From 2ec635a16236e53014ce9ee69a01cdbf8ca77836 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Sun, 1 Jan 2017 17:31:33 +0100 Subject: [PATCH] use pthread_cond to send signals between threads --- lib/compress/zstdmt_compress.c | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 770f59758..b9cc81f67 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -115,6 +115,7 @@ typedef struct { size_t cSize; unsigned jobCompleted; pthread_mutex_t* jobCompleted_mutex; + pthread_cond_t* jobCompleted_cond; } ZSTDMT_jobDescription; /* ZSTDMT_compressFrame() : POOL_function type */ @@ -126,7 +127,9 @@ void ZSTDMT_compressFrame(void* jobDescription) job->cSize = ZSTD_compressCCtx(job->cctx, job->dstBuff.start, job->dstBuff.size, job->srcStart, job->srcSize, job->compressionLevel); DEBUGLOG(5, "compressed to %u bytes ", (unsigned)job->cSize); job->jobCompleted = 1; - DEBUGLOG(5, "unlocking mutex jobCompleted_mutex"); + DEBUGLOG(5, "sending jobCompleted signal"); + pthread_mutex_lock(job->jobCompleted_mutex); + pthread_cond_signal(job->jobCompleted_cond); pthread_mutex_unlock(job->jobCompleted_mutex); DEBUGLOG(5, "ZSTDMT_compressFrame completed"); } @@ -188,6 +191,7 @@ struct ZSTDMT_CCtx_s { ZSTDMT_CCtxPool* cctxPool; unsigned nbThreads; pthread_mutex_t jobCompleted_mutex; + pthread_cond_t jobCompleted_cond; ZSTDMT_jobDescription jobs[1]; /* variable size */ }; @@ -201,6 +205,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) cctx->buffPool = ZSTDMT_createBufferPool(nbThreads); cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads); pthread_mutex_init(&cctx->jobCompleted_mutex, NULL); + pthread_cond_init(&cctx->jobCompleted_cond, NULL); return cctx; } @@ -248,6 +253,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, mtctx->jobs[u].frameID = u; mtctx->jobs[u].jobCompleted = 0; mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; + mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)frameSize); POOL_add(mtctx->factory, ZSTDMT_compressFrame, &mtctx->jobs[u]); @@ -261,10 +267,14 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, size_t dstPos = 0; for (frameID=0; frameIDjobCompleted_mutex); while (mtctx->jobs[frameID].jobCompleted==0) { - DEBUGLOG(4, "waiting for signal jobCompleted_mutex") - pthread_mutex_lock(&mtctx->jobCompleted_mutex); + DEBUGLOG(4, "waiting for jobCompleted signal for frame %u", frameID); + pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex); } + pthread_mutex_unlock(&mtctx->jobCompleted_mutex); + { size_t const cSize = mtctx->jobs[frameID].cSize; if (ZSTD_isError(cSize)) return cSize; if (dstPos + cSize > dstCapacity) return ERROR(dstSize_tooSmall);