diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 8de54d4a6..4e09a2082 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -541,8 +541,12 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu /* check if there is any data available to flush */ { unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; - ZSTDMT_jobDescription job = zcs->jobs[jobID]; - if (job.jobCompleted) { /* job completed : output can be flushed */ + unsigned jobCompleted; + pthread_mutex_lock(&zcs->jobCompleted_mutex); + jobCompleted = zcs->jobs[jobID].jobCompleted; + pthread_mutex_unlock(&zcs->jobCompleted_mutex); + if (jobCompleted) { + ZSTDMT_jobDescription const job = zcs->jobs[jobID]; size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); DEBUGLOG(1, "flush %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); @@ -556,15 +560,13 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu } memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); output->pos += toWrite; - job.dstFlushed += toWrite; + zcs->jobs[jobID].dstFlushed += toWrite; DEBUGLOG(1, "remaining : %u bytes ", (U32)(job.cSize - job.dstFlushed)); - if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */ + if (zcs->jobs[jobID].dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */ ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[jobID].dstBuff = g_nullBuffer; zcs->jobs[jobID].jobCompleted = 0; zcs->doneJobID++; - } else { - zcs->jobs[jobID].dstFlushed = job.dstFlushed; /* save flush level into zcs for later retrieval */ } } } /* recommended next input size : fill current input buffer */