1
0
mirror of https://github.com/facebook/zstd.git synced 2025-07-30 22:23:13 +03:00

Changed nbThreads for nbWorkers

This makes it easier to explain that nbWorkers=0 --> single-threaded mode,
while nbWorkers=1 --> asynchronous mode (one mode thread on top of the "main" caller thread).
No need for an additional asynchronous mode flag.
nbWorkers>=2 works the same as nbThreads>=2 previously.
This commit is contained in:
Yann Collet
2018-02-01 19:29:30 -08:00
parent 4b6a94f0cc
commit 209df52ba2
15 changed files with 165 additions and 183 deletions

View File

@ -416,7 +416,7 @@ size_t ZSTD_estimateDCtxSize(void);
It will also consider src size to be arbitrarily "large", which is worst case. It will also consider src size to be arbitrarily "large", which is worst case.
If srcSize is known to always be small, ZSTD_estimateCCtxSize_usingCParams() can provide a tighter estimation. If srcSize is known to always be small, ZSTD_estimateCCtxSize_usingCParams() can provide a tighter estimation.
ZSTD_estimateCCtxSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel. ZSTD_estimateCCtxSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel.
ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is > 1. ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1.
Note : CCtx size estimation is only correct for single-threaded compression. Note : CCtx size estimation is only correct for single-threaded compression.
</p></pre><BR> </p></pre><BR>
@ -429,7 +429,7 @@ size_t ZSTD_estimateDStreamSize_fromFrame(const void* src, size_t srcSize);
It will also consider src size to be arbitrarily "large", which is worst case. It will also consider src size to be arbitrarily "large", which is worst case.
If srcSize is known to always be small, ZSTD_estimateCStreamSize_usingCParams() can provide a tighter estimation. If srcSize is known to always be small, ZSTD_estimateCStreamSize_usingCParams() can provide a tighter estimation.
ZSTD_estimateCStreamSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel. ZSTD_estimateCStreamSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel.
ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is set to a value > 1. ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1.
Note : CStream size estimation is only correct for single-threaded compression. Note : CStream size estimation is only correct for single-threaded compression.
ZSTD_DStream memory budget depends on window Size. ZSTD_DStream memory budget depends on window Size.
This information can be passed manually, using ZSTD_estimateDStreamSize, This information can be passed manually, using ZSTD_estimateDStreamSize,
@ -800,18 +800,11 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long
</b>/* multi-threading parameters */<b> </b>/* multi-threading parameters */<b>
</b>/* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD).<b> </b>/* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD).<b>
* They return an error otherwise. */ * They return an error otherwise. */
ZSTD_p_nbThreads=400, </b>/* Select how many threads a compression job can spawn (default:1)<b> ZSTD_p_nbWorkers=400, </b>/* Select how many threads will be spawned to compress in parallel.<b>
* Triggers asynchronous mode, even with nbWorkers = 1.
* Can only be set to a value >= 1 if ZSTD_MULTITHREAD is enabled.
* More threads improve speed, but also increase memory usage. * More threads improve speed, but also increase memory usage.
* Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. * Default value is `0`, aka "blocking mode" : no worker is spawned, compression is performed inside Caller's thread */
* Special: value 0 means "do not change nbThreads" */
ZSTD_p_nonBlockingMode, </b>/* Single thread mode is by default "blocking" :<b>
* it finishes its job as much as possible, and only then gives back control to caller.
* In contrast, multi-thread is by default "non-blocking" :
* it takes some input, flush some output if available, and immediately gives back control to caller.
* Compression work is performed in parallel, within worker threads.
* (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking)
* Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected.
* It allows the caller to do other tasks while the worker thread compresses in parallel. */
ZSTD_p_jobSize, </b>/* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.<b> ZSTD_p_jobSize, </b>/* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.<b>
* Each compression job is completed in parallel, so indirectly controls the nb of active threads. * Each compression job is completed in parallel, so indirectly controls the nb of active threads.
* 0 means default, which is dynamically determined based on compression parameters. * 0 means default, which is dynamically determined based on compression parameters.
@ -823,7 +816,7 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long
</b>/* advanced parameters - may not remain available after API update */<b> </b>/* advanced parameters - may not remain available after API update */<b>
ZSTD_p_forceMaxWindow=1100, </b>/* Force back-reference distances to remain < windowSize,<b> ZSTD_p_forceMaxWindow=1100, </b>/* Force back-reference distances to remain < windowSize,<b>
* even when referencing into Dictionary content (default:0) */ * even when referencing into Dictionary content (default:0) */
ZSTD_p_enableLongDistanceMatching=1200, </b>/* Enable long distance matching.<b> ZSTD_p_enableLongDistanceMatching=1200, </b>/* Enable long distance matching.<b>
* This parameter is designed to improve the compression * This parameter is designed to improve the compression
* ratio for large inputs with long distance matches. * ratio for large inputs with long distance matches.
* This increases the memory usage as well as window size. * This increases the memory usage as well as window size.
@ -833,25 +826,29 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long
* other LDM parameters. Setting the compression level * other LDM parameters. Setting the compression level
* after this parameter overrides the window log, though LDM * after this parameter overrides the window log, though LDM
* will remain enabled until explicitly disabled. */ * will remain enabled until explicitly disabled. */
ZSTD_p_ldmHashLog, </b>/* Size of the table for long distance matching, as a power of 2.<b> ZSTD_p_ldmHashLog, </b>/* Size of the table for long distance matching, as a power of 2.<b>
* Larger values increase memory usage and compression ratio, but decrease * Larger values increase memory usage and compression ratio, but decrease
* compression speed. * compression speed.
* Must be clamped between ZSTD_HASHLOG_MIN and ZSTD_HASHLOG_MAX * Must be clamped between ZSTD_HASHLOG_MIN and ZSTD_HASHLOG_MAX
* (default: windowlog - 7). */ * (default: windowlog - 7).
ZSTD_p_ldmMinMatch, </b>/* Minimum size of searched matches for long distance matcher.<b> * Special: value 0 means "do not change ldmHashLog". */
* Larger/too small values usually decrease compression ratio. ZSTD_p_ldmMinMatch, </b>/* Minimum size of searched matches for long distance matcher.<b>
* Must be clamped between ZSTD_LDM_MINMATCH_MIN * Larger/too small values usually decrease compression ratio.
* and ZSTD_LDM_MINMATCH_MAX (default: 64). */ * Must be clamped between ZSTD_LDM_MINMATCH_MIN
ZSTD_p_ldmBucketSizeLog, </b>/* Log size of each bucket in the LDM hash table for collision resolution.<b> * and ZSTD_LDM_MINMATCH_MAX (default: 64).
* Larger values usually improve collision resolution but may decrease * Special: value 0 means "do not change ldmMinMatch". */
* compression speed. ZSTD_p_ldmBucketSizeLog, </b>/* Log size of each bucket in the LDM hash table for collision resolution.<b>
* The maximum value is ZSTD_LDM_BUCKETSIZELOG_MAX (default: 3). */ * Larger values usually improve collision resolution but may decrease
* compression speed.
* The maximum value is ZSTD_LDM_BUCKETSIZELOG_MAX (default: 3).
* note : 0 is a valid value */
ZSTD_p_ldmHashEveryLog, </b>/* Frequency of inserting/looking up entries in the LDM hash table.<b> ZSTD_p_ldmHashEveryLog, </b>/* Frequency of inserting/looking up entries in the LDM hash table.<b>
* The default is MAX(0, (windowLog - ldmHashLog)) to * The default is MAX(0, (windowLog - ldmHashLog)) to
* optimize hash table usage. * optimize hash table usage.
* Larger values improve compression speed. Deviating far from the * Larger values improve compression speed. Deviating far from the
* default value will likely result in a decrease in compression ratio. * default value will likely result in a decrease in compression ratio.
* Must be clamped between 0 and ZSTD_WINDOWLOG_MAX - ZSTD_HASHLOG_MIN. */ * Must be clamped between 0 and ZSTD_WINDOWLOG_MAX - ZSTD_HASHLOG_MIN.
* note : 0 is a valid value */
} ZSTD_cParameter; } ZSTD_cParameter;
</b></pre><BR> </b></pre><BR>
@ -1000,7 +997,7 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t
</p></pre><BR> </p></pre><BR>
<pre><b>size_t ZSTD_resetCCtxParams(ZSTD_CCtx_params* params); <pre><b>size_t ZSTD_resetCCtxParams(ZSTD_CCtx_params* params);
</b><p> Reset params to default, with the default compression level. </b><p> Reset params to default values.
</p></pre><BR> </p></pre><BR>
@ -1030,7 +1027,7 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t
</b><p> Apply a set of ZSTD_CCtx_params to the compression context. </b><p> Apply a set of ZSTD_CCtx_params to the compression context.
This must be done before the dictionary is loaded. This must be done before the dictionary is loaded.
The pledgedSrcSize is treated as unknown. The pledgedSrcSize is treated as unknown.
Multithreading parameters are applied only if nbThreads > 1. Multithreading parameters are applied only if nbWorkers >= 1.
</p></pre><BR> </p></pre><BR>

