From ad66faf16a7093c0cc02b86da2d4c583a5bb7dbf Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Tue, 18 Jul 2017 15:23:11 -0700 Subject: [PATCH] added progress check for filewriting, put important shared data behind mutex when being read from/written to --- contrib/adaptive-compression/adapt.c | 78 +++++++++++++++++++++------- 1 file changed, 59 insertions(+), 19 deletions(-) diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 404db6d94..b7f4dccf2 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -88,7 +88,9 @@ typedef struct { unsigned allJobsCompleted; unsigned adaptParam; unsigned compressionCompletionMeasured; + unsigned writeCompletionMeasured; double compressionCompletion; + double writeCompletion; mutex_t jobCompressed_mutex; cond_t jobCompressed_cond; mutex_t jobReady_mutex; @@ -97,6 +99,8 @@ typedef struct { cond_t allJobsCompleted_cond; mutex_t jobWrite_mutex; cond_t jobWrite_cond; + mutex_t completion_mutex; + mutex_t stats_mutex; size_t lastDictSize; inBuff_t input; cStat_t stats; @@ -156,6 +160,8 @@ static int freeCCtx(adaptCCtx* ctx) error |= destroyCond(&ctx->allJobsCompleted_cond); error |= destroyMutex(&ctx->jobWrite_mutex); error |= destroyCond(&ctx->jobWrite_cond); + error |= destroyMutex(&ctx->completion_mutex); + error |= destroyMutex(&ctx->stats_mutex); error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx)); free(ctx->input.buffer.start); if (ctx->jobs){ @@ -200,6 +206,8 @@ static adaptCCtx* createCCtx(unsigned numJobs) pthreadError |= initCond(&ctx->allJobsCompleted_cond); pthreadError |= initMutex(&ctx->jobWrite_mutex); pthreadError |= initCond(&ctx->jobWrite_cond); + pthreadError |= initMutex(&ctx->completion_mutex); + pthreadError |= initMutex(&ctx->stats_mutex); if (pthreadError) return NULL; } ctx->numJobs = numJobs; @@ -315,24 +323,44 @@ static void adaptCompressionLevel(adaptCCtx* ctx) } else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) { DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel); - ctx->compressionLevel++; - reset = 1; + double completion; + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + completion = ctx->writeCompletion; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + { + unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1; + unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1; + DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change); + DEBUG(2, "write completion: %f\n", completion); + ctx->compressionLevel += change; + reset = 1; + } } else if (compressSlow && ctx->compressionLevel > 1) { - double const completion = ctx->compressionCompletion; - unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1; - unsigned const change = MIN(maxChange, ctx->compressionLevel - 1); - DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel); - DEBUG(3, "completion: %f\n", completion); - ctx->compressionLevel -= change; - reset = 1; + double completion; + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + completion = ctx->compressionCompletion; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + { + unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1; + unsigned const change = MIN(maxChange, ctx->compressionLevel - 1); + DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel); + DEBUG(3, "completion: %f\n", completion); + ctx->compressionLevel -= change; + reset = 1; + } } if (reset) { ctx->stats.readyCounter = 0; ctx->stats.writeCounter = 0; ctx->stats.compressedCounter = 0; + + pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->compressionCompletion = 1; ctx->compressionCompletionMeasured = 0; + ctx->writeCompletion = 1; + ctx->writeCompletionMeasured = 0; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); } } } @@ -455,12 +483,14 @@ static void* outputThread(void* arg) ctx->stats.waitCompressed++; ctx->stats.compressedCounter++; reduceCounters(ctx); + pthread_mutex_lock(&ctx->completion_mutex.pMutex); if (!ctx->compressionCompletionMeasured) { ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx); ctx->compressionCompletionMeasured = 1; + DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion); } + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); adaptCompressionLevel(ctx); - DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion); DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } @@ -484,6 +514,14 @@ static void* outputThread(void* arg) if (ret != writeSize) break; pos += ret; remaining -= ret; + + /* update completion variable for writing */ + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + if (!ctx->writeCompletionMeasured) { + ctx->writeCompletion = 1 - (double)remaining/compressedSize; + } + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + if (remaining == 0) break; } if (pos != compressedSize) { @@ -528,12 +566,10 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) ctx->stats.waitWrite++; ctx->stats.writeCounter++; reduceCounters(ctx); - if (!ctx->compressionCompletion) { - ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx); - ctx->compressionCompletionMeasured = 1; - } + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->writeCompletionMeasured = 1; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); adaptCompressionLevel(ctx); - DEBUG(3, "job creation detected completion %f\n", ctx->compressionCompletion); DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob); pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); } @@ -552,10 +588,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) ctx->input.buffer.start = copy; } job->dictSize = ctx->lastDictSize; - pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); - ctx->jobReadyID++; - pthread_cond_signal(&ctx->jobReady_cond.pCond); - pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); + DEBUG(3, "finished job creation %u\n", nextJob); ctx->nextJobID++; DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize); @@ -567,6 +600,13 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) ctx->lastDictSize = srcSize; ctx->input.filled = srcSize; } + + /* signal job ready */ + pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); + ctx->jobReadyID++; + pthread_cond_signal(&ctx->jobReady_cond.pCond); + pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); + return 0; }