1
0
mirror of https://github.com/facebook/zstd.git synced 2025-11-30 11:21:33 +03:00

ZSTDMT_{flush,end}Stream() now block on next job completion when nothing to flush

The main issue was to avoid a caller to continually loop on {flush,end}Stream()
when there was nothing ready to be flushed but still some compression work ongoing in a worker thread.
The continuous loop would have resulted in wasted energy.
The new version makes call to {flush,end}Stream blocking when there is nothing ready to be flushed.
Of course, if all worker threads have exhausted job, it will return zero (all flush completed).

Note : There are still some remaining issues to report error codes
and properly collect back resources into pools when an error is triggered.
This commit is contained in:
Yann Collet
2017-01-17 16:15:18 -08:00
parent a73c412932
commit d0a1d45582

View File

@@ -388,10 +388,15 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
if (zcs->inBuff.filled == zcs->inBuffSize) { /* filled enough : let's compress */ if (zcs->inBuff.filled == zcs->inBuffSize) { /* filled enough : let's compress */
size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize); size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize);
buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */ buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); /* should check for NULL */ ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
if ((cctx==NULL) || (dstBuffer.start==NULL)) {
zcs->jobs[jobID].cSize = ERROR(memory_allocation); /* job result : how to collect that error ? */
zcs->jobs[jobID].jobCompleted = 1;
}
zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = zcs->targetSectionSize; zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
@@ -426,17 +431,18 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite; output->pos += toWrite;
job.dstFlushed += toWrite; job.dstFlushed += toWrite;
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */ if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[jobID].dstBuff = (buffer_t) { NULL, 0 };
zcs->doneJobID++; zcs->doneJobID++;
} else } else {
zcs->jobs[jobID].dstFlushed = job.dstFlushed; zcs->jobs[jobID].dstFlushed = job.dstFlushed; /* save flush level into zcs for later retrieval */
} } } } }
/* recommended next input size : fill current input buffer */ /* recommended next input size : fill current input buffer */
return zcs->inBuffSize - zcs->inBuff.filled; return zcs->inBuffSize - zcs->inBuff.filled;
} }
static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame) static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame)
{ {
size_t const srcSize = zcs->inBuff.filled; size_t const srcSize = zcs->inBuff.filled;
@@ -469,14 +475,20 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
} }
DEBUGLOG(3, "posting job %u (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize); DEBUGLOG(3, "posting job %u (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize);
POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */
zcs->nextJobID++; zcs->nextJobID++;
} }
/* check if there is any data available to flush */ /* check if there is any data available to flush */
{ unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
ZSTDMT_jobDescription job = zcs->jobs[wJobID]; PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
if (job.jobCompleted) { /* job completed : output can be flushed */ while (zcs->jobs[wJobID].jobCompleted==0) {
DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
}
pthread_mutex_unlock(&zcs->jobCompleted_mutex);
{ /* job completed : output can be flushed */
ZSTDMT_jobDescription job = zcs->jobs[wJobID];
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */ ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */
ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = (buffer_t) { NULL, 0 }; ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = (buffer_t) { NULL, 0 };
@@ -488,11 +500,11 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
zcs->doneJobID++; zcs->doneJobID++;
} else { } else {
zcs->jobs[wJobID].dstFlushed = job.dstFlushed; zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
} } }
/* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */ /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
return (zcs->doneJobID < zcs->nextJobID); return (zcs->doneJobID < zcs->nextJobID);
} } }
} }