diff --git a/doc/zstd_manual.html b/doc/zstd_manual.html index 4dcf5a58a..1dea249d4 100644 --- a/doc/zstd_manual.html +++ b/doc/zstd_manual.html @@ -926,7 +926,7 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t
typedef enum { - ZSTD_e_continue=0, /* collect more data, encoder transparently decides when to output result, for optimal conditions */ + ZSTD_e_continue=0, /* collect more data, encoder decides when to output compressed result, for optimal conditions */ ZSTD_e_flush, /* flush any data provided so far - frame will continue, future data can still reference previous data for better compression */ ZSTD_e_end /* flush any remaining data and close current frame. Any additional data starts a new frame. */ } ZSTD_EndDirective; @@ -945,10 +945,11 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t and then immediately returns, just indicating that there is some data remaining to be flushed. The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte. - Exception : in multi-threading mode, if the first call requests a ZSTD_e_end directive, it is blocking : it will complete compression before giving back control to caller. - - @return provides the minimum amount of data remaining to be flushed from internal buffers + - @return provides a minimum amount of data remaining to be flushed from internal buffers or an error code, which can be tested using ZSTD_isError(). if @return != 0, flush is not fully completed, there is still some data left within internal buffers. - This is useful to determine if a ZSTD_e_flush or ZSTD_e_end directive is completed. + This is useful for ZSTD_e_flush, since in this case more flushes are necessary to empty all buffers. + For ZSTD_e_end, @return == 0 when internal buffers are fully flushed and frame is completed. - after a ZSTD_e_end directive, if internal buffer is not fully flushed (@return != 0), only ZSTD_e_end or ZSTD_e_flush operations are allowed. Before starting a new compression job, or changing compression parameters, diff --git a/lib/common/pool.c b/lib/common/pool.c index 98b109e72..773488b07 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -12,6 +12,7 @@ /* ====== Dependencies ======= */ #include/* size_t */ #include "pool.h" +#include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */ /* ====== Compiler specifics ====== */ #if defined(_MSC_VER) @@ -193,32 +194,54 @@ static int isQueueFull(POOL_ctx const* ctx) { } } -void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { - POOL_ctx* const ctx = (POOL_ctx*)ctxVoid; - if (!ctx) { return; } - ZSTD_pthread_mutex_lock(&ctx->queueMutex); - { POOL_job const job = {function, opaque}; +static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque) +{ + POOL_job const job = {function, opaque}; + assert(ctx != NULL); + if (ctx->shutdown) return; - /* Wait until there is space in the queue for the new job */ - while (isQueueFull(ctx) && !ctx->shutdown) { - ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); - } - /* The queue is still going => there is space */ - if (!ctx->shutdown) { - ctx->queueEmpty = 0; - ctx->queue[ctx->queueTail] = job; - ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; - } - } - ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + ctx->queueEmpty = 0; + ctx->queue[ctx->queueTail] = job; + ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; ZSTD_pthread_cond_signal(&ctx->queuePopCond); } -#else /* ZSTD_MULTITHREAD not defined */ -/* No multi-threading support */ +void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) +{ + assert(ctx != NULL); + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + /* Wait until there is space in the queue for the new job */ + while (isQueueFull(ctx) && (!ctx->shutdown)) { + ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); + } + POOL_add_internal(ctx, function, opaque); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); +} -/* We don't need any data, but if it is empty malloc() might return NULL. */ + +int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) +{ + assert(ctx != NULL); + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + if (isQueueFull(ctx)) { + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + return 0; + } + POOL_add_internal(ctx, function, opaque); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + return 1; +} + + +#else /* ZSTD_MULTITHREAD not defined */ + +/* ========================== */ +/* No multi-threading support */ +/* ========================== */ + + +/* We don't need any data, but if it is empty, malloc() might return NULL. */ struct POOL_ctx_s { int dummy; }; @@ -240,11 +263,17 @@ void POOL_free(POOL_ctx* ctx) { (void)ctx; } -void POOL_add(void* ctx, POOL_function function, void* opaque) { +void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) { (void)ctx; function(opaque); } +int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) { + (void)ctx; + function(opaque); + return 1; +} + size_t POOL_sizeof(POOL_ctx* ctx) { if (ctx==NULL) return 0; /* supports sizeof NULL */ assert(ctx == &g_ctx); diff --git a/lib/common/pool.h b/lib/common/pool.h index 08c63715a..a57e9b4fa 100644 --- a/lib/common/pool.h +++ b/lib/common/pool.h @@ -17,7 +17,8 @@ extern "C" { #include /* size_t */ -#include "zstd_internal.h" /* ZSTD_customMem */ +#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_customMem */ +#include "zstd.h" typedef struct POOL_ctx_s POOL_ctx; @@ -27,35 +28,43 @@ typedef struct POOL_ctx_s POOL_ctx; * The maximum number of queued jobs before blocking is `queueSize`. * @return : POOL_ctx pointer on success, else NULL. */ -POOL_ctx *POOL_create(size_t numThreads, size_t queueSize); +POOL_ctx* POOL_create(size_t numThreads, size_t queueSize); -POOL_ctx *POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem); +POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem); /*! POOL_free() : Free a thread pool returned by POOL_create(). */ -void POOL_free(POOL_ctx *ctx); +void POOL_free(POOL_ctx* ctx); /*! POOL_sizeof() : return memory usage of pool returned by POOL_create(). */ -size_t POOL_sizeof(POOL_ctx *ctx); +size_t POOL_sizeof(POOL_ctx* ctx); /*! POOL_function : The function type that can be added to a thread pool. */ -typedef void (*POOL_function)(void *); +typedef void (*POOL_function)(void*); /*! POOL_add_function : The function type for a generic thread pool add function. */ -typedef void (*POOL_add_function)(void *, POOL_function, void *); +typedef void (*POOL_add_function)(void*, POOL_function, void*); /*! POOL_add() : - Add the job `function(opaque)` to the thread pool. + Add the job `function(opaque)` to the thread pool. `ctx` must be valid. Possibly blocks until there is room in the queue. Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed. */ -void POOL_add(void *ctx, POOL_function function, void *opaque); +void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque); + + +/*! POOL_tryAdd() : + Add the job `function(opaque)` to the thread pool if a worker is available. + return immediately otherwise. + @return : 1 if successful, 0 if not. +*/ +int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque); #if defined (__cplusplus) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 5211384e0..307478e09 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -1824,7 +1824,7 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc, void* dst, size_t dstCapacity, const void* src, size_t srcSize) { - DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u) (dictLimit=%u, nextToUpdate=%u)", + DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u, dictLimit=%u, nextToUpdate=%u)", (U32)dstCapacity, zc->blockState.matchState.dictLimit, zc->blockState.matchState.nextToUpdate); if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1) return 0; /* don't even attempt compression below a certain srcSize */ @@ -1837,9 +1837,9 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc, if (current > zc->blockState.matchState.nextToUpdate + 384) zc->blockState.matchState.nextToUpdate = current - MIN(192, (U32)(current - zc->blockState.matchState.nextToUpdate - 384)); } - /* find and store sequences */ - { - U32 const extDict = zc->blockState.matchState.lowLimit < zc->blockState.matchState.dictLimit; + + /* select and store sequences */ + { U32 const extDict = zc->blockState.matchState.lowLimit < zc->blockState.matchState.dictLimit; size_t lastLLSize; { int i; for (i = 0; i < ZSTD_REP_NUM; ++i) zc->blockState.nextCBlock->rep[i] = zc->blockState.prevCBlock->rep[i]; } if (zc->appliedParams.ldmParams.enableLdm) { @@ -1848,26 +1848,20 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc, U32 rep[ZSTD_REP_NUM], ZSTD_CCtx_params const* params, void const* src, size_t srcSize); ZSTD_ldmBlockCompressor const ldmBlockCompressor = extDict ? ZSTD_compressBlock_ldm_extDict : ZSTD_compressBlock_ldm; - lastLLSize = ldmBlockCompressor(&zc->ldmState, &zc->blockState.matchState, &zc->seqStore, zc->blockState.nextCBlock->rep, &zc->appliedParams, src, srcSize); - } else { + } else { /* not long range mode */ ZSTD_blockCompressor const blockCompressor = ZSTD_selectBlockCompressor(zc->appliedParams.cParams.strategy, extDict); - lastLLSize = blockCompressor(&zc->blockState.matchState, &zc->seqStore, zc->blockState.nextCBlock->rep, &zc->appliedParams.cParams, src, srcSize); } - { - const BYTE* const anchor = (const BYTE*)src + srcSize - lastLLSize; - ZSTD_storeLastLiterals(&zc->seqStore, anchor, lastLLSize); - } - } - /* encode */ - { - size_t const cSize = ZSTD_compressSequences(&zc->seqStore, &zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy, &zc->appliedParams.cParams, dst, dstCapacity, srcSize, zc->entropyWorkspace); - if (ZSTD_isError(cSize) || cSize == 0) - return cSize; + { const BYTE* const lastLiterals = (const BYTE*)src + srcSize - lastLLSize; + ZSTD_storeLastLiterals(&zc->seqStore, lastLiterals, lastLLSize); + } } + + /* encode sequences and literals */ + { size_t const cSize = ZSTD_compressSequences(&zc->seqStore, &zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy, &zc->appliedParams.cParams, dst, dstCapacity, srcSize, zc->entropyWorkspace); + if (ZSTD_isError(cSize) || cSize == 0) return cSize; /* confirm repcodes and entropy tables */ - { - ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock; + { ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock; zc->blockState.prevCBlock = zc->blockState.nextCBlock; zc->blockState.nextCBlock = tmp; } @@ -2030,6 +2024,20 @@ static size_t ZSTD_writeFrameHeader(void* dst, size_t dstCapacity, return pos; } +/* ZSTD_writeLastEmptyBlock() : + * output an empty Block with end-of-frame mark to complete a frame + * @return : size of data written into `dst` (== ZSTD_blockHeaderSize (defined in zstd_internal.h)) + * or an error code if `dstCapcity` is too small ( stage == ZSTDcs_created) return ERROR(stage_wrong); /* init missing */ /* special case : empty frame */ @@ -2420,6 +2428,7 @@ static size_t ZSTD_writeEpilogue(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity) if (cctx->appliedParams.fParams.checksumFlag) { U32 const checksum = (U32) XXH64_digest(&cctx->xxhState); if (dstCapacity<4) return ERROR(dstSize_tooSmall); + DEBUGLOG(4, "ZSTD_writeEpilogue: write checksum : %08X", checksum); MEM_writeLE32(op, checksum); op += 4; } diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index 64f03a0d2..cf5936587 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -481,4 +481,13 @@ size_t ZSTD_compress_advanced_internal(ZSTD_CCtx* cctx, const void* dict,size_t dictSize, ZSTD_CCtx_params params); + +/* ZSTD_writeLastEmptyBlock() : + * output an empty Block with end-of-frame mark to complete a frame + * @return : size of data written into `dst` (== ZSTD_blockHeaderSize (defined in zstd_internal.h)) + * or an error code if `dstCapcity` is too small ( poolMutex); for (u=0; u totalBuffers; u++) - totalBufferSize += bufPool->bTable[u].size; + totalBufferSize += bufPool->bTable[u].capacity; ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); return poolSize + totalBufferSize; } +/* ZSTDMT_setBufferSize() : + * all future buffers provided by this buffer pool will have _at least_ this size + * note : it's better for all buffers to have same size, + * as they become freely interchangeable, reducing malloc/free usages and memory fragmentation */ static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const bSize) { ZSTD_pthread_mutex_lock(&bufPool->poolMutex); @@ -161,12 +165,12 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) ZSTD_pthread_mutex_lock(&bufPool->poolMutex); if (bufPool->nbBuffers) { /* try to use an existing buffer */ buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)]; - size_t const availBufferSize = buf.size; + size_t const availBufferSize = buf.capacity; bufPool->bTable[bufPool->nbBuffers] = g_nullBuffer; if ((availBufferSize >= bSize) & ((availBufferSize>>3) <= bSize)) { /* large enough, but not too much */ DEBUGLOG(5, "ZSTDMT_getBuffer: provide buffer %u of size %u", - bufPool->nbBuffers, (U32)buf.size); + bufPool->nbBuffers, (U32)buf.capacity); ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); return buf; } @@ -180,7 +184,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) { buffer_t buffer; void* const start = ZSTD_malloc(bSize, bufPool->cMem); buffer.start = start; /* note : start can be NULL if malloc fails ! */ - buffer.size = (start==NULL) ? 0 : bSize; + buffer.capacity = (start==NULL) ? 0 : bSize; if (start==NULL) { DEBUGLOG(5, "ZSTDMT_getBuffer: buffer allocation failure !!"); } else { @@ -199,7 +203,7 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) if (bufPool->nbBuffers < bufPool->totalBuffers) { bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */ DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u", - (U32)buf.size, (U32)(bufPool->nbBuffers-1)); + (U32)buf.capacity, (U32)(bufPool->nbBuffers-1)); ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); return; } @@ -300,37 +304,36 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) /* ------------------------------------------ */ -/* ===== Thread worker ===== */ +/* ===== Worker thread ===== */ /* ------------------------------------------ */ typedef struct { - buffer_t src; - const void* srcStart; - size_t prefixSize; - size_t srcSize; - size_t consumed; - buffer_t dstBuff; - size_t cSize; - size_t dstFlushed; - unsigned firstChunk; - unsigned lastChunk; - unsigned jobCompleted; - unsigned jobScanned; - ZSTD_pthread_mutex_t* jobCompleted_mutex; - ZSTD_pthread_cond_t* jobCompleted_cond; - ZSTD_CCtx_params params; - const ZSTD_CDict* cdict; - ZSTDMT_CCtxPool* cctxPool; - ZSTDMT_bufferPool* bufPool; - unsigned long long fullFrameSize; + size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ + size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */ + ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */ + ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */ + ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ + ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ + buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */ + buffer_t srcBuff; /* set by mtctx, then released by worker => no barrier */ + const void* prefixStart; /* set by mtctx, then read and set0 by worker => no barrier */ + size_t prefixSize; /* set by mtctx, then read by worker => no barrier */ + size_t srcSize; /* set by mtctx, then read by worker & mtctx => no barrier */ + unsigned firstJob; /* set by mtctx, then read by worker => no barrier */ + unsigned lastJob; /* set by mtctx, then read by worker => no barrier */ + ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */ + const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */ + unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */ + size_t dstFlushed; /* used only by mtctx */ + unsigned frameChecksumNeeded; /* used only by mtctx */ } ZSTDMT_jobDescription; -/* ZSTDMT_compressChunk() is a POOL_function type */ -void ZSTDMT_compressChunk(void* jobDescription) +/* ZSTDMT_compressionJob() is a POOL_function type */ +void ZSTDMT_compressionJob(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); - const void* const src = (const char*)job->srcStart + job->prefixSize; + const void* const src = (const char*)job->prefixStart + job->prefixSize; buffer_t dstBuff = job->dstBuff; /* ressources */ @@ -338,99 +341,93 @@ void ZSTDMT_compressChunk(void* jobDescription) job->cSize = ERROR(memory_allocation); goto _endJob; } - if (dstBuff.start == NULL) { + if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */ dstBuff = ZSTDMT_getBuffer(job->bufPool); if (dstBuff.start==NULL) { job->cSize = ERROR(memory_allocation); goto _endJob; } - job->dstBuff = dstBuff; + job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */ } /* init */ if (job->cdict) { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dm_auto, job->cdict, job->params, job->fullFrameSize); - assert(job->firstChunk); /* only allowed for first job */ + assert(job->firstJob); /* only allowed for first job */ if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } } else { /* srcStart points at reloaded section */ - U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : job->srcSize; + U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->srcSize; ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ - { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk); + { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob); if (ZSTD_isError(forceWindowError)) { job->cSize = forceWindowError; goto _endJob; } } { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, - job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ + job->prefixStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ NULL, /*cdict*/ jobParams, pledgedSrcSize); if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; - } } - } - if (!job->firstChunk) { /* flush and overwrite frame header when it's not first job */ - size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0); + } } } + if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ + size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, src, 0); if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; } - DEBUGLOG(5, "ZSTDMT_compressChunk: flush and overwrite %u bytes of frame header (not first chunk)", (U32)hSize); + DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize); ZSTD_invalidateRepCodes(cctx); } /* compress */ -#if 0 - job->cSize = (job->lastChunk) ? - ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : - ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize); -#else - if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */ - { int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX); + { size_t const chunkSize = 4*ZSTD_BLOCKSIZE_MAX; + int const nbChunks = (int)((job->srcSize + (chunkSize-1)) / chunkSize); const BYTE* ip = (const BYTE*) src; BYTE* const ostart = (BYTE*)dstBuff.start; BYTE* op = ostart; - BYTE* oend = op + dstBuff.size; - int blockNb; - DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); + BYTE* oend = op + dstBuff.capacity; + int chunkNb; + if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * chunkSize); /* check overflow */ + DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->srcSize, nbChunks); assert(job->cSize == 0); - for (blockNb = 1; blockNb < nbBlocks; blockNb++) { - size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX); + for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) { + size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } - ip += ZSTD_BLOCKSIZE_MAX; + ip += chunkSize; op += cSize; assert(op < oend); /* stats */ - ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */ + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); job->cSize += cSize; - job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb; - ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); + job->consumed = chunkSize * chunkNb; + DEBUGLOG(5, "ZSTDMT_compressionJob: compress new block : cSize==%u bytes (total: %u)", + (U32)cSize, (U32)job->cSize); + ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */ + ZSTD_pthread_mutex_unlock(&job->job_mutex); } /* last block */ - if ((nbBlocks > 0) | job->lastChunk /*need to output a "last block" flag*/ ) { - size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1); - size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=ZSTD_BLOCKSIZE_MAX)) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1; - size_t const cSize = (job->lastChunk) ? + assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */ + if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) { + size_t const lastBlockSize1 = job->srcSize & (chunkSize-1); + size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=chunkSize)) ? chunkSize : lastBlockSize1; + size_t const cSize = (job->lastJob) ? ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } /* stats */ - ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */ + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); job->cSize += cSize; - job->consumed = job->srcSize; - ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); - } - } -#endif + ZSTD_pthread_mutex_unlock(&job->job_mutex); + } } _endJob: - /* release */ + /* release resources */ ZSTDMT_releaseCCtx(job->cctxPool, cctx); - ZSTDMT_releaseBuffer(job->bufPool, job->src); - job->src = g_nullBuffer; job->srcStart = NULL; + ZSTDMT_releaseBuffer(job->bufPool, job->srcBuff); + job->srcBuff = g_nullBuffer; job->prefixStart = NULL; /* report */ - ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); job->consumed = job->srcSize; - job->jobCompleted = 1; - job->jobScanned = 0; - ZSTD_pthread_cond_signal(job->jobCompleted_cond); - ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); + ZSTD_pthread_cond_signal(&job->job_cond); + ZSTD_pthread_mutex_unlock(&job->job_mutex); } @@ -440,6 +437,8 @@ _endJob: typedef struct { buffer_t buffer; + size_t targetCapacity; /* note : buffers provided by the pool may be larger than target capacity */ + size_t prefixSize; size_t filled; } inBuff_t; @@ -448,14 +447,11 @@ struct ZSTDMT_CCtx_s { ZSTDMT_jobDescription* jobs; ZSTDMT_bufferPool* bufPool; ZSTDMT_CCtxPool* cctxPool; - ZSTD_pthread_mutex_t jobCompleted_mutex; - ZSTD_pthread_cond_t jobCompleted_cond; ZSTD_CCtx_params params; size_t targetSectionSize; - size_t inBuffSize; - size_t prefixSize; size_t targetPrefixSize; inBuff_t inBuff; + int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */ XXH64_state_t xxhState; unsigned singleBlockingThread; unsigned jobIDMask; @@ -471,28 +467,39 @@ struct ZSTDMT_CCtx_s { const ZSTD_CDict* cdict; }; -/* Sets parameters relevant to the compression job, initializing others to - * default values. Notably, nbThreads should probably be zero. */ -static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) +static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZSTD_customMem cMem) { - ZSTD_CCtx_params jobParams; - memset(&jobParams, 0, sizeof(jobParams)); - - jobParams.cParams = params.cParams; - jobParams.fParams = params.fParams; - jobParams.compressionLevel = params.compressionLevel; - - jobParams.ldmParams = params.ldmParams; - return jobParams; + U32 jobNb; + if (jobTable == NULL) return; + for (jobNb=0; jobNb cMem = cMem; mtctx->allJobsCompleted = 1; mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem); - mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem); + mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem); + assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */ mtctx->jobIDMask = nbJobs - 1; mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem); @@ -533,14 +541,6 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) ZSTDMT_freeCCtx(mtctx); return NULL; } - if (ZSTD_pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL)) { - ZSTDMT_freeCCtx(mtctx); - return NULL; - } - if (ZSTD_pthread_cond_init(&mtctx->jobCompleted_cond, NULL)) { - ZSTDMT_freeCCtx(mtctx); - return NULL; - } DEBUGLOG(3, "mt_cctx created, for %u threads", nbThreads); return mtctx; } @@ -561,9 +561,10 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); mtctx->jobs[jobID].dstBuff = g_nullBuffer; - DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].src.start); - ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].src); - mtctx->jobs[jobID].src = g_nullBuffer; + mtctx->jobs[jobID].cSize = 0; + DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].srcBuff.start); + ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].srcBuff); + mtctx->jobs[jobID].srcBuff = g_nullBuffer; } memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription)); DEBUGLOG(4, "input: release address %08X", (U32)(size_t)mtctx->inBuff.buffer.start); @@ -572,18 +573,18 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) mtctx->allJobsCompleted = 1; } -static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) +static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx) { DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted"); - while (zcs->doneJobID < zcs->nextJobID) { - unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; - ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); - while (zcs->jobs[jobID].jobCompleted==0) { - DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */ - ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); + while (mtctx->doneJobID < mtctx->nextJobID) { + unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask; + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); + while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) { + DEBUGLOG(5, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ + ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); } - ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); - zcs->doneJobID++; + ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); + mtctx->doneJobID++; } } @@ -592,12 +593,10 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) if (mtctx==NULL) return 0; /* compatible with free on NULL */ POOL_free(mtctx->factory); /* stop and free worker threads */ ZSTDMT_releaseAllJobResources(mtctx); /* release job resources into pools first */ - ZSTD_free(mtctx->jobs, mtctx->cMem); + ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); ZSTDMT_freeBufferPool(mtctx->bufPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool); ZSTD_freeCDict(mtctx->cdictLocal); - ZSTD_pthread_mutex_destroy(&mtctx->jobCompleted_mutex); - ZSTD_pthread_cond_destroy(&mtctx->jobCompleted_cond); ZSTD_free(mtctx, mtctx->cMem); return 0; } @@ -650,6 +649,21 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, } } +/* Sets parameters relevant to the compression job, initializing others to + * default values. Notably, nbThreads should probably be zero. */ +static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) +{ + ZSTD_CCtx_params jobParams; + memset(&jobParams, 0, sizeof(jobParams)); + + jobParams.cParams = params.cParams; + jobParams.fParams = params.fParams; + jobParams.compressionLevel = params.compressionLevel; + + jobParams.ldmParams = params.ldmParams; + return jobParams; +} + /* ZSTDMT_getNbThreads(): * @return nb threads currently active in mtctx. * mtctx must be valid */ @@ -665,25 +679,29 @@ unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) * Note : mutex will be acquired during statistics collection. */ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) { - ZSTD_frameProgression fs; - DEBUGLOG(5, "ZSTDMT_getFrameProgression"); - ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex); - fs.consumed = mtctx->consumed; - fs.produced = mtctx->produced; - assert(mtctx->inBuff.filled >= mtctx->prefixSize); - fs.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->prefixSize); + ZSTD_frameProgression fps; + DEBUGLOG(6, "ZSTDMT_getFrameProgression"); + fps.consumed = mtctx->consumed; + fps.produced = mtctx->produced; + assert(mtctx->inBuff.filled >= mtctx->inBuff.prefixSize); + fps.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->inBuff.prefixSize); { unsigned jobNb; - for (jobNb = mtctx->doneJobID ; jobNb < mtctx->nextJobID ; jobNb++) { + unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); + DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", + mtctx->doneJobID, lastJobNb, mtctx->jobReady) + for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { unsigned const wJobID = jobNb & mtctx->jobIDMask; - size_t const cResult = mtctx->jobs[wJobID].cSize; - size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; - fs.consumed += mtctx->jobs[wJobID].consumed; - fs.ingested += mtctx->jobs[wJobID].srcSize; - fs.produced += produced; + ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex); + { size_t const cResult = mtctx->jobs[wJobID].cSize; + size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; + fps.consumed += mtctx->jobs[wJobID].consumed; + fps.ingested += mtctx->jobs[wJobID].srcSize; + fps.produced += produced; + } + ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); } } - ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex); - return fs; + return fps; } @@ -691,16 +709,16 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) /* ===== Multi-threaded compression ===== */ /* ------------------------------------------ */ -static unsigned ZSTDMT_computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbThreads) { +static unsigned ZSTDMT_computeNbJobs(size_t srcSize, unsigned windowLog, unsigned nbThreads) { assert(nbThreads>0); - { size_t const chunkSizeTarget = (size_t)1 << (windowLog + 2); - size_t const chunkMaxSize = chunkSizeTarget << 2; - size_t const passSizeMax = chunkMaxSize * nbThreads; + { size_t const jobSizeTarget = (size_t)1 << (windowLog + 2); + size_t const jobMaxSize = jobSizeTarget << 2; + size_t const passSizeMax = jobMaxSize * nbThreads; unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; - unsigned const nbChunksLarge = multiplier * nbThreads; - unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1; - unsigned const nbChunksSmall = MIN(nbChunksMax, nbThreads); - return (multiplier>1) ? nbChunksLarge : nbChunksSmall; + unsigned const nbJobsLarge = multiplier * nbThreads; + unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1; + unsigned const nbJobsSmall = MIN(nbJobsMax, nbThreads); + return (multiplier>1) ? nbJobsLarge : nbJobsSmall; } } /* ZSTDMT_compress_advanced_internal() : @@ -716,52 +734,52 @@ static size_t ZSTDMT_compress_advanced_internal( ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params); unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog; size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); - unsigned nbChunks = ZSTDMT_computeNbChunks(srcSize, params.cParams.windowLog, params.nbThreads); - size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; - size_t const avgChunkSize = (((proposedChunkSize-1) & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ + unsigned const nbJobs = ZSTDMT_computeNbJobs(srcSize, params.cParams.windowLog, params.nbThreads); + size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs; + size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */ const char* const srcStart = (const char*)src; size_t remainingSrcSize = srcSize; - unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */ + unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */ size_t frameStartPos = 0, dstBufferPos = 0; XXH64_state_t xxh64; assert(jobParams.nbThreads == 0); assert(mtctx->cctxPool->totalCCtx == params.nbThreads); - DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbChunks=%2u (rawSize=%u bytes; fixedSize=%u) ", - nbChunks, (U32)proposedChunkSize, (U32)avgChunkSize); + DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ", + nbJobs, (U32)proposedJobSize, (U32)avgJobSize); - if ((nbChunks==1) | (params.nbThreads<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */ + if ((nbJobs==1) | (params.nbThreads<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */ ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams); return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams); } - assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ - ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgChunkSize) ); + assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ + ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); XXH64_reset(&xxh64, 0); - if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */ - U32 nbJobs = nbChunks; - ZSTD_free(mtctx->jobs, mtctx->cMem); + if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */ + U32 jobsTableSize = nbJobs; + ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); mtctx->jobIDMask = 0; - mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem); + mtctx->jobs = ZSTDMT_createJobsTable(&jobsTableSize, mtctx->cMem); if (mtctx->jobs==NULL) return ERROR(memory_allocation); - assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0)); /* ensure nbJobs is a power of 2 */ - mtctx->jobIDMask = nbJobs - 1; + assert((jobsTableSize != 0) && ((jobsTableSize & (jobsTableSize - 1)) == 0)); /* ensure jobsTableSize is a power of 2 */ + mtctx->jobIDMask = jobsTableSize - 1; } { unsigned u; - for (u=0; u jobs[u].src = g_nullBuffer; - mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; + mtctx->jobs[u].srcBuff = g_nullBuffer; + mtctx->jobs[u].prefixStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].prefixSize = dictSize; - mtctx->jobs[u].srcSize = chunkSize; + mtctx->jobs[u].srcSize = jobSize; assert(jobSize > 0); /* avoid job.srcSize == 0 */ mtctx->jobs[u].consumed = 0; mtctx->jobs[u].cSize = 0; mtctx->jobs[u].cdict = (u==0) ? cdict : NULL; @@ -772,53 +790,51 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].cctxPool = mtctx->cctxPool; mtctx->jobs[u].bufPool = mtctx->bufPool; - mtctx->jobs[u].firstChunk = (u==0); - mtctx->jobs[u].lastChunk = (u==nbChunks-1); - mtctx->jobs[u].jobCompleted = 0; - mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; - mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; + mtctx->jobs[u].firstJob = (u==0); + mtctx->jobs[u].lastJob = (u==nbJobs-1); if (params.fParams.checksumFlag) { - XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize); + XXH64_update(&xxh64, srcStart + frameStartPos, jobSize); } - DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)chunkSize); - DEBUG_PRINTHEX(6, mtctx->jobs[u].srcStart, 12); - POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); + DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)jobSize); + DEBUG_PRINTHEX(6, mtctx->jobs[u].prefixStart, 12); + POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[u]); - frameStartPos += chunkSize; + frameStartPos += jobSize; dstBufferPos += dstBufferCapacity; - remainingSrcSize -= chunkSize; + remainingSrcSize -= jobSize; } } /* collect result */ { size_t error = 0, dstPos = 0; - unsigned chunkID; - for (chunkID=0; chunkID jobCompleted_mutex); - while (mtctx->jobs[chunkID].jobCompleted==0) { - DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID); - ZSTD_pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex); + unsigned jobID; + for (jobID=0; jobID jobs[jobID].job_mutex); + while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) { + DEBUGLOG(5, "waiting for jobCompleted signal from job %u", jobID); + ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); } - ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex); - DEBUGLOG(5, "ready to write chunk %u ", chunkID); + ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); + DEBUGLOG(5, "ready to write job %u ", jobID); - mtctx->jobs[chunkID].srcStart = NULL; - { size_t const cSize = mtctx->jobs[chunkID].cSize; + mtctx->jobs[jobID].prefixStart = NULL; + { size_t const cSize = mtctx->jobs[jobID].cSize; if (ZSTD_isError(cSize)) error = cSize; if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); - if (chunkID) { /* note : chunk 0 is written directly at dst, which is correct position */ + if (jobID) { /* note : job 0 is written directly at dst, which is correct position */ if (!error) - memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap when chunk compressed within dst */ - if (chunkID >= compressWithinDst) { /* chunk compressed into its own buffer, which must be released */ - DEBUGLOG(5, "releasing buffer %u>=%u", chunkID, compressWithinDst); - ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[chunkID].dstBuff); + memmove((char*)dst + dstPos, mtctx->jobs[jobID].dstBuff.start, cSize); /* may overlap when job compressed within dst */ + if (jobID >= compressWithinDst) { /* job compressed into its own buffer, which must be released */ + DEBUGLOG(5, "releasing buffer %u>=%u", jobID, compressWithinDst); + ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); } } - mtctx->jobs[chunkID].dstBuff = g_nullBuffer; + mtctx->jobs[jobID].dstBuff = g_nullBuffer; + mtctx->jobs[jobID].cSize = 0; dstPos += cSize ; } - } /* for (chunkID=0; chunkID cctxPool->totalCCtx == params.nbThreads); - zcs->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ + assert(mtctx->cctxPool->totalCCtx == params.nbThreads); + mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ if (params.jobSize == 0) { if (params.cParams.windowLog >= 29) params.jobSize = ZSTDMT_JOBSIZE_MAX; @@ -890,56 +906,56 @@ size_t ZSTDMT_initCStream_internal( } if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; - if (zcs->singleBlockingThread) { + if (mtctx->singleBlockingThread) { ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode"); assert(singleThreadParams.nbThreads == 0); - return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0], + return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0], dict, dictSize, cdict, singleThreadParams, pledgedSrcSize); } DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u threads", params.nbThreads); - if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */ - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - zcs->allJobsCompleted = 1; + if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */ + ZSTDMT_waitForAllJobsCompleted(mtctx); + ZSTDMT_releaseAllJobResources(mtctx); + mtctx->allJobsCompleted = 1; } - zcs->params = params; - zcs->frameContentSize = pledgedSrcSize; + mtctx->params = params; + mtctx->frameContentSize = pledgedSrcSize; if (dict) { - ZSTD_freeCDict(zcs->cdictLocal); - zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, + ZSTD_freeCDict(mtctx->cdictLocal); + mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, ZSTD_dlm_byCopy, dictMode, /* note : a loadPrefix becomes an internal CDict */ - params.cParams, zcs->cMem); - zcs->cdict = zcs->cdictLocal; - if (zcs->cdictLocal == NULL) return ERROR(memory_allocation); + params.cParams, mtctx->cMem); + mtctx->cdict = mtctx->cdictLocal; + if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation); } else { - ZSTD_freeCDict(zcs->cdictLocal); - zcs->cdictLocal = NULL; - zcs->cdict = cdict; + ZSTD_freeCDict(mtctx->cdictLocal); + mtctx->cdictLocal = NULL; + mtctx->cdict = cdict; } assert(params.overlapSizeLog <= 9); - zcs->targetPrefixSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog)); - DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(zcs->targetPrefixSize>>10)); - zcs->targetSectionSize = params.jobSize; - if (zcs->targetSectionSize < ZSTDMT_JOBSIZE_MIN) zcs->targetSectionSize = ZSTDMT_JOBSIZE_MIN; - if (zcs->targetSectionSize < zcs->targetPrefixSize) zcs->targetSectionSize = zcs->targetPrefixSize; /* job size must be >= overlap size */ - DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(zcs->targetSectionSize>>10), params.jobSize); - zcs->inBuffSize = zcs->targetPrefixSize + zcs->targetSectionSize; - DEBUGLOG(4, "inBuff Size : %u KB", (U32)(zcs->inBuffSize>>10)); - ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) ); - zcs->inBuff.buffer = g_nullBuffer; - zcs->prefixSize = 0; - zcs->doneJobID = 0; - zcs->nextJobID = 0; - zcs->frameEnded = 0; - zcs->allJobsCompleted = 0; - zcs->consumed = 0; - zcs->produced = 0; - if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0); + mtctx->targetPrefixSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog)); + DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10)); + mtctx->targetSectionSize = params.jobSize; + if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN; + if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ + DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); + mtctx->inBuff.targetCapacity = mtctx->targetPrefixSize + mtctx->targetSectionSize; + DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->inBuff.targetCapacity>>10)); + ZSTDMT_setBufferSize(mtctx->bufPool, MAX(mtctx->inBuff.targetCapacity, ZSTD_compressBound(mtctx->targetSectionSize)) ); + mtctx->inBuff.buffer = g_nullBuffer; + mtctx->inBuff.prefixSize = 0; + mtctx->doneJobID = 0; + mtctx->nextJobID = 0; + mtctx->frameEnded = 0; + mtctx->allJobsCompleted = 0; + mtctx->consumed = 0; + mtctx->produced = 0; + if (params.fParams.checksumFlag) XXH64_reset(&mtctx->xxhState, 0); return 0; } @@ -974,153 +990,226 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, * pledgedSrcSize can be zero == unknown (for the time being) * prefer using ZSTD_CONTENTSIZE_UNKNOWN, * as `0` might mean "empty" in the future */ -size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) +size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize) { if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; - if (zcs->params.nbThreads==1) - return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize); - return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, 0, zcs->params, + if (mtctx->params.nbThreads==1) + return ZSTD_resetCStream(mtctx->cctxPool->cctx[0], pledgedSrcSize); + return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, 0, mtctx->params, pledgedSrcSize); } -size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { +size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) { ZSTD_parameters const params = ZSTD_getParams(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, 0); - ZSTD_CCtx_params cctxParams = zcs->params; /* retrieve sticky params */ + ZSTD_CCtx_params cctxParams = mtctx->params; /* retrieve sticky params */ DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel); cctxParams.cParams = params.cParams; cctxParams.fParams = params.fParams; - return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN); + return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN); } -static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsigned endFrame) +/* ZSTDMT_writeLastEmptyBlock() + * Write a single empty block with an end-of-frame to finish a frame. + * Job must be created from streaming variant. + * This function is always successfull if expected conditions are fulfilled. + */ +static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) { - unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; + assert(job->lastJob == 1); + assert(job->srcSize == 0); /* last job is empty -> will be simplified into a last empty block */ + assert(job->firstJob == 0); /* cannot be first job, as it also needs to create frame header */ + /* A job created by streaming variant starts with a src buffer, but no dst buffer. + * It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx. + * When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx. + * This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */ + assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */ + assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */ + assert(job->srcBuff.capacity >= ZSTD_blockHeaderSize); /* no buffer should ever be that small */ + job->dstBuff = job->srcBuff; + job->srcBuff = g_nullBuffer; + job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.capacity); + assert(!ZSTD_isError(job->cSize)); + assert(job->consumed == 0); +} - DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", - zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize); - zcs->jobs[jobID].src = zcs->inBuff.buffer; - zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; - zcs->jobs[jobID].srcSize = srcSize; - zcs->jobs[jobID].consumed = 0; - zcs->jobs[jobID].cSize = 0; - zcs->jobs[jobID].prefixSize = zcs->prefixSize; - assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize); - zcs->jobs[jobID].params = zcs->params; - /* do not calculate checksum within sections, but write it in header for first section */ - if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; - zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; - zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; - zcs->jobs[jobID].dstBuff = g_nullBuffer; - zcs->jobs[jobID].cctxPool = zcs->cctxPool; - zcs->jobs[jobID].bufPool = zcs->bufPool; - zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); - zcs->jobs[jobID].lastChunk = endFrame; - zcs->jobs[jobID].jobCompleted = 0; - zcs->jobs[jobID].dstFlushed = 0; - zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; - zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; +static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp) +{ + unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask; + int const endFrame = (endOp == ZSTD_e_end); - if (zcs->params.fParams.checksumFlag) - XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize); + if (mtctx->nextJobID > mtctx->doneJobID + mtctx->jobIDMask) { + DEBUGLOG(5, "ZSTDMT_createCompressionJob: will not create new job : table is full"); + assert((mtctx->nextJobID & mtctx->jobIDMask) == (mtctx->doneJobID & mtctx->jobIDMask)); + return 0; + } - /* get a new buffer for next input */ - if (!endFrame) { - size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize); - zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool); - if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ - zcs->jobs[jobID].jobCompleted = 1; - zcs->nextJobID++; - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - return ERROR(memory_allocation); + if (!mtctx->jobReady) { + DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", + mtctx->nextJobID, (U32)srcSize, (U32)mtctx->inBuff.prefixSize); + assert(mtctx->jobs[jobID].srcBuff.start == NULL); /* no buffer left : supposed already released */ + mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer; + mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start; + mtctx->jobs[jobID].prefixSize = mtctx->inBuff.prefixSize; + mtctx->jobs[jobID].srcSize = srcSize; + assert(mtctx->inBuff.filled >= srcSize + mtctx->inBuff.prefixSize); + mtctx->jobs[jobID].consumed = 0; + mtctx->jobs[jobID].cSize = 0; + mtctx->jobs[jobID].params = mtctx->params; + /* do not calculate checksum within sections, but write it in header for first section */ + if (mtctx->nextJobID) mtctx->jobs[jobID].params.fParams.checksumFlag = 0; + mtctx->jobs[jobID].cdict = mtctx->nextJobID==0 ? mtctx->cdict : NULL; + mtctx->jobs[jobID].fullFrameSize = mtctx->frameContentSize; + mtctx->jobs[jobID].dstBuff = g_nullBuffer; + mtctx->jobs[jobID].cctxPool = mtctx->cctxPool; + mtctx->jobs[jobID].bufPool = mtctx->bufPool; + mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0); + mtctx->jobs[jobID].lastJob = endFrame; + mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag; + mtctx->jobs[jobID].dstFlushed = 0; + + if (mtctx->params.fParams.checksumFlag) + XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->inBuff.prefixSize, srcSize); + + /* get a new buffer for next input */ + if (!endFrame) { + size_t const newPrefixSize = MIN(mtctx->inBuff.filled, mtctx->targetPrefixSize); + mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); + if (mtctx->inBuff.buffer.start == NULL) { /* not enough memory to allocate a new input buffer */ + mtctx->jobs[jobID].srcSize = mtctx->jobs[jobID].consumed = 0; + mtctx->nextJobID++; + ZSTDMT_waitForAllJobsCompleted(mtctx); + ZSTDMT_releaseAllJobResources(mtctx); + return ERROR(memory_allocation); + } + mtctx->inBuff.filled -= (mtctx->inBuff.prefixSize + srcSize) - newPrefixSize; + memmove(mtctx->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */ + (const char*)mtctx->jobs[jobID].prefixStart + mtctx->inBuff.prefixSize + srcSize - newPrefixSize, + mtctx->inBuff.filled); + mtctx->inBuff.prefixSize = newPrefixSize; + } else { /* endFrame==1 => no need for another input buffer */ + mtctx->inBuff.buffer = g_nullBuffer; + mtctx->inBuff.filled = 0; + mtctx->inBuff.prefixSize = 0; + mtctx->frameEnded = endFrame; + if (mtctx->nextJobID == 0) { + /* single job exception : checksum is already calculated directly within worker thread */ + mtctx->params.fParams.checksumFlag = 0; + } } + + if ( (srcSize == 0) + && (mtctx->nextJobID>0)/*single job must also write frame header*/ ) { + DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame"); + assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */ + ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID); + mtctx->nextJobID++; + return 0; } - zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize; - memmove(zcs->inBuff.buffer.start, - (const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize, - zcs->inBuff.filled); - zcs->prefixSize = newPrefixSize; - } else { /* if (endFrame==1) */ - zcs->inBuff.buffer = g_nullBuffer; - zcs->inBuff.filled = 0; - zcs->prefixSize = 0; - zcs->frameEnded = 1; - if (zcs->nextJobID == 0) { - /* single chunk exception : checksum is calculated directly within worker thread */ - zcs->params.fParams.checksumFlag = 0; - } } + } - DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", - zcs->nextJobID, - (U32)zcs->jobs[jobID].srcSize, - zcs->jobs[jobID].lastChunk, - zcs->doneJobID, - zcs->doneJobID & zcs->jobIDMask); - POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */ - zcs->nextJobID++; + DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u, jobNb == %u (mod:%u))", + mtctx->nextJobID, + (U32)mtctx->jobs[jobID].srcSize, + mtctx->jobs[jobID].lastJob, + mtctx->nextJobID, + jobID); + if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) { + mtctx->nextJobID++; + mtctx->jobReady = 0; + } else { + DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", mtctx->nextJobID); + mtctx->jobReady = 1; + } return 0; } -/* ZSTDMT_flushNextJob() : - * output : will be updated with amount of data flushed . - * blockToFlush : if >0, the function will block and wait if there is no data available to flush . - * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more, or an error code */ -static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) +/*! ZSTDMT_flushProduced() : + * `output` : `pos` will be updated with amount of data flushed . + * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush . + * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */ +static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end) { - unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; - DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush); - if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ - ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); - while (zcs->jobs[wJobID].jobCompleted==0) { - DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); - if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ - ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */ - } - ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); - /* compression job completed : output can be flushed */ - { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; - if (!job.jobScanned) { - if (ZSTD_isError(job.cSize)) { - DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s", - zcs->doneJobID, ZSTD_getErrorName(job.cSize)); - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - return job.cSize; + unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask; + DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u , job %u <= %u)", + blockToFlush, mtctx->doneJobID, mtctx->nextJobID); + assert(output->size >= output->pos); + + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex); + if ( blockToFlush + && (mtctx->doneJobID < mtctx->nextJobID) ) { + assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize); + while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) { /* nothing to flush */ + if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].srcSize) { + DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond, there will be none", + mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].srcSize); + break; } - DEBUGLOG(5, "ZSTDMT_flushNextJob: zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); - if (zcs->params.fParams.checksumFlag) { - if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */ - U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); - DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum); - MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); - job.cSize += 4; - zcs->jobs[wJobID].cSize += 4; - } } - zcs->jobs[wJobID].jobScanned = 1; + DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)", + mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); + ZSTD_pthread_cond_wait(&mtctx->jobs[wJobID].job_cond, &mtctx->jobs[wJobID].job_mutex); /* block when nothing to flush but some to come */ + } } + + /* try to flush something */ + { size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */ + size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */ + size_t const srcSize = mtctx->jobs[wJobID].srcSize; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ + ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); + if (ZSTD_isError(cSize)) { + DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", + mtctx->doneJobID, ZSTD_getErrorName(cSize)); + ZSTDMT_waitForAllJobsCompleted(mtctx); + ZSTDMT_releaseAllJobResources(mtctx); + return cSize; } - { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); - DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); - memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); - output->pos += toWrite; - job.dstFlushed += toWrite; + /* add frame checksum if necessary (can only happen once) */ + assert(srcConsumed <= srcSize); + if ( (srcConsumed == srcSize) /* job completed -> worker no longer active */ + && mtctx->jobs[wJobID].frameChecksumNeeded ) { + U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState); + DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum); + MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum); + cSize += 4; + mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */ + mtctx->jobs[wJobID].frameChecksumNeeded = 0; } - if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */ - ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff); - zcs->jobs[wJobID].dstBuff = g_nullBuffer; - zcs->jobs[wJobID].jobCompleted = 0; - zcs->doneJobID++; - zcs->consumed += job.srcSize; - zcs->produced += job.cSize; - } else { - zcs->jobs[wJobID].dstFlushed = job.dstFlushed; - } - /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */ - if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); - if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */ - zcs->allJobsCompleted = zcs->frameEnded; /* frame completed and entirely flushed */ - return 0; /* everything flushed */ -} } + if (cSize > 0) { /* compression is ongoing or completed */ + size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); + DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)", + (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize); + assert(mtctx->doneJobID < mtctx->nextJobID); + assert(cSize >= mtctx->jobs[wJobID].dstFlushed); + assert(mtctx->jobs[wJobID].dstBuff.start != NULL); + memcpy((char*)output->dst + output->pos, + (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, + toFlush); + output->pos += toFlush; + mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */ + + if ( (srcConsumed == srcSize) /* job completed */ + && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ + DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", + mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); + assert(mtctx->jobs[wJobID].srcBuff.start == NULL); /* srcBuff supposed already released */ + ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); + mtctx->jobs[wJobID].dstBuff = g_nullBuffer; + mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ + mtctx->consumed += srcSize; + mtctx->produced += cSize; + mtctx->doneJobID++; + } } + + /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */ + if (cSize > mtctx->jobs[wJobID].dstFlushed) return (cSize - mtctx->jobs[wJobID].dstFlushed); + if (srcSize > srcConsumed) return 1; /* current job not completely compressed */ + } + if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs ongoing */ + if (mtctx->jobReady) return 1; /* one job is ready to push, just not yet in the list */ + if (mtctx->inBuff.filled > 0) return 1; /* input is not empty, and still needs to be converted into a job */ + mtctx->allJobsCompleted = mtctx->frameEnded; /* all jobs are entirely flushed => if this one is last one, frame is completed */ + if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? instead of : are internal buffers fully flushed ? */ + return 0; /* internal buffers fully flushed */ +} /** ZSTDMT_compressStream_generic() : @@ -1132,9 +1221,10 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) { - size_t const newJobThreshold = mtctx->prefixSize + mtctx->targetSectionSize; + size_t const newJobThreshold = mtctx->inBuff.prefixSize + mtctx->targetSectionSize; unsigned forwardInputProgress = 0; - DEBUGLOG(5, "ZSTDMT_compressStream_generic "); + DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)", + (U32)endOp, (U32)(input->size - input->pos)); assert(output->pos <= output->size); assert(input->pos <= input->size); @@ -1148,10 +1238,11 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } /* single-pass shortcut (note : synchronous-mode) */ - if ( (mtctx->nextJobID == 0) /* just started */ - && (mtctx->inBuff.filled == 0) /* nothing buffered */ - && (endOp == ZSTD_e_end) /* end order */ - && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */ + if ( (mtctx->nextJobID == 0) /* just started */ + && (mtctx->inBuff.filled == 0) /* nothing buffered */ + && (!mtctx->jobReady) /* no job already created */ + && (endOp == ZSTD_e_end) /* end order */ + && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */ size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx, (char*)output->dst + output->pos, output->size - output->pos, (const char*)input->src + input->pos, input->size - input->pos, @@ -1166,70 +1257,70 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } /* fill input buffer */ - if (input->size > input->pos) { /* support NULL input */ + if ( (!mtctx->jobReady) + && (input->size > input->pos) ) { /* support NULL input */ if (mtctx->inBuff.buffer.start == NULL) { - mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : allocation can fail, in which case, no forward input progress */ + mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : allocation can fail, in which case, buffer.start==NULL */ mtctx->inBuff.filled = 0; - if ( (mtctx->inBuff.buffer.start == NULL) /* allocation failure */ + if ( (mtctx->inBuff.buffer.start == NULL) /* allocation failure */ && (mtctx->doneJobID == mtctx->nextJobID) ) { /* and nothing to flush */ - return ERROR(memory_allocation); /* no forward progress possible => output an error */ - } } - if (mtctx->inBuff.buffer.start != NULL) { - size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled); - DEBUGLOG(5, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad); + return ERROR(memory_allocation); /* no forward progress possible => output an error */ + } + assert(mtctx->inBuff.buffer.capacity >= mtctx->inBuff.targetCapacity); /* pool must provide a buffer >= targetCapacity */ + } + if (mtctx->inBuff.buffer.start != NULL) { /* no buffer for input, but it's possible to flush, and then reclaim the buffer */ + size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuff.targetCapacity - mtctx->inBuff.filled); + DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", + (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->inBuff.targetCapacity); memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); input->pos += toLoad; mtctx->inBuff.filled += toLoad; forwardInputProgress = toLoad>0; - } } + } + if ((input->pos < input->size) && (endOp == ZSTD_e_end)) + endOp = ZSTD_e_flush; /* can't end now : not all input consumed */ + } - if ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ - && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) { /* avoid overwriting job round buffer */ - CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) ); + if ( (mtctx->jobReady) + || (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ + || ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */ + || ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */ + size_t const jobSize = MIN(mtctx->inBuff.filled - mtctx->inBuff.prefixSize, mtctx->targetSectionSize); + CHECK_F( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) ); } /* check for potential compressed data ready to be flushed */ - CHECK_F( ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress /* blockToFlush */) ); /* block if there was no forward input progress */ - - if (input->pos < input->size) /* input not consumed : do not flush yet */ - endOp = ZSTD_e_continue; - - switch(endOp) - { - case ZSTD_e_flush: - return ZSTDMT_flushStream(mtctx, output); - case ZSTD_e_end: - return ZSTDMT_endStream(mtctx, output); - case ZSTD_e_continue: - return 1; - default: - return ERROR(GENERIC); /* invalid endDirective */ + { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */ + if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */ + return remainingToFlush; } } -size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) +size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { - CHECK_F( ZSTDMT_compressStream_generic(zcs, output, input, ZSTD_e_continue) ); + CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) ); /* recommended next input size : fill current input buffer */ - return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ + return mtctx->inBuff.targetCapacity - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ } -static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned endFrame) +static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_EndDirective endFrame) { - size_t const srcSize = mtctx->inBuff.filled - mtctx->prefixSize; + size_t const srcSize = mtctx->inBuff.filled - mtctx->inBuff.prefixSize; DEBUGLOG(5, "ZSTDMT_flushStream_internal"); - if ( ((srcSize > 0) || (endFrame && !mtctx->frameEnded)) - && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) { - DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job"); + if ( mtctx->jobReady /* one job ready for a worker to pick up */ + || (srcSize > 0) /* still some data within input buffer */ + || ((endFrame==ZSTD_e_end) && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */ + DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)", + (U32)srcSize, (U32)endFrame); CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) ); } /* check if there is any data available to flush */ - return ZSTDMT_flushNextJob(mtctx, output, 1 /* blockToFlush */); + return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */, endFrame); } @@ -1238,7 +1329,7 @@ size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) DEBUGLOG(5, "ZSTDMT_flushStream"); if (mtctx->singleBlockingThread) return ZSTD_flushStream(mtctx->cctxPool->cctx[0], output); - return ZSTDMT_flushStream_internal(mtctx, output, 0 /* endFrame */); + return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_flush); } size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) @@ -1246,5 +1337,5 @@ size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) DEBUGLOG(4, "ZSTDMT_endStream"); if (mtctx->singleBlockingThread) return ZSTD_endStream(mtctx->cctxPool->cctx[0], output); - return ZSTDMT_flushStream_internal(mtctx, output, 1 /* endFrame */); + return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_end); } diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 7c1e7e27b..34dfc488f 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -97,11 +97,12 @@ ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter /*! ZSTDMT_compressStream_generic() : - * Combines ZSTDMT_compressStream() with ZSTDMT_flushStream() or ZSTDMT_endStream() + * Combines ZSTDMT_compressStream() with optional ZSTDMT_flushStream() or ZSTDMT_endStream() * depending on flush directive. * @return : minimum amount of data still to be flushed * 0 if fully flushed - * or an error code */ + * or an error code + * note : needs to be init using any ZSTD_initCStream*() variant */ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, diff --git a/lib/zstd.h b/lib/zstd.h index 5b0ce7e87..29c3ced18 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -1122,7 +1122,7 @@ ZSTDLIB_API size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* pre typedef enum { - ZSTD_e_continue=0, /* collect more data, encoder transparently decides when to output result, for optimal conditions */ + ZSTD_e_continue=0, /* collect more data, encoder decides when to output compressed result, for optimal conditions */ ZSTD_e_flush, /* flush any data provided so far - frame will continue, future data can still reference previous data for better compression */ ZSTD_e_end /* flush any remaining data and close current frame. Any additional data starts a new frame. */ } ZSTD_EndDirective; @@ -1138,10 +1138,11 @@ typedef enum { * and then immediately returns, just indicating that there is some data remaining to be flushed. * The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte. * - Exception : in multi-threading mode, if the first call requests a ZSTD_e_end directive, it is blocking : it will complete compression before giving back control to caller. - * - @return provides the minimum amount of data remaining to be flushed from internal buffers + * - @return provides a minimum amount of data remaining to be flushed from internal buffers * or an error code, which can be tested using ZSTD_isError(). * if @return != 0, flush is not fully completed, there is still some data left within internal buffers. - * This is useful to determine if a ZSTD_e_flush or ZSTD_e_end directive is completed. + * This is useful for ZSTD_e_flush, since in this case more flushes are necessary to empty all buffers. + * For ZSTD_e_end, @return == 0 when internal buffers are fully flushed and frame is completed. * - after a ZSTD_e_end directive, if internal buffer is not fully flushed (@return != 0), * only ZSTD_e_end or ZSTD_e_flush operations are allowed. * Before starting a new compression job, or changing compression parameters, diff --git a/programs/fileio.c b/programs/fileio.c index 7045a5323..6039a75a2 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -792,6 +792,7 @@ static int FIO_compressFilename_internal(cRess_t ress, /* Fill input Buffer */ size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile); ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; + DISPLAYLEVEL(6, "fread %u bytes from source \n", (U32)inSize); readsize += inSize; if (inSize == 0 || (fileSize != UTIL_FILESIZE_UNKNOWN && readsize == fileSize)) @@ -803,32 +804,23 @@ static int FIO_compressFilename_internal(cRess_t ress, CHECK_V(result, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive)); /* Write compressed stream */ - DISPLAYLEVEL(6, "ZSTD_compress_generic,ZSTD_e_continue: generated %u bytes \n", - (U32)outBuff.pos); + DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => intput pos(%u)<=(%u)size ; output generated %u bytes \n", + (U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos); if (outBuff.pos) { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); compressedfilesize += outBuff.pos; } + if (READY_FOR_UPDATE()) { + ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); + DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%", + (U32)(zfp.ingested >> 20), + (U32)(zfp.consumed >> 20), + (U32)(zfp.produced >> 20), + (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100 ); + } } -#if 1 - if (READY_FOR_UPDATE()) { - ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); - DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%", - (U32)(zfp.ingested >> 20), - (U32)(zfp.consumed >> 20), - (U32)(zfp.produced >> 20), - (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100 ); - } -#else - if (fileSize == UTIL_FILESIZE_UNKNOWN) { - DISPLAYUPDATE(2, "\rRead : %u MB", (U32)(readsize>>20)); - } else { - DISPLAYUPDATE(2, "\rRead : %u / %u MB", - (U32)(readsize>>20), (U32)(fileSize>>20)); - } -#endif } while (directive != ZSTD_e_end); finish: diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 5cd1ea0fb..ecc477c71 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -99,7 +99,7 @@ unsigned int FUZ_rand(unsigned int* seedPtr) if (cond) { \ DISPLAY("Error => "); \ DISPLAY(__VA_ARGS__); \ - DISPLAY(" (seed %u, test nb %u, line %u) \n", \ + DISPLAY(" (seed %u, test nb %u, line %u) \n", \ seed, testNb, __LINE__); \ goto _output_error; \ } } @@ -219,6 +219,7 @@ static int basicUnitTests(U32 seed, double compressibility) size_t cSize; int testResult = 0; U32 testNb = 1; + U32 coreSeed = 0; /* this name to conform with CHECK_Z macro display */ ZSTD_CStream* zc = ZSTD_createCStream(); ZSTD_DStream* zd = ZSTD_createDStream(); ZSTDMT_CCtx* mtctx = ZSTDMT_createCCtx(2); @@ -372,7 +373,7 @@ static int basicUnitTests(U32 seed, double compressibility) DISPLAYLEVEL(3, "OK (%u bytes) \n", (U32)s); } - /* Byte-by-byte decompression test */ + /* Decompression by small increment */ DISPLAYLEVEL(3, "test%3i : decompress byte-by-byte : ", testNb++); { /* skippable frame */ size_t r = 1; @@ -382,8 +383,10 @@ static int basicUnitTests(U32 seed, double compressibility) inBuff.pos = 0; outBuff.pos = 0; while (r) { /* skippable frame */ - inBuff.size = inBuff.pos + 1; - outBuff.size = outBuff.pos + 1; + size_t const inSize = FUZ_rand(&coreSeed) & 15; + size_t const outSize = FUZ_rand(&coreSeed) & 15; + inBuff.size = inBuff.pos + inSize; + outBuff.size = outBuff.pos + outSize; r = ZSTD_decompressStream(zd, &outBuff, &inBuff); if (ZSTD_isError(r)) goto _output_error; } @@ -391,8 +394,10 @@ static int basicUnitTests(U32 seed, double compressibility) ZSTD_initDStream_usingDict(zd, CNBuffer, dictSize); r=1; while (r) { - inBuff.size = inBuff.pos + 1; - outBuff.size = outBuff.pos + 1; + size_t const inSize = FUZ_rand(&coreSeed) & 15; + size_t const outSize = FUZ_rand(&coreSeed) & 15; + inBuff.size = inBuff.pos + inSize; + outBuff.size = outBuff.pos + outSize; r = ZSTD_decompressStream(zd, &outBuff, &inBuff); if (ZSTD_isError(r)) goto _output_error; } @@ -694,10 +699,13 @@ static int basicUnitTests(U32 seed, double compressibility) inBuff.src = CNBuffer; inBuff.size = CNBufferSize; inBuff.pos = 0; - CHECK_Z( ZSTDMT_compressStream_generic(mtctx, &outBuff, &inBuff, ZSTD_e_end) ); + { size_t const compressResult = ZSTDMT_compressStream_generic(mtctx, &outBuff, &inBuff, ZSTD_e_end); + if (compressResult != 0) goto _output_error; /* compression must be completed in a single round */ + } if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */ - { size_t const r = ZSTDMT_endStream(mtctx, &outBuff); - if (r != 0) goto _output_error; } /* error, or some data not flushed */ + { size_t const compressedSize = ZSTD_findFrameCompressedSize(compressedBuffer, outBuff.pos); + if (compressedSize != outBuff.pos) goto _output_error; /* must be a full valid frame */ + } DISPLAYLEVEL(3, "OK \n"); /* Complex multithreading + dictionary test */ @@ -863,11 +871,21 @@ static size_t findDiff(const void* buf1, const void* buf2, size_t max) for (u=0; u No difference detected within %u bytes \n", (U32)max); + return u; + } DISPLAY("Error at position %u / %u \n", (U32)u, (U32)max); - DISPLAY(" %02X %02X %02X :%02X: %02X %02X %02X %02X %02X \n", - b1[u-3], b1[u-2], b1[u-1], b1[u-0], b1[u+1], b1[u+2], b1[u+3], b1[u+4], b1[u+5]); - DISPLAY(" %02X %02X %02X :%02X: %02X %02X %02X %02X %02X \n", - b2[u-3], b2[u-2], b2[u-1], b2[u-0], b2[u+1], b2[u+2], b2[u+3], b2[u+4], b2[u+5]); + if (u>=3) + DISPLAY(" %02X %02X %02X ", + b1[u-3], b1[u-2], b1[u-1]); + DISPLAY(" :%02X: %02X %02X %02X %02X %02X \n", + b1[u], b1[u+1], b1[u+2], b1[u+3], b1[u+4], b1[u+5]); + if (u>=3) + DISPLAY(" %02X %02X %02X ", + b2[u-3], b2[u-2], b2[u-1]); + DISPLAY(" :%02X: %02X %02X %02X %02X %02X \n", + b2[u], b2[u+1], b2[u+2], b2[u+3], b2[u+4], b2[u+5]); return u; } @@ -948,10 +966,13 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres size_t maxTestSize; /* init */ - if (nbTests >= testNb) { DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); } - else { DISPLAYUPDATE(2, "\r%6u ", testNb); } FUZ_rand(&coreSeed); lseed = coreSeed ^ prime32; + if (nbTests >= testNb) { + DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); + } else { + DISPLAYUPDATE(2, "\r%6u ", testNb); + } /* states full reset (deliberately not synchronized) */ /* some issues can only happen when reusing states */ @@ -1161,7 +1182,6 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp UTIL_time_t const startClock = UTIL_getTime(); const BYTE* dict=NULL; /* can keep same dict on 2 consecutive tests */ size_t dictSize = 0; - U32 oldTestLog = 0; int const cLevelMax = bigTests ? (U32)ZSTD_maxCLevel()-1 : g_cLevelMax_smallTests; U32 const nbThreadsMax = bigTests ? 4 : 2; @@ -1183,6 +1203,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp RDG_genBuffer(cNoiseBuffer[4], srcBufferSize, 1.00, 0., coreSeed); /* sparse content */ memset(copyBuffer, 0x65, copyBufferSize); /* make copyBuffer considered initialized */ ZSTD_initDStream_usingDict(zd, NULL, 0); /* ensure at least one init */ + DISPLAYLEVEL(6, "Creating initial context with %u threads \n", nbThreads); /* catch up testNb */ for (testNb=1; testNb < startTest; testNb++) @@ -1195,14 +1216,14 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp size_t totalTestSize, totalGenSize, cSize; XXH64_state_t xxhState; U64 crcOrig; - U32 resetAllowed = 1; size_t maxTestSize; - /* init */ - if (testNb < nbTests) { - DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); - } else { DISPLAYUPDATE(2, "\r%6u ", testNb); } FUZ_rand(&coreSeed); + if (nbTests >= testNb) { + DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); + } else { + DISPLAYUPDATE(2, "\r%6u ", testNb); + } lseed = coreSeed ^ prime32; /* states full reset (deliberately not synchronized) */ @@ -1213,7 +1234,6 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp ZSTDMT_freeCCtx(zc); zc = ZSTDMT_createCCtx(nbThreads); CHECK(zc==NULL, "ZSTDMT_createCCtx allocation error") - resetAllowed=0; } if ((FUZ_rand(&lseed) & 0xFF) == 132) { ZSTD_freeDStream(zd); @@ -1238,16 +1258,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp } /* compression init */ - if ((FUZ_rand(&lseed)&1) /* at beginning, to keep same nb of rand */ - && oldTestLog /* at least one test happened */ && resetAllowed) { - maxTestSize = FUZ_randomLength(&lseed, oldTestLog+2); - if (maxTestSize >= srcBufferSize) maxTestSize = srcBufferSize-1; - { int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1; - DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel); - CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) ); - } - } else { - U32 const testLog = FUZ_rand(&lseed) % maxSrcLog; + { U32 const testLog = FUZ_rand(&lseed) % maxSrcLog; U32 const dictLog = FUZ_rand(&lseed) % maxSrcLog; int const cLevelCandidate = ( FUZ_rand(&lseed) % (ZSTD_maxCLevel() - (MAX(testLog, dictLog) / 2)) ) @@ -1256,24 +1267,29 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp int const cLevelMin = MAX(cLevelThreadAdjusted, 1); /* no negative cLevel yet */ int const cLevel = MIN(cLevelMin, cLevelMax); maxTestSize = FUZ_rLogLength(&lseed, testLog); - oldTestLog = testLog; - /* random dictionary selection */ - dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0; - { size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize); - dict = srcBuffer + dictStart; - } - { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize; - ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize); - DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n", - params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize); - params.fParams.checksumFlag = FUZ_rand(&lseed) & 1; - params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1; - params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1; - DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag); - CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) ); - CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custome job size */ - CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) ); - } } + + if (FUZ_rand(&lseed)&1) { /* simple init */ + int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1; + DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel); + CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) ); + } else { /* advanced init */ + /* random dictionary selection */ + dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0; + { size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize); + dict = srcBuffer + dictStart; + } + { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize; + ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize); + DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n", + params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize); + params.fParams.checksumFlag = FUZ_rand(&lseed) & 1; + params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1; + params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1; + DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag); + CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) ); + CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custome job size */ + CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) ); + } } } /* multi-segments compression test */ XXH64_reset(&xxhState, 0); @@ -1326,10 +1342,13 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp } } crcOrig = XXH64_digest(&xxhState); cSize = outBuff.pos; - DISPLAYLEVEL(5, "Frame completed : %u bytes \n", (U32)cSize); + DISPLAYLEVEL(5, "Frame completed : %u bytes compressed into %u bytes \n", + (U32)totalTestSize, (U32)cSize); } /* multi - fragments decompression test */ + assert(totalTestSize < dstBufferSize); + memset(dstBuffer, 170, totalTestSize); /* init dest area */ if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) { CHECK_Z( ZSTD_resetDStream(zd) ); } else { @@ -1344,10 +1363,16 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize); inBuff.size = inBuff.pos + readCSrcSize; outBuff.size = outBuff.pos + dstBuffSize; - DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize); + DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes into outBuff %u bytes \n", + (U32)readCSrcSize, (U32)dstBuffSize); decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff); + if (ZSTD_isError(decompressionResult)) { + DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult)); + findDiff(copyBuffer, dstBuffer, totalTestSize); + } CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult)); - DISPLAYLEVEL(6, "inBuff.pos = %u \n", (U32)readCSrcSize); + DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u and produced (outBuff.pos) = %u \n", + (U32)inBuff.pos, (U32)outBuff.pos); } CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize); CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize); @@ -1586,7 +1611,11 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double } /* mess with frame parameters */ - if (FUZ_rand(&lseed) & 1) CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_checksumFlag, FUZ_rand(&lseed) & 1, useOpaqueAPI) ); + if (FUZ_rand(&lseed) & 1) { + U32 const checksumFlag = FUZ_rand(&lseed) & 1; + DISPLAYLEVEL(5, "t%u: frame checksum : %u \n", testNb, checksumFlag); + CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_checksumFlag, checksumFlag, useOpaqueAPI) ); + } if (FUZ_rand(&lseed) & 1) CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_dictIDFlag, FUZ_rand(&lseed) & 1, useOpaqueAPI) ); if (FUZ_rand(&lseed) & 1) CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_contentSizeFlag, FUZ_rand(&lseed) & 1, useOpaqueAPI) ); if (FUZ_rand(&lseed) & 1) { @@ -1651,8 +1680,8 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double outBuff.size = outBuff.pos + dstBuffSize; CHECK_Z( ZSTD_compress_generic(zc, &outBuff, &inBuff, flush) ); - DISPLAYLEVEL(6, "t%u: compress consumed %u bytes (total : %u) \n", - testNb, (U32)inBuff.pos, (U32)(totalTestSize + inBuff.pos)); + DISPLAYLEVEL(6, "t%u: compress consumed %u bytes (total : %u) ; flush: %u (total : %u) \n", + testNb, (U32)inBuff.pos, (U32)(totalTestSize + inBuff.pos), (U32)flush, (U32)outBuff.pos); XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos); memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos); @@ -1660,14 +1689,15 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double } /* final frame epilogue */ - { size_t remainingToFlush = (size_t)(-1); + { size_t remainingToFlush = 1; while (remainingToFlush) { ZSTD_inBuffer inBuff = { NULL, 0, 0 }; size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1); size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize); outBuff.size = outBuff.pos + adjustedDstSize; - DISPLAYLEVEL(6, "End-flush into dst buffer of size %u \n", (U32)adjustedDstSize); + DISPLAYLEVEL(6, "t%u: End-flush into dst buffer of size %u \n", testNb, (U32)adjustedDstSize); remainingToFlush = ZSTD_compress_generic(zc, &outBuff, &inBuff, ZSTD_e_end); + DISPLAYLEVEL(6, "t%u: Total flushed so far : %u bytes \n", testNb, (U32)outBuff.pos); CHECK( ZSTD_isError(remainingToFlush), "ZSTD_compress_generic w/ ZSTD_e_end error : %s", ZSTD_getErrorName(remainingToFlush) ); @@ -1694,14 +1724,20 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize); inBuff.size = inBuff.pos + readCSrcSize; outBuff.size = outBuff.pos + dstBuffSize; - DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes (pos:%u/%u)\n", - (U32)readCSrcSize, (U32)inBuff.pos, (U32)cSize); + DISPLAYLEVEL(6, "decompression presented %u new bytes (pos:%u/%u)\n", + (U32)readCSrcSize, (U32)inBuff.pos, (U32)cSize); decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff); + DISPLAYLEVEL(6, "so far: consumed = %u, produced = %u \n", + (U32)inBuff.pos, (U32)outBuff.pos); + if (ZSTD_isError(decompressionResult)) { + DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult)); + findDiff(copyBuffer, dstBuffer, totalTestSize); + } CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult)); - DISPLAYLEVEL(6, "inBuff.pos = %u \n", (U32)readCSrcSize); + CHECK (inBuff.pos > cSize, "ZSTD_decompressStream consumes too much input : %u > %u ", (U32)inBuff.pos, (U32)cSize); } - CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize); CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize); + CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize); { U64 const crcDest = XXH64(dstBuffer, totalTestSize, 0); if (crcDest!=crcOrig) findDiff(copyBuffer, dstBuffer, totalTestSize); CHECK (crcDest!=crcOrig, "decompressed data corrupted");