1
0
mirror of https://github.com/facebook/zstd.git synced 2025-12-24 17:21:03 +03:00

switched over to model where reading only waits on compression thread

This commit is contained in:
Paul Cruz
2017-07-21 17:49:39 -07:00
parent 6455ec482c
commit 95bef759b3

View File

@@ -51,7 +51,7 @@ typedef struct {
buffer_t dst;
unsigned compressionLevel;
unsigned jobID;
unsigned lastJob;
unsigned lastJobPlusOne;
size_t compressedSize;
size_t dictSize;
} jobDescription;
@@ -226,7 +226,7 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
jobDescription* job = &ctx->jobs[jobNum];
job->src.start = malloc(2 * FILE_CHUNK_SIZE);
job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE));
job->lastJob = 0;
job->lastJobPlusOne = 0;
if (!job->src.start || !job->dst.start) {
DISPLAY("Could not allocate buffers for jobs\n");
return 1;
@@ -400,22 +400,30 @@ static void* compressionThread(void* arg)
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "compressionThread(): waiting on job ready\n");
/* wait until job is ready */
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
/* compression thread is waiting, take measurements of write completion and read completion */
/* compression thread is waiting on creation thread, take measurement */
ctx->compressWaitCreateCompletion = ctx->createCompletion;
ctx->compressWaitWriteCompletion = ctx->writeCompletion;
DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f : compressWaitWriteCompletion %f\n", ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion);
DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion);
DEBUG(3, "create completion: %f\n", ctx->createCompletion);
DEBUG(2, "compression thread waiting for nextJob: %u, compressWaitCreateCompletion %f, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion);
DEBUG(2, "compression thread waiting for ready: %u, compressWaitCreateCompletion %f\n", currJob, ctx->compressWaitCreateCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
}
pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
/* wait until job previously in this space is written */
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressWaitWriteCompletion = ctx->writeCompletion;
DEBUG(2, "compression thread waiting for write: %u, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitWriteCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
}
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
/* reset compression completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 0;
@@ -439,10 +447,8 @@ static void* compressionThread(void* arg)
size_t dstPos = 0;
DEBUG(3, "cLevel used: %u\n", cLevel);
DEBUG(3, "compression level used: %u\n", cLevel);
/* reset compressed size */
job->compressedSize = 0;
/* begin compression */
{
size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
@@ -474,10 +480,10 @@ static void* compressionThread(void* arg)
ZSTD_invalidateRepCodes(ctx->cctx);
}
{
DEBUG(3, "write out ending: %d\n", job->lastJob && (remaining == actualBlockSize));
DEBUG(3, "lastJob %u\n", job->lastJob);
DEBUG(3, "write out ending: %d\n", (job->lastJobPlusOne == currJob + 1) && (remaining == actualBlockSize));
DEBUG(3, "lastJobPlusOne %u\n", job->lastJobPlusOne);
DEBUG(3, "compressionBlockSize %zu\n", compressionBlockSize);
size_t const ret = (job->lastJob && remaining == actualBlockSize) ?
size_t const ret = (job->lastJobPlusOne == currJob + 1 && remaining == actualBlockSize) ?
ZSTD_compressEnd (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) :
ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize);
if (ZSTD_isError(ret)) {
@@ -503,15 +509,16 @@ static void* compressionThread(void* arg)
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
ctx->jobCompressedID++;
DEBUG(3, "signaling for job %u\n", currJob);
pthread_cond_signal(&ctx->jobCompressed_cond.pCond);
pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
DEBUG(3, "finished job compression %u\n", currJob);
currJob++;
if (job->lastJob || ctx->threadError) {
if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
/* finished compressing all jobs */
DEBUG(3, "all jobs finished compressing\n");
break;
}
currJob++;
}
return arg;
}
@@ -544,7 +551,6 @@ static void* outputThread(void* arg)
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "outputThread(): waiting on job compressed\n");
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
@@ -599,8 +605,7 @@ static void* outputThread(void* arg)
}
}
DEBUG(3, "finished job write %u\n", currJob);
currJob++;
displayProgress(currJob, ctx->compressionLevel, job->lastJob);
displayProgress(currJob, ctx->compressionLevel, job->lastJobPlusOne == currJob + 1);
DEBUG(3, "locking job write mutex\n");
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
ctx->jobWriteID++;
@@ -608,7 +613,7 @@ static void* outputThread(void* arg)
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
DEBUG(3, "unlocking job write mutex\n");
if (job->lastJob || ctx->threadError) {
if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
/* finished with all jobs */
DEBUG(3, "all jobs finished writing\n");
pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
@@ -617,6 +622,7 @@ static void* outputThread(void* arg)
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
break;
}
currJob++;
}
return arg;
@@ -628,21 +634,22 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
unsigned const nextJobIndex = nextJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[nextJobIndex];
DEBUG(3, "createCompressionJob(): wait for job write\n");
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
/* wait until the job has been compressed */
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
/* creation thread is waiting, take measurement of compression completion */
/* creation thread is waiting, take measurement of completion */
ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
ctx->createWaitWriteCompletion = ctx->writeCompletion;
DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f : createWaitWriteCompletion %f\n", ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion);
DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f, createWaitWriteCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
}
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
/* reset create completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletion = 0;
@@ -653,7 +660,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
job->compressionLevel = ctx->compressionLevel;
job->src.size = srcSize;
job->jobID = nextJob;
job->lastJob = last;
if (last) job->lastJobPlusOne = nextJob + 1;
{
/* swap buffer */
void* const copy = job->src.start;