From b426bcc097355d82002d8bc7f531de7292384ceb Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Mon, 25 Jun 2018 15:21:08 -0700 Subject: [PATCH] [zstdmt] Fix jobsize bugs (#1205) [zstdmt] Fix jobsize bugs * `ZSTDMT_serialState_reset()` should use `targetSectionSize`, not `jobSize` when sizing the seqstore. Add an assert that checks that we sized the seqstore using the right job size. * `ZSTDMT_compressionJob()` should check if `rawSeqStore.seq == NULL`. * `ZSTDMT_initCStream_internal()` should not adjust `mtctx->params.jobSize` (clamping to MIN/MAX is okay). --- lib/compress/zstdmt_compress.c | 54 ++++++++++++++++++++++++---------- lib/compress/zstdmt_compress.h | 5 ++++ tests/zstreamtest.c | 5 ++++ 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index b180b1b7d..6daedca8b 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -459,7 +459,7 @@ typedef struct { ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */ } serialState_t; -static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params) +static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params, size_t jobSize) { /* Adjust parameters */ if (params.ldmParams.enableLdm) { @@ -486,7 +486,7 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* serialState->params.ldmParams.hashLog - serialState->params.ldmParams.bucketSizeLog; /* Size the seq pool tables */ - ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize)); + ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, jobSize)); /* Reset the window */ ZSTD_window_clear(&serialState->ldmState.window); serialState->ldmWindow = serialState->ldmState.window; @@ -506,6 +506,7 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* memset(serialState->ldmState.bucketOffsets, 0, bucketSize); } serialState->params = params; + serialState->params.jobSize = (U32)jobSize; return 0; } @@ -547,6 +548,7 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, size_t error; assert(seqStore.seq != NULL && seqStore.pos == 0 && seqStore.size == 0 && seqStore.capacity > 0); + assert(src.size <= serialState->params.jobSize); ZSTD_window_update(&serialState->ldmState.window, src.start, src.size); error = ZSTD_ldm_generateSequences( &serialState->ldmState, &seqStore, @@ -635,13 +637,6 @@ void ZSTDMT_compressionJob(void* jobDescription) rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool); buffer_t dstBuff = job->dstBuff; - /* Don't compute the checksum for chunks, since we compute it externally, - * but write it in the header. - */ - if (job->jobID != 0) jobParams.fParams.checksumFlag = 0; - /* Don't run LDM for the chunks, since we handle it externally */ - jobParams.ldmParams.enableLdm = 0; - /* ressources */ if (cctx==NULL) { job->cSize = ERROR(memory_allocation); @@ -655,6 +650,18 @@ void ZSTDMT_compressionJob(void* jobDescription) } job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */ } + if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL) { + job->cSize = ERROR(memory_allocation); + goto _endJob; + } + + /* Don't compute the checksum for chunks, since we compute it externally, + * but write it in the header. + */ + if (job->jobID != 0) jobParams.fParams.checksumFlag = 0; + /* Don't run LDM for the chunks, since we handle it externally */ + jobParams.ldmParams.enableLdm = 0; + /* init */ if (job->cdict) { @@ -972,6 +979,8 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, if ( (value > 0) /* value==0 => automatic job size */ & (value < ZSTDMT_JOBSIZE_MIN) ) value = ZSTDMT_JOBSIZE_MIN; + if (value > ZSTDMT_JOBSIZE_MAX) + value = ZSTDMT_JOBSIZE_MAX; params->jobSize = value; return value; case ZSTDMT_p_overlapSectionLog : @@ -998,6 +1007,21 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, } } +size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value) +{ + switch (parameter) { + case ZSTDMT_p_jobSize: + *value = mtctx->params.jobSize; + break; + case ZSTDMT_p_overlapSectionLog: + *value = mtctx->params.overlapSizeLog; + break; + default: + return ERROR(parameter_unsupported); + } + return 0; +} + /* Sets parameters relevant to the compression job, * initializing others to default values. */ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) @@ -1143,7 +1167,7 @@ static size_t ZSTDMT_compress_advanced_internal( assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); - if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params)) + if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize)) return ERROR(memory_allocation); CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbJobs) ); /* only expands if necessary */ @@ -1280,9 +1304,7 @@ size_t ZSTDMT_initCStream_internal( if (params.nbWorkers != mtctx->params.nbWorkers) CHECK_F( ZSTDMT_resize(mtctx, params.nbWorkers) ); - if (params.jobSize == 0) { - params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params); - } + if (params.jobSize > 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN; if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ @@ -1321,7 +1343,9 @@ size_t ZSTDMT_initCStream_internal( mtctx->targetPrefixSize = (size_t)1 << ZSTDMT_computeOverlapLog(params); DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10)); mtctx->targetSectionSize = params.jobSize; - if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN; + if (mtctx->targetSectionSize == 0) { + mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params); + } if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); @@ -1363,7 +1387,7 @@ size_t ZSTDMT_initCStream_internal( mtctx->allJobsCompleted = 0; mtctx->consumed = 0; mtctx->produced = 0; - if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params)) + if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize)) return ERROR(memory_allocation); return 0; } diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 4249a82de..34a475a42 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -95,6 +95,11 @@ typedef enum { * @return : 0, or an error code (which can be tested using ZSTD_isError()) */ ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value); +/* ZSTDMT_getMTCtxParameter() : + * Query the ZSTDMT_CCtx for a parameter value. + * @return : 0, or an error code (which can be tested using ZSTD_isError()) */ +ZSTDLIB_API size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value); + /*! ZSTDMT_compressStream_generic() : * Combines ZSTDMT_compressStream() with optional ZSTDMT_flushStream() or ZSTDMT_endStream() diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 70da29728..22c49cb35 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -844,7 +844,12 @@ static int basicUnitTests(U32 seed, double compressibility) /* Basic multithreading compression test */ DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads : ", testNb++, COMPRESSIBLE_NOISE_LENGTH); { ZSTD_parameters const params = ZSTD_getParams(1, 0, 0); + unsigned jobSize; + CHECK_Z( ZSTDMT_getMTCtxParameter(mtctx, ZSTDMT_p_jobSize, &jobSize)); + CHECK(jobSize != 0, "job size non-zero"); CHECK_Z( ZSTDMT_initCStream_advanced(mtctx, CNBuffer, dictSize, params, CNBufferSize) ); + CHECK_Z( ZSTDMT_getMTCtxParameter(mtctx, ZSTDMT_p_jobSize, &jobSize)); + CHECK(jobSize != 0, "job size non-zero"); } outBuff.dst = compressedBuffer; outBuff.size = compressedBufferSize;