diff --git a/lib/compress/huf_compress.c b/lib/compress/huf_compress.c index ea0007232..3e68f76fb 100644 --- a/lib/compress/huf_compress.c +++ b/lib/compress/huf_compress.c @@ -379,7 +379,7 @@ static U32 HUF_setMaxHeight(nodeElt* huffNode, U32 lastNonNull, U32 targetNbBits /* early exit : no elt > targetNbBits, so the tree is already valid. */ if (largestBits <= targetNbBits) return largestBits; - DEBUGLOG(5, "HUF_setMaxHeight (targetNbBits = %u)", targetNbBits); + DEBUGLOG(6, "HUF_setMaxHeight (targetNbBits = %u)", targetNbBits); /* there are several too large elements (at least >= 2) */ { int totalCost = 0; @@ -685,7 +685,7 @@ static int HUF_buildTree(nodeElt* huffNode, U32 maxSymbolValue) int lowS, lowN; int nodeNb = STARTNODE; int n, nodeRoot; - DEBUGLOG(5, "HUF_buildTree (alphabet size = %u)", maxSymbolValue + 1); + DEBUGLOG(6, "HUF_buildTree (alphabet size = %u)", maxSymbolValue + 1); /* init for parents */ nonNullRank = (int)maxSymbolValue; while(huffNode[nonNullRank].count == 0) nonNullRank--; @@ -764,7 +764,7 @@ HUF_buildCTable_wksp(HUF_CElt* CTable, const unsigned* count, U32 maxSymbolValue HUF_STATIC_ASSERT(HUF_CTABLE_WORKSPACE_SIZE == sizeof(HUF_buildCTable_wksp_tables)); - DEBUGLOG(5, "HUF_buildCTable_wksp (alphabet size = %u)", maxSymbolValue+1); + DEBUGLOG(6, "HUF_buildCTable_wksp (alphabet size = %u)", maxSymbolValue+1); /* safety checks */ if (wkspSize < sizeof(HUF_buildCTable_wksp_tables)) diff --git a/lib/compress/zstd_opt.c b/lib/compress/zstd_opt.c index 09de5a9d9..22693c82e 100644 --- a/lib/compress/zstd_opt.c +++ b/lib/compress/zstd_opt.c @@ -124,7 +124,7 @@ static U32 ZSTD_scaleStats(unsigned* table, U32 lastEltIndex, U32 logTarget) { U32 const prevsum = sum_u32(table, lastEltIndex+1); U32 const factor = prevsum >> logTarget; - DEBUGLOG(5, "ZSTD_scaleStats (nbElts=%u, target=%u)", (unsigned)lastEltIndex+1, (unsigned)logTarget); + DEBUGLOG(6, "ZSTD_scaleStats (nbElts=%u, target=%u)", (unsigned)lastEltIndex+1, (unsigned)logTarget); assert(logTarget < 30); if (factor <= 1) return prevsum; return ZSTD_downscaleStats(table, lastEltIndex, ZSTD_highbit32(factor), base_1guaranteed); diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 3b572e4b0..4af236fd0 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -62,6 +62,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void) do { \ if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) { \ unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \ + DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "acquiring mutex %s", #mutex); \ ZSTD_pthread_mutex_lock(mutex); \ { unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \ unsigned long long const elapsedTime = (afterTime-beforeTime); \ @@ -76,9 +77,22 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void) } \ } while (0) +#define COND_WAIT_DLEVEL 6 +#define ZSTD_PTHREAD_COND_WAIT(_cond, _mutex) \ + do { \ + if (DEBUGLEVEL >= COND_WAIT_DLEVEL) { \ + DEBUGLOG(COND_WAIT_DLEVEL, "waiting on condition %s", #_cond); \ + ZSTD_pthread_cond_wait(_cond,_mutex); \ + DEBUGLOG(COND_WAIT_DLEVEL, "condition %s triggered", #_cond); \ + } else { \ + ZSTD_pthread_cond_wait(_cond,_mutex); \ + } \ + } while (0) + #else # define ZSTD_PTHREAD_MUTEX_LOCK(m) ZSTD_pthread_mutex_lock(m) +# define ZSTD_PTHREAD_COND_WAIT(c,m) ZSTD_pthread_cond_wait(c,m) # define DEBUG_PRINTHEX(l,p,n) do { } while (0) #endif @@ -147,7 +161,7 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) size_t const arraySize = bufPool->totalBuffers * sizeof(Buffer); unsigned u; size_t totalBufferSize = 0; - ZSTD_pthread_mutex_lock(&bufPool->poolMutex); + ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex); for (u=0; utotalBuffers; u++) totalBufferSize += bufPool->buffers[u].capacity; ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); @@ -161,7 +175,7 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) * as they become freely interchangeable, reducing malloc/free usages and memory fragmentation */ static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const bSize) { - ZSTD_pthread_mutex_lock(&bufPool->poolMutex); + ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex); DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32)bSize); bufPool->bufferSize = bSize; ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); @@ -193,7 +207,7 @@ static Buffer ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) { size_t const bSize = bufPool->bufferSize; DEBUGLOG(5, "ZSTDMT_getBuffer: bSize = %u", (U32)bufPool->bufferSize); - ZSTD_pthread_mutex_lock(&bufPool->poolMutex); + ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex); if (bufPool->nbBuffers) { /* try to use an existing buffer */ Buffer const buf = bufPool->buffers[--(bufPool->nbBuffers)]; size_t const availBufferSize = buf.capacity; @@ -256,7 +270,7 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, Buffer buf) { DEBUGLOG(5, "ZSTDMT_releaseBuffer"); if (buf.start == NULL) return; /* compatible with release on NULL */ - ZSTD_pthread_mutex_lock(&bufPool->poolMutex); + ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex); if (bufPool->nbBuffers < bufPool->totalBuffers) { bufPool->buffers[bufPool->nbBuffers++] = buf; /* stored for later use */ DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u", @@ -417,7 +431,7 @@ static ZSTDMT_CCtxPool* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool* srcPool, /* only works during initialization phase, not during compression */ static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) { - ZSTD_pthread_mutex_lock(&cctxPool->poolMutex); + ZSTD_PTHREAD_MUTEX_LOCK(&cctxPool->poolMutex); { unsigned const nbWorkers = cctxPool->totalCCtx; size_t const poolSize = sizeof(*cctxPool); size_t const arraySize = cctxPool->totalCCtx * sizeof(ZSTD_CCtx*); @@ -435,7 +449,7 @@ static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool) { DEBUGLOG(5, "ZSTDMT_getCCtx"); - ZSTD_pthread_mutex_lock(&cctxPool->poolMutex); + ZSTD_PTHREAD_MUTEX_LOCK(&cctxPool->poolMutex); if (cctxPool->availCCtx) { cctxPool->availCCtx--; { ZSTD_CCtx* const cctx = cctxPool->cctxs[cctxPool->availCCtx]; @@ -450,7 +464,7 @@ static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool) static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) { if (cctx==NULL) return; /* compatibility with release on NULL */ - ZSTD_pthread_mutex_lock(&pool->poolMutex); + ZSTD_PTHREAD_MUTEX_LOCK(&pool->poolMutex); if (pool->availCCtx < pool->totalCCtx) pool->cctxs[pool->availCCtx++] = cctx; else { @@ -586,7 +600,7 @@ ZSTDMT_serialState_genSequences(SerialState* serialState, ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); while (serialState->nextJobID < jobID) { DEBUGLOG(5, "wait for serialState->cond"); - ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex); + ZSTD_PTHREAD_COND_WAIT(&serialState->cond, &serialState->mutex); } /* A future job may error and skip our job */ if (serialState->nextJobID == jobID) { @@ -663,7 +677,8 @@ typedef struct { size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */ ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */ - ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */ + ZSTD_pthread_mutex_t* flush_mutex; /* Thread-safe - used by mtctx and worker */ + ZSTD_pthread_cond_t* flush_cond; /* Thread-safe - used by mtctx and worker */ ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */ @@ -779,8 +794,14 @@ static void ZSTDMT_compressionJob(void* jobDescription) job->consumed = chunkSize * chunkNb; DEBUGLOG(5, "ZSTDMT_compressionJob: compress new block : cSize==%u bytes (total: %u)", (U32)cSize, (U32)job->cSize); - ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */ - ZSTD_pthread_mutex_unlock(&job->job_mutex); + if (job->flush_mutex != NULL) { + ZSTD_pthread_mutex_unlock(&job->job_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(job->flush_mutex); + ZSTD_pthread_cond_signal(job->flush_cond); /* warns some more data is ready to be flushed */ + ZSTD_pthread_mutex_unlock(job->flush_mutex); + } else { + ZSTD_pthread_mutex_unlock(&job->job_mutex); + } } /* last block */ assert(chunkSize > 0); @@ -815,7 +836,15 @@ _endJob: if (ZSTD_isError(job->cSize)) assert(lastCBlockSize == 0); job->cSize += lastCBlockSize; job->consumed = job->src.size; /* when job->consumed == job->src.size , compression job is presumed completed */ - ZSTD_pthread_cond_signal(&job->job_cond); + if (job->flush_mutex != NULL) { + ZSTD_pthread_mutex_unlock(&job->job_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(job->flush_mutex); + ZSTD_pthread_cond_signal(job->flush_cond); /* warns some more data is ready to be flushed */ + ZSTD_pthread_mutex_unlock(job->flush_mutex); + ZSTD_pthread_mutex_lock(&job->job_mutex); + } + job->flush_mutex = NULL; + job->flush_cond = NULL; ZSTD_pthread_mutex_unlock(&job->job_mutex); } @@ -870,13 +899,15 @@ struct ZSTDMT_CCtx_s { ZSTDMT_CCtxPool* cctxPool; ZSTDMT_seqPool* seqPool; ZSTD_CCtx_params params; - size_t targetSectionSize; + size_t targetJobSize; size_t targetPrefixSize; int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */ InBuff_t inBuff; RoundBuff_t roundBuff; SerialState serial; RSyncState_t rsync; + ZSTD_pthread_mutex_t flushMutex; + ZSTD_pthread_cond_t flushCond; unsigned jobIDMask; unsigned doneJobID; unsigned nextJobID; @@ -897,7 +928,6 @@ static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZS if (jobTable == NULL) return; for (jobNb=0; jobNbcctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem); mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem); initError = ZSTDMT_serialState_init(&mtctx->serial); + initError |= ZSTD_pthread_mutex_init(&mtctx->flushMutex, NULL); + initError |= ZSTD_pthread_cond_init(&mtctx->flushCond, NULL); mtctx->roundBuff = kNullRoundBuff; if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | !mtctx->seqPool | initError) { ZSTDMT_freeCCtx(mtctx); @@ -1014,7 +1045,6 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) { /* Copy the mutex/cond out */ ZSTD_pthread_mutex_t const mutex = mtctx->jobs[jobID].job_mutex; - ZSTD_pthread_cond_t const cond = mtctx->jobs[jobID].job_cond; DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); @@ -1022,7 +1052,6 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) /* Clear the job description, but keep the mutex/cond */ ZSTD_memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID])); mtctx->jobs[jobID].job_mutex = mutex; - mtctx->jobs[jobID].job_cond = cond; } mtctx->inBuff.buffer = g_nullBuffer; mtctx->inBuff.filled = 0; @@ -1036,8 +1065,17 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx) unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask; ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) { - DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ - ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); + DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); + /* we want to block and wait for data to flush */ + if (mtctx->jobs[jobID].flush_mutex == NULL) { + mtctx->jobs[jobID].flush_mutex = &mtctx->flushMutex; + mtctx->jobs[jobID].flush_cond = &mtctx->flushCond; + } + ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); + DEBUGLOG(2, "ZSTDMT_waitForAllJobsCompleted: let's wait for job progress"); + ZSTD_PTHREAD_COND_WAIT(&mtctx->flushCond, &mtctx->flushMutex); + DEBUGLOG(2, "ZSTDMT_waitForAllJobsCompleted: waiting completed"); + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); } ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); mtctx->doneJobID++; @@ -1058,6 +1096,8 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) ZSTD_freeCDict(mtctx->cdictLocal); if (mtctx->roundBuff.buffer) ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem); + ZSTD_pthread_mutex_destroy(&mtctx->flushMutex); + ZSTD_pthread_cond_destroy(&mtctx->flushCond); ZSTD_customFree(mtctx, mtctx->cMem); return 0; } @@ -1129,7 +1169,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { unsigned const wJobID = jobNb & mtctx->jobIDMask; ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID]; - ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&jobPtr->job_mutex); { size_t const cResult = jobPtr->cSize; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed; @@ -1157,7 +1197,7 @@ size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx) /* look into oldest non-fully-flushed job */ { unsigned const wJobID = jobID & mtctx->jobIDMask; ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID]; - ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&jobPtr->job_mutex); { size_t const cResult = jobPtr->cSize; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed; @@ -1279,15 +1319,15 @@ size_t ZSTDMT_initCStream_internal( mtctx->frameContentSize = pledgedSrcSize; mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(¶ms); DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10)); - mtctx->targetSectionSize = params.jobSize; - if (mtctx->targetSectionSize == 0) { - mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(¶ms); + mtctx->targetJobSize = params.jobSize; + if (mtctx->targetJobSize == 0) { + mtctx->targetJobSize = 1ULL << ZSTDMT_computeTargetJobLog(¶ms); } - assert(mtctx->targetSectionSize <= (size_t)ZSTDMT_JOBSIZE_MAX); + assert(mtctx->targetJobSize <= (size_t)ZSTDMT_JOBSIZE_MAX); if (params.rsyncable) { /* Aim for the targetsectionSize as the average job size. */ - U32 const jobSizeKB = (U32)(mtctx->targetSectionSize >> 10); + U32 const jobSizeKB = (U32)(mtctx->targetJobSize >> 10); U32 const rsyncBits = (assert(jobSizeKB >= 1), ZSTD_highbit32(jobSizeKB) + 10); /* We refuse to create jobs < RSYNC_MIN_BLOCK_SIZE bytes, so make sure our * expected job size is at least 4x larger. */ @@ -1297,24 +1337,24 @@ size_t ZSTDMT_initCStream_internal( mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1; mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH); } - if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ - DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), (U32)params.jobSize); - DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); - ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize)); + if (mtctx->targetJobSize < mtctx->targetPrefixSize) mtctx->targetJobSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ + DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetJobSize>>10), (U32)params.jobSize); + DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetJobSize>>10)); + ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetJobSize)); { /* If ldm is enabled we need windowSize space. */ size_t const windowSize = mtctx->params.ldmParams.enableLdm == ZSTD_ps_enable ? (1U << mtctx->params.cParams.windowLog) : 0; /* Two buffers of slack, plus extra space for the overlap * This is the minimum slack that LDM works with. One extra because - * flush might waste up to targetSectionSize-1 bytes. Another extra + * flush might waste up to targetJobSize-1 bytes. Another extra * for the overlap (if > 0), then one to fill which doesn't overlap * with the LDM window. */ size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0); - size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers; + size_t const slackSize = mtctx->targetJobSize * nbSlackBuffers; /* Compute the total size, and always have enough slack */ size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1); - size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers; + size_t const sectionsSize = mtctx->targetJobSize * nbWorkers; size_t const capacity = MAX(windowSize, sectionsSize) + slackSize; if (mtctx->roundBuff.capacity < capacity) { if (mtctx->roundBuff.buffer) @@ -1359,7 +1399,7 @@ size_t ZSTDMT_initCStream_internal( mtctx->cdict = cdict; } - if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize, + if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetJobSize, dict, dictSize, dictContentType)) return ERROR(memory_allocation); @@ -1436,6 +1476,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->jobs[jobID].lastJob = endFrame; mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0); mtctx->jobs[jobID].dstFlushed = 0; + mtctx->jobs[jobID].flush_mutex = NULL; + mtctx->jobs[jobID].flush_cond = NULL; /* Update the round buffer pos and clear the input buffer to be reset */ mtctx->roundBuff.pos += srcSize; @@ -1455,12 +1497,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS } } if ( (srcSize == 0) - && (mtctx->nextJobID>0)/*single job must also write frame header*/ ) { + && (mtctx->nextJobID>0) /*single job must also write frame header*/ ) { DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame"); assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */ ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID); mtctx->nextJobID++; - return 0; + return 1; } } @@ -1471,13 +1513,15 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->nextJobID, jobID); - if (ZSTDMT_anythingToFlush(mtctx)) { + if (1 || ZSTDMT_anythingToFlush(mtctx)) { if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) { mtctx->nextJobID++; mtctx->jobReady = 0; + return 1; } else { - DEBUGLOG(2, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID); + DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID); mtctx->jobReady = 1; + return 0; } } else { /* block here, wait for next available job */ @@ -1485,8 +1529,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->nextJobID++; mtctx->jobReady = 0; } - - return 0; + return 1; } @@ -1515,7 +1558,15 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u } DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)", mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); - ZSTD_pthread_cond_wait(&mtctx->jobs[wJobID].job_cond, &mtctx->jobs[wJobID].job_mutex); /* block when nothing to flush but some to come */ + if (mtctx->jobs[wJobID].flush_mutex == NULL) { + mtctx->jobs[wJobID].flush_mutex = &mtctx->flushMutex; + mtctx->jobs[wJobID].flush_cond = &mtctx->flushCond; + } + DEBUGLOG(6, "waiting to flush something (%zu left)", mtctx->jobs[wJobID].src.size - mtctx->jobs[wJobID].consumed); + ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); + ZSTD_PTHREAD_COND_WAIT(&mtctx->flushCond, &mtctx->flushMutex); /* block waiting for something to flush */ + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex); + DEBUGLOG(6, "condition triggered: let's flush something (%zu bytes)", mtctx->jobs[wJobID].cSize - mtctx->jobs[wJobID].dstFlushed); } } /* try to flush something */ @@ -1565,6 +1616,8 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u DEBUGLOG(5, "dstBuffer released"); mtctx->jobs[wJobID].dstBuff = g_nullBuffer; mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ + mtctx->jobs[wJobID].flush_mutex = NULL; + mtctx->jobs[wJobID].flush_cond = NULL; mtctx->consumed += srcSize; mtctx->produced += cSize; mtctx->doneJobID++; @@ -1595,7 +1648,7 @@ static Range ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx) /* no need to check during first round */ size_t roundBuffCapacity = mtctx->roundBuff.capacity; - size_t nbJobs1stRoundMin = roundBuffCapacity / mtctx->targetSectionSize; + size_t nbJobs1stRoundMin = roundBuffCapacity / mtctx->targetJobSize; if (lastJobID < nbJobs1stRoundMin) return kNullRange; for (jobID = firstJobID; jobID < lastJobID; ++jobID) { @@ -1676,7 +1729,7 @@ static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, Buffer buffer) ZSTD_PTHREAD_MUTEX_LOCK(mutex); while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) { DEBUGLOG(5, "Waiting for LDM to finish..."); - ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex); + ZSTD_PTHREAD_COND_WAIT(&mtctx->serial.ldmWindowCond, mutex); } DEBUGLOG(6, "Done waiting for LDM to finish"); ZSTD_pthread_mutex_unlock(mutex); @@ -1692,7 +1745,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) { Range const inUse = ZSTDMT_getInputDataInUse(mtctx); size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos; - size_t const spaceNeeded = mtctx->targetSectionSize; + size_t const spaceNeeded = mtctx->targetJobSize; Buffer buffer; DEBUGLOG(5, "ZSTDMT_tryGetInputRange"); @@ -1765,7 +1818,7 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) BYTE const* prev; size_t pos; - syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetSectionSize - mtctx->inBuff.filled); + syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetJobSize - mtctx->inBuff.filled); syncPoint.flush = 0; if (!mtctx->params.rsyncable) /* Rsync is disabled. */ @@ -1781,7 +1834,7 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) * window. However, since it depends only in the internal buffers, if the * state is already synchronized, we will remain synchronized. * Additionally, the probability that we miss a synchronization point is - * low: RSYNC_LENGTH / targetSectionSize. + * low: RSYNC_LENGTH / targetJobSize. */ return syncPoint; /* Initialize the loop variables. */ @@ -1825,7 +1878,7 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) * through the input. If we hit a synchronization point, then cut the * job off, and tell the compressor to flush the job. Otherwise, load * all the bytes and continue as normal. - * If we go too long without a synchronization point (targetSectionSize) + * If we go too long without a synchronization point (targetJobSize) * then a block will be emitted anyways, but this is okay, since if we * are already synchronized we will remain synchronized. */ @@ -1852,8 +1905,8 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx) { - size_t hintInSize = mtctx->targetSectionSize - mtctx->inBuff.filled; - if (hintInSize==0) hintInSize = mtctx->targetSectionSize; + size_t hintInSize = mtctx->targetJobSize - mtctx->inBuff.filled; + if (hintInSize==0) hintInSize = mtctx->targetJobSize; return hintInSize; } @@ -1866,7 +1919,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) { - unsigned forwardInputProgress = 0; + unsigned forwardProgress = 0; DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)", (U32)endOp, (U32)(input->size - input->pos)); assert(output->pos <= output->size); @@ -1896,13 +1949,13 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, if (syncPoint.flush && endOp == ZSTD_e_continue) { endOp = ZSTD_e_flush; } - assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize); + assert(mtctx->inBuff.buffer.capacity >= mtctx->targetJobSize); DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", - (U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize); + (U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetJobSize); ZSTD_memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad); input->pos += syncPoint.toLoad; mtctx->inBuff.filled += syncPoint.toLoad; - forwardInputProgress = syncPoint.toLoad>0; + forwardProgress = syncPoint.toLoad>0; } } if ((input->pos < input->size) && (endOp == ZSTD_e_end)) { @@ -1912,21 +1965,23 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, * - We filled the input buffer: flush this job but don't end the frame. * - We hit a synchronization point: flush this job but don't end the frame. */ - assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetSectionSize || mtctx->params.rsyncable); + assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetJobSize || mtctx->params.rsyncable); endOp = ZSTD_e_flush; } if ( (mtctx->jobReady) - || (mtctx->inBuff.filled >= mtctx->targetSectionSize) /* filled enough : let's compress */ + || (mtctx->inBuff.filled >= mtctx->targetJobSize) /* filled enough : let's compress */ || ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */ || ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */ size_t const jobSize = mtctx->inBuff.filled; - assert(mtctx->inBuff.filled <= mtctx->targetSectionSize); - FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) , ""); + size_t const jobPosted = ZSTDMT_createCompressionJob(mtctx, jobSize, endOp); + assert(mtctx->inBuff.filled <= mtctx->targetJobSize); + FORWARD_IF_ERROR(jobPosted , ""); + if (jobPosted) forwardProgress = 1; } /* check for potential compressed data ready to be flushed */ - { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */ + { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardProgress, endOp); /* block if there was no forward input progress */ if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */ DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush); return remainingToFlush; diff --git a/tests/cli-tests/compression/adapt.sh b/tests/cli-tests/compression/adapt.sh index 30b9afaa0..2981ada38 100755 --- a/tests/cli-tests/compression/adapt.sh +++ b/tests/cli-tests/compression/adapt.sh @@ -8,7 +8,7 @@ zstd -f file --adapt -c | zstd -t datagen -g100M > file100M # Pick parameters to force fast adaptation, even on slow systems -zstd --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression" +zstd --adapt -vvvv -19 --zstd=wlog=18 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression" # Adaption still happens with --no-progress -zstd --no-progress --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression" +zstd --no-progress --adapt -vvvv -19 --zstd=wlog=18 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"