diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 1b925914a..97de6e645 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -42,51 +42,65 @@ if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \ #define ZSTDMT_NBTHREADS_MAX 128 -#define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX) + +/* === Buffer Pool === */ typedef struct buffer_s { void* start; size_t size; } buffer_t; -#define ZSTDMT_NBBUFFERSPOOLED_MAX ZSTDMT_NBTHREADS_MAX typedef struct ZSTDMT_bufferPool_s { - pthread_mutex_t bufferPool_mutex; - buffer_t bTable[ZSTDMT_NBBUFFERSPOOLED_MAX]; + unsigned totalBuffers;; unsigned nbBuffers; + buffer_t bTable[1]; /* variable size */ } ZSTDMT_bufferPool; +static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads) +{ + unsigned const maxNbBuffers = 2*nbThreads + 2; + ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)calloc(1, sizeof(ZSTDMT_bufferPool) + maxNbBuffers * sizeof(buffer_t)); + if (bufPool==NULL) return NULL; + bufPool->totalBuffers = maxNbBuffers; + return bufPool; +} + +static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool) +{ + unsigned u; + if (!bufPool) return; /* compatibility with free on NULL */ + for (u=0; utotalBuffers; u++) + free(bufPool->bTable[u].start); + free(bufPool); +} + +/* note : invocation only from main thread ! */ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) { - PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex); if (pool->nbBuffers) { /* try to use an existing buffer */ pool->nbBuffers--; buffer_t const buf = pool->bTable[pool->nbBuffers]; - pthread_mutex_unlock(&pool->bufferPool_mutex); size_t const availBufferSize = buf.size; if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) /* large enough, but not too much */ return buf; free(buf.start); /* size conditions not respected : create a new buffer */ } - pthread_mutex_unlock(&pool->bufferPool_mutex); /* create new buffer */ buffer_t buf; buf.size = bSize; - buf.start = calloc(1, bSize); + buf.start = malloc(bSize); return buf; } /* effectively store buffer for later re-use, up to pool capacity */ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf) { - PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex); - if (pool->nbBuffers >= ZSTDMT_NBBUFFERSPOOLED_MAX) { - pthread_mutex_unlock(&pool->bufferPool_mutex); - free(buf.start); + if (pool->nbBuffers < pool->totalBuffers) { + pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */ return; } - pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */ - pthread_mutex_unlock(&pool->bufferPool_mutex); + /* Reached bufferPool capacity (should not happen) */ + free(buf.start); } @@ -118,7 +132,7 @@ void ZSTDMT_compressFrame(void* jobDescription) } -/* note : calls to CCtxPool only from main thread */ +/* === CCtx Pool === */ typedef struct { unsigned totalCCtx; @@ -126,6 +140,8 @@ typedef struct { ZSTD_CCtx* cctx[1]; /* variable size */ } ZSTDMT_CCtxPool; +/* note : CCtxPool invocation only from main thread */ + static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads) { ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) calloc(1, sizeof(ZSTDMT_CCtxPool) + nbThreads*sizeof(ZSTD_CCtx*)); @@ -168,7 +184,7 @@ static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool) struct ZSTDMT_CCtx_s { POOL_ctx* factory; - ZSTDMT_bufferPool buffPool; + ZSTDMT_bufferPool* buffPool; ZSTDMT_CCtxPool* cctxPool; unsigned nbThreads; pthread_mutex_t jobCompleted_mutex; @@ -182,7 +198,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) if (!cctx) return NULL; cctx->nbThreads = nbThreads; cctx->factory = POOL_create(nbThreads, 1); - pthread_mutex_init(&cctx->buffPool.bufferPool_mutex, NULL); + cctx->buffPool = ZSTDMT_createBufferPool(nbThreads); cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads); pthread_mutex_init(&cctx->jobCompleted_mutex, NULL); return cctx; @@ -191,9 +207,9 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) /* incompleted ! */ { POOL_free(mtctx->factory); - /* free mutexes (if necessary) */ - /* free bufferPool */ + ZSTDMT_freeBufferPool(mtctx->buffPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool); + pthread_mutex_destroy(&mtctx->jobCompleted_mutex); free(mtctx); return 0; } @@ -221,7 +237,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, for (u=0; ubuffPool, dstBufferCapacity) : (buffer_t){ dst, dstCapacity }; + buffer_t const dstBuffer = u ? ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity) : (buffer_t){ dst, dstCapacity }; ZSTD_CCtx* cctx = ZSTDMT_getCCtx(mtctx->cctxPool); mtctx->jobs[u].srcStart = srcStart + frameStartPos; @@ -252,13 +268,15 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, { size_t const cSize = mtctx->jobs[frameID].cSize; if (ZSTD_isError(cSize)) return cSize; if (dstPos + cSize > dstCapacity) return ERROR(dstSize_tooSmall); - if (frameID) memcpy((char*)dst + dstPos, mtctx->jobs[frameID].dstBuff.start, mtctx->jobs[frameID].cSize); + if (frameID) { + memcpy((char*)dst + dstPos, mtctx->jobs[frameID].dstBuff.start, cSize); + ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[frameID].dstBuff); + } dstPos += cSize ; } ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[frameID].cctx); - ZSTDMT_releaseBuffer(&mtctx->buffPool, mtctx->jobs[frameID].dstBuff); } - DEBUGLOG(4, "compressed size : %u ", (U32)dstPos); + DEBUGLOG(3, "compressed size : %u ", (U32)dstPos); return dstPos; }