View File

@ -281,13 +281,12 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
* default : 0 when using a CDict, 1 when using a Prefix */ * default : 0 when using a CDict, 1 when using a Prefix */
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_nbThreads: case ZSTD_p_nbWorkers:
if ((value > 1) && cctx->staticSize) { if ((value>0) && cctx->staticSize) {
return ERROR(parameter_unsupported); /* MT not compatible with static alloc */ return ERROR(parameter_unsupported); /* MT not compatible with static alloc */
} }
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_nonBlockingMode:
case ZSTD_p_jobSize: case ZSTD_p_jobSize:
case ZSTD_p_overlapSizeLog: case ZSTD_p_overlapSizeLog:
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
@ -403,21 +402,12 @@ size_t ZSTD_CCtxParam_setParameter(
CCtxParams->forceWindow = (value > 0); CCtxParams->forceWindow = (value > 0);
return CCtxParams->forceWindow; return CCtxParams->forceWindow;
case ZSTD_p_nbThreads : case ZSTD_p_nbWorkers :
if (value == 0) return CCtxParams->nbThreads;
#ifndef ZSTD_MULTITHREAD #ifndef ZSTD_MULTITHREAD
if (value > 1) return ERROR(parameter_unsupported); if (value > 0) return ERROR(parameter_unsupported);
return 1; return 0;
#else #else
return ZSTDMT_CCtxParam_setNbThreads(CCtxParams, value); return ZSTDMT_CCtxParam_setNbWorkers(CCtxParams, value);
#endif
case ZSTD_p_nonBlockingMode :
#ifndef ZSTD_MULTITHREAD
return ERROR(parameter_unsupported);
#else
CCtxParams->nonBlockingMode = (value>0);
return CCtxParams->nonBlockingMode;
#endif #endif
case ZSTD_p_jobSize : case ZSTD_p_jobSize :
@ -489,7 +479,7 @@ size_t ZSTD_CCtx_setParametersUsingCCtxParams(
cctx->requestedParams = *params; cctx->requestedParams = *params;
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
if (cctx->mtctx) if (cctx->mtctx)
ZSTDMT_MTCtx_setParametersUsingCCtxParams(cctx->mtctx, params); ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing(cctx->mtctx, params);
#endif #endif
return 0; return 0;
@ -687,7 +677,7 @@ static size_t ZSTD_sizeof_matchState(ZSTD_compressionParameters const* cParams,
size_t ZSTD_estimateCCtxSize_usingCCtxParams(const ZSTD_CCtx_params* params) size_t ZSTD_estimateCCtxSize_usingCCtxParams(const ZSTD_CCtx_params* params)
{ {
/* Estimate CCtx size is supported for single-threaded compression only. */ /* Estimate CCtx size is supported for single-threaded compression only. */
if (params->nbThreads > 1) { return ERROR(GENERIC); } if (params->nbWorkers > 0) { return ERROR(GENERIC); }
{ ZSTD_compressionParameters const cParams = { ZSTD_compressionParameters const cParams =
ZSTD_getCParamsFromCCtxParams(*params, 0, 0); ZSTD_getCParamsFromCCtxParams(*params, 0, 0);
size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << cParams.windowLog); size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << cParams.windowLog);
@ -736,7 +726,7 @@ size_t ZSTD_estimateCCtxSize(int compressionLevel)
size_t ZSTD_estimateCStreamSize_usingCCtxParams(const ZSTD_CCtx_params* params) size_t ZSTD_estimateCStreamSize_usingCCtxParams(const ZSTD_CCtx_params* params)
{ {
if (params->nbThreads > 1) { return ERROR(GENERIC); } if (params->nbWorkers > 0) { return ERROR(GENERIC); }
{ size_t const CCtxSize = ZSTD_estimateCCtxSize_usingCCtxParams(params); { size_t const CCtxSize = ZSTD_estimateCCtxSize_usingCCtxParams(params);
size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << params->cParams.windowLog); size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << params->cParams.windowLog);
size_t const inBuffSize = ((size_t)1 << params->cParams.windowLog) + blockSize; size_t const inBuffSize = ((size_t)1 << params->cParams.windowLog) + blockSize;
@ -775,7 +765,7 @@ size_t ZSTD_estimateCStreamSize(int compressionLevel) {
ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx) ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx)
{ {
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
if ((cctx->appliedParams.nbThreads > 1) || (cctx->appliedParams.nonBlockingMode)) { if (cctx->appliedParams.nbWorkers > 0) {
return ZSTDMT_getFrameProgression(cctx->mtctx); return ZSTDMT_getFrameProgression(cctx->mtctx);
} }
#endif #endif
@ -3166,28 +3156,26 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) { if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) {
params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */ params.nbWorkers = 0; /* do not invoke multi-threading when src size is too small */
params.nonBlockingMode = 0;
} }
if ((params.nbThreads > 1) | (params.nonBlockingMode == 1)) { if (params.nbWorkers > 0) {
if (cctx->mtctx == NULL || (params.nbThreads != ZSTDMT_getNbThreads(cctx->mtctx))) { if (cctx->mtctx == NULL || (params.nbWorkers != ZSTDMT_getNbWorkers(cctx->mtctx))) {
DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u", DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbWorkers=%u",
params.nbThreads); params.nbWorkers);
if (cctx->mtctx != NULL) if (cctx->mtctx != NULL)
DEBUGLOG(4, "ZSTD_compress_generic: previous nbThreads was %u", DEBUGLOG(4, "ZSTD_compress_generic: previous nbWorkers was %u",
ZSTDMT_getNbThreads(cctx->mtctx)); ZSTDMT_getNbWorkers(cctx->mtctx));
ZSTDMT_freeCCtx(cctx->mtctx); ZSTDMT_freeCCtx(cctx->mtctx);
cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbThreads, cctx->customMem); cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbWorkers, cctx->customMem);
if (cctx->mtctx == NULL) return ERROR(memory_allocation); if (cctx->mtctx == NULL) return ERROR(memory_allocation);
} }
DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbThreads=%u", params.nbThreads); DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbWorkers=%u", params.nbWorkers);
CHECK_F( ZSTDMT_initCStream_internal( CHECK_F( ZSTDMT_initCStream_internal(
cctx->mtctx, cctx->mtctx,
prefixDict.dict, prefixDict.dictSize, ZSTD_dm_rawContent, prefixDict.dict, prefixDict.dictSize, ZSTD_dm_rawContent,
cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) );
cctx->streamStage = zcss_load; cctx->streamStage = zcss_load;
cctx->appliedParams.nbThreads = params.nbThreads; cctx->appliedParams.nbWorkers = params.nbWorkers;
cctx->appliedParams.nonBlockingMode = params.nonBlockingMode;
} else } else
#endif #endif
{ CHECK_F( ZSTD_resetCStream_internal( { CHECK_F( ZSTD_resetCStream_internal(
@ -3195,12 +3183,12 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
prefixDict.dictMode, cctx->cdict, params, prefixDict.dictMode, cctx->cdict, params,
cctx->pledgedSrcSizePlusOne-1) ); cctx->pledgedSrcSizePlusOne-1) );
assert(cctx->streamStage == zcss_load); assert(cctx->streamStage == zcss_load);
assert(cctx->appliedParams.nbThreads <= 1); assert(cctx->appliedParams.nbWorkers == 0);
} } } }
/* compression stage */ /* compression stage */
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
if ((cctx->appliedParams.nbThreads > 1) | (cctx->appliedParams.nonBlockingMode==1)) { if (cctx->appliedParams.nbWorkers > 0) {
size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
if ( ZSTD_isError(flushMin) if ( ZSTD_isError(flushMin)
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */

View File

@ -145,12 +145,11 @@ struct ZSTD_CCtx_params_s {
ZSTD_frameParameters fParams; ZSTD_frameParameters fParams;
int compressionLevel; int compressionLevel;
U32 forceWindow; /* force back-references to respect limit of int forceWindow; /* force back-references to respect limit of
* 1<<wLog, even for dictionary */ * 1<<wLog, even for dictionary */
/* Multithreading: used to pass parameters to mtctx */ /* Multithreading: used to pass parameters to mtctx */
U32 nbThreads; unsigned nbWorkers;
int nonBlockingMode; /* will trigger ZSTDMT even with nbThreads==1 */
unsigned jobSize; unsigned jobSize;
unsigned overlapSizeLog; unsigned overlapSizeLog;

View File

@ -10,7 +10,7 @@
/* ====== Tuning parameters ====== */ /* ====== Tuning parameters ====== */
#define ZSTDMT_NBTHREADS_MAX 200 #define ZSTDMT_NBWORKERS_MAX 200
#define ZSTDMT_JOBSIZE_MAX (MEM_32bits() ? (512 MB) : (2 GB)) /* note : limited by `jobSize` type, which is `unsigned` */ #define ZSTDMT_JOBSIZE_MAX (MEM_32bits() ? (512 MB) : (2 GB)) /* note : limited by `jobSize` type, which is `unsigned` */
#define ZSTDMT_OVERLAPLOG_DEFAULT 6 #define ZSTDMT_OVERLAPLOG_DEFAULT 6
@ -97,9 +97,9 @@ typedef struct ZSTDMT_bufferPool_s {
buffer_t bTable[1]; /* variable size */ buffer_t bTable[1]; /* variable size */
} ZSTDMT_bufferPool; } ZSTDMT_bufferPool;
static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads, ZSTD_customMem cMem) static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbWorkers, ZSTD_customMem cMem)
{ {
unsigned const maxNbBuffers = 2*nbThreads + 3; unsigned const maxNbBuffers = 2*nbWorkers + 3;
ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc( ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc(
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem); sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
if (bufPool==NULL) return NULL; if (bufPool==NULL) return NULL;
@ -236,23 +236,24 @@ static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
} }
/* ZSTDMT_createCCtxPool() : /* ZSTDMT_createCCtxPool() :
* implies nbThreads >= 1 , checked by caller ZSTDMT_createCCtx() */ * implies nbWorkers >= 1 , checked by caller ZSTDMT_createCCtx() */
static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads, static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbWorkers,
ZSTD_customMem cMem) ZSTD_customMem cMem)
{ {
ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc( ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc(
sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*), cMem); sizeof(ZSTDMT_CCtxPool) + (nbWorkers-1)*sizeof(ZSTD_CCtx*), cMem);
assert(nbWorkers > 0);
if (!cctxPool) return NULL; if (!cctxPool) return NULL;
if (ZSTD_pthread_mutex_init(&cctxPool->poolMutex, NULL)) { if (ZSTD_pthread_mutex_init(&cctxPool->poolMutex, NULL)) {
ZSTD_free(cctxPool, cMem); ZSTD_free(cctxPool, cMem);
return NULL; return NULL;
} }
cctxPool->cMem = cMem; cctxPool->cMem = cMem;
cctxPool->totalCCtx = nbThreads; cctxPool->totalCCtx = nbWorkers;
cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */
cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem); cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem);
if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; }
DEBUGLOG(3, "cctxPool created, with %u threads", nbThreads); DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers);
return cctxPool; return cctxPool;
} }
@ -260,15 +261,16 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads,
static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
{ {
ZSTD_pthread_mutex_lock(&cctxPool->poolMutex); ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);
{ unsigned const nbThreads = cctxPool->totalCCtx; { unsigned const nbWorkers = cctxPool->totalCCtx;
size_t const poolSize = sizeof(*cctxPool) size_t const poolSize = sizeof(*cctxPool)
+ (nbThreads-1)*sizeof(ZSTD_CCtx*); + (nbWorkers-1) * sizeof(ZSTD_CCtx*);
unsigned u; unsigned u;
size_t totalCCtxSize = 0; size_t totalCCtxSize = 0;
for (u=0; u<nbThreads; u++) { for (u=0; u<nbWorkers; u++) {
totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]); totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]);
} }
ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex); ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex);
assert(nbWorkers > 0);
return poolSize + totalCCtxSize; return poolSize + totalCCtxSize;
} }
} }
@ -295,8 +297,8 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
if (pool->availCCtx < pool->totalCCtx) if (pool->availCCtx < pool->totalCCtx)
pool->cctx[pool->availCCtx++] = cctx; pool->cctx[pool->availCCtx++] = cctx;
else { else {
/* pool overflow : should not happen, since totalCCtx==nbThreads */ /* pool overflow : should not happen, since totalCCtx==nbWorkers */
DEBUGLOG(5, "CCtx pool overflow : free cctx"); DEBUGLOG(4, "CCtx pool overflow : free cctx");
ZSTD_freeCCtx(cctx); ZSTD_freeCCtx(cctx);
} }
ZSTD_pthread_mutex_unlock(&pool->poolMutex); ZSTD_pthread_mutex_unlock(&pool->poolMutex);
@ -502,52 +504,52 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom
return jobTable; return jobTable;
} }
/* ZSTDMT_CCtxParam_setNbThreads(): /* ZSTDMT_CCtxParam_setNbWorkers():
* Internal use only */ * Internal use only */
size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads) size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
{ {
if (nbThreads > ZSTDMT_NBTHREADS_MAX) nbThreads = ZSTDMT_NBTHREADS_MAX; if (nbWorkers > ZSTDMT_NBWORKERS_MAX) nbWorkers = ZSTDMT_NBWORKERS_MAX;
if (nbThreads < 1) nbThreads = 1; if (nbWorkers < 1) nbWorkers = 1;
params->nbThreads = nbThreads; params->nbWorkers = nbWorkers;
params->overlapSizeLog = ZSTDMT_OVERLAPLOG_DEFAULT; params->overlapSizeLog = ZSTDMT_OVERLAPLOG_DEFAULT;
params->jobSize = 0; params->jobSize = 0;
return nbThreads; return nbWorkers;
} }
ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
{ {
ZSTDMT_CCtx* mtctx; ZSTDMT_CCtx* mtctx;
U32 nbJobs = nbThreads + 2; U32 nbJobs = nbWorkers + 2;
DEBUGLOG(3, "ZSTDMT_createCCtx_advanced (nbThreads = %u)", nbThreads); DEBUGLOG(3, "ZSTDMT_createCCtx_advanced (nbWorkers = %u)", nbWorkers);
if (nbThreads < 1) return NULL; if (nbWorkers < 1) return NULL;
nbThreads = MIN(nbThreads , ZSTDMT_NBTHREADS_MAX); nbWorkers = MIN(nbWorkers , ZSTDMT_NBWORKERS_MAX);
if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL)) if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL))
/* invalid custom allocator */ /* invalid custom allocator */
return NULL; return NULL;
mtctx = (ZSTDMT_CCtx*) ZSTD_calloc(sizeof(ZSTDMT_CCtx), cMem); mtctx = (ZSTDMT_CCtx*) ZSTD_calloc(sizeof(ZSTDMT_CCtx), cMem);
if (!mtctx) return NULL; if (!mtctx) return NULL;
ZSTDMT_CCtxParam_setNbThreads(&mtctx->params, nbThreads); ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers);
mtctx->cMem = cMem; mtctx->cMem = cMem;
mtctx->allJobsCompleted = 1; mtctx->allJobsCompleted = 1;
mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem); mtctx->factory = POOL_create_advanced(nbWorkers, 0, cMem);
mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem); mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */ assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */
mtctx->jobIDMask = nbJobs - 1; mtctx->jobIDMask = nbJobs - 1;
mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem); mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool) { if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool) {
ZSTDMT_freeCCtx(mtctx); ZSTDMT_freeCCtx(mtctx);
return NULL; return NULL;
} }
DEBUGLOG(3, "mt_cctx created, for %u threads", nbThreads); DEBUGLOG(3, "mt_cctx created, for %u threads", nbWorkers);
return mtctx; return mtctx;
} }
ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads) ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers)
{ {
return ZSTDMT_createCCtx_advanced(nbThreads, ZSTD_defaultCMem); return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem);
} }
@ -649,8 +651,8 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter,
} }
} }
/* Sets parameters relevant to the compression job, initializing others to /* Sets parameters relevant to the compression job,
* default values. Notably, nbThreads should probably be zero. */ * initializing others to default values. */
static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
{ {
ZSTD_CCtx_params jobParams; ZSTD_CCtx_params jobParams;
@ -664,7 +666,7 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
return jobParams; return jobParams;
} }
/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams() : /*! ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing() :
* Apply a ZSTD_CCtx_params to the compression context. * Apply a ZSTD_CCtx_params to the compression context.
* This entry point is accessed while compression is ongoing, * This entry point is accessed while compression is ongoing,
* new parameters will be applied to next compression job. * new parameters will be applied to next compression job.
@ -675,21 +677,23 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
* - job size * - job size
* - overlap size * - overlap size
*/ */
void ZSTDMT_MTCtx_setParametersUsingCCtxParams(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params) void ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params)
{ {
U32 const wlog = mtctx->params.cParams.windowLog; U32 const wlog = mtctx->params.cParams.windowLog;
U32 const nbWorkers = mtctx->params.nbWorkers;
mtctx->params = *params; mtctx->params = *params;
mtctx->params.cParams.windowLog = wlog; /* Do not modify windowLog ! */ mtctx->params.cParams.windowLog = wlog; /* Do not modify windowLog ! Frame must keep same wlog during the whole process ! */
mtctx->params.nbWorkers = nbWorkers; /* Do not modify nbWorkers, it must remain synchronized with CCtx Pool ! */
/* note : other parameters not updated are simply not used beyond initialization */ /* note : other parameters not updated are simply not used beyond initialization */
} }
/* ZSTDMT_getNbThreads(): /* ZSTDMT_getNbWorkers():
* @return nb threads currently active in mtctx. * @return nb threads currently active in mtctx.
* mtctx must be valid */ * mtctx must be valid */
unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx)
{ {
assert(mtctx != NULL); assert(mtctx != NULL);
return mtctx->params.nbThreads; return mtctx->params.nbWorkers;
} }
/* ZSTDMT_getFrameProgression(): /* ZSTDMT_getFrameProgression():
@ -728,15 +732,15 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
/* ===== Multi-threaded compression ===== */ /* ===== Multi-threaded compression ===== */
/* ------------------------------------------ */ /* ------------------------------------------ */
static unsigned ZSTDMT_computeNbJobs(size_t srcSize, unsigned windowLog, unsigned nbThreads) { static unsigned ZSTDMT_computeNbJobs(size_t srcSize, unsigned windowLog, unsigned nbWorkers) {
assert(nbThreads>0); assert(nbWorkers>0);
{ size_t const jobSizeTarget = (size_t)1 << (windowLog + 2); { size_t const jobSizeTarget = (size_t)1 << (windowLog + 2);
size_t const jobMaxSize = jobSizeTarget << 2; size_t const jobMaxSize = jobSizeTarget << 2;
size_t const passSizeMax = jobMaxSize * nbThreads; size_t const passSizeMax = jobMaxSize * nbWorkers;
unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1;
unsigned const nbJobsLarge = multiplier * nbThreads; unsigned const nbJobsLarge = multiplier * nbWorkers;
unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1; unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1;
unsigned const nbJobsSmall = MIN(nbJobsMax, nbThreads); unsigned const nbJobsSmall = MIN(nbJobsMax, nbWorkers);
return (multiplier>1) ? nbJobsLarge : nbJobsSmall; return (multiplier>1) ? nbJobsLarge : nbJobsSmall;
} } } }
@ -753,7 +757,7 @@ static size_t ZSTDMT_compress_advanced_internal(
ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params); ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params);
unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog; unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog;
size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog);
unsigned const nbJobs = ZSTDMT_computeNbJobs(srcSize, params.cParams.windowLog, params.nbThreads); unsigned const nbJobs = ZSTDMT_computeNbJobs(srcSize, params.cParams.windowLog, params.nbWorkers);
size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs; size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs;
size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */ size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */
const char* const srcStart = (const char*)src; const char* const srcStart = (const char*)src;
@ -761,13 +765,13 @@ static size_t ZSTDMT_compress_advanced_internal(
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */ unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */
size_t frameStartPos = 0, dstBufferPos = 0; size_t frameStartPos = 0, dstBufferPos = 0;
XXH64_state_t xxh64; XXH64_state_t xxh64;
assert(jobParams.nbThreads == 0); assert(jobParams.nbWorkers == 0);
assert(mtctx->cctxPool->totalCCtx == params.nbThreads); assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ", DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ",
nbJobs, (U32)proposedJobSize, (U32)avgJobSize); nbJobs, (U32)proposedJobSize, (U32)avgJobSize);
if ((nbJobs==1) | (params.nbThreads<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */ if ((nbJobs==1) | (params.nbWorkers<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */
ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams); if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams);
return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams); return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams);
@ -911,11 +915,12 @@ size_t ZSTDMT_initCStream_internal(
const ZSTD_CDict* cdict, ZSTD_CCtx_params params, const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
unsigned long long pledgedSrcSize) unsigned long long pledgedSrcSize)
{ {
DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u)", (U32)pledgedSrcSize); DEBUGLOG(2, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)",
(U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx);
/* params are supposed to be fully validated at this point */ /* params are supposed to be fully validated at this point */
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
assert(!((dict) && (cdict))); /* either dict or cdict, not both */ assert(!((dict) && (cdict))); /* either dict or cdict, not both */
assert(mtctx->cctxPool->totalCCtx == params.nbThreads); assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */
if (params.jobSize == 0) { if (params.jobSize == 0) {
if (params.cParams.windowLog >= 29) if (params.cParams.windowLog >= 29)
@ -928,12 +933,12 @@ size_t ZSTDMT_initCStream_internal(
if (mtctx->singleBlockingThread) { if (mtctx->singleBlockingThread) {
ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params);
DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode"); DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode");
assert(singleThreadParams.nbThreads == 0); assert(singleThreadParams.nbWorkers == 0);
return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0], return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0],
dict, dictSize, cdict, dict, dictSize, cdict,
singleThreadParams, pledgedSrcSize); singleThreadParams, pledgedSrcSize);
} }
DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u threads", params.nbThreads); DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers);
if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */ if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */
ZSTDMT_waitForAllJobsCompleted(mtctx); ZSTDMT_waitForAllJobsCompleted(mtctx);
@ -1012,8 +1017,6 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize) size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize)
{ {
if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
if (mtctx->params.nbThreads==1)
return ZSTD_resetCStream(mtctx->cctxPool->cctx[0], pledgedSrcSize);
return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, 0, mtctx->params, return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, 0, mtctx->params,
pledgedSrcSize); pledgedSrcSize);
} }

