From 1b5d80d633827bdb4d934b54833d5bfbebc000bf Mon Sep 17 00:00:00 2001
From: Yann Collet
typedef struct { + unsigned long long ingested; + unsigned long long consumed; + unsigned long long produced; +} ZSTD_frameProgression; +
typedef enum { DStream_p_maxWindowSize } ZSTD_DStreamParameter_e; size_t ZSTD_setDStreamParameter(ZSTD_DStream* zds, ZSTD_DStreamParameter_e paramType, unsigned paramValue);/* obsolete : this API will be removed in a future version */ size_t ZSTD_initDStream_usingDict(ZSTD_DStream* zds, const void* dict, size_t dictSize); /**< note: no dictionary will be used if dict == NULL or dictSize < 8 */ diff --git a/lib/common/pool.c b/lib/common/pool.c index 98b109e72..773488b07 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -12,6 +12,7 @@ /* ====== Dependencies ======= */ #include
typedef enum { - ZSTD_e_continue=0, /* collect more data, encoder transparently decides when to output result, for optimal conditions */ + ZSTD_e_continue=0, /* collect more data, encoder decides when to output compressed result, for optimal conditions */ ZSTD_e_flush, /* flush any data provided so far - frame will continue, future data can still reference previous data for better compression */ ZSTD_e_end /* flush any remaining data and close current frame. Any additional data starts a new frame. */ } ZSTD_EndDirective; @@ -945,10 +945,11 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t and then immediately returns, just indicating that there is some data remaining to be flushed. The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte. - Exception : in multi-threading mode, if the first call requests a ZSTD_e_end directive, it is blocking : it will complete compression before giving back control to caller. - - @return provides the minimum amount of data remaining to be flushed from internal buffers + - @return provides a minimum amount of data remaining to be flushed from internal buffers or an error code, which can be tested using ZSTD_isError(). if @return != 0, flush is not fully completed, there is still some data left within internal buffers. - This is useful to determine if a ZSTD_e_flush or ZSTD_e_end directive is completed. + This is useful for ZSTD_e_flush, since in this case more flushes are necessary to empty all buffers. + For ZSTD_e_end, @return == 0 when internal buffers are fully flushed and frame is completed. - after a ZSTD_e_end directive, if internal buffer is not fully flushed (@return != 0), only ZSTD_e_end or ZSTD_e_flush operations are allowed. Before starting a new compression job, or changing compression parameters, diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 451bb5bf6..4e53367fd 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -300,28 +300,28 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) /* ------------------------------------------ */ -/* ===== Thread worker ===== */ +/* ===== Worker thread ===== */ /* ------------------------------------------ */ typedef struct { - buffer_t src; - const void* srcStart; - size_t prefixSize; - size_t srcSize; - size_t consumed; - buffer_t dstBuff; - size_t cSize; - size_t dstFlushed; - unsigned firstChunk; - unsigned lastChunk; - unsigned frameChecksumNeeded; - ZSTD_pthread_mutex_t* mtctx_mutex; - ZSTD_pthread_cond_t* mtctx_cond; - ZSTD_CCtx_params params; - const ZSTD_CDict* cdict; - ZSTDMT_CCtxPool* cctxPool; - ZSTDMT_bufferPool* bufPool; - unsigned long long fullFrameSize; + size_t consumed; /* SHARED - init0 by mtctx, then modified by worker AND read by mtctx */ + size_t cSize; /* SHARED - init0 by mtctx, then modified by worker AND read by mtctx */ + ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) worker */ + ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) worker */ + ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) worker */ + ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) worker */ + buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */ + buffer_t src; /* set by mtctx, then modified by worker => no barrier */ + const void* srcStart; /* set by mtctx, then read by worker => no barrier */ + size_t prefixSize; /* set by mtctx, then read by worker => no barrier */ + size_t srcSize; /* set by mtctx, then read by worker => no barrier */ + unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */ + unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */ + ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */ + const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */ + unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */ + size_t dstFlushed; /* used only by mtctx */ + unsigned frameChecksumNeeded; /* used only by mtctx */ } ZSTDMT_jobDescription; /* ZSTDMT_compressChunk() is a POOL_function type */ diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index ef5cf65fb..16dcba73a 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -1220,9 +1220,9 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp FUZ_rand(&coreSeed); if (nbTests >= testNb) { - DISPLAYUPDATE(2, "\r%6u/%6u (%08X) ", testNb, nbTests, coreSeed); + DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); } else { - DISPLAYUPDATE(2, "\r%6u (%08X) ", testNb, coreSeed); + DISPLAYUPDATE(2, "\r%6u ", testNb); } lseed = coreSeed ^ prime32; From fca13c6855d28d6ba8628794465372cb6088442d Mon Sep 17 00:00:00 2001 From: Yann ColletDate: Fri, 26 Jan 2018 10:44:09 -0800 Subject: [PATCH 19/30] zstdmt : fixed memory leak writeLastEmptyBlock() must release srcBuffer as mtctx assumes it's done by job worker. minor : changed 2 job member names (src->srcBuffer, srcStart->prefixStart) for clarity --- lib/compress/zstdmt_compress.c | 73 ++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 4e53367fd..9fea4969f 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -304,19 +304,19 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) /* ------------------------------------------ */ typedef struct { - size_t consumed; /* SHARED - init0 by mtctx, then modified by worker AND read by mtctx */ - size_t cSize; /* SHARED - init0 by mtctx, then modified by worker AND read by mtctx */ + size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ + size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) worker */ ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) worker */ ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) worker */ ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) worker */ - buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */ - buffer_t src; /* set by mtctx, then modified by worker => no barrier */ - const void* srcStart; /* set by mtctx, then read by worker => no barrier */ - size_t prefixSize; /* set by mtctx, then read by worker => no barrier */ - size_t srcSize; /* set by mtctx, then read by worker => no barrier */ - unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */ - unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */ + buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */ + buffer_t srcBuff; /* set by mtctx, then released by worker => no barrier */ + const void* prefixStart; /* set by mtctx, then read by worker => no barrier */ + size_t prefixSize; /* set by mtctx, then read by worker => no barrier */ + size_t srcSize; /* set by mtctx, then read by worker => no barrier */ + unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */ + unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */ ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */ const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */ unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */ @@ -329,7 +329,7 @@ void ZSTDMT_compressChunk(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); - const void* const src = (const char*)job->srcStart + job->prefixSize; + const void* const src = (const char*)job->prefixStart + job->prefixSize; buffer_t dstBuff = job->dstBuff; /* ressources */ @@ -362,7 +362,7 @@ void ZSTDMT_compressChunk(void* jobDescription) goto _endJob; } } { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, - job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ + job->prefixStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ NULL, /*cdict*/ jobParams, pledgedSrcSize); if (ZSTD_isError(initError)) { @@ -419,8 +419,8 @@ void ZSTDMT_compressChunk(void* jobDescription) _endJob: /* release resources */ ZSTDMT_releaseCCtx(job->cctxPool, cctx); - ZSTDMT_releaseBuffer(job->bufPool, job->src); - job->src = g_nullBuffer; job->srcStart = NULL; + ZSTDMT_releaseBuffer(job->bufPool, job->srcBuff); + job->srcBuff = g_nullBuffer; job->prefixStart = NULL; /* report */ ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); job->consumed = job->srcSize; @@ -557,9 +557,9 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); mtctx->jobs[jobID].dstBuff = g_nullBuffer; - DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].src.start); - ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].src); - mtctx->jobs[jobID].src = g_nullBuffer; + DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].srcBuff.start); + ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].srcBuff); + mtctx->jobs[jobID].srcBuff = g_nullBuffer; } memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription)); DEBUGLOG(4, "input: release address %08X", (U32)(size_t)mtctx->inBuff.buffer.start); @@ -757,8 +757,8 @@ static size_t ZSTDMT_compress_advanced_internal( buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer; size_t dictSize = u ? overlapSize : 0; - mtctx->jobs[u].src = g_nullBuffer; - mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; + mtctx->jobs[u].srcBuff = g_nullBuffer; + mtctx->jobs[u].prefixStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].prefixSize = dictSize; mtctx->jobs[u].srcSize = chunkSize; assert(chunkSize > 0); /* avoid job.srcSize == 0 */ mtctx->jobs[u].consumed = 0; @@ -781,7 +781,7 @@ static size_t ZSTDMT_compress_advanced_internal( } DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)chunkSize); - DEBUG_PRINTHEX(6, mtctx->jobs[u].srcStart, 12); + DEBUG_PRINTHEX(6, mtctx->jobs[u].prefixStart, 12); POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); frameStartPos += chunkSize; @@ -802,7 +802,7 @@ static size_t ZSTDMT_compress_advanced_internal( ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); DEBUGLOG(5, "ready to write chunk %u ", chunkID); - mtctx->jobs[chunkID].srcStart = NULL; + mtctx->jobs[chunkID].prefixStart = NULL; { size_t const cSize = mtctx->jobs[chunkID].cSize; if (ZSTD_isError(cSize)) error = cSize; if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); @@ -999,18 +999,21 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) { */ static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) { - assert(job->srcSize == 0); assert(job->lastChunk == 1); - assert(job->firstChunk == 0); /* first chunk needs to create frame header too */ - assert(job->dstBuff.start == NULL); /* invoked from streaming variant only */ - { buffer_t const dstBuff = ZSTDMT_getBuffer(job->bufPool); - if (dstBuff.start==NULL) return ERROR(memory_allocation); - job->dstBuff = dstBuff; /* will be released by ZSTDMT_flushProduced() */ - assert(dstBuff.size >= ZSTD_blockHeaderSize); - job->cSize = ZSTD_writeLastEmptyBlock(dstBuff.start, dstBuff.size); - assert(!ZSTD_isError(job->cSize)); - assert(job->consumed == 0); - } + assert(job->srcSize == 0); /* last chunk is empty -> will be simplified into a last empty block */ + assert(job->firstChunk == 0); /* cannot be first chunk, as it also needs to create frame header */ + /* A job created by streaming variant starts with a src buffer, but no dst buffer. + * It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx. + * When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx. + * This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */ + assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */ + assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */ + assert(job->srcBuff.size >= ZSTD_blockHeaderSize); + job->dstBuff = job->srcBuff; + job->srcBuff = g_nullBuffer; + job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.size); + assert(!ZSTD_isError(job->cSize)); + assert(job->consumed == 0); return 0; } @@ -1028,12 +1031,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS if (!mtctx->jobReady) { DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize); - mtctx->jobs[jobID].src = mtctx->inBuff.buffer; - mtctx->jobs[jobID].srcStart = mtctx->inBuff.buffer.start; + mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer; + mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start; + mtctx->jobs[jobID].prefixSize = mtctx->prefixSize; mtctx->jobs[jobID].srcSize = srcSize; mtctx->jobs[jobID].consumed = 0; mtctx->jobs[jobID].cSize = 0; - mtctx->jobs[jobID].prefixSize = mtctx->prefixSize; assert(mtctx->inBuff.filled >= srcSize + mtctx->prefixSize); mtctx->jobs[jobID].params = mtctx->params; /* do not calculate checksum within sections, but write it in header for first section */ @@ -1066,7 +1069,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS } mtctx->inBuff.filled -= srcSize + mtctx->prefixSize - newPrefixSize; memmove(mtctx->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */ - (const char*)mtctx->jobs[jobID].srcStart + mtctx->prefixSize + srcSize - newPrefixSize, + (const char*)mtctx->jobs[jobID].prefixStart + mtctx->prefixSize + srcSize - newPrefixSize, mtctx->inBuff.filled); mtctx->prefixSize = newPrefixSize; } else { /* endFrame==1 => no need for another input buffer */ From d2b62b6fa5a752e8312a6c8b9fbedcf53bfa60ff Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 26 Jan 2018 11:06:34 -0800 Subject: [PATCH 20/30] minor : ZSTDMT_writeLastEmptyBlock() is a void function because it cannot fail --- lib/compress/zstdmt_compress.c | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 9fea4969f..7b37c5b33 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -142,6 +142,10 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) return poolSize + totalBufferSize; } +/* ZSTDMT_setBufferSize() : + * all future buffers provided by this buffer pool will have _at least_ this size + * note : it's better for all buffers to have same size, + * as they become freely interchangeable, reducing malloc/free usages and memory fragmentation */ static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const bSize) { ZSTD_pthread_mutex_lock(&bufPool->poolMutex); @@ -992,12 +996,11 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) { /* ZSTDMT_writeLastEmptyBlock() - * Write a single empty block with an end-of-frame - * to finish a frame. - * Completed synchronously. - * @return : 0, or an error code (can fail due to memory allocation) + * Write a single empty block with an end-of-frame to finish a frame. + * Job must be created from streaming variant. + * This function is always successfull if expected conditions are fulfilled. */ -static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) +static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) { assert(job->lastChunk == 1); assert(job->srcSize == 0); /* last chunk is empty -> will be simplified into a last empty block */ @@ -1006,15 +1009,14 @@ static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) * It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx. * When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx. * This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */ - assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */ - assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */ - assert(job->srcBuff.size >= ZSTD_blockHeaderSize); + assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */ + assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */ + assert(job->srcBuff.size >= ZSTD_blockHeaderSize); /* no buffer should ever be that small */ job->dstBuff = job->srcBuff; job->srcBuff = g_nullBuffer; job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.size); assert(!ZSTD_isError(job->cSize)); assert(job->consumed == 0); - return 0; } static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp) @@ -1031,6 +1033,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS if (!mtctx->jobReady) { DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize); + assert(mtctx->jobs[jobID].srcBuff.start == NULL); /* no buffer left : supposed already released */ mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer; mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start; mtctx->jobs[jobID].prefixSize = mtctx->prefixSize; @@ -1085,7 +1088,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS if ( (srcSize == 0) && (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) { assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */ - CHECK_F( ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID) ); + ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID); mtctx->nextJobID++; return 0; } @@ -1162,10 +1165,11 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u output->pos += toWrite; job.dstFlushed += toWrite; - if ( (job.consumed == job.srcSize) - && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */ + if ( (job.consumed == job.srcSize) /* job completed */ + && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => free this job position */ DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", mtctx->doneJobID, (U32)job.dstFlushed); + assert(job.srcBuff.start == NULL); /* srcBuff supposed already released */ ZSTDMT_releaseBuffer(mtctx->bufPool, job.dstBuff); mtctx->jobs[wJobID].dstBuff = g_nullBuffer; mtctx->consumed += job.srcSize; From 79b6e28b0a168bb076dbaf653b1a7d71bbd1a201 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 26 Jan 2018 12:15:43 -0800 Subject: [PATCH 21/30] zstdmt : flush() only lock to read shared job members Other job members are accessed directly. This avoids a full job copy, which would access everything, including a few members that are supposed to be used by worker only, uselessly requiring additional locks to avoid race conditions. --- lib/compress/zstdmt_compress.c | 115 +++++++++++++++++---------------- 1 file changed, 58 insertions(+), 57 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 7b37c5b33..c7df32d37 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -309,16 +309,16 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) typedef struct { size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ - size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ - ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) worker */ - ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) worker */ - ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) worker */ - ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) worker */ - buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */ + size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */ + ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) workers */ + ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) workers */ + ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ + ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ + buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */ buffer_t srcBuff; /* set by mtctx, then released by worker => no barrier */ - const void* prefixStart; /* set by mtctx, then read by worker => no barrier */ + const void* prefixStart; /* set by mtctx, then read and set0 by worker => no barrier */ size_t prefixSize; /* set by mtctx, then read by worker => no barrier */ - size_t srcSize; /* set by mtctx, then read by worker => no barrier */ + size_t srcSize; /* set by mtctx, then read by worker & mtctx => no barrier */ unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */ unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */ ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */ @@ -341,15 +341,13 @@ void ZSTDMT_compressChunk(void* jobDescription) job->cSize = ERROR(memory_allocation); goto _endJob; } - if (dstBuff.start == NULL) { + if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */ dstBuff = ZSTDMT_getBuffer(job->bufPool); if (dstBuff.start==NULL) { job->cSize = ERROR(memory_allocation); goto _endJob; } - ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */ - ZSTD_pthread_mutex_unlock(job->mtctx_mutex); } /* init */ @@ -1087,6 +1085,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS if ( (srcSize == 0) && (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) { + DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame"); assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */ ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID); mtctx->nextJobID++; @@ -1094,12 +1093,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS } } - DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", + DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u, jobNb == %u (mod:%u))", mtctx->nextJobID, (U32)mtctx->jobs[jobID].srcSize, mtctx->jobs[jobID].lastChunk, - mtctx->doneJobID, - mtctx->doneJobID & mtctx->jobIDMask); + mtctx->nextJobID, + jobID); if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[jobID])) { mtctx->nextJobID++; mtctx->jobReady = 0; @@ -1118,15 +1117,17 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end) { unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask; - DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush); + DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u , job %u <= %u)", + blockToFlush, mtctx->doneJobID, mtctx->nextJobID); assert(output->size >= output->pos); ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); - if (blockToFlush && (mtctx->doneJobID < mtctx->nextJobID)) { + if ( blockToFlush + && (mtctx->doneJobID < mtctx->nextJobID) ) { assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize); - while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) { + while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) { /* nothing to flush */ if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].srcSize) { - DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond", + DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond, there will be none", mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].srcSize); break; } @@ -1135,60 +1136,60 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); /* block when nothing available to flush but more to come */ } } - /* some output is available to be flushed */ - { ZSTDMT_jobDescription job = mtctx->jobs[wJobID]; + /* try to flush something */ + { size_t cSize = mtctx->jobs[wJobID].cSize; + size_t const srcConsumed = mtctx->jobs[wJobID].consumed; ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); - if (ZSTD_isError(job.cSize)) { + if (ZSTD_isError(cSize)) { DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", - mtctx->doneJobID, ZSTD_getErrorName(job.cSize)); + mtctx->doneJobID, ZSTD_getErrorName(cSize)); ZSTDMT_waitForAllJobsCompleted(mtctx); ZSTDMT_releaseAllJobResources(mtctx); - return job.cSize; + return cSize; } /* add frame checksum if necessary (can only happen once) */ - assert(job.consumed <= job.srcSize); - if ( (job.consumed == job.srcSize) - && job.frameChecksumNeeded ) { + assert(srcConsumed <= mtctx->jobs[wJobID].srcSize); + if ( (srcConsumed == mtctx->jobs[wJobID].srcSize) /* job completed -> worker no longer active */ + && mtctx->jobs[wJobID].frameChecksumNeeded ) { U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState); DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum); - MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); - job.cSize += 4; - mtctx->jobs[wJobID].cSize += 4; + MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum); + cSize += 4; + mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */ mtctx->jobs[wJobID].frameChecksumNeeded = 0; } - assert(job.cSize >= job.dstFlushed); - if (job.dstBuff.start != NULL) { /* dst buffer present : some work is ongoing or completed */ - size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); - DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)", - (U32)toWrite, mtctx->doneJobID, (double)job.consumed / (job.srcSize + !job.srcSize /*avoid div0*/) * 100); - memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); + if (cSize > 0) { /* compression is ongoing or completed */ + size_t const toWrite = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); + DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u)", + (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize); + assert(cSize >= mtctx->jobs[wJobID].dstFlushed); + memcpy((char*)output->dst + output->pos, (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, toWrite); output->pos += toWrite; - job.dstFlushed += toWrite; + mtctx->jobs[wJobID].dstFlushed += toWrite; /* can write : this value is only used by mtctx */ - if ( (job.consumed == job.srcSize) /* job completed */ - && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => free this job position */ + if ( (srcConsumed == mtctx->jobs[wJobID].srcSize) /* job completed */ + && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", - mtctx->doneJobID, (U32)job.dstFlushed); - assert(job.srcBuff.start == NULL); /* srcBuff supposed already released */ - ZSTDMT_releaseBuffer(mtctx->bufPool, job.dstBuff); + mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); + assert(mtctx->jobs[wJobID].srcBuff.start == NULL); /* srcBuff supposed already released */ + ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); mtctx->jobs[wJobID].dstBuff = g_nullBuffer; - mtctx->consumed += job.srcSize; - mtctx->produced += job.cSize; + mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ + mtctx->consumed += mtctx->jobs[wJobID].srcSize; + mtctx->produced += cSize; mtctx->doneJobID++; - } else { - mtctx->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */ } } /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */ - if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); - if (job.srcSize > job.consumed) return 1; /* current job not completely compressed */ + if (cSize > mtctx->jobs[wJobID].dstFlushed) return (cSize - mtctx->jobs[wJobID].dstFlushed); + if (mtctx->jobs[wJobID].srcSize > srcConsumed) return 1; /* current job not completely compressed */ } - if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs to flush */ - if (mtctx->jobReady) return 1; /* one job is ready and queued! */ - if (mtctx->inBuff.filled > 0) return 1; /* input not empty */ - mtctx->allJobsCompleted = mtctx->frameEnded; /* last frame entirely flushed */ - if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */ - return 0; /* everything flushed */ + if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs ongoing */ + if (mtctx->jobReady) return 1; /* one job is ready to push, just not yet in the list */ + if (mtctx->inBuff.filled > 0) return 1; /* input is not empty, and still needs to be converted into a job */ + mtctx->allJobsCompleted = mtctx->frameEnded; /* all chunks are entirely flushed => if this one is last one, frame is completed */ + if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? instead of : are internal buffers fully flushed ? */ + return 0; /* internal buffers fully flushed */ } @@ -1217,10 +1218,10 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } /* single-pass shortcut (note : synchronous-mode) */ - if ( (mtctx->nextJobID == 0) /* just started */ - && (mtctx->inBuff.filled == 0) /* nothing buffered */ - && (!mtctx->jobReady) /* no job already created */ - && (endOp == ZSTD_e_end) /* end order */ + if ( (mtctx->nextJobID == 0) /* just started */ + && (mtctx->inBuff.filled == 0) /* nothing buffered */ + && (!mtctx->jobReady) /* no job already created */ + && (endOp == ZSTD_e_end) /* end order */ && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */ size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx, (char*)output->dst + output->pos, output->size - output->pos, From 0d426f6b8391c3b5c5714fd83c014609ec238e7d Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 26 Jan 2018 13:00:14 -0800 Subject: [PATCH 22/30] zstdmt : refactor a few member names for clarity --- lib/compress/zstdmt_compress.c | 94 +++++++++++++++++----------------- 1 file changed, 48 insertions(+), 46 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index c7df32d37..3bf585d49 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -83,7 +83,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void) typedef struct buffer_s { void* start; - size_t size; + size_t capacity; } buffer_t; static const buffer_t g_nullBuffer = { NULL, 0 }; @@ -136,7 +136,7 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) size_t totalBufferSize = 0; ZSTD_pthread_mutex_lock(&bufPool->poolMutex); for (u=0; u totalBuffers; u++) - totalBufferSize += bufPool->bTable[u].size; + totalBufferSize += bufPool->bTable[u].capacity; ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); return poolSize + totalBufferSize; @@ -165,12 +165,12 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) ZSTD_pthread_mutex_lock(&bufPool->poolMutex); if (bufPool->nbBuffers) { /* try to use an existing buffer */ buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)]; - size_t const availBufferSize = buf.size; + size_t const availBufferSize = buf.capacity; bufPool->bTable[bufPool->nbBuffers] = g_nullBuffer; if ((availBufferSize >= bSize) & ((availBufferSize>>3) <= bSize)) { /* large enough, but not too much */ DEBUGLOG(5, "ZSTDMT_getBuffer: provide buffer %u of size %u", - bufPool->nbBuffers, (U32)buf.size); + bufPool->nbBuffers, (U32)buf.capacity); ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); return buf; } @@ -184,7 +184,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) { buffer_t buffer; void* const start = ZSTD_malloc(bSize, bufPool->cMem); buffer.start = start; /* note : start can be NULL if malloc fails ! */ - buffer.size = (start==NULL) ? 0 : bSize; + buffer.capacity = (start==NULL) ? 0 : bSize; if (start==NULL) { DEBUGLOG(5, "ZSTDMT_getBuffer: buffer allocation failure !!"); } else { @@ -203,7 +203,7 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) if (bufPool->nbBuffers < bufPool->totalBuffers) { bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */ DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u", - (U32)buf.size, (U32)(bufPool->nbBuffers-1)); + (U32)buf.capacity, (U32)(bufPool->nbBuffers-1)); ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); return; } @@ -372,7 +372,7 @@ void ZSTDMT_compressChunk(void* jobDescription) goto _endJob; } } } if (!job->firstChunk) { /* flush and overwrite frame header when it's not first job */ - size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0); + size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, src, 0); if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; } DEBUGLOG(5, "ZSTDMT_compressChunk: flush and overwrite %u bytes of frame header (not first chunk)", (U32)hSize); ZSTD_invalidateRepCodes(cctx); @@ -386,7 +386,7 @@ void ZSTDMT_compressChunk(void* jobDescription) const BYTE* ip = (const BYTE*) src; BYTE* const ostart = (BYTE*)dstBuff.start; BYTE* op = ostart; - BYTE* oend = op + dstBuff.size; + BYTE* oend = op + dstBuff.capacity; int blockNb; DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); assert(job->cSize == 0); @@ -437,6 +437,8 @@ _endJob: typedef struct { buffer_t buffer; + size_t targetCapacity; /* note : buffers provided by the pool may be larger than target capacity */ + size_t prefixSize; size_t filled; } inBuff_t; @@ -449,8 +451,6 @@ struct ZSTDMT_CCtx_s { ZSTD_pthread_cond_t mtctx_cond; ZSTD_CCtx_params params; size_t targetSectionSize; - size_t inBuffSize; - size_t prefixSize; size_t targetPrefixSize; inBuff_t inBuff; int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */ @@ -663,13 +663,13 @@ unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) * Note : mutex will be acquired during statistics collection. */ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) { - ZSTD_frameProgression fs; + ZSTD_frameProgression fps; DEBUGLOG(6, "ZSTDMT_getFrameProgression"); ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); - fs.consumed = mtctx->consumed; - fs.produced = mtctx->produced; - assert(mtctx->inBuff.filled >= mtctx->prefixSize); - fs.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->prefixSize); + fps.consumed = mtctx->consumed; + fps.produced = mtctx->produced; + assert(mtctx->inBuff.filled >= mtctx->inBuff.prefixSize); + fps.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->inBuff.prefixSize); { unsigned jobNb; unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", @@ -678,13 +678,13 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) unsigned const wJobID = jobNb & mtctx->jobIDMask; size_t const cResult = mtctx->jobs[wJobID].cSize; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; - fs.consumed += mtctx->jobs[wJobID].consumed; - fs.ingested += mtctx->jobs[wJobID].srcSize; - fs.produced += produced; + fps.consumed += mtctx->jobs[wJobID].consumed; + fps.ingested += mtctx->jobs[wJobID].srcSize; + fps.produced += produced; } } ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); - return fs; + return fps; } @@ -928,11 +928,11 @@ size_t ZSTDMT_initCStream_internal( if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN; if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); - mtctx->inBuffSize = mtctx->targetPrefixSize + mtctx->targetSectionSize; - DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->inBuffSize>>10)); - ZSTDMT_setBufferSize(mtctx->bufPool, MAX(mtctx->inBuffSize, ZSTD_compressBound(mtctx->targetSectionSize)) ); + mtctx->inBuff.targetCapacity = mtctx->targetPrefixSize + mtctx->targetSectionSize; + DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->inBuff.targetCapacity>>10)); + ZSTDMT_setBufferSize(mtctx->bufPool, MAX(mtctx->inBuff.targetCapacity, ZSTD_compressBound(mtctx->targetSectionSize)) ); mtctx->inBuff.buffer = g_nullBuffer; - mtctx->prefixSize = 0; + mtctx->inBuff.prefixSize = 0; mtctx->doneJobID = 0; mtctx->nextJobID = 0; mtctx->frameEnded = 0; @@ -1009,10 +1009,10 @@ static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) * This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */ assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */ assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */ - assert(job->srcBuff.size >= ZSTD_blockHeaderSize); /* no buffer should ever be that small */ + assert(job->srcBuff.capacity >= ZSTD_blockHeaderSize); /* no buffer should ever be that small */ job->dstBuff = job->srcBuff; job->srcBuff = g_nullBuffer; - job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.size); + job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.capacity); assert(!ZSTD_isError(job->cSize)); assert(job->consumed == 0); } @@ -1030,15 +1030,15 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS if (!mtctx->jobReady) { DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", - mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize); + mtctx->nextJobID, (U32)srcSize, (U32)mtctx->inBuff.prefixSize); assert(mtctx->jobs[jobID].srcBuff.start == NULL); /* no buffer left : supposed already released */ mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer; mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start; - mtctx->jobs[jobID].prefixSize = mtctx->prefixSize; + mtctx->jobs[jobID].prefixSize = mtctx->inBuff.prefixSize; mtctx->jobs[jobID].srcSize = srcSize; + assert(mtctx->inBuff.filled >= srcSize + mtctx->inBuff.prefixSize); mtctx->jobs[jobID].consumed = 0; mtctx->jobs[jobID].cSize = 0; - assert(mtctx->inBuff.filled >= srcSize + mtctx->prefixSize); mtctx->jobs[jobID].params = mtctx->params; /* do not calculate checksum within sections, but write it in header for first section */ if (mtctx->nextJobID) mtctx->jobs[jobID].params.fParams.checksumFlag = 0; @@ -1055,28 +1055,28 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond; if (mtctx->params.fParams.checksumFlag) - XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->prefixSize, srcSize); + XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->inBuff.prefixSize, srcSize); /* get a new buffer for next input */ if (!endFrame) { - size_t const newPrefixSize = MIN(srcSize + mtctx->prefixSize, mtctx->targetPrefixSize); + size_t const newPrefixSize = MIN(mtctx->inBuff.filled, mtctx->targetPrefixSize); mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); - if (mtctx->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ + if (mtctx->inBuff.buffer.start == NULL) { /* not enough memory to allocate a new input buffer */ mtctx->jobs[jobID].srcSize = mtctx->jobs[jobID].consumed = 0; mtctx->nextJobID++; ZSTDMT_waitForAllJobsCompleted(mtctx); ZSTDMT_releaseAllJobResources(mtctx); return ERROR(memory_allocation); } - mtctx->inBuff.filled -= srcSize + mtctx->prefixSize - newPrefixSize; + mtctx->inBuff.filled -= (mtctx->inBuff.prefixSize + srcSize) - newPrefixSize; memmove(mtctx->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */ - (const char*)mtctx->jobs[jobID].prefixStart + mtctx->prefixSize + srcSize - newPrefixSize, + (const char*)mtctx->jobs[jobID].prefixStart + mtctx->inBuff.prefixSize + srcSize - newPrefixSize, mtctx->inBuff.filled); - mtctx->prefixSize = newPrefixSize; + mtctx->inBuff.prefixSize = newPrefixSize; } else { /* endFrame==1 => no need for another input buffer */ mtctx->inBuff.buffer = g_nullBuffer; mtctx->inBuff.filled = 0; - mtctx->prefixSize = 0; + mtctx->inBuff.prefixSize = 0; mtctx->frameEnded = endFrame; if (mtctx->nextJobID == 0) { /* single chunk exception : checksum is already calculated directly within worker thread */ @@ -1202,7 +1202,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) { - size_t const newJobThreshold = mtctx->prefixSize + mtctx->targetSectionSize; + size_t const newJobThreshold = mtctx->inBuff.prefixSize + mtctx->targetSectionSize; unsigned forwardInputProgress = 0; DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u)", (U32)endOp); assert(output->pos <= output->size); @@ -1240,16 +1240,18 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, if ( (!mtctx->jobReady) && (input->size > input->pos) ) { /* support NULL input */ if (mtctx->inBuff.buffer.start == NULL) { - mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : allocation can fail, in which case, no forward input progress */ + mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : allocation can fail, in which case, buffer.start==NULL */ mtctx->inBuff.filled = 0; - if ( (mtctx->inBuff.buffer.start == NULL) /* allocation failure */ + if ( (mtctx->inBuff.buffer.start == NULL) /* allocation failure */ && (mtctx->doneJobID == mtctx->nextJobID) ) { /* and nothing to flush */ - return ERROR(memory_allocation); /* no forward progress possible => output an error */ - } } - if (mtctx->inBuff.buffer.start != NULL) { - size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled); + return ERROR(memory_allocation); /* no forward progress possible => output an error */ + } + assert(mtctx->inBuff.buffer.capacity >= mtctx->inBuff.targetCapacity); /* pool must provide a buffer >= targetCapacity */ + } + if (mtctx->inBuff.buffer.start != NULL) { /* no buffer for input, but it's possible to flush, and then reclaim the buffer */ + size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuff.targetCapacity - mtctx->inBuff.filled); DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", - (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->inBuffSize); + (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->inBuff.targetCapacity); memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); input->pos += toLoad; mtctx->inBuff.filled += toLoad; @@ -1263,7 +1265,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, || (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ || ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */ || ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */ - size_t const jobSize = MIN(mtctx->inBuff.filled - mtctx->prefixSize, mtctx->targetSectionSize); + size_t const jobSize = MIN(mtctx->inBuff.filled - mtctx->inBuff.prefixSize, mtctx->targetSectionSize); CHECK_F( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) ); } @@ -1280,13 +1282,13 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_in CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) ); /* recommended next input size : fill current input buffer */ - return mtctx->inBuffSize - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ + return mtctx->inBuff.targetCapacity - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ } static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_EndDirective endFrame) { - size_t const srcSize = mtctx->inBuff.filled - mtctx->prefixSize; + size_t const srcSize = mtctx->inBuff.filled - mtctx->inBuff.prefixSize; DEBUGLOG(5, "ZSTDMT_flushStream_internal"); if ( mtctx->jobReady /* one job ready for a worker to pick up */ From 27c5853c4285fa678fb9f606a87d21852b533ac0 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 26 Jan 2018 14:35:54 -0800 Subject: [PATCH 23/30] zstdmt: job table correctly cleaned after synchronous ZSTDMT_compress() --- lib/compress/zstdmt_compress.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 3bf585d49..d78257b2e 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -559,6 +559,7 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); mtctx->jobs[jobID].dstBuff = g_nullBuffer; + mtctx->jobs[jobID].cSize = 0; DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].srcBuff.start); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].srcBuff); mtctx->jobs[jobID].srcBuff = g_nullBuffer; @@ -816,6 +817,7 @@ static size_t ZSTDMT_compress_advanced_internal( ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[chunkID].dstBuff); } } mtctx->jobs[chunkID].dstBuff = g_nullBuffer; + mtctx->jobs[chunkID].cSize = 0; dstPos += cSize ; } } /* for (chunkID=0; chunkID jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */ mtctx->jobs[wJobID].frameChecksumNeeded = 0; } - if (cSize > 0) { /* compression is ongoing or completed */ + if (cSize > 0) { /* compression is ongoing or completed */ size_t const toWrite = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); - DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u)", - (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize); + DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)", + (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize, (U32)cSize); + assert(mtctx->doneJobID < mtctx->nextJobID); assert(cSize >= mtctx->jobs[wJobID].dstFlushed); + assert(mtctx->jobs[wJobID].dstBuff.start != NULL); memcpy((char*)output->dst + output->pos, (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, toWrite); output->pos += toWrite; mtctx->jobs[wJobID].dstFlushed += toWrite; /* can write : this value is only used by mtctx */ @@ -1204,7 +1208,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, { size_t const newJobThreshold = mtctx->inBuff.prefixSize + mtctx->targetSectionSize; unsigned forwardInputProgress = 0; - DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u)", (U32)endOp); + DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)", + (U32)endOp, (U32)(input->size - input->pos)); assert(output->pos <= output->size); assert(input->pos <= input->size); From 77e36273de31ded4fc35703514175af07bdf2fb5 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 26 Jan 2018 17:08:58 -0800 Subject: [PATCH 24/30] zstdmt: minor code refactor for clarity --- lib/compress/zstdmt_compress.c | 80 +++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index d78257b2e..e04864507 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -379,35 +379,35 @@ void ZSTDMT_compressChunk(void* jobDescription) } /* compress */ - if (sizeof(size_t) > sizeof(int)) - assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */ - - { int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX); + { size_t const blockSize = ZSTD_BLOCKSIZE_MAX; + int const nbBlocks = (int)((job->srcSize + (blockSize-1)) / blockSize); const BYTE* ip = (const BYTE*) src; BYTE* const ostart = (BYTE*)dstBuff.start; BYTE* op = ostart; BYTE* oend = op + dstBuff.capacity; int blockNb; + if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * blockSize); /* check overflow */ DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); assert(job->cSize == 0); for (blockNb = 1; blockNb < nbBlocks; blockNb++) { - size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX); + size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, blockSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } - ip += ZSTD_BLOCKSIZE_MAX; + ip += blockSize; op += cSize; assert(op < oend); /* stats */ ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */ job->cSize += cSize; - job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb; + job->consumed = blockSize * blockNb; DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)", (U32)cSize, (U32)job->cSize); ZSTD_pthread_cond_signal(job->mtctx_cond); /* warns some more data is ready to be flushed */ ZSTD_pthread_mutex_unlock(job->mtctx_mutex); } /* last block */ + assert(blockSize > 0); assert((blockSize & (blockSize - 1)) == 0); /* blockSize must be power of 2 for mask==(blockSize-1) to work */ if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) { - size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1); - size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=ZSTD_BLOCKSIZE_MAX)) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1; + size_t const lastBlockSize1 = job->srcSize & (blockSize-1); + size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=blockSize)) ? blockSize : lastBlockSize1; size_t const cSize = (job->lastChunk) ? ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); @@ -469,21 +469,10 @@ struct ZSTDMT_CCtx_s { const ZSTD_CDict* cdict; }; -/* Sets parameters relevant to the compression job, initializing others to - * default values. Notably, nbThreads should probably be zero. */ -static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) -{ - ZSTD_CCtx_params jobParams; - memset(&jobParams, 0, sizeof(jobParams)); - - jobParams.cParams = params.cParams; - jobParams.fParams = params.fParams; - jobParams.compressionLevel = params.compressionLevel; - - jobParams.ldmParams = params.ldmParams; - return jobParams; -} - +/* ZSTDMT_allocJobsTable() + * allocate, and just init to zero, a job table. + * update *nbJobsPtr to next power of 2 value, as size of table + * No reverse free() function is provided : just use ZSTD_free() */ static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) { U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; @@ -524,6 +513,7 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) mtctx->allJobsCompleted = 1; mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem); mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem); + assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */ mtctx->jobIDMask = nbJobs - 1; mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem); @@ -649,6 +639,21 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, } } +/* Sets parameters relevant to the compression job, initializing others to + * default values. Notably, nbThreads should probably be zero. */ +static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) +{ + ZSTD_CCtx_params jobParams; + memset(&jobParams, 0, sizeof(jobParams)); + + jobParams.cParams = params.cParams; + jobParams.fParams = params.fParams; + jobParams.compressionLevel = params.compressionLevel; + + jobParams.ldmParams = params.ldmParams; + return jobParams; +} + /* ZSTDMT_getNbThreads(): * @return nb threads currently active in mtctx. * mtctx must be valid */ @@ -1139,8 +1144,9 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u } } /* try to flush something */ - { size_t cSize = mtctx->jobs[wJobID].cSize; - size_t const srcConsumed = mtctx->jobs[wJobID].consumed; + { size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */ + size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */ + size_t const srcSize = mtctx->jobs[wJobID].srcSize; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); if (ZSTD_isError(cSize)) { DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", @@ -1150,8 +1156,8 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u return cSize; } /* add frame checksum if necessary (can only happen once) */ - assert(srcConsumed <= mtctx->jobs[wJobID].srcSize); - if ( (srcConsumed == mtctx->jobs[wJobID].srcSize) /* job completed -> worker no longer active */ + assert(srcConsumed <= srcSize); + if ( (srcConsumed == srcSize) /* job completed -> worker no longer active */ && mtctx->jobs[wJobID].frameChecksumNeeded ) { U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState); DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum); @@ -1161,17 +1167,19 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u mtctx->jobs[wJobID].frameChecksumNeeded = 0; } if (cSize > 0) { /* compression is ongoing or completed */ - size_t const toWrite = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); + size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)", - (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize, (U32)cSize); + (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize); assert(mtctx->doneJobID < mtctx->nextJobID); assert(cSize >= mtctx->jobs[wJobID].dstFlushed); assert(mtctx->jobs[wJobID].dstBuff.start != NULL); - memcpy((char*)output->dst + output->pos, (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, toWrite); - output->pos += toWrite; - mtctx->jobs[wJobID].dstFlushed += toWrite; /* can write : this value is only used by mtctx */ + memcpy((char*)output->dst + output->pos, + (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, + toFlush); + output->pos += toFlush; + mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */ - if ( (srcConsumed == mtctx->jobs[wJobID].srcSize) /* job completed */ + if ( (srcConsumed == srcSize) /* job completed */ && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); @@ -1179,14 +1187,14 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); mtctx->jobs[wJobID].dstBuff = g_nullBuffer; mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ - mtctx->consumed += mtctx->jobs[wJobID].srcSize; + mtctx->consumed += srcSize; mtctx->produced += cSize; mtctx->doneJobID++; } } /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */ if (cSize > mtctx->jobs[wJobID].dstFlushed) return (cSize - mtctx->jobs[wJobID].dstFlushed); - if (mtctx->jobs[wJobID].srcSize > srcConsumed) return 1; /* current job not completely compressed */ + if (srcSize > srcConsumed) return 1; /* current job not completely compressed */ } if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs ongoing */ if (mtctx->jobReady) return 1; /* one job is ready to push, just not yet in the list */ From 9c40ae7ff16a6561914092e130b47c7a45f7914b Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 26 Jan 2018 17:48:33 -0800 Subject: [PATCH 25/30] zstdmt: there is now one mutex/cond per job --- lib/compress/zstdmt_compress.c | 103 +++++++++++++++++---------------- tests/zstreamtest.c | 4 +- 2 files changed, 55 insertions(+), 52 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index e04864507..22560bfb1 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -310,8 +310,8 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) typedef struct { size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */ - ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) workers */ - ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) workers */ + ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */ + ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */ ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */ @@ -395,13 +395,13 @@ void ZSTDMT_compressChunk(void* jobDescription) ip += blockSize; op += cSize; assert(op < oend); /* stats */ - ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */ + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); /* note : it's a mtctx mutex */ job->cSize += cSize; job->consumed = blockSize * blockNb; DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)", (U32)cSize, (U32)job->cSize); - ZSTD_pthread_cond_signal(job->mtctx_cond); /* warns some more data is ready to be flushed */ - ZSTD_pthread_mutex_unlock(job->mtctx_mutex); + ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */ + ZSTD_pthread_mutex_unlock(&job->job_mutex); } /* last block */ assert(blockSize > 0); assert((blockSize & (blockSize - 1)) == 0); /* blockSize must be power of 2 for mask==(blockSize-1) to work */ @@ -413,9 +413,9 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } /* stats */ - ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); job->cSize += cSize; - ZSTD_pthread_mutex_unlock(job->mtctx_mutex); + ZSTD_pthread_mutex_unlock(&job->job_mutex); } } _endJob: @@ -424,10 +424,10 @@ _endJob: ZSTDMT_releaseBuffer(job->bufPool, job->srcBuff); job->srcBuff = g_nullBuffer; job->prefixStart = NULL; /* report */ - ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); job->consumed = job->srcSize; - ZSTD_pthread_cond_signal(job->mtctx_cond); - ZSTD_pthread_mutex_unlock(job->mtctx_mutex); + ZSTD_pthread_cond_signal(&job->job_cond); + ZSTD_pthread_mutex_unlock(&job->job_mutex); } @@ -447,8 +447,6 @@ struct ZSTDMT_CCtx_s { ZSTDMT_jobDescription* jobs; ZSTDMT_bufferPool* bufPool; ZSTDMT_CCtxPool* cctxPool; - ZSTD_pthread_mutex_t mtctx_mutex; - ZSTD_pthread_cond_t mtctx_cond; ZSTD_CCtx_params params; size_t targetSectionSize; size_t targetPrefixSize; @@ -470,16 +468,34 @@ struct ZSTDMT_CCtx_s { }; /* ZSTDMT_allocJobsTable() - * allocate, and just init to zero, a job table. + * allocate and init a job table. * update *nbJobsPtr to next power of 2 value, as size of table * No reverse free() function is provided : just use ZSTD_free() */ -static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) +static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) { U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; U32 const nbJobs = 1 << nbJobsLog2; - *nbJobsPtr = nbJobs; - return (ZSTDMT_jobDescription*) ZSTD_calloc( + ZSTDMT_jobDescription* const jobTable = ZSTD_calloc( nbJobs * sizeof(ZSTDMT_jobDescription), cMem); + U32 jobNb; + if (jobTable==NULL) return NULL; + *nbJobsPtr = nbJobs; + for (jobNb=0; jobNb cMem = cMem; mtctx->allJobsCompleted = 1; mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem); - mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem); + mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem); assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */ mtctx->jobIDMask = nbJobs - 1; mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem); @@ -521,14 +537,6 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) ZSTDMT_freeCCtx(mtctx); return NULL; } - if (ZSTD_pthread_mutex_init(&mtctx->mtctx_mutex, NULL)) { - ZSTDMT_freeCCtx(mtctx); - return NULL; - } - if (ZSTD_pthread_cond_init(&mtctx->mtctx_cond, NULL)) { - ZSTDMT_freeCCtx(mtctx); - return NULL; - } DEBUGLOG(3, "mt_cctx created, for %u threads", nbThreads); return mtctx; } @@ -566,12 +574,12 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx) DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted"); while (mtctx->doneJobID < mtctx->nextJobID) { unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask; - ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) { DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ - ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); + ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); } - ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); + ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); mtctx->doneJobID++; } } @@ -581,12 +589,10 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) if (mtctx==NULL) return 0; /* compatible with free on NULL */ POOL_free(mtctx->factory); /* stop and free worker threads */ ZSTDMT_releaseAllJobResources(mtctx); /* release job resources into pools first */ - ZSTD_free(mtctx->jobs, mtctx->cMem); + ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); ZSTDMT_freeBufferPool(mtctx->bufPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool); ZSTD_freeCDict(mtctx->cdictLocal); - ZSTD_pthread_mutex_destroy(&mtctx->mtctx_mutex); - ZSTD_pthread_cond_destroy(&mtctx->mtctx_cond); ZSTD_free(mtctx, mtctx->cMem); return 0; } @@ -671,7 +677,6 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) { ZSTD_frameProgression fps; DEBUGLOG(6, "ZSTDMT_getFrameProgression"); - ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); fps.consumed = mtctx->consumed; fps.produced = mtctx->produced; assert(mtctx->inBuff.filled >= mtctx->inBuff.prefixSize); @@ -682,14 +687,16 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) mtctx->doneJobID, lastJobNb, mtctx->jobReady) for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { unsigned const wJobID = jobNb & mtctx->jobIDMask; - size_t const cResult = mtctx->jobs[wJobID].cSize; - size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; - fps.consumed += mtctx->jobs[wJobID].consumed; - fps.ingested += mtctx->jobs[wJobID].srcSize; - fps.produced += produced; + ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex); + { size_t const cResult = mtctx->jobs[wJobID].cSize; + size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; + fps.consumed += mtctx->jobs[wJobID].consumed; + fps.ingested += mtctx->jobs[wJobID].srcSize; + fps.produced += produced; + } + ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); } } - ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); return fps; } @@ -749,9 +756,9 @@ static size_t ZSTDMT_compress_advanced_internal( if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */ U32 nbJobs = nbChunks; - ZSTD_free(mtctx->jobs, mtctx->cMem); + ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); mtctx->jobIDMask = 0; - mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem); + mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem); if (mtctx->jobs==NULL) return ERROR(memory_allocation); assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0)); /* ensure nbJobs is a power of 2 */ mtctx->jobIDMask = nbJobs - 1; @@ -781,8 +788,6 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobs[u].bufPool = mtctx->bufPool; mtctx->jobs[u].firstChunk = (u==0); mtctx->jobs[u].lastChunk = (u==nbChunks-1); - mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex; - mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond; if (params.fParams.checksumFlag) { XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize); @@ -802,12 +807,12 @@ static size_t ZSTDMT_compress_advanced_internal( unsigned chunkID; for (chunkID=0; chunkID mtctx_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[chunkID].job_mutex); while (mtctx->jobs[chunkID].consumed < mtctx->jobs[chunkID].srcSize) { DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID); - ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); + ZSTD_pthread_cond_wait(&mtctx->jobs[chunkID].job_cond, &mtctx->jobs[chunkID].job_mutex); } - ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); + ZSTD_pthread_mutex_unlock(&mtctx->jobs[chunkID].job_mutex); DEBUGLOG(5, "ready to write chunk %u ", chunkID); mtctx->jobs[chunkID].prefixStart = NULL; @@ -1058,8 +1063,6 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->jobs[jobID].lastChunk = endFrame; mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag; mtctx->jobs[jobID].dstFlushed = 0; - mtctx->jobs[jobID].mtctx_mutex = &mtctx->mtctx_mutex; - mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond; if (mtctx->params.fParams.checksumFlag) XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->inBuff.prefixSize, srcSize); @@ -1128,7 +1131,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u blockToFlush, mtctx->doneJobID, mtctx->nextJobID); assert(output->size >= output->pos); - ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex); if ( blockToFlush && (mtctx->doneJobID < mtctx->nextJobID) ) { assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize); @@ -1140,14 +1143,14 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u } DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)", mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); - ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); /* block when nothing available to flush but more to come */ + ZSTD_pthread_cond_wait(&mtctx->jobs[wJobID].job_cond, &mtctx->jobs[wJobID].job_mutex); /* block when nothing to flush but some to come */ } } /* try to flush something */ { size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */ size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */ size_t const srcSize = mtctx->jobs[wJobID].srcSize; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ - ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); + ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); if (ZSTD_isError(cSize)) { DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", mtctx->doneJobID, ZSTD_getErrorName(cSize)); diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 16dcba73a..ecc477c71 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -969,9 +969,9 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres FUZ_rand(&coreSeed); lseed = coreSeed ^ prime32; if (nbTests >= testNb) { - DISPLAYUPDATE(2, "\r%6u/%6u (%08X) ", testNb, nbTests, lseed); + DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); } else { - DISPLAYUPDATE(2, "\r%6u (%08X) ", testNb, lseed); + DISPLAYUPDATE(2, "\r%6u ", testNb); } /* states full reset (deliberately not synchronized) */ From caf9e96dc3958850f14b3ddba31003e5f2f2a700 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 26 Jan 2018 18:09:25 -0800 Subject: [PATCH 26/30] job mutex creation is checked --- lib/compress/zstdmt_compress.c | 44 ++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 22560bfb1..865035691 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -467,26 +467,6 @@ struct ZSTDMT_CCtx_s { const ZSTD_CDict* cdict; }; -/* ZSTDMT_allocJobsTable() - * allocate and init a job table. - * update *nbJobsPtr to next power of 2 value, as size of table - * No reverse free() function is provided : just use ZSTD_free() */ -static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) -{ - U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; - U32 const nbJobs = 1 << nbJobsLog2; - ZSTDMT_jobDescription* const jobTable = ZSTD_calloc( - nbJobs * sizeof(ZSTDMT_jobDescription), cMem); - U32 jobNb; - if (jobTable==NULL) return NULL; - *nbJobsPtr = nbJobs; - for (jobNb=0; jobNb Date: Fri, 26 Jan 2018 18:18:42 -0800 Subject: [PATCH 27/30] fixed minor conversion warning for C++ compilation mode --- lib/compress/zstdmt_compress.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 865035691..72515edb8 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -486,8 +486,8 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; U32 const nbJobs = 1 << nbJobsLog2; U32 jobNb; - ZSTDMT_jobDescription* const jobTable = ZSTD_calloc( - nbJobs * sizeof(ZSTDMT_jobDescription), cMem); + ZSTDMT_jobDescription* const jobTable = (ZSTDMT_jobDescription*) + ZSTD_calloc(nbJobs * sizeof(ZSTDMT_jobDescription), cMem); int initError = 0; if (jobTable==NULL) return NULL; *nbJobsPtr = nbJobs; From 2cb0740b6b24b42d0255046e5692539c7b8534f8 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Tue, 30 Jan 2018 14:43:36 -0800 Subject: [PATCH 28/30] zstdmt: changed naming convention to avoid confusion with blocks. also: - jobs are cut into chunks of 512KB now, to reduce nb of mutex calls. - fix function declaration ZSTD_getBlockSizeMax() - fix outdated comment --- doc/zstd_manual.html | 2 +- lib/compress/zstdmt_compress.c | 176 ++++++++++++++++----------------- lib/zstd.h | 2 +- 3 files changed, 90 insertions(+), 90 deletions(-) diff --git a/doc/zstd_manual.html b/doc/zstd_manual.html index b4aed91e5..debd37f7a 100644 --- a/doc/zstd_manual.html +++ b/doc/zstd_manual.html @@ -1153,7 +1153,7 @@ size_t ZSTD_DCtx_refPrefix_advanced(ZSTD_DCtx* dctx, const void* prefix, size_t Use ZSTD_insertBlock() for such a case.
size_t ZSTD_getBlockSize (const ZSTD_CCtx* cctx); +Raw zstd block functions
size_t ZSTD_getBlockSizeMax(const ZSTD_CCtx* cctx); size_t ZSTD_compressBlock (ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize); size_t ZSTD_decompressBlock(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize); size_t ZSTD_insertBlock (ZSTD_DCtx* dctx, const void* blockStart, size_t blockSize);/**< insert uncompressed block into `dctx` history. Useful for multi-blocks decompression. */ diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 72515edb8..ec5f0bbbd 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -319,8 +319,8 @@ typedef struct { const void* prefixStart; /* set by mtctx, then read and set0 by worker => no barrier */ size_t prefixSize; /* set by mtctx, then read by worker => no barrier */ size_t srcSize; /* set by mtctx, then read by worker & mtctx => no barrier */ - unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */ - unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */ + unsigned firstJob; /* set by mtctx, then read by worker => no barrier */ + unsigned lastJob; /* set by mtctx, then read by worker => no barrier */ ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */ const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */ unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */ @@ -328,8 +328,8 @@ typedef struct { unsigned frameChecksumNeeded; /* used only by mtctx */ } ZSTDMT_jobDescription; -/* ZSTDMT_compressChunk() is a POOL_function type */ -void ZSTDMT_compressChunk(void* jobDescription) +/* ZSTDMT_compressionJob() is a POOL_function type */ +void ZSTDMT_compressionJob(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); @@ -353,12 +353,12 @@ void ZSTDMT_compressChunk(void* jobDescription) /* init */ if (job->cdict) { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dm_auto, job->cdict, job->params, job->fullFrameSize); - assert(job->firstChunk); /* only allowed for first job */ + assert(job->firstJob); /* only allowed for first job */ if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } } else { /* srcStart points at reloaded section */ - U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : job->srcSize; + U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->srcSize; ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ - { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk); + { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob); if (ZSTD_isError(forceWindowError)) { job->cSize = forceWindowError; goto _endJob; @@ -371,44 +371,44 @@ void ZSTDMT_compressChunk(void* jobDescription) job->cSize = initError; goto _endJob; } } } - if (!job->firstChunk) { /* flush and overwrite frame header when it's not first job */ + if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, src, 0); if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; } - DEBUGLOG(5, "ZSTDMT_compressChunk: flush and overwrite %u bytes of frame header (not first chunk)", (U32)hSize); + DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize); ZSTD_invalidateRepCodes(cctx); } /* compress */ - { size_t const blockSize = ZSTD_BLOCKSIZE_MAX; - int const nbBlocks = (int)((job->srcSize + (blockSize-1)) / blockSize); + { size_t const chunkSize = 4*ZSTD_BLOCKSIZE_MAX; + int const nbChunks = (int)((job->srcSize + (chunkSize-1)) / chunkSize); const BYTE* ip = (const BYTE*) src; BYTE* const ostart = (BYTE*)dstBuff.start; BYTE* op = ostart; BYTE* oend = op + dstBuff.capacity; - int blockNb; - if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * blockSize); /* check overflow */ - DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); + int chunkNb; + if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * chunkSize); /* check overflow */ + DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->srcSize, nbChunks); assert(job->cSize == 0); - for (blockNb = 1; blockNb < nbBlocks; blockNb++) { - size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, blockSize); + for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) { + size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } - ip += blockSize; + ip += chunkSize; op += cSize; assert(op < oend); /* stats */ - ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); /* note : it's a mtctx mutex */ + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); job->cSize += cSize; - job->consumed = blockSize * blockNb; - DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)", + job->consumed = chunkSize * chunkNb; + DEBUGLOG(5, "ZSTDMT_compressionJob: compress new block : cSize==%u bytes (total: %u)", (U32)cSize, (U32)job->cSize); ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */ ZSTD_pthread_mutex_unlock(&job->job_mutex); } /* last block */ - assert(blockSize > 0); assert((blockSize & (blockSize - 1)) == 0); /* blockSize must be power of 2 for mask==(blockSize-1) to work */ - if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) { - size_t const lastBlockSize1 = job->srcSize & (blockSize-1); - size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=blockSize)) ? blockSize : lastBlockSize1; - size_t const cSize = (job->lastChunk) ? + assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */ + if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) { + size_t const lastBlockSize1 = job->srcSize & (chunkSize-1); + size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=chunkSize)) ? chunkSize : lastBlockSize1; + size_t const cSize = (job->lastJob) ? ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } @@ -580,7 +580,7 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx) unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask; ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) { - DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ + DEBUGLOG(5, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); } ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); @@ -709,16 +709,16 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) /* ===== Multi-threaded compression ===== */ /* ------------------------------------------ */ -static unsigned ZSTDMT_computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbThreads) { +static unsigned ZSTDMT_computeNbJobs(size_t srcSize, unsigned windowLog, unsigned nbThreads) { assert(nbThreads>0); - { size_t const chunkSizeTarget = (size_t)1 << (windowLog + 2); - size_t const chunkMaxSize = chunkSizeTarget << 2; - size_t const passSizeMax = chunkMaxSize * nbThreads; + { size_t const jobSizeTarget = (size_t)1 << (windowLog + 2); + size_t const jobMaxSize = jobSizeTarget << 2; + size_t const passSizeMax = jobMaxSize * nbThreads; unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; - unsigned const nbChunksLarge = multiplier * nbThreads; - unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1; - unsigned const nbChunksSmall = MIN(nbChunksMax, nbThreads); - return (multiplier>1) ? nbChunksLarge : nbChunksSmall; + unsigned const nbJobsLarge = multiplier * nbThreads; + unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1; + unsigned const nbJobsSmall = MIN(nbJobsMax, nbThreads); + return (multiplier>1) ? nbJobsLarge : nbJobsSmall; } } /* ZSTDMT_compress_advanced_internal() : @@ -734,44 +734,44 @@ static size_t ZSTDMT_compress_advanced_internal( ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params); unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog; size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); - unsigned nbChunks = ZSTDMT_computeNbChunks(srcSize, params.cParams.windowLog, params.nbThreads); - size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; - size_t const avgChunkSize = (((proposedChunkSize-1) & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ + unsigned const nbJobs = ZSTDMT_computeNbJobs(srcSize, params.cParams.windowLog, params.nbThreads); + size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs; + size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */ const char* const srcStart = (const char*)src; size_t remainingSrcSize = srcSize; - unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */ + unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */ size_t frameStartPos = 0, dstBufferPos = 0; XXH64_state_t xxh64; assert(jobParams.nbThreads == 0); assert(mtctx->cctxPool->totalCCtx == params.nbThreads); - DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbChunks=%2u (rawSize=%u bytes; fixedSize=%u) ", - nbChunks, (U32)proposedChunkSize, (U32)avgChunkSize); + DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ", + nbJobs, (U32)proposedJobSize, (U32)avgJobSize); - if ((nbChunks==1) | (params.nbThreads<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */ + if ((nbJobs==1) | (params.nbThreads<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */ ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams); return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams); } - assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ - ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgChunkSize) ); + assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ + ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); XXH64_reset(&xxh64, 0); - if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */ - U32 nbJobs = nbChunks; + if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */ + U32 jobsTableSize = nbJobs; ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); mtctx->jobIDMask = 0; - mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem); + mtctx->jobs = ZSTDMT_createJobsTable(&jobsTableSize, mtctx->cMem); if (mtctx->jobs==NULL) return ERROR(memory_allocation); - assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0)); /* ensure nbJobs is a power of 2 */ - mtctx->jobIDMask = nbJobs - 1; + assert((jobsTableSize != 0) && ((jobsTableSize & (jobsTableSize - 1)) == 0)); /* ensure jobsTableSize is a power of 2 */ + mtctx->jobIDMask = jobsTableSize - 1; } { unsigned u; - for (u=0; ujobs[u].srcBuff = g_nullBuffer; mtctx->jobs[u].prefixStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].prefixSize = dictSize; - mtctx->jobs[u].srcSize = chunkSize; assert(chunkSize > 0); /* avoid job.srcSize == 0 */ + mtctx->jobs[u].srcSize = jobSize; assert(jobSize > 0); /* avoid job.srcSize == 0 */ mtctx->jobs[u].consumed = 0; mtctx->jobs[u].cSize = 0; mtctx->jobs[u].cdict = (u==0) ? cdict : NULL; @@ -790,51 +790,51 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].cctxPool = mtctx->cctxPool; mtctx->jobs[u].bufPool = mtctx->bufPool; - mtctx->jobs[u].firstChunk = (u==0); - mtctx->jobs[u].lastChunk = (u==nbChunks-1); + mtctx->jobs[u].firstJob = (u==0); + mtctx->jobs[u].lastJob = (u==nbJobs-1); if (params.fParams.checksumFlag) { - XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize); + XXH64_update(&xxh64, srcStart + frameStartPos, jobSize); } - DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)chunkSize); + DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)jobSize); DEBUG_PRINTHEX(6, mtctx->jobs[u].prefixStart, 12); - POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); + POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[u]); - frameStartPos += chunkSize; + frameStartPos += jobSize; dstBufferPos += dstBufferCapacity; - remainingSrcSize -= chunkSize; + remainingSrcSize -= jobSize; } } /* collect result */ { size_t error = 0, dstPos = 0; - unsigned chunkID; - for (chunkID=0; chunkID jobs[chunkID].job_mutex); - while (mtctx->jobs[chunkID].consumed < mtctx->jobs[chunkID].srcSize) { - DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID); - ZSTD_pthread_cond_wait(&mtctx->jobs[chunkID].job_cond, &mtctx->jobs[chunkID].job_mutex); + unsigned jobID; + for (jobID=0; jobID jobs[jobID].job_mutex); + while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) { + DEBUGLOG(5, "waiting for jobCompleted signal from job %u", jobID); + ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); } - ZSTD_pthread_mutex_unlock(&mtctx->jobs[chunkID].job_mutex); - DEBUGLOG(5, "ready to write chunk %u ", chunkID); + ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); + DEBUGLOG(5, "ready to write job %u ", jobID); - mtctx->jobs[chunkID].prefixStart = NULL; - { size_t const cSize = mtctx->jobs[chunkID].cSize; + mtctx->jobs[jobID].prefixStart = NULL; + { size_t const cSize = mtctx->jobs[jobID].cSize; if (ZSTD_isError(cSize)) error = cSize; if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); - if (chunkID) { /* note : chunk 0 is written directly at dst, which is correct position */ + if (jobID) { /* note : job 0 is written directly at dst, which is correct position */ if (!error) - memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap when chunk compressed within dst */ - if (chunkID >= compressWithinDst) { /* chunk compressed into its own buffer, which must be released */ - DEBUGLOG(5, "releasing buffer %u>=%u", chunkID, compressWithinDst); - ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[chunkID].dstBuff); + memmove((char*)dst + dstPos, mtctx->jobs[jobID].dstBuff.start, cSize); /* may overlap when job compressed within dst */ + if (jobID >= compressWithinDst) { /* job compressed into its own buffer, which must be released */ + DEBUGLOG(5, "releasing buffer %u>=%u", jobID, compressWithinDst); + ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); } } - mtctx->jobs[chunkID].dstBuff = g_nullBuffer; - mtctx->jobs[chunkID].cSize = 0; + mtctx->jobs[jobID].dstBuff = g_nullBuffer; + mtctx->jobs[jobID].cSize = 0; dstPos += cSize ; } - } /* for (chunkID=0; chunkID lastChunk == 1); - assert(job->srcSize == 0); /* last chunk is empty -> will be simplified into a last empty block */ - assert(job->firstChunk == 0); /* cannot be first chunk, as it also needs to create frame header */ + assert(job->lastJob == 1); + assert(job->srcSize == 0); /* last job is empty -> will be simplified into a last empty block */ + assert(job->firstJob == 0); /* cannot be first job, as it also needs to create frame header */ /* A job created by streaming variant starts with a src buffer, but no dst buffer. * It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx. * When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx. @@ -1063,8 +1063,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->jobs[jobID].dstBuff = g_nullBuffer; mtctx->jobs[jobID].cctxPool = mtctx->cctxPool; mtctx->jobs[jobID].bufPool = mtctx->bufPool; - mtctx->jobs[jobID].firstChunk = (mtctx->nextJobID==0); - mtctx->jobs[jobID].lastChunk = endFrame; + mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0); + mtctx->jobs[jobID].lastJob = endFrame; mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag; mtctx->jobs[jobID].dstFlushed = 0; @@ -1093,12 +1093,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->inBuff.prefixSize = 0; mtctx->frameEnded = endFrame; if (mtctx->nextJobID == 0) { - /* single chunk exception : checksum is already calculated directly within worker thread */ + /* single job exception : checksum is already calculated directly within worker thread */ mtctx->params.fParams.checksumFlag = 0; } } if ( (srcSize == 0) - && (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) { + && (mtctx->nextJobID>0)/*single job must also write frame header*/ ) { DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame"); assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */ ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID); @@ -1110,10 +1110,10 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u, jobNb == %u (mod:%u))", mtctx->nextJobID, (U32)mtctx->jobs[jobID].srcSize, - mtctx->jobs[jobID].lastChunk, + mtctx->jobs[jobID].lastJob, mtctx->nextJobID, jobID); - if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[jobID])) { + if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) { mtctx->nextJobID++; mtctx->jobReady = 0; } else { @@ -1206,7 +1206,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs ongoing */ if (mtctx->jobReady) return 1; /* one job is ready to push, just not yet in the list */ if (mtctx->inBuff.filled > 0) return 1; /* input is not empty, and still needs to be converted into a job */ - mtctx->allJobsCompleted = mtctx->frameEnded; /* all chunks are entirely flushed => if this one is last one, frame is completed */ + mtctx->allJobsCompleted = mtctx->frameEnded; /* all jobs are entirely flushed => if this one is last one, frame is completed */ if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? instead of : are internal buffers fully flushed ? */ return 0; /* internal buffers fully flushed */ } diff --git a/lib/zstd.h b/lib/zstd.h index e573daf5b..4c77197d1 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -1373,7 +1373,7 @@ ZSTDLIB_API void ZSTD_DCtx_reset(ZSTD_DCtx* dctx); #define ZSTD_BLOCKSIZELOG_MAX 17 #define ZSTD_BLOCKSIZE_MAX (1< Date: Tue, 30 Jan 2018 15:03:39 -0800 Subject: [PATCH 29/30] fixed function declaration ZSTD_getBlockSize() --- lib/zstd.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/zstd.h b/lib/zstd.h index 4c77197d1..e573daf5b 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -1373,7 +1373,7 @@ ZSTDLIB_API void ZSTD_DCtx_reset(ZSTD_DCtx* dctx); #define ZSTD_BLOCKSIZELOG_MAX 17 #define ZSTD_BLOCKSIZE_MAX (1< Date: Tue, 30 Jan 2018 15:05:12 -0800 Subject: [PATCH 30/30] updated zstd api manual --- doc/zstd_manual.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/zstd_manual.html b/doc/zstd_manual.html index debd37f7a..b4aed91e5 100644 --- a/doc/zstd_manual.html +++ b/doc/zstd_manual.html @@ -1153,7 +1153,7 @@ size_t ZSTD_DCtx_refPrefix_advanced(ZSTD_DCtx* dctx, const void* prefix, size_t Use ZSTD_insertBlock() for such a case.
size_t ZSTD_getBlockSizeMax(const ZSTD_CCtx* cctx); +Raw zstd block functions
size_t ZSTD_getBlockSize (const ZSTD_CCtx* cctx); size_t ZSTD_compressBlock (ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize); size_t ZSTD_decompressBlock(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize); size_t ZSTD_insertBlock (ZSTD_DCtx* dctx, const void* blockStart, size_t blockSize);/**< insert uncompressed block into `dctx` history. Useful for multi-blocks decompression. */