diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 244690cdc..39255fdcf 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1530,7 +1530,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->jobs[jobID].jobID = mtctx->nextJobID; mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0); mtctx->jobs[jobID].lastJob = endFrame; - mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag; + mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0); mtctx->jobs[jobID].dstFlushed = 0; /* Update the round buffer pos and clear the input buffer to be reset */ diff --git a/programs/fileio.c b/programs/fileio.c index 701e30e8f..00f0bc263 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -807,6 +807,8 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, ZSTD_EndDirective directive = ZSTD_e_continue; /* stats */ + ZSTD_frameProgression previous_zfp_update = { 0, 0, 0, 0, 0, 0 }; + ZSTD_frameProgression previous_zfp_correction = { 0, 0, 0, 0, 0, 0 }; typedef enum { noChange, slower, faster } speedChange_e; speedChange_e speedChange = noChange; unsigned flushWaiting = 0; @@ -820,7 +822,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, if (fileSize != UTIL_FILESIZE_UNKNOWN) { CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize)); } - (void)compressionLevel; (void)srcFileName; + (void)srcFileName; /* Main compression loop */ do { @@ -863,69 +865,85 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100; - /* check output speed */ - if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */ - static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 }; /* note : requires fileio to run main thread */ - - unsigned long long newlyProduced = zfp.produced - cpszfp.produced; - unsigned long long newlyFlushed = zfp.flushed - cpszfp.flushed; - assert(zfp.produced >= cpszfp.produced); - assert(g_nbWorkers >= 1); - - if ( (zfp.ingested == cpszfp.ingested) /* no data read : input buffer full */ - && (zfp.consumed == cpszfp.consumed) /* no data compressed : no more buffer to compress OR compression is really slow */ - && (zfp.nbActiveWorkers == 0) /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */ - ) { - DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n") - speedChange = slower; - } - - cpszfp = zfp; - - if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */ - && (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */ - ) { - DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed); - speedChange = slower; - } - flushWaiting = 0; + /* display progress notifications */ + if (g_displayLevel >= 3) { + DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ", + compressionLevel, + (U32)((zfp.ingested - zfp.consumed) >> 20), + (U32)(zfp.consumed >> 20), + (U32)(zfp.produced >> 20), + cShare ); + } else { /* summarized notifications if == 2; */ + DISPLAYLEVEL(2, "\rRead : %u ", (U32)(zfp.consumed >> 20)); + if (fileSize != UTIL_FILESIZE_UNKNOWN) + DISPLAYLEVEL(2, "/ %u ", (U32)(fileSize >> 20)); + DISPLAYLEVEL(2, "MB ==> %2.f%% ", cShare); + DELAY_NEXT_UPDATE(); } - /* course correct only if there is at least one new job completed */ - if (zfp.currentJobID > lastJobID) { - DISPLAYLEVEL(6, "compression level adaptation check \n") + /* adaptive mode : statistics measurement and speed correction */ + if (g_adaptiveMode) { - /* check input speed */ - if (zfp.currentJobID > g_nbWorkers+1) { /* warm up period, to fill all workers */ - if (inputBlocked <= 0) { - DISPLAYLEVEL(6, "input is never blocked => input is too slow \n"); + /* check output speed */ + if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */ + + unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced; + unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed; + assert(zfp.produced >= previous_zfp_update.produced); + assert(g_nbWorkers >= 1); + + if ( (zfp.ingested == previous_zfp_update.ingested) /* no data read : input buffer full */ + && (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no more buffer to compress OR compression is really slow */ + && (zfp.nbActiveWorkers == 0) /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */ + ) { + DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n") speedChange = slower; - } else if (speedChange == noChange) { - static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0, 0, 0 }; - unsigned long long newlyIngested = zfp.ingested - csuzfp.ingested; - unsigned long long newlyConsumed = zfp.consumed - csuzfp.consumed; - unsigned long long newlyProduced = zfp.produced - csuzfp.produced; - unsigned long long newlyFlushed = zfp.flushed - csuzfp.flushed; - csuzfp = zfp; - assert(inputPresented > 0); - DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n", - inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100, - (U32)newlyIngested, (U32)newlyConsumed, - (U32)newlyFlushed, (U32)newlyProduced); - if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */ - && (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */ - && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */ - ) { - DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n", - newlyIngested, newlyConsumed, newlyProduced, newlyFlushed); - speedChange = faster; - } } - inputBlocked = 0; - inputPresented = 0; + + previous_zfp_update = zfp; + + if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */ + && (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */ + ) { + DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed); + speedChange = slower; + } + flushWaiting = 0; } - if (g_adaptiveMode) { + /* course correct only if there is at least one new job completed */ + if (zfp.currentJobID > lastJobID) { + DISPLAYLEVEL(6, "compression level adaptation check \n") + + /* check input speed */ + if (zfp.currentJobID > g_nbWorkers+1) { /* warm up period, to fill all workers */ + if (inputBlocked <= 0) { + DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n"); + speedChange = slower; + } else if (speedChange == noChange) { + unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested; + unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed; + unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced; + unsigned long long newlyFlushed = zfp.flushed - previous_zfp_correction.flushed; + previous_zfp_correction = zfp; + assert(inputPresented > 0); + DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n", + inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100, + (U32)newlyIngested, (U32)newlyConsumed, + (U32)newlyFlushed, (U32)newlyProduced); + if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */ + && (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */ + && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */ + ) { + DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n", + newlyIngested, newlyConsumed, newlyProduced, newlyFlushed); + speedChange = faster; + } + } + inputBlocked = 0; + inputPresented = 0; + } + if (speedChange == slower) { DISPLAYLEVEL(6, "slower speed , higher compression \n") compressionLevel ++; @@ -940,27 +958,12 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel); } speedChange = noChange; - } - lastJobID = zfp.currentJobID; - } /* if (zfp.currentJobID > lastJobID) */ - if (g_displayLevel >= 3) { - DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ", - compressionLevel, - (U32)((zfp.ingested - zfp.consumed) >> 20), - (U32)(zfp.consumed >> 20), - (U32)(zfp.produced >> 20), - cShare ); - } else { - /* g_displayLevel <= 2; only display notifications if == 2; */ - DISPLAYLEVEL(2, "\rRead : %u ", (U32)(zfp.consumed >> 20)); - if (fileSize != UTIL_FILESIZE_UNKNOWN) - DISPLAYLEVEL(2, "/ %u ", (U32)(fileSize >> 20)); - DISPLAYLEVEL(2, "MB ==> %2.f%% ", cShare); - DELAY_NEXT_UPDATE(); - } - } - } + lastJobID = zfp.currentJobID; + } /* if (zfp.currentJobID > lastJobID) */ + } /* if (g_adaptiveMode) */ + } /* if (READY_FOR_UPDATE()) */ + } /* while ((inBuff.pos != inBuff.size) */ } while (directive != ZSTD_e_end); if (ferror(srcFile)) {