diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 0b91ad4ea..1baccf0fc 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -9,10 +9,6 @@ /* ====== Tuning parameters ====== */ -#ifndef ZSTDMT_SECTION_LOGSIZE_MIN -# define ZSTDMT_SECTION_LOGSIZE_MIN 20 /* minimum size for a full compression job (20==2^20==1 MB) */ -#endif - #define ZSTDMT_NBTHREADS_MAX 128 @@ -285,6 +281,7 @@ struct ZSTDMT_CCtx_s { unsigned frameEnded; unsigned allJobsCompleted; unsigned long long frameContentSize; + size_t sectionSize; ZSTD_CDict* cdict; ZSTD_CStream* cstream; ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */ @@ -304,6 +301,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) cctx->nbThreads = nbThreads; cctx->jobIDMask = nbJobs - 1; cctx->allJobsCompleted = 1; + cctx->sectionSize = 0; cctx->factory = POOL_create(nbThreads, 1); cctx->buffPool = ZSTDMT_createBufferPool(nbThreads); cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads); @@ -356,6 +354,22 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) return 0; } +unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value) +{ + switch(parameter) + { + case ZSTDMT_p_sectionSize : + mtctx->sectionSize = value; + return 0; + default : + return ERROR(compressionParameter_unsupported); + } +} + + +/* ------------------------------------------ */ +/* ===== Multi-threaded compression ===== */ +/* ------------------------------------------ */ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, void* dst, size_t dstCapacity, @@ -487,7 +501,8 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, if (zcs->cdict == NULL) return ERROR(memory_allocation); } } zcs->frameContentSize = pledgedSrcSize; - zcs->targetSectionSize = (size_t)1 << MAX(ZSTDMT_SECTION_LOGSIZE_MIN, (zcs->params.cParams.windowLog + 2)); + zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2); + zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog); zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation); diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 84d25f738..c00782e9d 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -7,6 +7,10 @@ * of patent rights can be found in the PATENTS file in the same directory. */ + +/* Note : All prototypes defined in this file shall be considered experimental. + * There is no guarantee of API continuity (yet) on any of these prototypes */ + /* === Dependencies === */ #include /* size_t */ #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_parameters */ @@ -27,12 +31,32 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx, /* === Streaming functions === */ -ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel); -ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ -ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, /**< dict can be released after init, a local copy is preserved within zcs */ - ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ +ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel); +ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ -ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input); +ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input); -ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ -ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ +ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ +ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ + + +/* === Advanced functions and parameters === */ + +#ifndef ZSTDMT_SECTION_SIZE_MIN +# define ZSTDMT_SECTION_SIZE_MIN (1U << 20) /* 1 MB - Minimum size of each compression job */ +#endif + +ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, const void* dict, size_t dictSize, /**< dict can be released after init, a local copy is preserved within zcs */ + ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ + +/* ZSDTMT_parameter : + * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */ +typedef enum { ZSTDMT_p_sectionSize /* size of input "section". Each section is compressed in parallel. 0 means default, which is dynamically determined within compression functions */ + } ZSDTMT_parameter; + +/* ZSTDMT_setMTCtxParameter() : + * allow setting individual parameters, one at a time, among a list of enums defined in ZSTDMT_parameter. + * The function must be called typically after ZSTD_createCCtx(). + * Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions. + * @return : 0, or an error code (which can be tested using ZSTD_isError()) */ +ZSTDLIB_API unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value); diff --git a/programs/fileio.c b/programs/fileio.c index 3864a5fab..86db12acb 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -113,6 +113,16 @@ void FIO_setNbThreads(unsigned nbThreads) { #endif g_nbThreads = nbThreads; } +static U32 g_blockSize = 0; +void FIO_setBlockSize(unsigned blockSize) { + if (blockSize && g_nbThreads==1) + DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n"); +#ifdef ZSTD_MULTITHREAD + if (blockSize-1 < ZSTDMT_SECTION_SIZE_MIN-1) /* intentional underflow */ + DISPLAYLEVEL(2, "Note : minimum block size is %u KB \n", (ZSTDMT_SECTION_SIZE_MIN>>10)); +#endif + g_blockSize = blockSize; +} /*-************************************* @@ -283,10 +293,12 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, if (comprParams->strategy) params.cParams.strategy = (ZSTD_strategy)(comprParams->strategy - 1); #ifdef ZSTD_MULTITHREAD { size_t const errorCode = ZSTDMT_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize); + if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode)); + ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_sectionSize, g_blockSize); #else { size_t const errorCode = ZSTD_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize); -#endif if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode)); +#endif } } free(dictBuffer); } diff --git a/programs/fileio.h b/programs/fileio.h index 9ef449294..19f09c33a 100644 --- a/programs/fileio.h +++ b/programs/fileio.h @@ -41,6 +41,7 @@ void FIO_setChecksumFlag(unsigned checksumFlag); void FIO_setRemoveSrcFile(unsigned flag); void FIO_setMemLimit(unsigned memLimit); void FIO_setNbThreads(unsigned nbThreads); +void FIO_setBlockSize(unsigned blockSize); /*-************************************* diff --git a/programs/zstdcli.c b/programs/zstdcli.c index 785ecedee..549dad01a 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -118,6 +118,7 @@ static int usage_advanced(const char* programName) DISPLAY( "--[no-]check : integrity check (default:enabled) \n"); #ifdef ZSTD_MULTITHREAD DISPLAY( " -T# : use # threads for compression (default:1) \n"); + DISPLAY( " -B# : select size of independent sections (default:0==automatic) \n"); #endif #endif #ifndef ZSTD_NODECOMPRESS @@ -625,6 +626,7 @@ int main(int argCount, const char* argv[]) if (operation==zom_compress) { #ifndef ZSTD_NOCOMPRESS FIO_setNbThreads(nbThreads); + FIO_setBlockSize((U32)blockSize); if ((filenameIdx==1) && outFileName) operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams); else