View File

@ -30,8 +30,8 @@
/* === Memory management === */ /* === Memory management === */
typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx; typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx;
ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads); ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers);
ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers,
ZSTD_customMem cMem); ZSTD_customMem cMem);
ZSTDLIB_API size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx); ZSTDLIB_API size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx);
@ -116,12 +116,12 @@ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value); size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value);
/* ZSTDMT_CCtxParam_setNbThreads() /* ZSTDMT_CCtxParam_setNbWorkers()
* Set nbThreads, and clamp it. * Set nbWorkers, and clamp it.
* Also reset jobSize and overlapLog */ * Also reset jobSize and overlapLog */
size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads); size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers);
/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams() : /*! ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing() :
* Apply a ZSTD_CCtx_params to the compression context. * Apply a ZSTD_CCtx_params to the compression context.
* This works even during compression, and will be applied to next compression job. * This works even during compression, and will be applied to next compression job.
* However, the following parameters will NOT be updated after compression has been started : * However, the following parameters will NOT be updated after compression has been started :
@ -131,12 +131,12 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread
* - job size * - job size
* - overlap size * - overlap size
*/ */
void ZSTDMT_MTCtx_setParametersUsingCCtxParams(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params); void ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params);
/* ZSTDMT_getNbThreads(): /* ZSTDMT_getNbWorkers():
* @return nb threads currently active in mtctx. * @return nb threads currently active in mtctx.
* mtctx must be valid */ * mtctx must be valid */
unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx); unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx);
/* ZSTDMT_getFrameProgression(): /* ZSTDMT_getFrameProgression():
* tells how much data has been consumed (input) and produced (output) for current frame. * tells how much data has been consumed (input) and produced (output) for current frame.

View File

@ -506,7 +506,7 @@ ZSTDLIB_API size_t ZSTD_sizeof_DDict(const ZSTD_DDict* ddict);
* It will also consider src size to be arbitrarily "large", which is worst case. * It will also consider src size to be arbitrarily "large", which is worst case.
* If srcSize is known to always be small, ZSTD_estimateCCtxSize_usingCParams() can provide a tighter estimation. * If srcSize is known to always be small, ZSTD_estimateCCtxSize_usingCParams() can provide a tighter estimation.
* ZSTD_estimateCCtxSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel. * ZSTD_estimateCCtxSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel.
* ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is > 1. * ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1.
* Note : CCtx size estimation is only correct for single-threaded compression. */ * Note : CCtx size estimation is only correct for single-threaded compression. */
ZSTDLIB_API size_t ZSTD_estimateCCtxSize(int compressionLevel); ZSTDLIB_API size_t ZSTD_estimateCCtxSize(int compressionLevel);
ZSTDLIB_API size_t ZSTD_estimateCCtxSize_usingCParams(ZSTD_compressionParameters cParams); ZSTDLIB_API size_t ZSTD_estimateCCtxSize_usingCParams(ZSTD_compressionParameters cParams);
@ -518,7 +518,7 @@ ZSTDLIB_API size_t ZSTD_estimateDCtxSize(void);
* It will also consider src size to be arbitrarily "large", which is worst case. * It will also consider src size to be arbitrarily "large", which is worst case.
* If srcSize is known to always be small, ZSTD_estimateCStreamSize_usingCParams() can provide a tighter estimation. * If srcSize is known to always be small, ZSTD_estimateCStreamSize_usingCParams() can provide a tighter estimation.
* ZSTD_estimateCStreamSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel. * ZSTD_estimateCStreamSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel.
* ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is set to a value > 1. * ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1.
* Note : CStream size estimation is only correct for single-threaded compression. * Note : CStream size estimation is only correct for single-threaded compression.
* ZSTD_DStream memory budget depends on window Size. * ZSTD_DStream memory budget depends on window Size.
* This information can be passed manually, using ZSTD_estimateDStreamSize, * This information can be passed manually, using ZSTD_estimateDStreamSize,
@ -992,18 +992,13 @@ typedef enum {
/* multi-threading parameters */ /* multi-threading parameters */
/* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD). /* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD).
* They return an error otherwise. */ * They return an error otherwise. */
ZSTD_p_nbThreads=400, /* Select how many threads a compression job can spawn (default:1) ZSTD_p_nbWorkers=400, /* Select how many threads will be spawned to compress in parallel.
* More threads improve speed, but also increase memory usage. * When nbWorkers >= 1, triggers asynchronous mode :
* Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. * ZSTD_compress_generic() consumes some input, flush some output if possible, and immediately gives back control to caller,
* Special: value 0 means "do not change nbThreads" */ * while compression work is performed in parallel, within worker threads.
ZSTD_p_nonBlockingMode, /* Single thread mode is by default "blocking" : * (note : a strong exception to this rule is when first invocation sets ZSTD_e_end : it becomes a blocking call).
* it finishes its job as much as possible, and only then gives back control to caller. * More workers improve speed, but also increase memory usage.
* In contrast, multi-thread is by default "non-blocking" : * Default value is `0`, aka "single-threaded mode" : no worker is spawned, compression is performed inside Caller's thread, all invocations are blocking */
* it takes some input, flush some output if available, and immediately gives back control to caller.
* Compression work is performed in parallel, within worker threads.
* (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking)
* Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected.
* It allows the caller to do other tasks while the worker thread compresses in parallel. */
ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode. ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.
* Each compression job is completed in parallel, so indirectly controls the nb of active threads. * Each compression job is completed in parallel, so indirectly controls the nb of active threads.
* 0 means default, which is dynamically determined based on compression parameters. * 0 means default, which is dynamically determined based on compression parameters.
@ -1231,9 +1226,10 @@ ZSTDLIB_API size_t ZSTD_CCtxParam_setParameter(ZSTD_CCtx_params* params, ZSTD_cP
/*! ZSTD_CCtx_setParametersUsingCCtxParams() : /*! ZSTD_CCtx_setParametersUsingCCtxParams() :
* Apply a set of ZSTD_CCtx_params to the compression context. * Apply a set of ZSTD_CCtx_params to the compression context.
* This must be done before the dictionary is loaded. * This can be done even after compression is started,
* The pledgedSrcSize is treated as unknown. * if nbWorkers==0, this will have no impact until a new compression is started.
* Multithreading parameters are applied only if nbThreads > 1. * if nbWorkers>=1, new parameters will be picked up at next job,
* with a few restrictions (windowLog, pledgedSrcSize, nbWorkers, jobSize, and overlapLog are not updated).
*/ */
ZSTDLIB_API size_t ZSTD_CCtx_setParametersUsingCCtxParams( ZSTDLIB_API size_t ZSTD_CCtx_setParametersUsingCCtxParams(
ZSTD_CCtx* cctx, const ZSTD_CCtx_params* params); ZSTD_CCtx* cctx, const ZSTD_CCtx_params* params);

View File

@ -122,12 +122,12 @@ void BMK_setBlockSize(size_t blockSize)
void BMK_setDecodeOnlyMode(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); } void BMK_setDecodeOnlyMode(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); }
static U32 g_nbThreads = 1; static U32 g_nbWorkers = 0;
void BMK_setNbThreads(unsigned nbThreads) { void BMK_setNbWorkers(unsigned nbWorkers) {
#ifndef ZSTD_MULTITHREAD #ifndef ZSTD_MULTITHREAD
if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n"); if (nbWorkers > 0) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
#endif #endif
g_nbThreads = nbThreads; g_nbWorkers = nbWorkers;
} }
static U32 g_realTime = 0; static U32 g_realTime = 0;
@ -295,7 +295,7 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
if (!cCompleted) { /* still some time to do compression tests */ if (!cCompleted) { /* still some time to do compression tests */
U64 const clockLoop = g_nbSeconds ? TIMELOOP_MICROSEC : 1; U64 const clockLoop = g_nbSeconds ? TIMELOOP_MICROSEC : 1;
U32 nbLoops = 0; U32 nbLoops = 0;
ZSTD_CCtx_setParameter(ctx, ZSTD_p_nbThreads, g_nbThreads); ZSTD_CCtx_setParameter(ctx, ZSTD_p_nbWorkers, g_nbWorkers);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_compressionLevel, cLevel); ZSTD_CCtx_setParameter(ctx, ZSTD_p_compressionLevel, cLevel);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_enableLongDistanceMatching, g_ldmFlag); ZSTD_CCtx_setParameter(ctx, ZSTD_p_enableLongDistanceMatching, g_ldmFlag);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_ldmMinMatch, g_ldmMinMatch); ZSTD_CCtx_setParameter(ctx, ZSTD_p_ldmMinMatch, g_ldmMinMatch);

