diff --git a/programs/fileio.c b/programs/fileio.c index fe3c13527..c12126168 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -737,8 +737,13 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, FILE* const dstFile = ress.dstFile; U64 compressedfilesize = 0; ZSTD_EndDirective directive = ZSTD_e_continue; + + typedef enum { noChange, slower, faster } speedChange_e; + speedChange_e speedChange = noChange; unsigned inputBlocked = 0; unsigned lastJobID = 0; + unsigned long long lastProduced = 0; + unsigned long long lastFlushedSize = 0; DISPLAYLEVEL(6, "compression using zstd format \n"); @@ -763,6 +768,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, stillToFlush = 1; while ((inBuff.pos != inBuff.size) /* input buffer must be entirely ingested */ || (directive == ZSTD_e_end && stillToFlush != 0) ) { + size_t const oldIPos = inBuff.pos; ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive)); @@ -779,23 +785,55 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, EXM_THROW(25, "Write error : cannot write compressed block"); compressedfilesize += outBuff.pos; } + + /* display notification; and adapt compression level */ if (READY_FOR_UPDATE()) { ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100; - /* check input speed */ - if (zfp.currentJobID >= lastJobID+2) { - if (inputBlocked<=1) { /* small tolerance */ - DISPLAYLEVEL(6, "input is never blocked => input is too slow \n"); - compressionLevel++; - ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel); + /* check output speed */ + if (zfp.currentJobID > 0) { + unsigned long long newlyProduced = zfp.produced - lastProduced; + unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize; + assert(zfp.produced >= lastProduced); + if (newlyProduced == 0) { + DISPLAYLEVEL(6, "no more data compression generation => buffers are full, compression waiting => output (or input) too slow \n") + speedChange = slower; } - lastJobID = zfp.currentJobID; - inputBlocked = 0; + + if ( (newlyProduced > (newlyFlushed * 9 / 8)) + && (stillToFlush > ZSTD_BLOCKSIZE_MAX) ) { + DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) \n", newlyProduced, newlyFlushed); + speedChange = slower; + } + lastProduced = zfp.produced; + lastFlushedSize = compressedfilesize; } + /* course correct only if there is at least one job completed */ + if (zfp.currentJobID > lastJobID) { + DISPLAYLEVEL(6, "compression level adaptation check \n") + + /* check input speed */ + if (zfp.currentJobID > g_nbWorkers+1) { /* warm up period, to fill all workers */ + if (inputBlocked <= 1) { /* small tolerance */ + DISPLAYLEVEL(6, "input is never blocked => input is too slow \n"); + speedChange = slower; + } + inputBlocked = 0; + } + + if (speedChange == slower) { + DISPLAYLEVEL(6, "slower speed , higher compression \n") + compressionLevel ++; + ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel); + speedChange = noChange; + } + lastJobID = zfp.currentJobID; + } /* if (zfp.currentJobID > lastJobID) */ + if (g_displayLevel >= 3) { - DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%%", + DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ", compressionLevel, (U32)((zfp.ingested - zfp.consumed) >> 20), (U32)(zfp.consumed >> 20),