1
0
mirror of https://github.com/facebook/zstd.git synced 2025-08-05 19:15:58 +03:00

switch to a single flush mutex+cond

which is transferred to the current oldest unfinished job.
This commit is contained in:
Yann Collet
2025-02-01 18:37:23 -08:00
parent 5bda54abfb
commit 8803d1497a
4 changed files with 119 additions and 64 deletions

View File

@@ -379,7 +379,7 @@ static U32 HUF_setMaxHeight(nodeElt* huffNode, U32 lastNonNull, U32 targetNbBits
/* early exit : no elt > targetNbBits, so the tree is already valid. */
if (largestBits <= targetNbBits) return largestBits;
DEBUGLOG(5, "HUF_setMaxHeight (targetNbBits = %u)", targetNbBits);
DEBUGLOG(6, "HUF_setMaxHeight (targetNbBits = %u)", targetNbBits);
/* there are several too large elements (at least >= 2) */
{ int totalCost = 0;
@@ -685,7 +685,7 @@ static int HUF_buildTree(nodeElt* huffNode, U32 maxSymbolValue)
int lowS, lowN;
int nodeNb = STARTNODE;
int n, nodeRoot;
DEBUGLOG(5, "HUF_buildTree (alphabet size = %u)", maxSymbolValue + 1);
DEBUGLOG(6, "HUF_buildTree (alphabet size = %u)", maxSymbolValue + 1);
/* init for parents */
nonNullRank = (int)maxSymbolValue;
while(huffNode[nonNullRank].count == 0) nonNullRank--;
@@ -764,7 +764,7 @@ HUF_buildCTable_wksp(HUF_CElt* CTable, const unsigned* count, U32 maxSymbolValue
HUF_STATIC_ASSERT(HUF_CTABLE_WORKSPACE_SIZE == sizeof(HUF_buildCTable_wksp_tables));
DEBUGLOG(5, "HUF_buildCTable_wksp (alphabet size = %u)", maxSymbolValue+1);
DEBUGLOG(6, "HUF_buildCTable_wksp (alphabet size = %u)", maxSymbolValue+1);
/* safety checks */
if (wkspSize < sizeof(HUF_buildCTable_wksp_tables))

View File

@@ -124,7 +124,7 @@ static U32 ZSTD_scaleStats(unsigned* table, U32 lastEltIndex, U32 logTarget)
{
U32 const prevsum = sum_u32(table, lastEltIndex+1);
U32 const factor = prevsum >> logTarget;
DEBUGLOG(5, "ZSTD_scaleStats (nbElts=%u, target=%u)", (unsigned)lastEltIndex+1, (unsigned)logTarget);
DEBUGLOG(6, "ZSTD_scaleStats (nbElts=%u, target=%u)", (unsigned)lastEltIndex+1, (unsigned)logTarget);
assert(logTarget < 30);
if (factor <= 1) return prevsum;
return ZSTD_downscaleStats(table, lastEltIndex, ZSTD_highbit32(factor), base_1guaranteed);

View File

@@ -62,6 +62,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void)
do { \
if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) { \
unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "acquiring mutex %s", #mutex); \
ZSTD_pthread_mutex_lock(mutex); \
{ unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
unsigned long long const elapsedTime = (afterTime-beforeTime); \
@@ -76,9 +77,22 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void)
} \
} while (0)
#define COND_WAIT_DLEVEL 6
#define ZSTD_PTHREAD_COND_WAIT(_cond, _mutex) \
do { \
if (DEBUGLEVEL >= COND_WAIT_DLEVEL) { \
DEBUGLOG(COND_WAIT_DLEVEL, "waiting on condition %s", #_cond); \
ZSTD_pthread_cond_wait(_cond,_mutex); \
DEBUGLOG(COND_WAIT_DLEVEL, "condition %s triggered", #_cond); \
} else { \
ZSTD_pthread_cond_wait(_cond,_mutex); \
} \
} while (0)
#else
# define ZSTD_PTHREAD_MUTEX_LOCK(m) ZSTD_pthread_mutex_lock(m)
# define ZSTD_PTHREAD_COND_WAIT(c,m) ZSTD_pthread_cond_wait(c,m)
# define DEBUG_PRINTHEX(l,p,n) do { } while (0)
#endif
@@ -147,7 +161,7 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
size_t const arraySize = bufPool->totalBuffers * sizeof(Buffer);
unsigned u;
size_t totalBufferSize = 0;
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex);
for (u=0; u<bufPool->totalBuffers; u++)
totalBufferSize += bufPool->buffers[u].capacity;
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
@@ -161,7 +175,7 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
* as they become freely interchangeable, reducing malloc/free usages and memory fragmentation */
static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const bSize)
{
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex);
DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32)bSize);
bufPool->bufferSize = bSize;
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
@@ -193,7 +207,7 @@ static Buffer ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
{
size_t const bSize = bufPool->bufferSize;
DEBUGLOG(5, "ZSTDMT_getBuffer: bSize = %u", (U32)bufPool->bufferSize);
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex);
if (bufPool->nbBuffers) { /* try to use an existing buffer */
Buffer const buf = bufPool->buffers[--(bufPool->nbBuffers)];
size_t const availBufferSize = buf.capacity;
@@ -256,7 +270,7 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, Buffer buf)
{
DEBUGLOG(5, "ZSTDMT_releaseBuffer");
if (buf.start == NULL) return; /* compatible with release on NULL */
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex);
if (bufPool->nbBuffers < bufPool->totalBuffers) {
bufPool->buffers[bufPool->nbBuffers++] = buf; /* stored for later use */
DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u",
@@ -417,7 +431,7 @@ static ZSTDMT_CCtxPool* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool* srcPool,
/* only works during initialization phase, not during compression */
static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
{
ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);
ZSTD_PTHREAD_MUTEX_LOCK(&cctxPool->poolMutex);
{ unsigned const nbWorkers = cctxPool->totalCCtx;
size_t const poolSize = sizeof(*cctxPool);
size_t const arraySize = cctxPool->totalCCtx * sizeof(ZSTD_CCtx*);
@@ -435,7 +449,7 @@ static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)
{
DEBUGLOG(5, "ZSTDMT_getCCtx");
ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);
ZSTD_PTHREAD_MUTEX_LOCK(&cctxPool->poolMutex);
if (cctxPool->availCCtx) {
cctxPool->availCCtx--;
{ ZSTD_CCtx* const cctx = cctxPool->cctxs[cctxPool->availCCtx];
@@ -450,7 +464,7 @@ static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)
static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
{
if (cctx==NULL) return; /* compatibility with release on NULL */
ZSTD_pthread_mutex_lock(&pool->poolMutex);
ZSTD_PTHREAD_MUTEX_LOCK(&pool->poolMutex);
if (pool->availCCtx < pool->totalCCtx)
pool->cctxs[pool->availCCtx++] = cctx;
else {
@@ -586,7 +600,7 @@ ZSTDMT_serialState_genSequences(SerialState* serialState,
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
while (serialState->nextJobID < jobID) {
DEBUGLOG(5, "wait for serialState->cond");
ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex);
ZSTD_PTHREAD_COND_WAIT(&serialState->cond, &serialState->mutex);
}
/* A future job may error and skip our job */
if (serialState->nextJobID == jobID) {
@@ -663,7 +677,8 @@ typedef struct {
size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */
ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */
ZSTD_pthread_mutex_t* flush_mutex; /* Thread-safe - used by mtctx and worker */
ZSTD_pthread_cond_t* flush_cond; /* Thread-safe - used by mtctx and worker */
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */
@@ -779,8 +794,14 @@ static void ZSTDMT_compressionJob(void* jobDescription)
job->consumed = chunkSize * chunkNb;
DEBUGLOG(5, "ZSTDMT_compressionJob: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */
ZSTD_pthread_mutex_unlock(&job->job_mutex);
if (job->flush_mutex != NULL) {
ZSTD_pthread_mutex_unlock(&job->job_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(job->flush_mutex);
ZSTD_pthread_cond_signal(job->flush_cond); /* warns some more data is ready to be flushed */
ZSTD_pthread_mutex_unlock(job->flush_mutex);
} else {
ZSTD_pthread_mutex_unlock(&job->job_mutex);
}
}
/* last block */
assert(chunkSize > 0);
@@ -815,7 +836,15 @@ _endJob:
if (ZSTD_isError(job->cSize)) assert(lastCBlockSize == 0);
job->cSize += lastCBlockSize;
job->consumed = job->src.size; /* when job->consumed == job->src.size , compression job is presumed completed */
ZSTD_pthread_cond_signal(&job->job_cond);
if (job->flush_mutex != NULL) {
ZSTD_pthread_mutex_unlock(&job->job_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(job->flush_mutex);
ZSTD_pthread_cond_signal(job->flush_cond); /* warns some more data is ready to be flushed */
ZSTD_pthread_mutex_unlock(job->flush_mutex);
ZSTD_pthread_mutex_lock(&job->job_mutex);
}
job->flush_mutex = NULL;
job->flush_cond = NULL;
ZSTD_pthread_mutex_unlock(&job->job_mutex);
}
@@ -870,13 +899,15 @@ struct ZSTDMT_CCtx_s {
ZSTDMT_CCtxPool* cctxPool;
ZSTDMT_seqPool* seqPool;
ZSTD_CCtx_params params;
size_t targetSectionSize;
size_t targetJobSize;
size_t targetPrefixSize;
int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */
InBuff_t inBuff;
RoundBuff_t roundBuff;
SerialState serial;
RSyncState_t rsync;
ZSTD_pthread_mutex_t flushMutex;
ZSTD_pthread_cond_t flushCond;
unsigned jobIDMask;
unsigned doneJobID;
unsigned nextJobID;
@@ -897,7 +928,6 @@ static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZS
if (jobTable == NULL) return;
for (jobNb=0; jobNb<nbJobs; jobNb++) {
ZSTD_pthread_mutex_destroy(&jobTable[jobNb].job_mutex);
ZSTD_pthread_cond_destroy(&jobTable[jobNb].job_cond);
}
ZSTD_customFree(jobTable, cMem);
}
@@ -918,7 +948,6 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom
*nbJobsPtr = nbJobs;
for (jobNb=0; jobNb<nbJobs; jobNb++) {
initError |= ZSTD_pthread_mutex_init(&jobTable[jobNb].job_mutex, NULL);
initError |= ZSTD_pthread_cond_init(&jobTable[jobNb].job_cond, NULL);
}
if (initError != 0) {
ZSTDMT_freeJobsTable(jobTable, nbJobs, cMem);
@@ -983,6 +1012,8 @@ ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZST
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);
initError = ZSTDMT_serialState_init(&mtctx->serial);
initError |= ZSTD_pthread_mutex_init(&mtctx->flushMutex, NULL);
initError |= ZSTD_pthread_cond_init(&mtctx->flushCond, NULL);
mtctx->roundBuff = kNullRoundBuff;
if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | !mtctx->seqPool | initError) {
ZSTDMT_freeCCtx(mtctx);
@@ -1014,7 +1045,6 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) {
/* Copy the mutex/cond out */
ZSTD_pthread_mutex_t const mutex = mtctx->jobs[jobID].job_mutex;
ZSTD_pthread_cond_t const cond = mtctx->jobs[jobID].job_cond;
DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
@@ -1022,7 +1052,6 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
/* Clear the job description, but keep the mutex/cond */
ZSTD_memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID]));
mtctx->jobs[jobID].job_mutex = mutex;
mtctx->jobs[jobID].job_cond = cond;
}
mtctx->inBuff.buffer = g_nullBuffer;
mtctx->inBuff.filled = 0;
@@ -1036,8 +1065,17 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) {
DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */
ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID);
/* we want to block and wait for data to flush */
if (mtctx->jobs[jobID].flush_mutex == NULL) {
mtctx->jobs[jobID].flush_mutex = &mtctx->flushMutex;
mtctx->jobs[jobID].flush_cond = &mtctx->flushCond;
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
DEBUGLOG(2, "ZSTDMT_waitForAllJobsCompleted: let's wait for job progress");
ZSTD_PTHREAD_COND_WAIT(&mtctx->flushCond, &mtctx->flushMutex);
DEBUGLOG(2, "ZSTDMT_waitForAllJobsCompleted: waiting completed");
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
mtctx->doneJobID++;
@@ -1058,6 +1096,8 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
ZSTD_freeCDict(mtctx->cdictLocal);
if (mtctx->roundBuff.buffer)
ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem);
ZSTD_pthread_mutex_destroy(&mtctx->flushMutex);
ZSTD_pthread_cond_destroy(&mtctx->flushCond);
ZSTD_customFree(mtctx, mtctx->cMem);
return 0;
}
@@ -1129,7 +1169,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
unsigned const wJobID = jobNb & mtctx->jobIDMask;
ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];
ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(&jobPtr->job_mutex);
{ size_t const cResult = jobPtr->cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
@@ -1157,7 +1197,7 @@ size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx)
/* look into oldest non-fully-flushed job */
{ unsigned const wJobID = jobID & mtctx->jobIDMask;
ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID];
ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(&jobPtr->job_mutex);
{ size_t const cResult = jobPtr->cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
@@ -1279,15 +1319,15 @@ size_t ZSTDMT_initCStream_internal(
mtctx->frameContentSize = pledgedSrcSize;
mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(&params);
DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10));
mtctx->targetSectionSize = params.jobSize;
if (mtctx->targetSectionSize == 0) {
mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(&params);
mtctx->targetJobSize = params.jobSize;
if (mtctx->targetJobSize == 0) {
mtctx->targetJobSize = 1ULL << ZSTDMT_computeTargetJobLog(&params);
}
assert(mtctx->targetSectionSize <= (size_t)ZSTDMT_JOBSIZE_MAX);
assert(mtctx->targetJobSize <= (size_t)ZSTDMT_JOBSIZE_MAX);
if (params.rsyncable) {
/* Aim for the targetsectionSize as the average job size. */
U32 const jobSizeKB = (U32)(mtctx->targetSectionSize >> 10);
U32 const jobSizeKB = (U32)(mtctx->targetJobSize >> 10);
U32 const rsyncBits = (assert(jobSizeKB >= 1), ZSTD_highbit32(jobSizeKB) + 10);
/* We refuse to create jobs < RSYNC_MIN_BLOCK_SIZE bytes, so make sure our
* expected job size is at least 4x larger. */
@@ -1297,24 +1337,24 @@ size_t ZSTDMT_initCStream_internal(
mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1;
mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH);
}
if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */
DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), (U32)params.jobSize);
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize));
if (mtctx->targetJobSize < mtctx->targetPrefixSize) mtctx->targetJobSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */
DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetJobSize>>10), (U32)params.jobSize);
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetJobSize>>10));
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetJobSize));
{
/* If ldm is enabled we need windowSize space. */
size_t const windowSize = mtctx->params.ldmParams.enableLdm == ZSTD_ps_enable ? (1U << mtctx->params.cParams.windowLog) : 0;
/* Two buffers of slack, plus extra space for the overlap
* This is the minimum slack that LDM works with. One extra because
* flush might waste up to targetSectionSize-1 bytes. Another extra
* flush might waste up to targetJobSize-1 bytes. Another extra
* for the overlap (if > 0), then one to fill which doesn't overlap
* with the LDM window.
*/
size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0);
size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers;
size_t const slackSize = mtctx->targetJobSize * nbSlackBuffers;
/* Compute the total size, and always have enough slack */
size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1);
size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers;
size_t const sectionsSize = mtctx->targetJobSize * nbWorkers;
size_t const capacity = MAX(windowSize, sectionsSize) + slackSize;
if (mtctx->roundBuff.capacity < capacity) {
if (mtctx->roundBuff.buffer)
@@ -1359,7 +1399,7 @@ size_t ZSTDMT_initCStream_internal(
mtctx->cdict = cdict;
}
if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize,
if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetJobSize,
dict, dictSize, dictContentType))
return ERROR(memory_allocation);
@@ -1436,6 +1476,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
mtctx->jobs[jobID].lastJob = endFrame;
mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0);
mtctx->jobs[jobID].dstFlushed = 0;
mtctx->jobs[jobID].flush_mutex = NULL;
mtctx->jobs[jobID].flush_cond = NULL;
/* Update the round buffer pos and clear the input buffer to be reset */
mtctx->roundBuff.pos += srcSize;
@@ -1455,12 +1497,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
} }
if ( (srcSize == 0)
&& (mtctx->nextJobID>0)/*single job must also write frame header*/ ) {
&& (mtctx->nextJobID>0) /*single job must also write frame header*/ ) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame");
assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */
ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID);
mtctx->nextJobID++;
return 0;
return 1;
}
}
@@ -1471,13 +1513,15 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
mtctx->nextJobID,
jobID);
if (ZSTDMT_anythingToFlush(mtctx)) {
if (1 || ZSTDMT_anythingToFlush(mtctx)) {
if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
mtctx->nextJobID++;
mtctx->jobReady = 0;
return 1;
} else {
DEBUGLOG(2, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
mtctx->jobReady = 1;
return 0;
}
} else {
/* block here, wait for next available job */
@@ -1485,8 +1529,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
mtctx->nextJobID++;
mtctx->jobReady = 0;
}
return 0;
return 1;
}
@@ -1515,7 +1558,15 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
}
DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
ZSTD_pthread_cond_wait(&mtctx->jobs[wJobID].job_cond, &mtctx->jobs[wJobID].job_mutex); /* block when nothing to flush but some to come */
if (mtctx->jobs[wJobID].flush_mutex == NULL) {
mtctx->jobs[wJobID].flush_mutex = &mtctx->flushMutex;
mtctx->jobs[wJobID].flush_cond = &mtctx->flushCond;
}
DEBUGLOG(6, "waiting to flush something (%zu left)", mtctx->jobs[wJobID].src.size - mtctx->jobs[wJobID].consumed);
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
ZSTD_PTHREAD_COND_WAIT(&mtctx->flushCond, &mtctx->flushMutex); /* block waiting for something to flush */
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);
DEBUGLOG(6, "condition triggered: let's flush something (%zu bytes)", mtctx->jobs[wJobID].cSize - mtctx->jobs[wJobID].dstFlushed);
} }
/* try to flush something */
@@ -1565,6 +1616,8 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
DEBUGLOG(5, "dstBuffer released");
mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */
mtctx->jobs[wJobID].flush_mutex = NULL;
mtctx->jobs[wJobID].flush_cond = NULL;
mtctx->consumed += srcSize;
mtctx->produced += cSize;
mtctx->doneJobID++;
@@ -1595,7 +1648,7 @@ static Range ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx)
/* no need to check during first round */
size_t roundBuffCapacity = mtctx->roundBuff.capacity;
size_t nbJobs1stRoundMin = roundBuffCapacity / mtctx->targetSectionSize;
size_t nbJobs1stRoundMin = roundBuffCapacity / mtctx->targetJobSize;
if (lastJobID < nbJobs1stRoundMin) return kNullRange;
for (jobID = firstJobID; jobID < lastJobID; ++jobID) {
@@ -1676,7 +1729,7 @@ static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, Buffer buffer)
ZSTD_PTHREAD_MUTEX_LOCK(mutex);
while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) {
DEBUGLOG(5, "Waiting for LDM to finish...");
ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex);
ZSTD_PTHREAD_COND_WAIT(&mtctx->serial.ldmWindowCond, mutex);
}
DEBUGLOG(6, "Done waiting for LDM to finish");
ZSTD_pthread_mutex_unlock(mutex);
@@ -1692,7 +1745,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
{
Range const inUse = ZSTDMT_getInputDataInUse(mtctx);
size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos;
size_t const spaceNeeded = mtctx->targetSectionSize;
size_t const spaceNeeded = mtctx->targetJobSize;
Buffer buffer;
DEBUGLOG(5, "ZSTDMT_tryGetInputRange");
@@ -1765,7 +1818,7 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
BYTE const* prev;
size_t pos;
syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetSectionSize - mtctx->inBuff.filled);
syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetJobSize - mtctx->inBuff.filled);
syncPoint.flush = 0;
if (!mtctx->params.rsyncable)
/* Rsync is disabled. */
@@ -1781,7 +1834,7 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
* window. However, since it depends only in the internal buffers, if the
* state is already synchronized, we will remain synchronized.
* Additionally, the probability that we miss a synchronization point is
* low: RSYNC_LENGTH / targetSectionSize.
* low: RSYNC_LENGTH / targetJobSize.
*/
return syncPoint;
/* Initialize the loop variables. */
@@ -1825,7 +1878,7 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
* through the input. If we hit a synchronization point, then cut the
* job off, and tell the compressor to flush the job. Otherwise, load
* all the bytes and continue as normal.
* If we go too long without a synchronization point (targetSectionSize)
* If we go too long without a synchronization point (targetJobSize)
* then a block will be emitted anyways, but this is okay, since if we
* are already synchronized we will remain synchronized.
*/
@@ -1852,8 +1905,8 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx)
{
size_t hintInSize = mtctx->targetSectionSize - mtctx->inBuff.filled;
if (hintInSize==0) hintInSize = mtctx->targetSectionSize;
size_t hintInSize = mtctx->targetJobSize - mtctx->inBuff.filled;
if (hintInSize==0) hintInSize = mtctx->targetJobSize;
return hintInSize;
}
@@ -1866,7 +1919,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
ZSTD_inBuffer* input,
ZSTD_EndDirective endOp)
{
unsigned forwardInputProgress = 0;
unsigned forwardProgress = 0;
DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)",
(U32)endOp, (U32)(input->size - input->pos));
assert(output->pos <= output->size);
@@ -1896,13 +1949,13 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
if (syncPoint.flush && endOp == ZSTD_e_continue) {
endOp = ZSTD_e_flush;
}
assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize);
assert(mtctx->inBuff.buffer.capacity >= mtctx->targetJobSize);
DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
(U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize);
(U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetJobSize);
ZSTD_memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad);
input->pos += syncPoint.toLoad;
mtctx->inBuff.filled += syncPoint.toLoad;
forwardInputProgress = syncPoint.toLoad>0;
forwardProgress = syncPoint.toLoad>0;
}
}
if ((input->pos < input->size) && (endOp == ZSTD_e_end)) {
@@ -1912,21 +1965,23 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
* - We filled the input buffer: flush this job but don't end the frame.
* - We hit a synchronization point: flush this job but don't end the frame.
*/
assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetSectionSize || mtctx->params.rsyncable);
assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetJobSize || mtctx->params.rsyncable);
endOp = ZSTD_e_flush;
}
if ( (mtctx->jobReady)
|| (mtctx->inBuff.filled >= mtctx->targetSectionSize) /* filled enough : let's compress */
|| (mtctx->inBuff.filled >= mtctx->targetJobSize) /* filled enough : let's compress */
|| ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */
|| ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */
size_t const jobSize = mtctx->inBuff.filled;
assert(mtctx->inBuff.filled <= mtctx->targetSectionSize);
FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) , "");
size_t const jobPosted = ZSTDMT_createCompressionJob(mtctx, jobSize, endOp);
assert(mtctx->inBuff.filled <= mtctx->targetJobSize);
FORWARD_IF_ERROR(jobPosted , "");
if (jobPosted) forwardProgress = 1;
}
/* check for potential compressed data ready to be flushed */
{ size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
{ size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardProgress, endOp); /* block if there was no forward input progress */
if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */
DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush);
return remainingToFlush;

View File

@@ -8,7 +8,7 @@ zstd -f file --adapt -c | zstd -t
datagen -g100M > file100M
# Pick parameters to force fast adaptation, even on slow systems
zstd --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"
zstd --adapt -vvvv -19 --zstd=wlog=18 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"
# Adaption still happens with --no-progress
zstd --no-progress --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"
zstd --no-progress --adapt -vvvv -19 --zstd=wlog=18 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"