View File

@ -22,7 +22,7 @@ int BMK_benchFiles(const char** fileNamesTable, unsigned nbFiles, const char* di
/* Set Parameters */ /* Set Parameters */
void BMK_setNbSeconds(unsigned nbLoops); void BMK_setNbSeconds(unsigned nbLoops);
void BMK_setBlockSize(size_t blockSize); void BMK_setBlockSize(size_t blockSize);
void BMK_setNbThreads(unsigned nbThreads); void BMK_setNbWorkers(unsigned nbWorkers);
void BMK_setRealTime(unsigned priority); void BMK_setRealTime(unsigned priority);
void BMK_setNotificationLevel(unsigned level); void BMK_setNotificationLevel(unsigned level);
void BMK_setSeparateFiles(unsigned separate); void BMK_setSeparateFiles(unsigned separate);

View File

@ -218,23 +218,23 @@ static U32 g_removeSrcFile = 0;
void FIO_setRemoveSrcFile(unsigned flag) { g_removeSrcFile = (flag>0); } void FIO_setRemoveSrcFile(unsigned flag) { g_removeSrcFile = (flag>0); }
static U32 g_memLimit = 0; static U32 g_memLimit = 0;
void FIO_setMemLimit(unsigned memLimit) { g_memLimit = memLimit; } void FIO_setMemLimit(unsigned memLimit) { g_memLimit = memLimit; }
static U32 g_nbThreads = 1; static U32 g_nbWorkers = 1;
void FIO_setNbThreads(unsigned nbThreads) { void FIO_setNbWorkers(unsigned nbWorkers) {
#ifndef ZSTD_MULTITHREAD #ifndef ZSTD_MULTITHREAD
if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n"); if (nbWorkers > 0) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
#endif #endif
g_nbThreads = nbThreads; g_nbWorkers = nbWorkers;
} }
static U32 g_blockSize = 0; static U32 g_blockSize = 0;
void FIO_setBlockSize(unsigned blockSize) { void FIO_setBlockSize(unsigned blockSize) {
if (blockSize && g_nbThreads==1) if (blockSize && g_nbWorkers==0)
DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n"); DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n");
g_blockSize = blockSize; g_blockSize = blockSize;
} }
#define FIO_OVERLAP_LOG_NOTSET 9999 #define FIO_OVERLAP_LOG_NOTSET 9999
static U32 g_overlapLog = FIO_OVERLAP_LOG_NOTSET; static U32 g_overlapLog = FIO_OVERLAP_LOG_NOTSET;
void FIO_setOverlapLog(unsigned overlapLog){ void FIO_setOverlapLog(unsigned overlapLog){
if (overlapLog && g_nbThreads==1) if (overlapLog && g_nbWorkers==0)
DISPLAYLEVEL(2, "Setting overlapLog is useless in single-thread mode \n"); DISPLAYLEVEL(2, "Setting overlapLog is useless in single-thread mode \n");
g_overlapLog = overlapLog; g_overlapLog = overlapLog;
} }
@ -461,9 +461,8 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionStrategy, (U32)comprParams->strategy) ); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionStrategy, (U32)comprParams->strategy) );
/* multi-threading */ /* multi-threading */
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbThreads); DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbWorkers);
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbThreads, g_nbThreads) ); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbWorkers, g_nbWorkers) );
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nonBlockingMode, 1) );
#endif #endif
/* dictionary */ /* dictionary */
CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* set the value temporarily for dictionary loading, to adapt compression parameters */ CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* set the value temporarily for dictionary loading, to adapt compression parameters */

