From 2dd76037be46a8b7589ed95cece4cff5df62330c Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 9 Aug 2018 15:51:30 -0700 Subject: [PATCH] zstd cli can increase level when input is too slow --- lib/compress/zstd_compress.c | 1 + lib/compress/zstdmt_compress.c | 1 + lib/zstd.h | 1 + programs/fileio.c | 29 ++++++++++++++++++++++++----- 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index ed3aab871..2121fe749 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -900,6 +900,7 @@ ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx) fp.ingested = cctx->consumedSrcSize + buffered; fp.consumed = cctx->consumedSrcSize; fp.produced = cctx->producedCSize; + fp.currentJobID = 0; return fp; } } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 74f9dc29c..d2f06e4ee 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1083,6 +1083,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) fps.ingested = mtctx->consumed + mtctx->inBuff.filled; fps.consumed = mtctx->consumed; fps.produced = mtctx->produced; + fps.currentJobID = mtctx->nextJobID; { unsigned jobNb; unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", diff --git a/lib/zstd.h b/lib/zstd.h index 0c20bb768..edd0079c9 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -735,6 +735,7 @@ typedef struct { unsigned long long ingested; unsigned long long consumed; unsigned long long produced; + unsigned currentJobID; } ZSTD_frameProgression; /* ZSTD_getFrameProgression(): diff --git a/programs/fileio.c b/programs/fileio.c index c1587f8f8..b62e25703 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -737,6 +737,9 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, FILE* const dstFile = ress.dstFile; U64 compressedfilesize = 0; ZSTD_EndDirective directive = ZSTD_e_continue; + unsigned inputBlocked = 0; + unsigned lastJobID = 0; + DISPLAYLEVEL(6, "compression using zstd format \n"); /* init */ @@ -747,7 +750,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, /* Main compression loop */ do { - size_t result; + size_t stillToFlush; /* Fill input Buffer */ size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile); ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; @@ -757,14 +760,18 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, if ((inSize == 0) || (*readsize == fileSize)) directive = ZSTD_e_end; - result = 1; + stillToFlush = 1; while ((inBuff.pos != inBuff.size) /* input buffer must be entirely ingested */ - || (directive == ZSTD_e_end && result != 0) ) { + || (directive == ZSTD_e_end && stillToFlush != 0) ) { + size_t const oldIPos = inBuff.pos; ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; - CHECK_V(result, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive)); + CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive)); + + /* count stats */ + if (oldIPos == inBuff.pos) inputBlocked++; /* Write compressed stream */ - DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => intput pos(%u)<=(%u)size ; output generated %u bytes \n", + DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n", (U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos); if (outBuff.pos) { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); @@ -775,6 +782,18 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, if (READY_FOR_UPDATE()) { ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100; + + /* check input speed */ + if (zfp.currentJobID >= lastJobID+2) { + if (inputBlocked<=1) { /* small tolerance */ + DISPLAYLEVEL(6, "input is never blocked => input is too slow \n"); + compressionLevel++; + ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel); + } + lastJobID = zfp.currentJobID; + inputBlocked = 0; + } + if (g_displayLevel >= 3) { DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%%", compressionLevel,