From 01fc7c42441ef75ead51f4289bdf13a799993ad9 Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Mon, 10 Jul 2017 16:27:58 -0700 Subject: [PATCH] changed how the detection of the last job works --- contrib/adaptive-compression/adapt.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index e1ac0aaf3..8f3acd5c9 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -50,6 +50,7 @@ typedef struct { buffer_t dst; unsigned compressionLevel; unsigned jobID; + unsigned lastJob; size_t compressedSize; } jobDescription; @@ -57,7 +58,6 @@ typedef struct { unsigned compressionLevel; unsigned numActiveThreads; unsigned numJobs; - unsigned lastJobID; unsigned nextJobID; unsigned threadError; unsigned jobReadyID; @@ -134,15 +134,15 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename) ctx->jobReadyID = 0; ctx->jobCompressedID = 0; ctx->jobWriteID = 0; - ctx->lastJobID = -1; /* intentional underflow */ ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); - /* allocating buffers for jobs */ + /* initializing jobs */ { unsigned jobNum; for (jobNum=0; jobNumjobs[jobNum]; job->src.start = malloc(FILE_CHUNK_SIZE); job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE)); + job->lastJob = 0; if (!job->src.start || !job->dst.start) { DISPLAY("Could not allocate buffers for jobs\n"); freeCCtx(ctx); @@ -265,7 +265,7 @@ static void* compressionThread(void* arg) pthread_mutex_unlock(&ctx->jobCompressed_mutex); DEBUGLOG(2, "finished job compression %u\n", currJob); currJob++; - if (currJob >= ctx->lastJobID || ctx->threadError) { + if (job->lastJob || ctx->threadError) { /* finished compressing all jobs */ DEBUGLOG(2, "all jobs finished compressing\n"); break; @@ -327,7 +327,7 @@ static void* outputThread(void* arg) } DEBUGLOG(2, "finished job write %u\n", currJob); currJob++; - displayProgress(currJob, ctx->compressionLevel, currJob >= ctx->lastJobID); + displayProgress(currJob, ctx->compressionLevel, job->lastJob); DEBUGLOG(2, "locking job write mutex\n"); pthread_mutex_lock(&ctx->jobWrite_mutex); ctx->jobWriteID++; @@ -335,8 +335,7 @@ static void* outputThread(void* arg) pthread_mutex_unlock(&ctx->jobWrite_mutex); DEBUGLOG(2, "unlocking job write mutex\n"); - DEBUGLOG(2, "checking if done: %u/%u\n", currJob, ctx->lastJobID); - if (currJob >= ctx->lastJobID || ctx->threadError) { + if (job->lastJob || ctx->threadError) { /* finished with all jobs */ DEBUGLOG(2, "all jobs finished writing\n"); pthread_mutex_lock(&ctx->allJobsCompleted_mutex); @@ -349,7 +348,7 @@ static void* outputThread(void* arg) return arg; } -static int createCompressionJob(adaptCCtx* ctx, size_t srcSize) +static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) { unsigned const nextJob = ctx->nextJobID; unsigned const nextJobIndex = nextJob % ctx->numJobs; @@ -371,6 +370,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize) job->src.size = srcSize; job->dst.size = ZSTD_compressBound(srcSize); job->jobID = nextJob; + job->lastJob = last; memcpy(job->src.start, ctx->input.buffer.start, srcSize); pthread_mutex_lock(&ctx->jobReady_mutex); ctx->jobReadyID++; @@ -457,7 +457,8 @@ static int compressFilename(const char* const srcFilename, const char* const dst g_streamedSize += readSize; /* reading was fine, now create the compression job */ { - int const error = createCompressionJob(ctx, readSize); + int const last = feof(srcFile); + int const error = createCompressionJob(ctx, readSize, last); if (error != 0) { ret = error; ctx->threadError = 1; @@ -466,7 +467,6 @@ static int compressFilename(const char* const srcFilename, const char* const dst } if (feof(srcFile)) { DEBUGLOG(2, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID); - ctx->lastJobID = ctx->nextJobID; break; } }