View File

@ -54,7 +54,7 @@ void FIO_setDictIDFlag(unsigned dictIDFlag);
void FIO_setChecksumFlag(unsigned checksumFlag); void FIO_setChecksumFlag(unsigned checksumFlag);
void FIO_setRemoveSrcFile(unsigned flag); void FIO_setRemoveSrcFile(unsigned flag);
void FIO_setMemLimit(unsigned memLimit); void FIO_setMemLimit(unsigned memLimit);
void FIO_setNbThreads(unsigned nbThreads); void FIO_setNbWorkers(unsigned nbWorkers);
void FIO_setBlockSize(unsigned blockSize); void FIO_setBlockSize(unsigned blockSize);
void FIO_setOverlapLog(unsigned overlapLog); void FIO_setOverlapLog(unsigned overlapLog);
void FIO_setLdmFlag(unsigned ldmFlag); void FIO_setLdmFlag(unsigned ldmFlag);

View File

@ -377,7 +377,7 @@ int main(int argCount, const char* argv[])
nextArgumentsAreFiles=0, nextArgumentsAreFiles=0,
ultra=0, ultra=0,
lastCommand = 0, lastCommand = 0,
nbThreads = 1, nbWorkers = 1,
setRealTimePrio = 0, setRealTimePrio = 0,
separateFiles = 0, separateFiles = 0,
ldmFlag = 0; ldmFlag = 0;
@ -422,7 +422,7 @@ int main(int argCount, const char* argv[])
programName = lastNameFromPath(programName); programName = lastNameFromPath(programName);
/* preset behaviors */ /* preset behaviors */
if (exeNameMatch(programName, ZSTD_ZSTDMT)) nbThreads=0; if (exeNameMatch(programName, ZSTD_ZSTDMT)) nbWorkers=0;
if (exeNameMatch(programName, ZSTD_UNZSTD)) operation=zom_decompress; if (exeNameMatch(programName, ZSTD_UNZSTD)) operation=zom_decompress;
if (exeNameMatch(programName, ZSTD_CAT)) { operation=zom_decompress; forceStdout=1; FIO_overwriteMode(); outFileName=stdoutmark; g_displayLevel=1; } /* supports multiple formats */ if (exeNameMatch(programName, ZSTD_CAT)) { operation=zom_decompress; forceStdout=1; FIO_overwriteMode(); outFileName=stdoutmark; g_displayLevel=1; } /* supports multiple formats */
if (exeNameMatch(programName, ZSTD_ZCAT)) { operation=zom_decompress; forceStdout=1; FIO_overwriteMode(); outFileName=stdoutmark; g_displayLevel=1; } /* behave like zcat, also supports multiple formats */ if (exeNameMatch(programName, ZSTD_ZCAT)) { operation=zom_decompress; forceStdout=1; FIO_overwriteMode(); outFileName=stdoutmark; g_displayLevel=1; } /* behave like zcat, also supports multiple formats */
@ -515,7 +515,7 @@ int main(int argCount, const char* argv[])
continue; continue;
} }
#endif #endif
if (longCommandWArg(&argument, "--threads=")) { nbThreads = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--threads=")) { nbWorkers = readU32FromChar(&argument); continue; }
if (longCommandWArg(&argument, "--memlimit=")) { memLimit = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--memlimit=")) { memLimit = readU32FromChar(&argument); continue; }
if (longCommandWArg(&argument, "--memory=")) { memLimit = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--memory=")) { memLimit = readU32FromChar(&argument); continue; }
if (longCommandWArg(&argument, "--memlimit-decompress=")) { memLimit = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--memlimit-decompress=")) { memLimit = readU32FromChar(&argument); continue; }
@ -648,7 +648,7 @@ int main(int argCount, const char* argv[])
/* nb of threads (hidden option) */ /* nb of threads (hidden option) */
case 'T': case 'T':
argument++; argument++;
nbThreads = readU32FromChar(&argument); nbWorkers = readU32FromChar(&argument);
break; break;
/* Dictionary Selection level */ /* Dictionary Selection level */
@ -716,10 +716,10 @@ int main(int argCount, const char* argv[])
/* Welcome message (if verbose) */ /* Welcome message (if verbose) */
DISPLAYLEVEL(3, WELCOME_MESSAGE); DISPLAYLEVEL(3, WELCOME_MESSAGE);
if (nbThreads == 0) { if (nbWorkers == 0) {
/* try to guess */ /* automatically set # workers based on # of reported cpus */
nbThreads = UTIL_countPhysicalCores(); nbWorkers = UTIL_countPhysicalCores();
DISPLAYLEVEL(3, "Note: %d physical core(s) detected \n", nbThreads); DISPLAYLEVEL(3, "Note: %d physical core(s) detected \n", nbWorkers);
} }
g_utilDisplayLevel = g_displayLevel; g_utilDisplayLevel = g_displayLevel;
@ -763,7 +763,7 @@ int main(int argCount, const char* argv[])
BMK_setNotificationLevel(g_displayLevel); BMK_setNotificationLevel(g_displayLevel);
BMK_setSeparateFiles(separateFiles); BMK_setSeparateFiles(separateFiles);
BMK_setBlockSize(blockSize); BMK_setBlockSize(blockSize);
BMK_setNbThreads(nbThreads); BMK_setNbWorkers(nbWorkers);
BMK_setRealTime(setRealTimePrio); BMK_setRealTime(setRealTimePrio);
BMK_setNbSeconds(bench_nbSeconds); BMK_setNbSeconds(bench_nbSeconds);
BMK_setLdmFlag(ldmFlag); BMK_setLdmFlag(ldmFlag);
@ -791,7 +791,7 @@ int main(int argCount, const char* argv[])
zParams.dictID = dictID; zParams.dictID = dictID;
if (cover) { if (cover) {
int const optimize = !coverParams.k || !coverParams.d; int const optimize = !coverParams.k || !coverParams.d;
coverParams.nbThreads = nbThreads; coverParams.nbThreads = nbWorkers;
coverParams.zParams = zParams; coverParams.zParams = zParams;
operationResult = DiB_trainFromFiles(outFileName, maxDictSize, filenameTable, filenameIdx, blockSize, NULL, &coverParams, optimize); operationResult = DiB_trainFromFiles(outFileName, maxDictSize, filenameTable, filenameIdx, blockSize, NULL, &coverParams, optimize);
} else { } else {
@ -835,7 +835,7 @@ int main(int argCount, const char* argv[])
FIO_setNotificationLevel(g_displayLevel); FIO_setNotificationLevel(g_displayLevel);
if (operation==zom_compress) { if (operation==zom_compress) {
#ifndef ZSTD_NOCOMPRESS #ifndef ZSTD_NOCOMPRESS
FIO_setNbThreads(nbThreads); FIO_setNbWorkers(nbWorkers);
FIO_setBlockSize((U32)blockSize); FIO_setBlockSize((U32)blockSize);
FIO_setLdmFlag(ldmFlag); FIO_setLdmFlag(ldmFlag);
FIO_setLdmHashLog(g_ldmHashLog); FIO_setLdmHashLog(g_ldmHashLog);

View File

@ -181,7 +181,7 @@ static size_t local_ZSTD_compress_generic_T2_end(void* dst, size_t dstCapacity,
ZSTD_inBuffer buffIn; ZSTD_inBuffer buffIn;
(void)buff2; (void)buff2;
ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1); ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1);
ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbThreads, 2); ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbWorkers, 2);
buffOut.dst = dst; buffOut.dst = dst;
buffOut.size = dstCapacity; buffOut.size = dstCapacity;
buffOut.pos = 0; buffOut.pos = 0;
@ -198,7 +198,7 @@ static size_t local_ZSTD_compress_generic_T2_continue(void* dst, size_t dstCapac
ZSTD_inBuffer buffIn; ZSTD_inBuffer buffIn;
(void)buff2; (void)buff2;
ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1); ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1);
ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbThreads, 2); ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbWorkers, 2);
buffOut.dst = dst; buffOut.dst = dst;
buffOut.size = dstCapacity; buffOut.size = dstCapacity;
buffOut.pos = 0; buffOut.pos = 0;

View File

@ -226,7 +226,7 @@ static int FUZ_mallocTests(unsigned seed, double compressibility, unsigned part)
ZSTD_outBuffer out = { outBuffer, outSize, 0 }; ZSTD_outBuffer out = { outBuffer, outSize, 0 };
ZSTD_inBuffer in = { inBuffer, inSize, 0 }; ZSTD_inBuffer in = { inBuffer, inSize, 0 };
CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) ); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) );
CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads) ); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbWorkers, nbThreads) );
while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {} while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {}
ZSTD_freeCCtx(cctx); ZSTD_freeCCtx(cctx);
DISPLAYLEVEL(3, "compress_generic,-T%u,end level %i : ", DISPLAYLEVEL(3, "compress_generic,-T%u,end level %i : ",
@ -246,7 +246,7 @@ static int FUZ_mallocTests(unsigned seed, double compressibility, unsigned part)
ZSTD_outBuffer out = { outBuffer, outSize, 0 }; ZSTD_outBuffer out = { outBuffer, outSize, 0 };
ZSTD_inBuffer in = { inBuffer, inSize, 0 }; ZSTD_inBuffer in = { inBuffer, inSize, 0 };
CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) ); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) );
CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads) ); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbWorkers, nbThreads) );
CHECK_Z( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_continue) ); CHECK_Z( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_continue) );
while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {} while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {}
ZSTD_freeCCtx(cctx); ZSTD_freeCCtx(cctx);

