diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index b7f4dccf2..35e26abb7 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -288,10 +288,12 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx) /* this function normalizes counters when compression level is changing */ static void reduceCounters(adaptCCtx* ctx) { + pthread_mutex_lock(&ctx->stats_mutex.pMutex); unsigned const min = MIN(ctx->stats.compressedCounter, MIN(ctx->stats.writeCounter, ctx->stats.readyCounter)); ctx->stats.writeCounter -= min; ctx->stats.compressedCounter -= min; ctx->stats.readyCounter -= min; + pthread_mutex_unlock(&ctx->stats_mutex.pMutex); } /* @@ -309,58 +311,68 @@ static void adaptCompressionLevel(adaptCCtx* ctx) } else { unsigned reset = 0; - unsigned const allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter; - unsigned const compressWaiting = ctx->adaptParam < ctx->stats.readyCounter; - unsigned const writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter; - unsigned const createWaiting = ctx->adaptParam < ctx->stats.writeCounter; - unsigned const writeSlow = (compressWaiting && createWaiting); - unsigned const compressSlow = (writeWaiting && createWaiting); - unsigned const createSlow = (compressWaiting && writeWaiting); - DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting); - DEBUG(2, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter); - if (allSlow) { - reset = 1; - } - else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) { - DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel); - double completion; - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - completion = ctx->writeCompletion; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - { - unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1; - unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1; - DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change); - DEBUG(2, "write completion: %f\n", completion); - ctx->compressionLevel += change; - reset = 1; - } - } - else if (compressSlow && ctx->compressionLevel > 1) { - double completion; - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - completion = ctx->compressionCompletion; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - { - unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1; - unsigned const change = MIN(maxChange, ctx->compressionLevel - 1); - DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel); - DEBUG(3, "completion: %f\n", completion); - ctx->compressionLevel -= change; - reset = 1; - } - } - if (reset) { - ctx->stats.readyCounter = 0; - ctx->stats.writeCounter = 0; - ctx->stats.compressedCounter = 0; + unsigned allSlow; + unsigned compressWaiting; + unsigned writeWaiting; + unsigned createWaiting; - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->compressionCompletion = 1; - ctx->compressionCompletionMeasured = 0; - ctx->writeCompletion = 1; - ctx->writeCompletionMeasured = 0; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_mutex_lock(&ctx->stats_mutex.pMutex); + allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter; + compressWaiting = ctx->adaptParam < ctx->stats.readyCounter; + writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter; + createWaiting = ctx->adaptParam < ctx->stats.writeCounter; + pthread_mutex_unlock(&ctx->stats_mutex.pMutex); + { + unsigned const writeSlow = (compressWaiting && createWaiting); + unsigned const compressSlow = (writeWaiting && createWaiting); + unsigned const createSlow = (compressWaiting && writeWaiting); + DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting); + if (allSlow) { + reset = 1; + } + else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) { + DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel); + double completion; + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + completion = ctx->writeCompletion; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + { + unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1; + unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1; + DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change); + DEBUG(2, "write completion: %f\n", completion); + ctx->compressionLevel += change; + reset = 1; + } + } + else if (compressSlow && ctx->compressionLevel > 1) { + double completion; + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + completion = ctx->compressionCompletion; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + { + unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1; + unsigned const change = MIN(maxChange, ctx->compressionLevel - 1); + DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel); + DEBUG(3, "completion: %f\n", completion); + ctx->compressionLevel -= change; + reset = 1; + } + } + if (reset) { + pthread_mutex_lock(&ctx->stats_mutex.pMutex); + ctx->stats.readyCounter = 0; + ctx->stats.writeCounter = 0; + ctx->stats.compressedCounter = 0; + pthread_mutex_unlock(&ctx->stats_mutex.pMutex); + + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->compressionCompletion = 1; + ctx->compressionCompletionMeasured = 0; + ctx->writeCompletion = 1; + ctx->writeCompletionMeasured = 0; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + } } } } @@ -383,8 +395,10 @@ static void* compressionThread(void* arg) DEBUG(3, "compressionThread(): waiting on job ready\n"); pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) { + pthread_mutex_lock(&ctx->stats_mutex.pMutex); ctx->stats.waitReady++; ctx->stats.readyCounter++; + pthread_mutex_unlock(&ctx->stats_mutex.pMutex); reduceCounters(ctx); adaptCompressionLevel(ctx); DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob); @@ -480,8 +494,10 @@ static void* outputThread(void* arg) DEBUG(3, "outputThread(): waiting on job compressed\n"); pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { + pthread_mutex_lock(&ctx->stats_mutex.pMutex); ctx->stats.waitCompressed++; ctx->stats.compressedCounter++; + pthread_mutex_unlock(&ctx->stats_mutex.pMutex); reduceCounters(ctx); pthread_mutex_lock(&ctx->completion_mutex.pMutex); if (!ctx->compressionCompletionMeasured) { @@ -563,8 +579,10 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs); while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) { + pthread_mutex_lock(&ctx->stats_mutex.pMutex); ctx->stats.waitWrite++; ctx->stats.writeCounter++; + pthread_mutex_unlock(&ctx->stats_mutex.pMutex); reduceCounters(ctx); pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->writeCompletionMeasured = 1; @@ -719,7 +737,9 @@ static int freeFileCompressionResources(fcResources* fcr) { int ret = 0; waitUntilAllJobsCompleted(fcr->ctx); + pthread_mutex_lock(&fcr->ctx->stats_mutex.pMutex); if (g_displayStats) printStats(fcr->ctx->stats); + pthread_mutex_unlock(&fcr->ctx->stats_mutex.pMutex); ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0; ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0; if (fcr->otArg) {