diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index fb9183f9e..57cc107f7 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -388,10 +388,15 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu if (zcs->inBuff.filled == zcs->inBuffSize) { /* filled enough : let's compress */ size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize); - buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */ - ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); /* should check for NULL */ + buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); + ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; + if ((cctx==NULL) || (dstBuffer.start==NULL)) { + zcs->jobs[jobID].cSize = ERROR(memory_allocation); /* job result : how to collect that error ? */ + zcs->jobs[jobID].jobCompleted = 1; + } + zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = zcs->targetSectionSize; @@ -426,17 +431,18 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); output->pos += toWrite; job.dstFlushed += toWrite; - if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */ - ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); + if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */ + ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[jobID].dstBuff = (buffer_t) { NULL, 0 }; zcs->doneJobID++; - } else - zcs->jobs[jobID].dstFlushed = job.dstFlushed; - } } + } else { + zcs->jobs[jobID].dstFlushed = job.dstFlushed; /* save flush level into zcs for later retrieval */ + } } } /* recommended next input size : fill current input buffer */ return zcs->inBuffSize - zcs->inBuff.filled; } + static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame) { size_t const srcSize = zcs->inBuff.filled; @@ -469,14 +475,20 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp } DEBUGLOG(3, "posting job %u (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize); - POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); + POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */ zcs->nextJobID++; } /* check if there is any data available to flush */ { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; - ZSTDMT_jobDescription job = zcs->jobs[wJobID]; - if (job.jobCompleted) { /* job completed : output can be flushed */ + PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); + while (zcs->jobs[wJobID].jobCompleted==0) { + DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */ + pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); + } + pthread_mutex_unlock(&zcs->jobCompleted_mutex); + { /* job completed : output can be flushed */ + ZSTDMT_jobDescription job = zcs->jobs[wJobID]; size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */ ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = (buffer_t) { NULL, 0 }; @@ -488,11 +500,11 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp zcs->doneJobID++; } else { zcs->jobs[wJobID].dstFlushed = job.dstFlushed; - } } - /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */ - if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); - return (zcs->doneJobID < zcs->nextJobID); - } + } + /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */ + if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); + return (zcs->doneJobID < zcs->nextJobID); + } } }