diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 2121fe749..1412c1d6a 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -3704,6 +3704,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ ZSTD_CCtx_reset(cctx); } + DEBUGLOG(5, "completed ZSTD_compress_generic delegating to ZSTDMT_compressStream_generic"); return flushMin; } } #endif diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index d2f06e4ee..49502bd0d 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -249,8 +249,8 @@ static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer) /* store buffer for later re-use, up to pool capacity */ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) { - if (buf.start == NULL) return; /* compatible with release on NULL */ DEBUGLOG(5, "ZSTDMT_releaseBuffer"); + if (buf.start == NULL) return; /* compatible with release on NULL */ ZSTD_pthread_mutex_lock(&bufPool->poolMutex); if (bufPool->nbBuffers < bufPool->totalBuffers) { bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */ @@ -541,6 +541,7 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, /* Wait for our turn */ ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); while (serialState->nextJobID < jobID) { + DEBUGLOG(5, "wait for serialState->cond"); ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex); } /* A future job may error and skip our job */ @@ -932,7 +933,7 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx) unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask; ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) { - DEBUGLOG(5, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ + DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); } ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); @@ -1079,7 +1080,7 @@ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_p ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) { ZSTD_frameProgression fps; - DEBUGLOG(6, "ZSTDMT_getFrameProgression"); + DEBUGLOG(5, "ZSTDMT_getFrameProgression"); fps.ingested = mtctx->consumed + mtctx->inBuff.filled; fps.consumed = mtctx->consumed; fps.produced = mtctx->produced; @@ -1100,6 +1101,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); } } + DEBUGLOG(5, "ZSTDMT_getFrameProgression : completed"); return fps; } @@ -1576,7 +1578,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u /* try to flush something */ { size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */ size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */ - size_t const srcSize = mtctx->jobs[wJobID].src.size; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ + size_t const srcSize = mtctx->jobs[wJobID].src.size; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); if (ZSTD_isError(cSize)) { DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", @@ -1615,6 +1617,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); + DEBUGLOG(5, "dstBuffer released") mtctx->jobs[wJobID].dstBuff = g_nullBuffer; mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ mtctx->consumed += srcSize; @@ -1691,6 +1694,7 @@ static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window) range_t extDict; range_t prefix; + DEBUGLOG(5, "ZSTDMT_doesOverlapWindow"); extDict.start = window.dictBase + window.lowLimit; extDict.size = window.dictLimit - window.lowLimit; @@ -1711,12 +1715,13 @@ static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer) { if (mtctx->params.ldmParams.enableLdm) { ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex; + DEBUGLOG(5, "ZSTDMT_waitForLdmComplete"); DEBUGLOG(5, "source [0x%zx, 0x%zx)", (size_t)buffer.start, (size_t)buffer.start + buffer.capacity); ZSTD_PTHREAD_MUTEX_LOCK(mutex); while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) { - DEBUGLOG(6, "Waiting for LDM to finish..."); + DEBUGLOG(5, "Waiting for LDM to finish..."); ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex); } DEBUGLOG(6, "Done waiting for LDM to finish"); @@ -1736,6 +1741,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) size_t const target = mtctx->targetSectionSize; buffer_t buffer; + DEBUGLOG(5, "ZSTDMT_tryGetInputRange"); assert(mtctx->inBuff.buffer.start == NULL); assert(mtctx->roundBuff.capacity >= target); @@ -1749,7 +1755,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) buffer.start = start; buffer.capacity = prefixSize; if (ZSTDMT_isOverlapped(buffer, inUse)) { - DEBUGLOG(6, "Waiting for buffer..."); + DEBUGLOG(5, "Waiting for buffer..."); return 0; } ZSTDMT_waitForLdmComplete(mtctx, buffer); @@ -1761,7 +1767,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) buffer.capacity = target; if (ZSTDMT_isOverlapped(buffer, inUse)) { - DEBUGLOG(6, "Waiting for buffer..."); + DEBUGLOG(5, "Waiting for buffer..."); return 0; } assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix)); @@ -1834,8 +1840,10 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, /* It is only possible for this operation to fail if there are * still compression jobs ongoing. */ + DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed") assert(mtctx->doneJobID != mtctx->nextJobID); - } + } else + DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start); } if (mtctx->inBuff.buffer.start != NULL) { size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled); @@ -1863,6 +1871,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, /* check for potential compressed data ready to be flushed */ { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */ if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */ + DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush); return remainingToFlush; } } diff --git a/programs/fileio.c b/programs/fileio.c index c12126168..89ee524b3 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -226,6 +226,8 @@ void FIO_setOverlapLog(unsigned overlapLog){ DISPLAYLEVEL(2, "Setting overlapLog is useless in single-thread mode \n"); g_overlapLog = overlapLog; } +static U32 g_adaptiveMode = 0; +void FIO_setAdaptiveMode(unsigned adapt) { g_adaptiveMode = adapt; } static U32 g_ldmFlag = 0; void FIO_setLdmFlag(unsigned ldmFlag) { g_ldmFlag = (ldmFlag>0); @@ -738,12 +740,12 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, U64 compressedfilesize = 0; ZSTD_EndDirective directive = ZSTD_e_continue; + /* stats */ typedef enum { noChange, slower, faster } speedChange_e; speedChange_e speedChange = noChange; + unsigned inputPresented = 0; unsigned inputBlocked = 0; unsigned lastJobID = 0; - unsigned long long lastProduced = 0; - unsigned long long lastFlushedSize = 0; DISPLAYLEVEL(6, "compression using zstd format \n"); @@ -774,6 +776,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive)); /* count stats */ + inputPresented++; if (oldIPos == inBuff.pos) inputBlocked++; /* Write compressed stream */ @@ -792,41 +795,74 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100; /* check output speed */ - if (zfp.currentJobID > 0) { - unsigned long long newlyProduced = zfp.produced - lastProduced; + if (zfp.currentJobID > 1) { + static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0 }; + static unsigned long long lastFlushedSize = 0; + + unsigned long long newlyProduced = zfp.produced - cpszfp.produced; unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize; - assert(zfp.produced >= lastProduced); - if (newlyProduced == 0) { - DISPLAYLEVEL(6, "no more data compression generation => buffers are full, compression waiting => output (or input) too slow \n") + assert(zfp.produced >= cpszfp.produced); + + if ( (zfp.ingested == cpszfp.ingested) + && (zfp.consumed == cpszfp.consumed) ) { + DISPLAYLEVEL(6, "no data read nor consumed : buffers are full (?) or compression is slow + input has reached its limit. If buffers full : output is too slow => slow down \n") speedChange = slower; } if ( (newlyProduced > (newlyFlushed * 9 / 8)) && (stillToFlush > ZSTD_BLOCKSIZE_MAX) ) { - DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) \n", newlyProduced, newlyFlushed); + DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) but there is still %u bytes to flush => slow down \n", newlyProduced, newlyFlushed, (U32)stillToFlush); speedChange = slower; } - lastProduced = zfp.produced; + cpszfp = zfp; lastFlushedSize = compressedfilesize; } - /* course correct only if there is at least one job completed */ + /* course correct only if there is at least one new job completed */ if (zfp.currentJobID > lastJobID) { DISPLAYLEVEL(6, "compression level adaptation check \n") /* check input speed */ if (zfp.currentJobID > g_nbWorkers+1) { /* warm up period, to fill all workers */ - if (inputBlocked <= 1) { /* small tolerance */ + if (inputBlocked <= 0) { DISPLAYLEVEL(6, "input is never blocked => input is too slow \n"); speedChange = slower; + } else if (speedChange == noChange) { + static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0 }; + static unsigned long long lastFlushedSize = 0; + unsigned long long newlyIngested = zfp.ingested - csuzfp.ingested; + unsigned long long newlyConsumed = zfp.consumed - csuzfp.consumed; + unsigned long long newlyProduced = zfp.produced - csuzfp.produced; + unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize; + csuzfp = zfp; + lastFlushedSize = compressedfilesize; + assert(inputPresented > 0); + if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */ + && (newlyFlushed * 17 / 16 > newlyProduced) /* flush everything that is produced */ + && (newlyIngested * 17 / 16 > newlyConsumed) /* can't keep up with input speed */ + ) { + DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n", + newlyIngested, newlyConsumed, newlyProduced, newlyFlushed); + speedChange = faster; + } } inputBlocked = 0; + inputPresented = 0; } - if (speedChange == slower) { - DISPLAYLEVEL(6, "slower speed , higher compression \n") - compressionLevel ++; - ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel); + if (g_adaptiveMode) { + if (speedChange == slower) { + DISPLAYLEVEL(6, "slower speed , higher compression \n") + compressionLevel ++; + compressionLevel += (compressionLevel == 0); /* skip 0 */ + ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel); + } + if (speedChange == faster) { + DISPLAYLEVEL(6, "slower speed , higher compression \n") + compressionLevel --; + compressionLevel -= (compressionLevel == 0); /* skip 0 */ + ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel); + } speedChange = noChange; } lastJobID = zfp.currentJobID; diff --git a/programs/fileio.h b/programs/fileio.h index 69c83f71d..f4946c78a 100644 --- a/programs/fileio.h +++ b/programs/fileio.h @@ -57,6 +57,7 @@ void FIO_setMemLimit(unsigned memLimit); void FIO_setNbWorkers(unsigned nbWorkers); void FIO_setBlockSize(unsigned blockSize); void FIO_setOverlapLog(unsigned overlapLog); +void FIO_setAdaptiveMode(unsigned adapt); void FIO_setLdmFlag(unsigned ldmFlag); void FIO_setLdmHashLog(unsigned ldmHashLog); void FIO_setLdmMinMatch(unsigned ldmMinMatch); diff --git a/programs/zstd.1.md b/programs/zstd.1.md index 055c5c244..b71d5d5bf 100644 --- a/programs/zstd.1.md +++ b/programs/zstd.1.md @@ -102,6 +102,13 @@ the last one takes effect. * `-#`: `#` compression level \[1-19] (default: 3) +* `--fast[=#]`: + switch to ultra-fast compression levels. + If `=#` is not present, it defaults to `1`. + The higher the value, the faster the compression speed, + at the cost of some compression ratio. + This setting overwrites compression level if one was set previously. + Similarly, if a compression level is set after `--fast`, it overrides it. * `--ultra`: unlocks high compression levels 20+ (maximum 22), using a lot more memory. Note that decompression will also require more memory when using these levels. @@ -115,25 +122,23 @@ the last one takes effect. Note: If `windowLog` is set to larger than 27, `--long=windowLog` or `--memory=windowSize` needs to be passed to the decompressor. -* `--fast[=#]`: - switch to ultra-fast compression levels. - If `=#` is not present, it defaults to `1`. - The higher the value, the faster the compression speed, - at the cost of some compression ratio. - This setting overwrites compression level if one was set previously. - Similarly, if a compression level is set after `--fast`, it overrides it. - * `-T#`, `--threads=#`: Compress using `#` working threads (default: 1). If `#` is 0, attempt to detect and use the number of physical CPU cores. In all cases, the nb of threads is capped to ZSTDMT_NBTHREADS_MAX==200. This modifier does nothing if `zstd` is compiled without multithread support. * `--single-thread`: - Does not spawn a thread for compression, use caller thread instead. - This is the only available mode when multithread support is disabled. - In this mode, compression is serialized with I/O. + Does not spawn a thread for compression, use a single thread for both I/O and compression. + In this mode, compression is serialized with I/O, which is slightly slower. (This is different from `-T1`, which spawns 1 compression thread in parallel of I/O). - Single-thread mode also features lower memory usage. + This mode is the only one available when multithread support is disabled. + Single-thread mode features lower memory usage. + Final compressed result is slightly different from `-T1`. +* `--adapt` : + `zstd` will dynamically adapt compression level to perceived I/O conditions. + The current compression level can be observed live by using command `-v`. + Works with multi-threading and `--long` mode. + Does not work with `--single-thread`. * `-D file`: use `file` as Dictionary to compress or decompress FILE(s) * `--no-dictID`: diff --git a/programs/zstdcli.c b/programs/zstdcli.c index d5a2216d6..2e54b3b0f 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -135,6 +135,7 @@ static int usage_advanced(const char* programName) #ifndef ZSTD_NOCOMPRESS DISPLAY( "--ultra : enable levels beyond %i, up to %i (requires more memory)\n", ZSTDCLI_CLEVEL_MAX, ZSTD_maxCLevel()); DISPLAY( "--long[=#]: enable long distance matching with given window log (default: %u)\n", g_defaultMaxWindowLog); + DISPLAY( "--adapt : automatically adapt compression level to I/O conditions \n"); DISPLAY( "--fast[=#]: switch to ultra fast compression level (default: %u)\n", 1); #ifdef ZSTD_MULTITHREAD DISPLAY( " -T# : spawns # compression threads (default: 1, 0==# cores) \n"); @@ -395,6 +396,7 @@ int main(int argCount, const char* argv[]) ldmFlag = 0, main_pause = 0, nbWorkers = 0, + adapt = 0, nextArgumentIsOutFileName = 0, nextArgumentIsMaxDict = 0, nextArgumentIsDictID = 0, @@ -511,6 +513,7 @@ int main(int argCount, const char* argv[]) if (!strcmp(argument, "--keep")) { FIO_setRemoveSrcFile(0); continue; } if (!strcmp(argument, "--rm")) { FIO_setRemoveSrcFile(1); continue; } if (!strcmp(argument, "--priority=rt")) { setRealTimePrio = 1; continue; } + if (!strcmp(argument, "--adapt")) { adapt = 1; continue; } if (!strcmp(argument, "--single-thread")) { nbWorkers = 0; singleThread = 1; continue; } if (!strcmp(argument, "--format=zstd")) { suffix = ZSTD_EXTENSION; FIO_setCompressionType(FIO_zstdCompression); continue; } #ifdef ZSTD_GZCOMPRESS @@ -935,17 +938,14 @@ int main(int argCount, const char* argv[]) #ifndef ZSTD_NOCOMPRESS FIO_setNbWorkers(nbWorkers); FIO_setBlockSize((U32)blockSize); + if (g_overlapLog!=OVERLAP_LOG_DEFAULT) FIO_setOverlapLog(g_overlapLog); FIO_setLdmFlag(ldmFlag); FIO_setLdmHashLog(g_ldmHashLog); FIO_setLdmMinMatch(g_ldmMinMatch); - if (g_ldmBucketSizeLog != LDM_PARAM_DEFAULT) { - FIO_setLdmBucketSizeLog(g_ldmBucketSizeLog); - } - if (g_ldmHashEveryLog != LDM_PARAM_DEFAULT) { - FIO_setLdmHashEveryLog(g_ldmHashEveryLog); - } + if (g_ldmBucketSizeLog != LDM_PARAM_DEFAULT) FIO_setLdmBucketSizeLog(g_ldmBucketSizeLog); + if (g_ldmHashEveryLog != LDM_PARAM_DEFAULT) FIO_setLdmHashEveryLog(g_ldmHashEveryLog); + FIO_setAdaptiveMode(adapt); - if (g_overlapLog!=OVERLAP_LOG_DEFAULT) FIO_setOverlapLog(g_overlapLog); if ((filenameIdx==1) && outFileName) operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams); else