From 5684bae4f666f2730f2121048b0aa8472ac30457 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 31 Dec 2021 16:25:12 -0800 Subject: [PATCH 01/13] minor refactoring on streaming compression implementation. --- lib/compress/zstd_compress.c | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 1cb229f7a..74f0cbfb5 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -5325,19 +5325,18 @@ static size_t ZSTD_nextInputSizeHint(const ZSTD_CCtx* cctx) /** ZSTD_compressStream_generic(): * internal function for all *compressStream*() variants - * non-static, because can be called from zstdmt_compress.c - * @return : hint size for next input */ + * @return : hint size for next input to complete ongoing block */ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective const flushMode) { const char* const istart = (const char*)input->src; - const char* const iend = input->size != 0 ? istart + input->size : istart; - const char* ip = input->pos != 0 ? istart + input->pos : istart; + const char* const iend = (istart != NULL) ? istart + input->size : istart; + const char* ip = (istart != NULL) ? istart + input->pos : istart; char* const ostart = (char*)output->dst; - char* const oend = output->size != 0 ? ostart + output->size : ostart; - char* op = output->pos != 0 ? ostart + output->pos : ostart; + char* const oend = (ostart != NULL) ? ostart + output->size : ostart; + char* op = (ostart != NULL) ? ostart + output->pos : ostart; U32 someMoreWork = 1; /* check expectations */ @@ -5515,7 +5514,8 @@ size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuf /* After a compression call set the expected input/output buffer. * This is validated at the start of the next compression call. */ -static void ZSTD_setBufferExpectations(ZSTD_CCtx* cctx, ZSTD_outBuffer const* output, ZSTD_inBuffer const* input) +static void +ZSTD_setBufferExpectations(ZSTD_CCtx* cctx, const ZSTD_outBuffer* output, const ZSTD_inBuffer* input) { if (cctx->appliedParams.inBufferMode == ZSTD_bm_stable) { cctx->expectedInBuffer = *input; @@ -5550,7 +5550,8 @@ static size_t ZSTD_checkBufferStability(ZSTD_CCtx const* cctx, static size_t ZSTD_CCtx_init_compressStream2(ZSTD_CCtx* cctx, ZSTD_EndDirective endOp, - size_t inSize) { + size_t inSize) +{ ZSTD_CCtx_params params = cctx->requestedParams; ZSTD_prefixDict const prefixDict = cctx->prefixDict; FORWARD_IF_ERROR( ZSTD_initLocalDict(cctx) , ""); /* Init the local dict if present. */ @@ -5564,9 +5565,9 @@ static size_t ZSTD_CCtx_init_compressStream2(ZSTD_CCtx* cctx, params.compressionLevel = cctx->cdict->compressionLevel; } DEBUGLOG(4, "ZSTD_compressStream2 : transparent init stage"); - if (endOp == ZSTD_e_end) cctx->pledgedSrcSizePlusOne = inSize + 1; /* auto-fix pledgedSrcSize */ - { - size_t const dictSize = prefixDict.dict + if (endOp == ZSTD_e_end) cctx->pledgedSrcSizePlusOne = inSize + 1; /* auto-determine pledgedSrcSize */ + + { size_t const dictSize = prefixDict.dict ? prefixDict.dictSize : (cctx->cdict ? cctx->cdict->dictContentSize : 0); ZSTD_cParamMode_e const mode = ZSTD_getCParamMode(cctx->cdict, ¶ms, cctx->pledgedSrcSizePlusOne - 1); @@ -5607,7 +5608,7 @@ static size_t ZSTD_CCtx_init_compressStream2(ZSTD_CCtx* cctx, cctx->streamStage = zcss_load; cctx->appliedParams = params; } else -#endif +#endif /* ZSTD_MULTITHREAD */ { U64 const pledgedSrcSize = cctx->pledgedSrcSizePlusOne - 1; assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); FORWARD_IF_ERROR( ZSTD_compressBegin_internal(cctx, @@ -5699,7 +5700,7 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx, ZSTD_setBufferExpectations(cctx, output, input); return flushMin; } -#endif +#endif /* ZSTD_MULTITHREAD */ FORWARD_IF_ERROR( ZSTD_compressStream_generic(cctx, output, input, endOp) , ""); DEBUGLOG(5, "completed ZSTD_compressStream2"); ZSTD_setBufferExpectations(cctx, output, input); From c0c5ffa97385063ce9b24432e7c0c4130ed26e27 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 31 Dec 2021 18:20:36 -0800 Subject: [PATCH 02/13] streaming compression : lazy parameter adaptation with stable input effectively makes ZSTD_c_stableInput compatible ZSTD_compressStream() and zstd_e_continue operation mode. --- lib/compress/zstd_compress.c | 25 ++++++++++++++++--------- lib/zstd.h | 6 +++--- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 74f0cbfb5..724ac3887 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -177,12 +177,9 @@ size_t ZSTD_freeCCtx(ZSTD_CCtx* cctx) if (cctx==NULL) return 0; /* support free on NULL */ RETURN_ERROR_IF(cctx->staticSize, memory_allocation, "not compatible with static CCtx"); - { - int cctxInWorkspace = ZSTD_cwksp_owns_buffer(&cctx->workspace, cctx); + { int cctxInWorkspace = ZSTD_cwksp_owns_buffer(&cctx->workspace, cctx); ZSTD_freeCCtxContent(cctx); - if (!cctxInWorkspace) { - ZSTD_customFree(cctx, cctx->customMem); - } + if (!cctxInWorkspace) ZSTD_customFree(cctx, cctx->customMem); } return 0; } @@ -4487,6 +4484,7 @@ ZSTD_compress_insertDictionary(ZSTD_compressedBlockState_t* bs, #define ZSTD_USE_CDICT_PARAMS_DICTSIZE_MULTIPLIER (6ULL) /*! ZSTD_compressBegin_internal() : + * Assumption : either @dict OR @cdict (or none) is non-NULL, never both * @return : 0, or an error code */ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, @@ -4567,11 +4565,11 @@ size_t ZSTD_compressBegin_advanced(ZSTD_CCtx* cctx, &cctxParams, pledgedSrcSize); } -size_t ZSTD_compressBegin_usingDict(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, int compressionLevel) +size_t +ZSTD_compressBegin_usingDict(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, int compressionLevel) { ZSTD_CCtx_params cctxParams; - { - ZSTD_parameters const params = ZSTD_getParams_internal(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, dictSize, ZSTD_cpm_noAttachDict); + { ZSTD_parameters const params = ZSTD_getParams_internal(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, dictSize, ZSTD_cpm_noAttachDict); ZSTD_CCtxParams_init_internal(&cctxParams, ¶ms, (compressionLevel == 0) ? ZSTD_CLEVEL_DEFAULT : compressionLevel); } DEBUGLOG(4, "ZSTD_compressBegin_usingDict (dictSize=%u)", (unsigned)dictSize); @@ -5648,6 +5646,12 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx, /* transparent initialization stage */ if (cctx->streamStage == zcss_init) { + if ( (cctx->requestedParams.inBufferMode == ZSTD_bm_stable) /* input is presumed stable, across invocations */ + && (endOp == ZSTD_e_continue) /* more to come */ + && (input->pos < ZSTD_BLOCKSIZE_MAX) ) { /* not even reached one block yet */ + cctx->expectedInBuffer = *input; + return (ZSTD_BLOCKSIZE_MAX - input->pos); /* don't do anything : allows lazy compression parameters adaptation */ + } FORWARD_IF_ERROR(ZSTD_CCtx_init_compressStream2(cctx, endOp, input->size), "CompressStream2 initialization failed"); ZSTD_setBufferExpectations(cctx, output, input); /* Set initial buffer expectations now that we've initialized */ } @@ -6159,6 +6163,7 @@ size_t ZSTD_compressSequences(ZSTD_CCtx* const cctx, void* dst, size_t dstCapaci /*====== Finalize ======*/ /*! ZSTD_flushStream() : +* Note : not compatible with ZSTD_c_stableInBuffer * @return : amount of data remaining to flush */ size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output) { @@ -6169,7 +6174,9 @@ size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output) size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output) { - ZSTD_inBuffer input = { NULL, 0, 0 }; + ZSTD_inBuffer const nullInput = { NULL, 0, 0 }; + int const stableInput = (zcs->appliedParams.inBufferMode == ZSTD_bm_stable); + ZSTD_inBuffer input = stableInput ? zcs->expectedInBuffer : nullInput; size_t const remainingToFlush = ZSTD_compressStream2(zcs, output, &input, ZSTD_e_end); FORWARD_IF_ERROR( remainingToFlush , "ZSTD_compressStream2 failed"); if (zcs->appliedParams.nbWorkers > 0) return remainingToFlush; /* minimal estimation */ diff --git a/lib/zstd.h b/lib/zstd.h index 349d8017f..f25e17d4b 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -1843,14 +1843,14 @@ ZSTDLIB_STATIC_API size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const vo * large enough to fit a block (see ZSTD_c_stableOutBuffer). This will also * avoid the memcpy() from the input buffer to the input window buffer. * - * NOTE: ZSTD_compressStream2() will error if ZSTD_e_end is not used. - * That means this flag cannot be used with ZSTD_compressStream(). + * NOTE: ZSTD_compressStream2() will error if ZSTD_e_flush is used. + * That means this flag cannot be used with ZSTD_flushStream(). * * NOTE: So long as the ZSTD_inBuffer always points to valid memory, using * this flag is ALWAYS memory safe, and will never access out-of-bounds * memory. However, compression WILL fail if you violate the preconditions. * - * WARNING: The data in the ZSTD_inBuffer in the range [dst, dst + pos) MUST + * WARNING: The data in the ZSTD_inBuffer in the range [src, src + pos) MUST * not be modified during compression or you will get data corruption. This * is because zstd needs to reference data in the ZSTD_inBuffer to find * matches. Normally zstd maintains its own window buffer for this purpose, From 37b87add7a158deee06a12743aac11c36c352340 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Sat, 1 Jan 2022 23:15:34 -0800 Subject: [PATCH 03/13] make stableSrc compatible with regular streaming API including flushStream(). Now the only condition is for `input.size` to continuously grow. --- lib/compress/zstd_compress.c | 17 +++++++------ lib/zstd.h | 28 ++++++++++----------- tests/zstreamtest.c | 47 +++++++++++++++++++----------------- 3 files changed, 48 insertions(+), 44 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 724ac3887..c1cd53fe7 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -5338,7 +5338,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, U32 someMoreWork = 1; /* check expectations */ - DEBUGLOG(5, "ZSTD_compressStream_generic, flush=%u", (unsigned)flushMode); + DEBUGLOG(5, "ZSTD_compressStream_generic, flush=%i", (int)flushMode); if (zcs->appliedParams.inBufferMode == ZSTD_bm_buffered) { assert(zcs->inBuff != NULL); assert(zcs->inBuffSize > 0); @@ -5424,9 +5424,8 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, if (!lastBlock) assert(zcs->inBuffTarget <= zcs->inBuffSize); zcs->inToCompress = zcs->inBuffPos; - } else { + } else { /* !inputBuffered, hence ZSTD_bm_stable */ unsigned const lastBlock = (ip + iSize == iend); - assert(flushMode == ZSTD_e_end /* Already validated */); cSize = lastBlock ? ZSTD_compressEnd(zcs, cDst, oSize, ip, iSize) : ZSTD_compressContinue(zcs, cDst, oSize, ip, iSize); @@ -5533,11 +5532,10 @@ static size_t ZSTD_checkBufferStability(ZSTD_CCtx const* cctx, { if (cctx->appliedParams.inBufferMode == ZSTD_bm_stable) { ZSTD_inBuffer const expect = cctx->expectedInBuffer; - if (expect.src != input->src || expect.pos != input->pos || expect.size != input->size) + if (expect.src != input->src || expect.pos != input->pos) RETURN_ERROR(srcBuffer_wrong, "ZSTD_c_stableInBuffer enabled but input differs!"); - if (endOp != ZSTD_e_end) - RETURN_ERROR(srcBuffer_wrong, "ZSTD_c_stableInBuffer can only be used with ZSTD_e_end!"); } + (void)endOp; if (cctx->appliedParams.outBufferMode == ZSTD_bm_stable) { size_t const outBufferSize = output->size - output->pos; if (cctx->expectedOutBufferSize != outBufferSize) @@ -5650,7 +5648,7 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx, && (endOp == ZSTD_e_continue) /* more to come */ && (input->pos < ZSTD_BLOCKSIZE_MAX) ) { /* not even reached one block yet */ cctx->expectedInBuffer = *input; - return (ZSTD_BLOCKSIZE_MAX - input->pos); /* don't do anything : allows lazy compression parameters adaptation */ + return (ZSTD_BLOCKSIZE_MAX - input->pos); /* don't do anything : allows lazy adaptation of compression parameters */ } FORWARD_IF_ERROR(ZSTD_CCtx_init_compressStream2(cctx, endOp, input->size), "CompressStream2 initialization failed"); ZSTD_setBufferExpectations(cctx, output, input); /* Set initial buffer expectations now that we've initialized */ @@ -6167,7 +6165,10 @@ size_t ZSTD_compressSequences(ZSTD_CCtx* const cctx, void* dst, size_t dstCapaci * @return : amount of data remaining to flush */ size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output) { - ZSTD_inBuffer input = { NULL, 0, 0 }; + ZSTD_inBuffer const nullInput = { NULL, 0, 0 }; + int const stableInput = (zcs->appliedParams.inBufferMode == ZSTD_bm_stable); + ZSTD_inBuffer input = stableInput ? zcs->expectedInBuffer : nullInput; + input.size = input.pos; /* do not ingest more input during flush */ return ZSTD_compressStream2(zcs, output, &input, ZSTD_e_flush); } diff --git a/lib/zstd.h b/lib/zstd.h index f25e17d4b..f503c1708 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -1829,13 +1829,16 @@ ZSTDLIB_STATIC_API size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const vo * Experimental parameter. * Default is 0 == disabled. Set to 1 to enable. * - * Tells the compressor that the ZSTD_inBuffer will ALWAYS be the same - * between calls, except for the modifications that zstd makes to pos (the - * caller must not modify pos). This is checked by the compressor, and - * compression will fail if it ever changes. This means the only flush - * mode that makes sense is ZSTD_e_end, so zstd will error if ZSTD_e_end - * is not used. The data in the ZSTD_inBuffer in the range [src, src + pos) - * MUST not be modified during compression or you will get data corruption. + * Tells the compressor that input data presented with ZSTD_inBuffer + * will ALWAYS be the same between calls. + * Technically, the @src pointer must never be changed, + * and the @pos field can only be updated by zstd. + * However, it's possible to increase the @size field, + * allowing scenarios where more data can be appended after compressions starts. + * These conditions are checked by the compressor, + * and compression will fail if they are not respected. + * Also, data in the ZSTD_inBuffer within the range [src, src + pos) + * MUST not be modified during compression or it will result in data corruption. * * When this flag is enabled zstd won't allocate an input window buffer, * because the user guarantees it can reference the ZSTD_inBuffer until @@ -1843,18 +1846,15 @@ ZSTDLIB_STATIC_API size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const vo * large enough to fit a block (see ZSTD_c_stableOutBuffer). This will also * avoid the memcpy() from the input buffer to the input window buffer. * - * NOTE: ZSTD_compressStream2() will error if ZSTD_e_flush is used. - * That means this flag cannot be used with ZSTD_flushStream(). - * * NOTE: So long as the ZSTD_inBuffer always points to valid memory, using * this flag is ALWAYS memory safe, and will never access out-of-bounds - * memory. However, compression WILL fail if you violate the preconditions. + * memory. However, compression WILL fail if conditions are not respected. * * WARNING: The data in the ZSTD_inBuffer in the range [src, src + pos) MUST - * not be modified during compression or you will get data corruption. This - * is because zstd needs to reference data in the ZSTD_inBuffer to find + * not be modified during compression or it sill result in data corruption. + * This is because zstd needs to reference data in the ZSTD_inBuffer to find * matches. Normally zstd maintains its own window buffer for this purpose, - * but passing this flag tells zstd to use the user provided buffer. + * but passing this flag tells zstd to rely on user provided buffer instead. */ #define ZSTD_c_stableInBuffer ZSTD_c_experimentalParam9 diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 72fd72ea3..66e017c3d 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -613,7 +613,7 @@ static int basicUnitTests(U32 seed, double compressibility) DISPLAYLEVEL(3, "OK (error detected : %s) \n", ZSTD_getErrorName(r)); } } - /* Complex context re-use scenario */ + /* Compression state re-use scenario */ DISPLAYLEVEL(3, "test%3i : context re-use : ", testNb++); ZSTD_freeCStream(zc); zc = ZSTD_createCStream(); @@ -634,8 +634,7 @@ static int basicUnitTests(U32 seed, double compressibility) CHECK_Z( ZSTD_compressStream(zc, &outBuff, &inBuff) ); if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */ DISPLAYLEVEL(5, "end1 "); - { size_t const r = ZSTD_endStream(zc, &outBuff); - if (r != 0) goto _output_error; } /* error, or some data not flushed */ + if (ZSTD_endStream(zc, &outBuff) != 0) goto _output_error; /* error, or some data not flushed */ } /* use 2 */ { size_t const inSize = 1025; /* will not continue, because tables auto-adjust and are therefore different size */ @@ -653,8 +652,7 @@ static int basicUnitTests(U32 seed, double compressibility) CHECK_Z( ZSTD_compressStream(zc, &outBuff, &inBuff) ); if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */ DISPLAYLEVEL(5, "end2 "); - { size_t const r = ZSTD_endStream(zc, &outBuff); - if (r != 0) goto _output_error; } /* error, or some data not flushed */ + if (ZSTD_endStream(zc, &outBuff) != 0) goto _output_error; /* error, or some data not flushed */ } DISPLAYLEVEL(3, "OK \n"); @@ -786,16 +784,18 @@ static int basicUnitTests(U32 seed, double compressibility) CHECK(!(cSize < ZSTD_compressBound(CNBufferSize)), "cSize too large for test"); CHECK_Z(cSize = ZSTD_compress2(cctx, compressedBuffer, cSize + 4, CNBuffer, CNBufferSize)); CHECK_Z(cctxSize1 = ZSTD_sizeof_CCtx(cctx)); - { ZSTD_CCtx* cctx2 = ZSTD_createCCtx(); + { ZSTD_CCtx* const cctx2 = ZSTD_createCCtx(); + assert(cctx2 != NULL); in.pos = out.pos = 0; CHECK_Z(ZSTD_compressStream2(cctx2, &out, &in, ZSTD_e_continue)); CHECK(!(ZSTD_compressStream2(cctx2, &out, &in, ZSTD_e_end) == 0), "Not finished"); CHECK_Z(cctxSize2 = ZSTD_sizeof_CCtx(cctx2)); ZSTD_freeCCtx(cctx2); } - { ZSTD_CCtx* cctx3 = ZSTD_createCCtx(); + { ZSTD_CCtx* const cctx3 = ZSTD_createCCtx(); ZSTD_parameters params = ZSTD_getParams(0, CNBufferSize, 0); size_t cSize3; + assert(cctx3 != NULL); params.fParams.checksumFlag = 1; cSize3 = ZSTD_compress_advanced(cctx3, compressedBuffer, compressedBufferSize, CNBuffer, CNBufferSize, NULL, 0, params); CHECK_Z(cSize3); @@ -808,8 +808,7 @@ static int basicUnitTests(U32 seed, double compressibility) DISPLAYLEVEL(3, "OK \n"); DISPLAYLEVEL(3, "test%3i : ZSTD_compress2() doesn't modify user parameters : ", testNb++); - { - int stableInBuffer; + { int stableInBuffer; int stableOutBuffer; CHECK_Z(ZSTD_CCtx_getParameter(cctx, ZSTD_c_stableInBuffer, &stableInBuffer)); CHECK_Z(ZSTD_CCtx_getParameter(cctx, ZSTD_c_stableOutBuffer, &stableOutBuffer)); @@ -870,21 +869,24 @@ static int basicUnitTests(U32 seed, double compressibility) } DISPLAYLEVEL(3, "OK \n"); - DISPLAYLEVEL(3, "test%3i : ZSTD_compressStream2() ZSTD_c_stableInBuffer with continue and flush : ", testNb++); + /* stableSrc + streaming */ + DISPLAYLEVEL(3, "test%3i : ZSTD_c_stableInBuffer compatibility with compressStream, flushStream and endStream : ", testNb++); + CHECK_Z( ZSTD_initCStream(cctx, 1) ); + CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_c_stableInBuffer, 1) ); in.src = CNBuffer; - in.size = CNBufferSize; + in.size = 100; in.pos = 0; - out.pos = 0; - out.size = compressedBufferSize; - CHECK_Z(ZSTD_CCtx_reset(cctx, ZSTD_reset_session_only)); - { size_t const ret = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_continue); - CHECK(!ZSTD_isError(ret), "Must error"); - CHECK(!(ZSTD_getErrorCode(ret) == ZSTD_error_srcBuffer_wrong), "Must be this error"); - } - CHECK_Z(ZSTD_CCtx_reset(cctx, ZSTD_reset_session_only)); - { size_t const ret = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_flush); - CHECK(!ZSTD_isError(ret), "Must error"); - CHECK(!(ZSTD_getErrorCode(ret) == ZSTD_error_srcBuffer_wrong), "Must be this error"); + { ZSTD_outBuffer outBuf; + outBuf.dst = (char*)(compressedBuffer)+cSize; + outBuf.size = ZSTD_compressBound(500); + outBuf.pos = 0; + CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &in) ); + in.size = 200; + CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &in) ); + CHECK_Z( ZSTD_flushStream(cctx, &outBuf) ); + in.size = 300; + CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &in) ); + if (ZSTD_endStream(cctx, &outBuf) != 0) goto _output_error; /* error, or some data not flushed */ } DISPLAYLEVEL(3, "OK \n"); @@ -902,6 +904,7 @@ static int basicUnitTests(U32 seed, double compressibility) CHECK_Z(ZSTD_CCtx_setParameter(cctx, ZSTD_c_stableOutBuffer, 1)); in.pos = out.pos = 0; in.size = MIN(CNBufferSize, 10); + out.size = compressedBufferSize; CHECK_Z(ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_flush)); in.pos = 0; in.size = CNBufferSize - in.size; From 27d336b099d3e4b19969a24e7425a14af86d4879 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Sun, 2 Jan 2022 20:06:46 -0800 Subject: [PATCH 04/13] minor behavior refinements specifically, there is no obligation to start streaming compression with pos=0. stableSrc mode is now compatible with this setup. --- lib/compress/zstd_compress.c | 12 ++++++++---- lib/zstd.h | 2 +- tests/zstreamtest.c | 22 ++++++++++++---------- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index c1cd53fe7..015fd23c9 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -5630,6 +5630,8 @@ static size_t ZSTD_CCtx_init_compressStream2(ZSTD_CCtx* cctx, return 0; } +/* @return provides a minimum amount of data remaining to be flushed from internal buffers + */ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, @@ -5644,13 +5646,15 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx, /* transparent initialization stage */ if (cctx->streamStage == zcss_init) { + size_t const inputSize = input->size - input->pos; /* no obligation to start from pos==0 */ if ( (cctx->requestedParams.inBufferMode == ZSTD_bm_stable) /* input is presumed stable, across invocations */ - && (endOp == ZSTD_e_continue) /* more to come */ - && (input->pos < ZSTD_BLOCKSIZE_MAX) ) { /* not even reached one block yet */ + && (endOp == ZSTD_e_continue) /* no flush requested, more input to come */ + && (inputSize < ZSTD_BLOCKSIZE_MAX) ) { /* not even reached one block yet */ + /* just wait, allows lazy adaptation of compression parameters */ cctx->expectedInBuffer = *input; - return (ZSTD_BLOCKSIZE_MAX - input->pos); /* don't do anything : allows lazy adaptation of compression parameters */ + return ZSTD_FRAMEHEADERSIZE_MIN(cctx->requestedParams.format); /* at least some header to produce */ } - FORWARD_IF_ERROR(ZSTD_CCtx_init_compressStream2(cctx, endOp, input->size), "CompressStream2 initialization failed"); + FORWARD_IF_ERROR(ZSTD_CCtx_init_compressStream2(cctx, endOp, inputSize), "CompressStream2 initialization failed"); ZSTD_setBufferExpectations(cctx, output, input); /* Set initial buffer expectations now that we've initialized */ } /* end of transparent initialization stage */ diff --git a/lib/zstd.h b/lib/zstd.h index f503c1708..c3847a864 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -1851,7 +1851,7 @@ ZSTDLIB_STATIC_API size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const vo * memory. However, compression WILL fail if conditions are not respected. * * WARNING: The data in the ZSTD_inBuffer in the range [src, src + pos) MUST - * not be modified during compression or it sill result in data corruption. + * not be modified during compression or it will result in data corruption. * This is because zstd needs to reference data in the ZSTD_inBuffer to find * matches. Normally zstd maintains its own window buffer for this purpose, * but passing this flag tells zstd to rely on user provided buffer instead. diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 66e017c3d..15a5cdb7c 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -873,20 +873,21 @@ static int basicUnitTests(U32 seed, double compressibility) DISPLAYLEVEL(3, "test%3i : ZSTD_c_stableInBuffer compatibility with compressStream, flushStream and endStream : ", testNb++); CHECK_Z( ZSTD_initCStream(cctx, 1) ); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_c_stableInBuffer, 1) ); - in.src = CNBuffer; - in.size = 100; - in.pos = 0; - { ZSTD_outBuffer outBuf; + { ZSTD_inBuffer inBuf; + ZSTD_outBuffer outBuf; + inBuf.src = CNBuffer; + inBuf.size = 100; + inBuf.pos = 0; outBuf.dst = (char*)(compressedBuffer)+cSize; outBuf.size = ZSTD_compressBound(500); outBuf.pos = 0; - CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &in) ); - in.size = 200; - CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &in) ); + CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) ); + inBuf.size = 200; + CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) ); CHECK_Z( ZSTD_flushStream(cctx, &outBuf) ); - in.size = 300; - CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &in) ); - if (ZSTD_endStream(cctx, &outBuf) != 0) goto _output_error; /* error, or some data not flushed */ + inBuf.size = 300; + CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) ); + CHECK(ZSTD_endStream(cctx, &outBuf) != 0, "compression should be successful and fully flushed"); } DISPLAYLEVEL(3, "OK \n"); @@ -902,6 +903,7 @@ static int basicUnitTests(U32 seed, double compressibility) CHECK_Z(ZSTD_CCtx_reset(cctx, ZSTD_reset_session_and_parameters)); CHECK_Z(ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1)); CHECK_Z(ZSTD_CCtx_setParameter(cctx, ZSTD_c_stableOutBuffer, 1)); + in.src = CNBuffer; in.pos = out.pos = 0; in.size = MIN(CNBufferSize, 10); out.size = compressedBufferSize; From 4b9d1dd9ffc882ff9ac57e2363a411591a27b886 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 5 Jan 2022 13:21:43 -0800 Subject: [PATCH 05/13] fixed incorrect comment --- lib/compress/zstd_compress.c | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 015fd23c9..3e20818c6 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -6165,7 +6165,6 @@ size_t ZSTD_compressSequences(ZSTD_CCtx* const cctx, void* dst, size_t dstCapaci /*====== Finalize ======*/ /*! ZSTD_flushStream() : -* Note : not compatible with ZSTD_c_stableInBuffer * @return : amount of data remaining to flush */ size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output) { From 8296be4a0a25b6e2f74f1cdd7bd4ae04981f6a45 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 5 Jan 2022 16:34:10 -0800 Subject: [PATCH 06/13] pretend consuming input to provide a sense of forward progress --- lib/compress/zstd_compress.c | 21 +++++++++++++++++---- lib/compress/zstd_compress_internal.h | 1 + 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 3e20818c6..553a0765d 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -5647,15 +5647,28 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx, /* transparent initialization stage */ if (cctx->streamStage == zcss_init) { size_t const inputSize = input->size - input->pos; /* no obligation to start from pos==0 */ + size_t const totalInputSize = (cctx->savedInPosPlusOne == 0) ? inputSize : input->size - (cctx->savedInPosPlusOne - 1); if ( (cctx->requestedParams.inBufferMode == ZSTD_bm_stable) /* input is presumed stable, across invocations */ && (endOp == ZSTD_e_continue) /* no flush requested, more input to come */ - && (inputSize < ZSTD_BLOCKSIZE_MAX) ) { /* not even reached one block yet */ - /* just wait, allows lazy adaptation of compression parameters */ + && (totalInputSize < ZSTD_BLOCKSIZE_MAX) ) { /* not even reached one block yet */ + if (cctx->savedInPosPlusOne) { /* not the first time */ + /* check stable source guarantees */ + assert(input->src == cctx->expectedInBuffer.src); + assert(input->pos == cctx->expectedInBuffer.size); + } + /* keep track of first position */ + if (cctx->savedInPosPlusOne == 0) cctx->savedInPosPlusOne = input->pos + 1; cctx->expectedInBuffer = *input; + /* pretend input was consumed, to give a sense forward progress */ + input[0].pos = input[0].size; + /* but actually input wasn't consumed, so keep track of position from where compression shall resume */ + cctx->expectedInBuffer.pos = cctx->savedInPosPlusOne - 1; + /* don't initialize yet, wait for the first block of flush() order, for better parameters adaptation */ return ZSTD_FRAMEHEADERSIZE_MIN(cctx->requestedParams.format); /* at least some header to produce */ } - FORWARD_IF_ERROR(ZSTD_CCtx_init_compressStream2(cctx, endOp, inputSize), "CompressStream2 initialization failed"); - ZSTD_setBufferExpectations(cctx, output, input); /* Set initial buffer expectations now that we've initialized */ + FORWARD_IF_ERROR(ZSTD_CCtx_init_compressStream2(cctx, endOp, totalInputSize), "compressStream2 initialization failed"); + cctx->savedInPosPlusOne = 0; + ZSTD_setBufferExpectations(cctx, output, input); /* Set initial buffer expectations now that we've initialized */ } /* end of transparent initialization stage */ diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index 9fe3affab..8cc2f81d6 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -411,6 +411,7 @@ struct ZSTD_CCtx_s { /* Stable in/out buffer verification */ ZSTD_inBuffer expectedInBuffer; size_t expectedOutBufferSize; + size_t savedInPosPlusOne; /* 0 == no savedInPos */ /* Dictionary */ ZSTD_localDict localDict; From 270f9bf00595160933652d92a809a2dab61cfeed Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Mon, 24 Jan 2022 14:45:22 -0800 Subject: [PATCH 07/13] better consistency in accessing @input as suggested by @terrelln. Also : commented zstreamtest more to ensure ZSTD_stableInBuffer is tested/ --- lib/compress/zstd_compress.c | 2 +- tests/zstreamtest.c | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 553a0765d..7be5d87a7 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -5660,7 +5660,7 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx, if (cctx->savedInPosPlusOne == 0) cctx->savedInPosPlusOne = input->pos + 1; cctx->expectedInBuffer = *input; /* pretend input was consumed, to give a sense forward progress */ - input[0].pos = input[0].size; + input->pos = input->size; /* but actually input wasn't consumed, so keep track of position from where compression shall resume */ cctx->expectedInBuffer.pos = cctx->savedInPosPlusOne - 1; /* don't initialize yet, wait for the first block of flush() order, for better parameters adaptation */ diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 15a5cdb7c..1444d27df 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -769,11 +769,12 @@ static int basicUnitTests(U32 seed, double compressibility) } /* Compression with ZSTD_c_stable{In,Out}Buffer */ - { ZSTD_CCtx* cctx = ZSTD_createCCtx(); + { ZSTD_CCtx* const cctx = ZSTD_createCCtx(); ZSTD_inBuffer in; ZSTD_outBuffer out; size_t cctxSize1; size_t cctxSize2; + assert(cctx != NULL); in.src = CNBuffer; in.size = CNBufferSize; out.dst = compressedBuffer; @@ -784,6 +785,7 @@ static int basicUnitTests(U32 seed, double compressibility) CHECK(!(cSize < ZSTD_compressBound(CNBufferSize)), "cSize too large for test"); CHECK_Z(cSize = ZSTD_compress2(cctx, compressedBuffer, cSize + 4, CNBuffer, CNBufferSize)); CHECK_Z(cctxSize1 = ZSTD_sizeof_CCtx(cctx)); + /* @cctxSize2 : sizeof_CCtx when doing full streaming (no stable in/out) */ { ZSTD_CCtx* const cctx2 = ZSTD_createCCtx(); assert(cctx2 != NULL); in.pos = out.pos = 0; @@ -792,16 +794,17 @@ static int basicUnitTests(U32 seed, double compressibility) CHECK_Z(cctxSize2 = ZSTD_sizeof_CCtx(cctx2)); ZSTD_freeCCtx(cctx2); } - { ZSTD_CCtx* const cctx3 = ZSTD_createCCtx(); + /* @cctxSize1 : sizeof_CCtx when doing single-shot compression (no streaming) */ + { ZSTD_CCtx* const cctx1 = ZSTD_createCCtx(); ZSTD_parameters params = ZSTD_getParams(0, CNBufferSize, 0); size_t cSize3; - assert(cctx3 != NULL); + assert(cctx1 != NULL); params.fParams.checksumFlag = 1; - cSize3 = ZSTD_compress_advanced(cctx3, compressedBuffer, compressedBufferSize, CNBuffer, CNBufferSize, NULL, 0, params); + cSize3 = ZSTD_compress_advanced(cctx1, compressedBuffer, compressedBufferSize, CNBuffer, CNBufferSize, NULL, 0, params); CHECK_Z(cSize3); CHECK(!(cSize == cSize3), "Must be same compressed size"); - CHECK(!(cctxSize1 == ZSTD_sizeof_CCtx(cctx3)), "Must be same CCtx size"); - ZSTD_freeCCtx(cctx3); + CHECK(!(cctxSize1 == ZSTD_sizeof_CCtx(cctx1)), "Must be same CCtx size"); + ZSTD_freeCCtx(cctx1); } CHECK(!(cctxSize1 < cctxSize2), "Stable buffers means less allocated size"); CHECK_Z(ZSTD_decompress(decodedBuffer, CNBufferSize, compressedBuffer, cSize)); @@ -891,8 +894,9 @@ static int basicUnitTests(U32 seed, double compressibility) } DISPLAYLEVEL(3, "OK \n"); - DISPLAYLEVEL(3, "test%3i : ZSTD_compressStream2() ZSTD_c_stableInBuffer allocated size : ", testNb++); + DISPLAYLEVEL(3, "test%3i : ZSTD_compressStream2() with ZSTD_c_stableInBuffer: context size : ", testNb++); { size_t const cctxSize = ZSTD_sizeof_CCtx(cctx); + DISPLAYLEVEL(4, "cctxSize1=%zu; cctxSize=%zu; cctxSize2=%zu : ", cctxSize1, cctxSize, cctxSize2); CHECK(!(cctxSize1 < cctxSize), "Must be bigger than single-pass"); CHECK(!(cctxSize < cctxSize2), "Must be smaller than streaming"); cctxSize1 = cctxSize; @@ -925,8 +929,9 @@ static int basicUnitTests(U32 seed, double compressibility) } DISPLAYLEVEL(3, "OK \n"); - DISPLAYLEVEL(3, "test%3i : ZSTD_compressStream2() ZSTD_c_stableOutBuffer allocated size : ", testNb++); + DISPLAYLEVEL(3, "test%3i : ZSTD_compressStream2() with ZSTD_c_stableOutBuffer: context size : ", testNb++); { size_t const cctxSize = ZSTD_sizeof_CCtx(cctx); + DISPLAYLEVEL(4, "cctxSize1=%zu; cctxSize=%zu; cctxSize2=%zu : ", cctxSize1, cctxSize, cctxSize2); CHECK(!(cctxSize1 < cctxSize), "Must be bigger than single-pass and stableInBuffer"); CHECK(!(cctxSize < cctxSize2), "Must be smaller than streaming"); } From c1668a00d2d71915582a22c3c0082f59cfee53bd Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Mon, 24 Jan 2022 22:57:55 -0800 Subject: [PATCH 08/13] fix extended case combining stableInBuffer with continue() and flush() modes --- lib/compress/zstd_compress.c | 78 ++++++++++++++++++--------- lib/compress/zstd_compress_internal.h | 2 +- tests/fuzzer.c | 2 +- tests/zstreamtest.c | 12 +++-- 4 files changed, 65 insertions(+), 29 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 7be5d87a7..e2a6d9675 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -5316,9 +5316,14 @@ size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel) static size_t ZSTD_nextInputSizeHint(const ZSTD_CCtx* cctx) { - size_t hintInSize = cctx->inBuffTarget - cctx->inBuffPos; - if (hintInSize==0) hintInSize = cctx->blockSize; - return hintInSize; + if (cctx->appliedParams.inBufferMode == ZSTD_bm_stable) { + return cctx->blockSize - cctx->stableIn_notConsumed; + } + assert(cctx->appliedParams.inBufferMode == ZSTD_bm_buffered); + { size_t hintInSize = cctx->inBuffTarget - cctx->inBuffPos; + if (hintInSize==0) hintInSize = cctx->blockSize; + return hintInSize; + } } /** ZSTD_compressStream_generic(): @@ -5329,16 +5334,23 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, ZSTD_inBuffer* input, ZSTD_EndDirective const flushMode) { - const char* const istart = (const char*)input->src; + const char* const istart = (assert(input != NULL), (const char*)input->src); const char* const iend = (istart != NULL) ? istart + input->size : istart; const char* ip = (istart != NULL) ? istart + input->pos : istart; - char* const ostart = (char*)output->dst; + char* const ostart = (assert(output != NULL), (char*)output->dst); char* const oend = (ostart != NULL) ? ostart + output->size : ostart; char* op = (ostart != NULL) ? ostart + output->pos : ostart; U32 someMoreWork = 1; /* check expectations */ - DEBUGLOG(5, "ZSTD_compressStream_generic, flush=%i", (int)flushMode); + DEBUGLOG(5, "ZSTD_compressStream_generic, flush=%i, srcSize = %zu", (int)flushMode, input->size - input->pos); + assert(zcs != NULL); + if (zcs->appliedParams.inBufferMode == ZSTD_bm_stable) { + assert(input->pos >= zcs->stableIn_notConsumed); + input->pos -= zcs->stableIn_notConsumed; + ip -= zcs->stableIn_notConsumed; + zcs->stableIn_notConsumed = 0; + } if (zcs->appliedParams.inBufferMode == ZSTD_bm_buffered) { assert(zcs->inBuff != NULL); assert(zcs->inBuffSize > 0); @@ -5347,8 +5359,10 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, assert(zcs->outBuff != NULL); assert(zcs->outBuffSize > 0); } - assert(output->pos <= output->size); + if (input->src == NULL) assert(input->size == 0); assert(input->pos <= input->size); + if (output->dst == NULL) assert(output->size == 0); + assert(output->pos <= output->size); assert((U32)flushMode <= (U32)ZSTD_e_end); while (someMoreWork) { @@ -5380,8 +5394,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, zcs->inBuff + zcs->inBuffPos, toLoad, ip, iend-ip); zcs->inBuffPos += loaded; - if (loaded != 0) - ip += loaded; + if (ip) ip += loaded; if ( (flushMode == ZSTD_e_continue) && (zcs->inBuffPos < zcs->inBuffTarget) ) { /* not enough input to fill full block : stop here */ @@ -5392,6 +5405,20 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, /* empty */ someMoreWork = 0; break; } + } else { + assert(zcs->appliedParams.inBufferMode == ZSTD_bm_stable); + if ( (flushMode == ZSTD_e_continue) + && ( (size_t)(iend - ip) < zcs->blockSize) ) { + /* can't compress a full block : stop here */ + zcs->stableIn_notConsumed = (size_t)(iend - ip); + ip = iend; /* pretend to have consumed input */ + someMoreWork = 0; break; + } + if ( (flushMode == ZSTD_e_flush) + && (ip == iend) ) { + /* empty */ + someMoreWork = 0; break; + } } /* compress current block (note : this stage cannot be stopped in the middle) */ DEBUGLOG(5, "stream compression stage (flushMode==%u)", flushMode); @@ -5399,9 +5426,8 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, void* cDst; size_t cSize; size_t oSize = oend-op; - size_t const iSize = inputBuffered - ? zcs->inBuffPos - zcs->inToCompress - : MIN((size_t)(iend - ip), zcs->blockSize); + size_t const iSize = inputBuffered ? zcs->inBuffPos - zcs->inToCompress + : MIN((size_t)(iend - ip), zcs->blockSize); if (oSize >= ZSTD_compressBound(iSize) || zcs->appliedParams.outBufferMode == ZSTD_bm_stable) cDst = op; /* compress into output buffer, to skip flush stage */ else @@ -5425,17 +5451,15 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, assert(zcs->inBuffTarget <= zcs->inBuffSize); zcs->inToCompress = zcs->inBuffPos; } else { /* !inputBuffered, hence ZSTD_bm_stable */ - unsigned const lastBlock = (ip + iSize == iend); + unsigned const lastBlock = (flushMode == ZSTD_e_end) && (ip + iSize == iend); cSize = lastBlock ? ZSTD_compressEnd(zcs, cDst, oSize, ip, iSize) : ZSTD_compressContinue(zcs, cDst, oSize, ip, iSize); /* Consume the input prior to error checking to mirror buffered mode. */ - if (iSize > 0) - ip += iSize; + if (ip) ip += iSize; FORWARD_IF_ERROR(cSize, "%s", lastBlock ? "ZSTD_compressEnd failed" : "ZSTD_compressContinue failed"); zcs->frameEnded = lastBlock; - if (lastBlock) - assert(ip == iend); + if (lastBlock) assert(ip == iend); } if (cDst == op) { /* no need to flush */ op += cSize; @@ -5514,6 +5538,7 @@ size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuf static void ZSTD_setBufferExpectations(ZSTD_CCtx* cctx, const ZSTD_outBuffer* output, const ZSTD_inBuffer* input) { + DEBUGLOG(5, "ZSTD_setBufferExpectations (for advanced stable in/out modes)"); if (cctx->appliedParams.inBufferMode == ZSTD_bm_stable) { cctx->expectedInBuffer = *input; } @@ -5647,27 +5672,25 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx, /* transparent initialization stage */ if (cctx->streamStage == zcss_init) { size_t const inputSize = input->size - input->pos; /* no obligation to start from pos==0 */ - size_t const totalInputSize = (cctx->savedInPosPlusOne == 0) ? inputSize : input->size - (cctx->savedInPosPlusOne - 1); + size_t const totalInputSize = inputSize + cctx->stableIn_notConsumed; if ( (cctx->requestedParams.inBufferMode == ZSTD_bm_stable) /* input is presumed stable, across invocations */ && (endOp == ZSTD_e_continue) /* no flush requested, more input to come */ && (totalInputSize < ZSTD_BLOCKSIZE_MAX) ) { /* not even reached one block yet */ - if (cctx->savedInPosPlusOne) { /* not the first time */ + if (cctx->stableIn_notConsumed) { /* not the first time */ /* check stable source guarantees */ assert(input->src == cctx->expectedInBuffer.src); assert(input->pos == cctx->expectedInBuffer.size); } - /* keep track of first position */ - if (cctx->savedInPosPlusOne == 0) cctx->savedInPosPlusOne = input->pos + 1; - cctx->expectedInBuffer = *input; /* pretend input was consumed, to give a sense forward progress */ input->pos = input->size; + /* save stable inBuffer, for later control, and flush/end */ + cctx->expectedInBuffer = *input; /* but actually input wasn't consumed, so keep track of position from where compression shall resume */ - cctx->expectedInBuffer.pos = cctx->savedInPosPlusOne - 1; + cctx->stableIn_notConsumed += inputSize; /* don't initialize yet, wait for the first block of flush() order, for better parameters adaptation */ return ZSTD_FRAMEHEADERSIZE_MIN(cctx->requestedParams.format); /* at least some header to produce */ } FORWARD_IF_ERROR(ZSTD_CCtx_init_compressStream2(cctx, endOp, totalInputSize), "compressStream2 initialization failed"); - cctx->savedInPosPlusOne = 0; ZSTD_setBufferExpectations(cctx, output, input); /* Set initial buffer expectations now that we've initialized */ } /* end of transparent initialization stage */ @@ -5681,6 +5704,13 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx, ZSTDMT_updateCParams_whileCompressing(cctx->mtctx, &cctx->requestedParams); cctx->cParamsChanged = 0; } + if (cctx->stableIn_notConsumed) { + assert(cctx->appliedParams.inBufferMode == ZSTD_bm_stable); + /* some early data was skipped - make it available for consumption */ + assert(input->pos >= cctx->stableIn_notConsumed); + input->pos -= cctx->stableIn_notConsumed; + cctx->stableIn_notConsumed = 0; + } for (;;) { size_t const ipos = input->pos; size_t const opos = output->pos; diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index 8cc2f81d6..efbf89ae3 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -410,8 +410,8 @@ struct ZSTD_CCtx_s { /* Stable in/out buffer verification */ ZSTD_inBuffer expectedInBuffer; + size_t stableIn_notConsumed; /* nb bytes within stable input buffer that are said to be consumed but are not */ size_t expectedOutBufferSize; - size_t savedInPosPlusOne; /* 0 == no savedInPos */ /* Dictionary */ ZSTD_localDict localDict; diff --git a/tests/fuzzer.c b/tests/fuzzer.c index 823db775f..ddb2ad393 100644 --- a/tests/fuzzer.c +++ b/tests/fuzzer.c @@ -1206,7 +1206,7 @@ static int basicUnitTests(U32 const seed, double compressibility) DISPLAYLEVEL(3, "test%3i : compress a NULL input with each level : ", testNb++); { int level = -1; - ZSTD_CCtx* cctx = ZSTD_createCCtx(); + ZSTD_CCtx* const cctx = ZSTD_createCCtx(); if (!cctx) goto _output_error; for (level = -1; level <= ZSTD_maxCLevel(); ++level) { CHECK_Z( ZSTD_compress(compressedBuffer, compressedBufferSize, NULL, 0, level) ); diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 1444d27df..e084924b2 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -878,20 +878,26 @@ static int basicUnitTests(U32 seed, double compressibility) CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_c_stableInBuffer, 1) ); { ZSTD_inBuffer inBuf; ZSTD_outBuffer outBuf; + const size_t inputSize = 500; inBuf.src = CNBuffer; inBuf.size = 100; inBuf.pos = 0; outBuf.dst = (char*)(compressedBuffer)+cSize; - outBuf.size = ZSTD_compressBound(500); + outBuf.size = ZSTD_compressBound(inputSize); outBuf.pos = 0; CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) ); inBuf.size = 200; CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) ); CHECK_Z( ZSTD_flushStream(cctx, &outBuf) ); - inBuf.size = 300; + inBuf.size = inputSize; CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) ); CHECK(ZSTD_endStream(cctx, &outBuf) != 0, "compression should be successful and fully flushed"); - } + { void* const verifBuf = (char*)outBuf.dst + outBuf.pos; + const size_t decSize = ZSTD_decompress(verifBuf, inputSize, outBuf.dst, outBuf.pos); + CHECK_Z(decSize); + CHECK(decSize != inputSize, "regenerated %zu bytes, instead of %zu", decSize, inputSize); + CHECK(memcmp(inBuf.src, verifBuf, inputSize) != 0, "regenerated data different from original"); + } } DISPLAYLEVEL(3, "OK \n"); DISPLAYLEVEL(3, "test%3i : ZSTD_compressStream2() with ZSTD_c_stableInBuffer: context size : ", testNb++); From af3d9c506e5bbfdbf78ce35fe62501c4ae0e19a7 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 26 Jan 2022 10:30:33 -0800 Subject: [PATCH 09/13] added streaming test starting from non-0 pos --- tests/zstreamtest.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index e084924b2..a88ba7ac8 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -878,10 +878,11 @@ static int basicUnitTests(U32 seed, double compressibility) CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_c_stableInBuffer, 1) ); { ZSTD_inBuffer inBuf; ZSTD_outBuffer outBuf; + const size_t nonZeroStartPos = 18; const size_t inputSize = 500; inBuf.src = CNBuffer; inBuf.size = 100; - inBuf.pos = 0; + inBuf.pos = nonZeroStartPos; outBuf.dst = (char*)(compressedBuffer)+cSize; outBuf.size = ZSTD_compressBound(inputSize); outBuf.pos = 0; @@ -889,14 +890,15 @@ static int basicUnitTests(U32 seed, double compressibility) inBuf.size = 200; CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) ); CHECK_Z( ZSTD_flushStream(cctx, &outBuf) ); - inBuf.size = inputSize; + inBuf.size = nonZeroStartPos + inputSize; CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) ); CHECK(ZSTD_endStream(cctx, &outBuf) != 0, "compression should be successful and fully flushed"); - { void* const verifBuf = (char*)outBuf.dst + outBuf.pos; + { const void* const realSrcStart = (const char*)inBuf.src + nonZeroStartPos; + void* const verifBuf = (char*)outBuf.dst + outBuf.pos; const size_t decSize = ZSTD_decompress(verifBuf, inputSize, outBuf.dst, outBuf.pos); CHECK_Z(decSize); CHECK(decSize != inputSize, "regenerated %zu bytes, instead of %zu", decSize, inputSize); - CHECK(memcmp(inBuf.src, verifBuf, inputSize) != 0, "regenerated data different from original"); + CHECK(memcmp(realSrcStart, verifBuf, inputSize) != 0, "regenerated data different from original"); } } DISPLAYLEVEL(3, "OK \n"); From b99ece96b99b7015169d5e11d45078f21bfa6bd4 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 26 Jan 2022 10:43:50 -0800 Subject: [PATCH 10/13] converted checks into user validation generating error codes had to create a new error code for this condition, none of the existing ones were fitting enough. --- lib/common/error_private.c | 1 + lib/compress/zstd_compress.c | 4 ++-- lib/zstd_errors.h | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/common/error_private.c b/lib/common/error_private.c index 6d1135f8c..f171e0dce 100644 --- a/lib/common/error_private.c +++ b/lib/common/error_private.c @@ -38,6 +38,7 @@ const char* ERR_getErrorString(ERR_enum code) case PREFIX(tableLog_tooLarge): return "tableLog requires too much memory : unsupported"; case PREFIX(maxSymbolValue_tooLarge): return "Unsupported max Symbol Value : too large"; case PREFIX(maxSymbolValue_tooSmall): return "Specified maxSymbolValue is too small"; + case PREFIX(stabilityCondition_notRespected): return "pledged buffer stability condition is not respected"; case PREFIX(dictionary_corrupted): return "Dictionary is corrupted"; case PREFIX(dictionary_wrong): return "Dictionary mismatch"; case PREFIX(dictionaryCreation_failed): return "Cannot create Dictionary from provided samples"; diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index e2a6d9675..57f41a3b3 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -5678,8 +5678,8 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx, && (totalInputSize < ZSTD_BLOCKSIZE_MAX) ) { /* not even reached one block yet */ if (cctx->stableIn_notConsumed) { /* not the first time */ /* check stable source guarantees */ - assert(input->src == cctx->expectedInBuffer.src); - assert(input->pos == cctx->expectedInBuffer.size); + RETURN_ERROR_IF(input->src != cctx->expectedInBuffer.src, stabilityCondition_notRespected, "stableInBuffer condition not respected: wrong src pointer"); + RETURN_ERROR_IF(input->pos != cctx->expectedInBuffer.size, stabilityCondition_notRespected, "stableInBuffer condition not respected: externally modified pos"); } /* pretend input was consumed, to give a sense forward progress */ input->pos = input->size; diff --git a/lib/zstd_errors.h b/lib/zstd_errors.h index fa3686b77..2ec0b0ab1 100644 --- a/lib/zstd_errors.h +++ b/lib/zstd_errors.h @@ -66,6 +66,7 @@ typedef enum { ZSTD_error_tableLog_tooLarge = 44, ZSTD_error_maxSymbolValue_tooLarge = 46, ZSTD_error_maxSymbolValue_tooSmall = 48, + ZSTD_error_stabilityCondition_notRespected = 50, ZSTD_error_stage_wrong = 60, ZSTD_error_init_missing = 62, ZSTD_error_memory_allocation = 64, From cbff372d105cb0593af24aba780481c83ef8d2ab Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 26 Jan 2022 11:05:57 -0800 Subject: [PATCH 11/13] added helper function inBuffer_forEndFlush() --- lib/compress/zstd_compress.c | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 57f41a3b3..6a34beea9 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -6207,13 +6207,18 @@ size_t ZSTD_compressSequences(ZSTD_CCtx* const cctx, void* dst, size_t dstCapaci /*====== Finalize ======*/ +static ZSTD_inBuffer inBuffer_forEndFlush(const ZSTD_CStream* zcs) +{ + const ZSTD_inBuffer nullInput = { NULL, 0, 0 }; + const int stableInput = (zcs->appliedParams.inBufferMode == ZSTD_bm_stable); + return stableInput ? zcs->expectedInBuffer : nullInput; +} + /*! ZSTD_flushStream() : * @return : amount of data remaining to flush */ size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output) { - ZSTD_inBuffer const nullInput = { NULL, 0, 0 }; - int const stableInput = (zcs->appliedParams.inBufferMode == ZSTD_bm_stable); - ZSTD_inBuffer input = stableInput ? zcs->expectedInBuffer : nullInput; + ZSTD_inBuffer input = inBuffer_forEndFlush(zcs); input.size = input.pos; /* do not ingest more input during flush */ return ZSTD_compressStream2(zcs, output, &input, ZSTD_e_flush); } @@ -6221,11 +6226,9 @@ size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output) size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output) { - ZSTD_inBuffer const nullInput = { NULL, 0, 0 }; - int const stableInput = (zcs->appliedParams.inBufferMode == ZSTD_bm_stable); - ZSTD_inBuffer input = stableInput ? zcs->expectedInBuffer : nullInput; + ZSTD_inBuffer input = inBuffer_forEndFlush(zcs); size_t const remainingToFlush = ZSTD_compressStream2(zcs, output, &input, ZSTD_e_end); - FORWARD_IF_ERROR( remainingToFlush , "ZSTD_compressStream2 failed"); + FORWARD_IF_ERROR(remainingToFlush , "ZSTD_compressStream2(,,ZSTD_e_end) failed"); if (zcs->appliedParams.nbWorkers > 0) return remainingToFlush; /* minimal estimation */ /* single thread mode : attempt to calculate remaining to flush more precisely */ { size_t const lastBlockSize = zcs->frameEnded ? 0 : ZSTD_BLOCKHEADERSIZE; From dda4c10f0710d24685ee8490e9715d18300665f1 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 26 Jan 2022 13:33:04 -0800 Subject: [PATCH 12/13] added ZSTD_compressStream2() + ZSTD_c_stableInBuffer test --- tests/zstreamtest.c | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index a88ba7ac8..dbe2b907a 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -902,6 +902,36 @@ static int basicUnitTests(U32 seed, double compressibility) } } DISPLAYLEVEL(3, "OK \n"); + /* stableSrc + streaming */ + DISPLAYLEVEL(3, "test%3i : ZSTD_c_stableInBuffer compatibility with compressStream2, using different end directives : ", testNb++); + CHECK_Z( ZSTD_initCStream(cctx, 1) ); + CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_c_stableInBuffer, 1) ); + { ZSTD_inBuffer inBuf; + ZSTD_outBuffer outBuf; + const size_t nonZeroStartPos = 18; + const size_t inputSize = 500; + inBuf.src = CNBuffer; + inBuf.size = 100; + inBuf.pos = nonZeroStartPos; + outBuf.dst = (char*)(compressedBuffer)+cSize; + outBuf.size = ZSTD_compressBound(inputSize); + outBuf.pos = 0; + CHECK_Z( ZSTD_compressStream2(cctx, &outBuf, &inBuf, ZSTD_e_continue) ); + inBuf.size = 200; + CHECK_Z( ZSTD_compressStream2(cctx, &outBuf, &inBuf, ZSTD_e_continue) ); + CHECK_Z( ZSTD_compressStream2(cctx, &outBuf, &inBuf, ZSTD_e_flush) ); + inBuf.size = nonZeroStartPos + inputSize; + CHECK_Z( ZSTD_compressStream2(cctx, &outBuf, &inBuf, ZSTD_e_continue) ); + CHECK( ZSTD_compressStream2(cctx, &outBuf, &inBuf, ZSTD_e_end) != 0, "compression should be successful and fully flushed"); + { const void* const realSrcStart = (const char*)inBuf.src + nonZeroStartPos; + void* const verifBuf = (char*)outBuf.dst + outBuf.pos; + const size_t decSize = ZSTD_decompress(verifBuf, inputSize, outBuf.dst, outBuf.pos); + CHECK_Z(decSize); + CHECK(decSize != inputSize, "regenerated %zu bytes, instead of %zu", decSize, inputSize); + CHECK(memcmp(realSrcStart, verifBuf, inputSize) != 0, "regenerated data different from original"); + } } + DISPLAYLEVEL(3, "OK \n"); + DISPLAYLEVEL(3, "test%3i : ZSTD_compressStream2() with ZSTD_c_stableInBuffer: context size : ", testNb++); { size_t const cctxSize = ZSTD_sizeof_CCtx(cctx); DISPLAYLEVEL(4, "cctxSize1=%zu; cctxSize=%zu; cctxSize2=%zu : ", cctxSize1, cctxSize, cctxSize2); From f2d9652ad82c5ead0665bea428215eaf027de933 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 26 Jan 2022 18:04:52 -0800 Subject: [PATCH 13/13] more usage of new error code stabilityCondition_notRespected as suggested by @terrelln --- lib/compress/zstd_compress.c | 4 ++-- tests/zstreamtest.c | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 6a34beea9..4a073cf10 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -5558,13 +5558,13 @@ static size_t ZSTD_checkBufferStability(ZSTD_CCtx const* cctx, if (cctx->appliedParams.inBufferMode == ZSTD_bm_stable) { ZSTD_inBuffer const expect = cctx->expectedInBuffer; if (expect.src != input->src || expect.pos != input->pos) - RETURN_ERROR(srcBuffer_wrong, "ZSTD_c_stableInBuffer enabled but input differs!"); + RETURN_ERROR(stabilityCondition_notRespected, "ZSTD_c_stableInBuffer enabled but input differs!"); } (void)endOp; if (cctx->appliedParams.outBufferMode == ZSTD_bm_stable) { size_t const outBufferSize = output->size - output->pos; if (cctx->expectedOutBufferSize != outBufferSize) - RETURN_ERROR(dstBuffer_wrong, "ZSTD_c_stableOutBuffer enabled but output size differs!"); + RETURN_ERROR(stabilityCondition_notRespected, "ZSTD_c_stableOutBuffer enabled but output size differs!"); } return 0; } diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index dbe2b907a..20a05a75c 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -868,7 +868,7 @@ static int basicUnitTests(U32 seed, double compressibility) in.pos = 0; { size_t const ret = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end); CHECK(!ZSTD_isError(ret), "Must error"); - CHECK(!(ZSTD_getErrorCode(ret) == ZSTD_error_srcBuffer_wrong), "Must be this error"); + CHECK(!(ZSTD_getErrorCode(ret) == ZSTD_error_stabilityCondition_notRespected), "Must be this error"); } DISPLAYLEVEL(3, "OK \n"); @@ -963,7 +963,7 @@ static int basicUnitTests(U32 seed, double compressibility) in.pos = out.pos = 0; { size_t const ret = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_continue); CHECK(!ZSTD_isError(ret), "Must have errored"); - CHECK(!(ZSTD_getErrorCode(ret) == ZSTD_error_dstBuffer_wrong), "Must be this error"); + CHECK(!(ZSTD_getErrorCode(ret) == ZSTD_error_stabilityCondition_notRespected), "Must be this error"); } DISPLAYLEVEL(3, "OK \n");