diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 18ed74417..135d274f8 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -172,7 +172,10 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads) ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) calloc(1, sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*)); if (!cctxPool) return NULL; cctxPool->totalCCtx = nbThreads; - cctxPool->availCCtx = 0; + cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ + cctxPool->cctx[0] = ZSTD_createCCtx(); + if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } + DEBUGLOG(1, "cctxPool created, with %u threads", nbThreads); return cctxPool; } @@ -278,6 +281,7 @@ struct ZSTDMT_CCtx_s { unsigned allJobsCompleted; unsigned long long frameContentSize; ZSTD_CDict* cdict; + ZSTD_CStream* cstream; ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */ }; @@ -287,7 +291,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) U32 const minNbJobs = nbThreads + 2; U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1; U32 const nbJobs = 1 << nbJobsLog2; - DEBUGLOG(4, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n", + DEBUGLOG(5, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n", nbThreads, minNbJobs, nbJobsLog2, nbJobs); if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL; cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbJobs*sizeof(ZSTDMT_jobDescription)); @@ -302,8 +306,14 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) ZSTDMT_freeCCtx(cctx); return NULL; } + if (nbThreads==1) { + cctx->cstream = ZSTD_createCStream(); + if (!cctx->cstream) { + ZSTDMT_freeCCtx(cctx); return NULL; + } } pthread_mutex_init(&cctx->jobCompleted_mutex, NULL); /* Todo : check init function return */ pthread_cond_init(&cctx->jobCompleted_cond, NULL); + DEBUGLOG(4, "mt_cctx created, for %u threads \n", nbThreads); return cctx; } @@ -329,11 +339,12 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) { if (mtctx==NULL) return 0; /* compatible with free on NULL */ - ZSTD_freeCDict(mtctx->cdict); POOL_free(mtctx->factory); if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */ ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */ ZSTDMT_freeCCtxPool(mtctx->cctxPool); + ZSTD_freeCDict(mtctx->cdict); + ZSTD_freeCStream(mtctx->cstream); pthread_mutex_destroy(&mtctx->jobCompleted_mutex); pthread_cond_destroy(&mtctx->jobCompleted_cond); free(mtctx); @@ -361,12 +372,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, params.fParams.contentSizeFlag = 1; if (nbChunks==1) { /* fallback to single-thread mode */ - size_t result; - ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool); - if (!cctx) return ERROR(memory_allocation); - result = ZSTD_compressCCtx(mtctx->cctxPool->cctx[0], dst, dstCapacity, src, srcSize, compressionLevel); - ZSTDMT_releaseCCtx(mtctx->cctxPool, cctx); - return result; + ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; + return ZSTD_compressCCtx(cctx, dst, dstCapacity, src, srcSize, compressionLevel); } { unsigned u; @@ -461,6 +468,7 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, ZSTD_parameters params, unsigned long long pledgedSrcSize) { ZSTD_customMem const cmem = { NULL, NULL, NULL }; + if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize); if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */ ZSTDMT_waitForAllJobsCompleted(zcs); ZSTDMT_releaseAllJobResources(zcs); @@ -498,6 +506,7 @@ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, * pledgedSrcSize is optional and can be zero == unknown */ size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) { + if (zcs->nbThreads==1) return ZSTD_resetCStream(zcs->cstream, pledgedSrcSize); return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize); } @@ -510,6 +519,7 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */ + if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input); /* fill input buffer */ { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled); @@ -708,10 +718,12 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) { + if (zcs->nbThreads==1) return ZSTD_flushStream(zcs->cstream, output); return ZSTDMT_flushStream_internal(zcs, output, 0); } size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) { + if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cstream, output); return ZSTDMT_flushStream_internal(zcs, output, 1); }