View File

@ -94,7 +94,7 @@ static size_t cctxParamRoundTripTest(void* resultBuff, size_t resultBuffCapacity
/* Set parameters */ /* Set parameters */
CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_compressionLevel, cLevel) ); CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_compressionLevel, cLevel) );
CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_nbThreads, 2) ); CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_nbWorkers, 2) );
CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_overlapSizeLog, 5) ); CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_overlapSizeLog, 5) );

View File

@ -753,9 +753,9 @@ static int basicUnitTests(U32 seed, double compressibility)
DISPLAYLEVEL(3, "OK \n"); DISPLAYLEVEL(3, "OK \n");
/* Complex multithreading + dictionary test */ /* Complex multithreading + dictionary test */
{ U32 const nbThreads = 2; { U32 const nbWorkers = 2;
size_t const jobSize = 4 * 1 MB; size_t const jobSize = 4 * 1 MB;
size_t const srcSize = jobSize * nbThreads; /* we want each job to have predictable size */ size_t const srcSize = jobSize * nbWorkers; /* we want each job to have predictable size */
size_t const segLength = 2 KB; size_t const segLength = 2 KB;
size_t const offset = 600 KB; /* must be larger than window defined in cdict */ size_t const offset = 600 KB; /* must be larger than window defined in cdict */
size_t const start = jobSize + (offset-1); size_t const start = jobSize + (offset-1);
@ -763,7 +763,7 @@ static int basicUnitTests(U32 seed, double compressibility)
BYTE* const dst = (BYTE*)CNBuffer + start - offset; BYTE* const dst = (BYTE*)CNBuffer + start - offset;
DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads + dictionary : ", testNb++, (U32)srcSize); DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads + dictionary : ", testNb++, (U32)srcSize);
CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_compressionLevel, 3) ); CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_compressionLevel, 3) );
CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_nbThreads, 2) ); CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_nbWorkers, nbWorkers) );
CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_jobSize, jobSize) ); CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_jobSize, jobSize) );
assert(start > offset); assert(start > offset);
assert(start + segLength < COMPRESSIBLE_NOISE_LENGTH); assert(start + segLength < COMPRESSIBLE_NOISE_LENGTH);
@ -1672,7 +1672,7 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double
U32 const nbThreadsAdjusted = (windowLogMalus < nbThreadsCandidate) ? nbThreadsCandidate - windowLogMalus : 1; U32 const nbThreadsAdjusted = (windowLogMalus < nbThreadsCandidate) ? nbThreadsCandidate - windowLogMalus : 1;
U32 const nbThreads = MIN(nbThreadsAdjusted, nbThreadsMax); U32 const nbThreads = MIN(nbThreadsAdjusted, nbThreadsMax);
DISPLAYLEVEL(5, "t%u: nbThreads : %u \n", testNb, nbThreads); DISPLAYLEVEL(5, "t%u: nbThreads : %u \n", testNb, nbThreads);
CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_nbThreads, nbThreads, useOpaqueAPI) ); CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_nbWorkers, nbThreads, useOpaqueAPI) );
if (nbThreads > 1) { if (nbThreads > 1) {
U32 const jobLog = FUZ_rand(&lseed) % (testLog+1); U32 const jobLog = FUZ_rand(&lseed) % (testLog+1);
CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_overlapSizeLog, FUZ_rand(&lseed) % 10, useOpaqueAPI) ); CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_overlapSizeLog, FUZ_rand(&lseed) % 10, useOpaqueAPI) );