diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 7af7e7881..06a3f101e 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -3788,8 +3788,15 @@ size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel) /*====== Compression ======*/ -MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity, - const void* src, size_t srcSize) +static size_t ZSTD_nextInputSizeHint(const ZSTD_CCtx* cctx) +{ + size_t hintInSize = cctx->inBuffTarget - cctx->inBuffPos; + if (hintInSize==0) hintInSize = cctx->blockSize; + return hintInSize; +} + +static size_t ZSTD_limitCopy(void* dst, size_t dstCapacity, + const void* src, size_t srcSize) { size_t const length = MIN(dstCapacity, srcSize); if (length) memcpy(dst, src, length); @@ -3797,7 +3804,7 @@ MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity, } /** ZSTD_compressStream_generic(): - * internal function for all *compressStream*() variants and *compress_generic() + * internal function for all *compressStream*() variants * non-static, because can be called from zstdmt_compress.c * @return : hint size for next input */ size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, @@ -3937,19 +3944,25 @@ size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, input->pos = ip - istart; output->pos = op - ostart; if (zcs->frameEnded) return 0; - { size_t hintInSize = zcs->inBuffTarget - zcs->inBuffPos; - if (hintInSize==0) hintInSize = zcs->blockSize; - return hintInSize; + return ZSTD_nextInputSizeHint(zcs); +} + +static size_t ZSTD_nextInputSizeHint_MTorST(const ZSTD_CCtx* cctx) +{ +#ifdef ZSTD_MULTITHREAD + if (cctx->appliedParams.nbWorkers >= 1) { + assert(cctx->mtctx != NULL); + return ZSTDMT_nextInputSizeHint(cctx->mtctx); } +#endif + return ZSTD_nextInputSizeHint(cctx); + } size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { - /* check conditions */ - if (output->pos > output->size) return ERROR(GENERIC); - if (input->pos > input->size) return ERROR(GENERIC); - - return ZSTD_compressStream_generic(zcs, output, input, ZSTD_e_continue); + CHECK_F( ZSTD_compressStream2(zcs, output, input, ZSTD_e_continue) ); + return ZSTD_nextInputSizeHint_MTorST(zcs); } @@ -4005,6 +4018,7 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx, assert(cctx->streamStage == zcss_load); assert(cctx->appliedParams.nbWorkers == 0); } } + /* end of transparent initialization stage */ /* compression stage */ #ifdef ZSTD_MULTITHREAD @@ -4054,6 +4068,10 @@ size_t ZSTD_compress2(ZSTD_CCtx* cctx, ZSTD_e_end); assert(iPos == srcSize); if (ZSTD_isError(result)) return result; + if (result != 0) { /* compression not completed, due to lack of output space */ + assert(oPos == dstCapacity); + return ERROR(dstSize_tooSmall); + } return oPos; } @@ -4064,20 +4082,20 @@ size_t ZSTD_compress2(ZSTD_CCtx* cctx, size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output) { ZSTD_inBuffer input = { NULL, 0, 0 }; - if (output->pos > output->size) return ERROR(GENERIC); - CHECK_F( ZSTD_compressStream_generic(zcs, output, &input, ZSTD_e_flush) ); - return zcs->outBuffContentSize - zcs->outBuffFlushedSize; /* remaining to flush */ + return ZSTD_compressStream2(zcs, output, &input, ZSTD_e_flush); } size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output) { ZSTD_inBuffer input = { NULL, 0, 0 }; - if (output->pos > output->size) return ERROR(GENERIC); - CHECK_F( ZSTD_compressStream_generic(zcs, output, &input, ZSTD_e_end) ); + size_t const remainingToFlush = ZSTD_compressStream2(zcs, output, &input, ZSTD_e_end); + CHECK_F( remainingToFlush ); + if (zcs->appliedParams.nbWorkers > 0) return remainingToFlush; /* minimal estimation */ + /* single thread mode : attempt to calculate remaining to flush more precisely */ { size_t const lastBlockSize = zcs->frameEnded ? 0 : ZSTD_BLOCKHEADERSIZE; size_t const checksumSize = zcs->frameEnded ? 0 : zcs->appliedParams.fParams.checksumFlag * 4; - size_t const toFlush = zcs->outBuffContentSize - zcs->outBuffFlushedSize + lastBlockSize + checksumSize; + size_t const toFlush = remainingToFlush + lastBlockSize + checksumSize; DEBUGLOG(4, "ZSTD_endStream : remaining to flush : %u", (U32)toFlush); return toFlush; } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 465ab1e44..4a9a626cc 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1844,7 +1844,9 @@ typedef struct { * Otherwise, we will load as many bytes as possible and instruct the caller * to continue as normal. */ -static syncPoint_t findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) { +static syncPoint_t +findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) +{ BYTE const* const istart = (BYTE const*)input.src + input.pos; U64 const primePower = mtctx->rsync.primePower; U64 const hitMask = mtctx->rsync.hitMask; @@ -1908,6 +1910,13 @@ static syncPoint_t findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuf return syncPoint; } +size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx) +{ + size_t hintInSize = mtctx->targetSectionSize - mtctx->inBuff.filled; + if (hintInSize==0) hintInSize = mtctx->targetSectionSize; + return hintInSize; +} + /** ZSTDMT_compressStream_generic() : * internal use only - exposed to be invoked from zstd_compress.c * assumption : output and input are valid (pos <= size) diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 666b506e8..c60ec8322 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -60,6 +60,7 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel); ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< if srcSize is not known at reset time, use ZSTD_CONTENTSIZE_UNKNOWN. Note: for compatibility with older programs, 0 means the same as ZSTD_CONTENTSIZE_UNKNOWN, but it will change in the future to mean "empty" */ +ZSTDLIB_API size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx); ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input); ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ diff --git a/lib/zstd.h b/lib/zstd.h index ff4897090..d5bd738f7 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -289,13 +289,17 @@ typedef struct ZSTD_outBuffer_s { * A ZSTD_CStream object is required to track streaming operation. * Use ZSTD_createCStream() and ZSTD_freeCStream() to create/release resources. * ZSTD_CStream objects can be reused multiple times on consecutive compression operations. -* It is recommended to re-use ZSTD_CStream in situations where many streaming operations will be achieved consecutively, -* since it will play nicer with system's memory, by re-using already allocated memory. -* Use one separate ZSTD_CStream per thread for parallel execution. +* It is recommended to re-use ZSTD_CStream since it will play nicer with system's memory, by re-using already allocated memory. * -* Start a new compression by initializing ZSTD_CStream context. -* Use ZSTD_initCStream() to start a new compression operation. -* Use variants ZSTD_initCStream_usingDict() or ZSTD_initCStream_usingCDict() for streaming with dictionary (experimental section) +* For parallel execution, use one separate ZSTD_CStream per thread. +* +* note : since v1.3.0, ZSTD_CStream and ZSTD_CCtx are the same thing. +* +* Parameters are sticky : when starting a new compression on the same context, +* it will re-use the same sticky parameters as previous compression session. +* It's recommended to initialize the context before every usage. +* Use ZSTD_initCStream() to set the parameter to a selected compression level. +* Use advanced API (ZSTD_CCtx_setParameter(), etc.) to set more detailed parameters. * * Use ZSTD_compressStream() as many times as necessary to consume input stream. * The function will automatically update both `pos` fields within `input` and `output`. @@ -306,10 +310,10 @@ typedef struct ZSTD_outBuffer_s { * If not, the caller must make some room to receive more compressed data, * typically by emptying output buffer, or allocating a new output buffer, * and then present again remaining input data. -* @return : a size hint, preferred nb of bytes to use as input for next function call -* or an error code, which can be tested using ZSTD_isError(). -* Note 1 : it's just a hint, to help latency a little, any other value will work fine. -* Note 2 : size hint is guaranteed to be <= ZSTD_CStreamInSize() +* @return : a size hint, preferred nb of bytes to use as input for next function call +* or an error code, which can be tested using ZSTD_isError(). +* Note 1 : it's just a hint, to help latency a little, any value will work fine. +* Note 2 : size hint is guaranteed to be <= ZSTD_CStreamInSize() * * At any moment, it's possible to flush whatever data might remain stuck within internal buffer, * using ZSTD_flushStream(). `output->pos` will be updated. @@ -757,21 +761,22 @@ typedef enum { ZSTD_e_flush=1, /* flush any data provided so far, * it creates (at least) one new block, that can be decoded immediately on reception; * frame will continue: any future data can still reference previously compressed data, improving compression. */ - ZSTD_e_end=2 /* flush any remaining data and close current frame. - * any additional data starts a new frame. - * each frame is independent (does not reference any content from previous frame). */ + ZSTD_e_end=2 /* flush any remaining data _and_ close current frame. + * note that frame is only closed after compressed data is fully flushed (return value == 0). + * After that point, any additional data starts a new frame. + * note : each frame is independent (does not reference any content from previous frame). */ } ZSTD_EndDirective; /*! ZSTD_compressStream2() : - * Behave about the same as ZSTD_compressStream, with additional control on end directive. + * Behaves about the same as ZSTD_compressStream, with additional control on end directive. * - Compression parameters are pushed into CCtx before starting compression, using ZSTD_CCtx_set*() * - Compression parameters cannot be changed once compression is started (save a list of exceptions in multi-threading mode) * - outpot->pos must be <= dstCapacity, input->pos must be <= srcSize * - outpot->pos and input->pos will be updated. They are guaranteed to remain below their respective limit. - * - In single-thread mode (default), function is blocking : it completes its job before returning to caller. - * - In multi-thread mode, function is non-blocking : it just acquires a copy of input, and distribute job to internal worker threads, - * and then immediately returns, just indicating that there is some data remaining to be flushed. - * The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte. + * - When nbWorkers==0 (default), function is blocking : it completes its job before returning to caller. + * - When nbWorkers>=1, function is non-blocking : it just acquires a copy of input, and distributes jobs to internal worker threads, + * and then immediately returns, just indicating that there is some data remaining to be flushed. + * The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte. * - Exception : if the first call requests a ZSTD_e_end directive, the function delegates to ZSTD_compress2() which is always blocking. * - @return provides a minimum amount of data remaining to be flushed from internal buffers * or an error code, which can be tested using ZSTD_isError(). diff --git a/tests/fuzzer.c b/tests/fuzzer.c index 0afa21a3a..dd6e260c2 100644 --- a/tests/fuzzer.c +++ b/tests/fuzzer.c @@ -233,11 +233,9 @@ static int FUZ_mallocTests_internal(unsigned seed, double compressibility, unsig mallocCounter_t malcount = INIT_MALLOC_COUNTER; ZSTD_customMem const cMem = { FUZ_mallocDebug, FUZ_freeDebug, &malcount }; ZSTD_CCtx* const cctx = ZSTD_createCCtx_advanced(cMem); - ZSTD_outBuffer out = { outBuffer, outSize, 0 }; - ZSTD_inBuffer in = { inBuffer, inSize, 0 }; CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, compressionLevel) ); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbWorkers, nbThreads) ); - while ( ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end) ) {} + CHECK_Z( ZSTD_compress2(cctx, outBuffer, outSize, inBuffer, inSize) ); ZSTD_freeCCtx(cctx); DISPLAYLEVEL(3, "compress_generic,-T%u,end level %i : ", nbThreads, compressionLevel); @@ -1253,12 +1251,12 @@ static int basicUnitTests(U32 seed, double compressibility) if (ZSTD_isError(cSize_1pass)) goto _output_error; CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, compressionLevel) ); - { ZSTD_inBuffer in = { CNBuffer, srcSize, 0 }; - ZSTD_outBuffer out = { compressedBuffer, compressedBufferSize, 0 }; - size_t const compressionResult = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end); - DISPLAYLEVEL(5, "simple=%zu vs %zu=advanced : ", cSize_1pass, out.pos); + { size_t const compressionResult = ZSTD_compress2(cctx, + compressedBuffer, compressedBufferSize, + CNBuffer, srcSize); + DISPLAYLEVEL(5, "simple=%zu vs %zu=advanced : ", cSize_1pass, compressionResult); if (ZSTD_isError(compressionResult)) goto _output_error; - if (out.pos != cSize_1pass) goto _output_error; + if (compressionResult != cSize_1pass) goto _output_error; } } ZSTD_freeCCtx(cctx); } @@ -1274,13 +1272,12 @@ static int basicUnitTests(U32 seed, double compressibility) CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, 2) ); CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_enableLongDistanceMatching, 1) ); CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_windowLog, 18) ); - { ZSTD_inBuffer in = { CNBuffer, inputSize, 0 }; - ZSTD_outBuffer out = { compressedBuffer, ZSTD_compressBound(inputSize), 0 }; - size_t const result = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end); - if (result != 0) goto _output_error; - if (in.pos != in.size) goto _output_error; - cSize = out.pos; - xxh64 = XXH64(out.dst, out.pos, 0); + { size_t const compressedSize = ZSTD_compress2(cctx, + compressedBuffer, ZSTD_compressBound(inputSize), + CNBuffer, inputSize); + CHECK(compressedSize); + cSize = compressedSize; + xxh64 = XXH64(compressedBuffer, compressedSize, 0); } DISPLAYLEVEL(3, "OK (compress : %u -> %u bytes)\n", (U32)inputSize, (U32)cSize); ZSTD_freeCCtx(cctx); @@ -1291,14 +1288,13 @@ static int basicUnitTests(U32 seed, double compressibility) CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_windowLog, 18) ); CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_enableLongDistanceMatching, 1) ); CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, 2) ); - { ZSTD_inBuffer in = { CNBuffer, inputSize, 0 }; - ZSTD_outBuffer out = { compressedBuffer, ZSTD_compressBound(inputSize), 0 }; - size_t const result = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end); - if (result != 0) goto _output_error; - if (in.pos != in.size) goto _output_error; - if (out.pos != cSize) goto _output_error; /* must result in same compressed result, hence same size */ - if (XXH64(out.dst, out.pos, 0) != xxh64) goto _output_error; /* must result in exactly same content, hence same hash */ - DISPLAYLEVEL(3, "OK (compress : %u -> %u bytes)\n", (U32)inputSize, (U32)out.pos); + { size_t const result = ZSTD_compress2(cctx, + compressedBuffer, ZSTD_compressBound(inputSize), + CNBuffer, inputSize); + CHECK(result); + if (result != cSize) goto _output_error; /* must result in same compressed result, hence same size */ + if (XXH64(compressedBuffer, result, 0) != xxh64) goto _output_error; /* must result in exactly same content, hence same hash */ + DISPLAYLEVEL(3, "OK (compress : %u -> %u bytes)\n", (U32)inputSize, (U32)result); } ZSTD_freeCCtx(cctx); }