From e70912c72bcafdf4f8b43a25017eb579a85b97f6 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 29 Dec 2016 01:24:01 +0100 Subject: [PATCH] Changed : input divided into roughly equal parts. Debug : can measure time waiting for mutexes to unlock. --- lib/compress/zstdmt_compress.c | 60 ++++++++++++++++++++++++++-------- programs/bench.c | 4 +-- 2 files changed, 48 insertions(+), 16 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 0f14dbf31..c86be8701 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -5,12 +5,41 @@ #if 0 # include - static unsigned g_debugLevel = 4; +# include +# include + static unsigned g_debugLevel = 2; # define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); } + +static unsigned long long GetCurrentClockTimeMicroseconds() +{ + static clock_t _ticksPerSecond = 0; + if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK); + + struct tms junk; clock_t newTicks = (clock_t) times(&junk); + return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); +} + +#define MUTEX_WAIT_TIME_DLEVEL 5 +#define PTHREAD_MUTEX_LOCK(mutex) \ +if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \ + unsigned long long beforeTime = GetCurrentClockTimeMicroseconds(); \ + pthread_mutex_lock(mutex); \ + unsigned long long afterTime = GetCurrentClockTimeMicroseconds(); \ + unsigned long long elapsedTime = (afterTime-beforeTime); \ + if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \ + DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread %li took %llu microseconds to acquire mutex %s \n", \ + (long int) pthread_self(), elapsedTime, #mutex); \ + } \ +} else pthread_mutex_lock(mutex); + #else + # define DEBUGLOG(l, ...) /* disabled */ +# define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m) + #endif + #define ZSTDMT_NBTHREADS_MAX 128 #define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX) @@ -38,8 +67,9 @@ static ZSTDMT_dstBufferManager ZSTDMT_createDstBufferManager(void* dst, size_t d dbm.out.pos = 0; dbm.frameIDToWrite = 0; pthread_mutex_init(&dbm.frameTable_mutex, NULL); - pthread_mutex_init(&dbm.allFramesWritten_mutex, NULL); - pthread_mutex_lock(&dbm.allFramesWritten_mutex); /* maybe could be merged into init ? */ + pthread_mutex_t* const allFramesWritten_mutex = &dbm.allFramesWritten_mutex; + pthread_mutex_init(allFramesWritten_mutex, NULL); + PTHREAD_MUTEX_LOCK(allFramesWritten_mutex); /* maybe could be merged into init ? */ dbm.nbStackedFrames = 0; return dbm; } @@ -89,7 +119,7 @@ static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager, /* check if correct frame ordering; stack otherwise */ DEBUGLOG(5, "considering writing frame %u ", frameID); - pthread_mutex_lock(&dstBufferManager->frameTable_mutex); + PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex); if (frameID != dstBufferManager->frameIDToWrite) { DEBUGLOG(4, "writing frameID %u : not possible, waiting for %u ", frameID, dstBufferManager->frameIDToWrite); frameToWrite_t const frame = { src, srcSize, frameID, isLastFrame }; @@ -112,7 +142,7 @@ static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager, lastFrameWritten = isLastFrame; /* check if more frames are stacked */ - pthread_mutex_lock(&dstBufferManager->frameTable_mutex); + PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex); unsigned frameWritten = dstBufferManager->nbStackedFrames>0; while (frameWritten) { unsigned u; @@ -127,7 +157,7 @@ static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager, lastFrameWritten = dstBufferManager->stackedFrame[u].isLastFrame; dstBufferManager->frameIDToWrite = frameID+1; /* remove frame from stack */ - pthread_mutex_lock(&dstBufferManager->frameTable_mutex); + PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex); dstBufferManager->stackedFrame[u] = dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames-1]; dstBufferManager->nbStackedFrames -= 1; frameWritten = dstBufferManager->nbStackedFrames>0; @@ -166,7 +196,7 @@ typedef struct ZSTDMT_jobAgency_s { static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription job) { DEBUGLOG(5, "starting job posting "); - pthread_mutex_lock(&jobAgency->jobApply_mutex); /* wait for a thread to take previous job */ + PTHREAD_MUTEX_LOCK(&jobAgency->jobApply_mutex); /* wait for a thread to take previous job */ DEBUGLOG(5, "job posting mutex acquired "); jobAgency->jobAnnounce = job; /* post job */ pthread_mutex_unlock(&jobAgency->jobAnnounce_mutex); /* announce */ @@ -175,7 +205,7 @@ static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription jo static ZSTDMT_jobDescription ZSTDMT_getjob(ZSTDMT_jobAgency* jobAgency) { - pthread_mutex_lock(&jobAgency->jobAnnounce_mutex); /* should check return code */ + PTHREAD_MUTEX_LOCK(&jobAgency->jobAnnounce_mutex); /* should check return code */ ZSTDMT_jobDescription const job = jobAgency->jobAnnounce; pthread_mutex_unlock(&jobAgency->jobApply_mutex); return job; @@ -192,7 +222,7 @@ typedef struct ZSTDMT_bufferPool_s { static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) { - pthread_mutex_lock(&pool->bufferPool_mutex); + PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex); if (pool->nbBuffers) { /* try to use an existing buffer */ pool->nbBuffers--; buffer_t const buf = pool->bTable[pool->nbBuffers]; @@ -213,7 +243,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) /* effectively store buffer for later re-use, up to pool capacity */ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf) { - pthread_mutex_lock(&pool->bufferPool_mutex); + PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex); if (pool->nbBuffers >= ZSTDMT_NBBUFFERSPOOLED_MAX) { pthread_mutex_unlock(&pool->bufferPool_mutex); free(buf.start); @@ -240,6 +270,7 @@ static void* ZSTDMT_compressionThread(void* arg) ZSTDMT_bufferPool* const pool = &mtctx->bufferPool; ZSTD_CCtx* const cctx = ZSTD_createCCtx(); if (cctx==NULL) return NULL; /* allocation failure : thread not started */ + DEBUGLOG(3, "thread %li created ", (long int)pthread_self()); for (;;) { ZSTDMT_jobDescription const job = ZSTDMT_getjob(jobAgency); if (job.src == NULL) { @@ -254,7 +285,7 @@ static void* ZSTDMT_compressionThread(void* arg) DEBUGLOG(4, "start compressing frame %u", job.frameNumber); //size_t const cSize = ZSTD_compress(dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel); size_t const cSize = ZSTD_compressCCtx(cctx, dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel); - if (ZSTD_isError(cSize)) return (void*)(cSize); /* error */ + if (ZSTD_isError(cSize)) return (void*)(cSize); /* error - find a better way */ size_t const writeError = ZSTDMT_tryWriteFrame(dstBufferManager, dstBuffer.start, cSize, job.frameNumber, job.isLastFrame); /* pas clair */ if (ZSTD_isError(writeError)) return (void*)writeError; if (job.frameNumber) ZSTDMT_releaseBuffer(pool, dstBuffer); @@ -269,7 +300,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) /* init jobAgency */ pthread_mutex_init(&cctx->jobAgency.jobAnnounce_mutex, NULL); /* check return value ? */ pthread_mutex_init(&cctx->jobAgency.jobApply_mutex, NULL); - pthread_mutex_lock(&cctx->jobAgency.jobAnnounce_mutex); /* no job at beginning */ + PTHREAD_MUTEX_LOCK(&cctx->jobAgency.jobAnnounce_mutex); /* no job at beginning */ /* init bufferPool */ pthread_mutex_init(&cctx->bufferPool.bufferPool_mutex, NULL); /* start all workers */ @@ -299,7 +330,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx, ZSTDMT_jobAgency* jobAgency = &cctx->jobAgency; ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0); size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2); - unsigned const nbFrames = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */; + unsigned const nbFramesMax = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */; + unsigned const nbFrames = MIN(nbFramesMax, cctx->nbThreads); size_t const avgFrameSize = (srcSize + (nbFrames-1)) / nbFrames; size_t remainingSrcSize = srcSize; const char* const srcStart = (const char*)src; @@ -320,7 +352,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx, remainingSrcSize -= frameSize; } } - pthread_mutex_lock(&dbm.allFramesWritten_mutex); + PTHREAD_MUTEX_LOCK(&dbm.allFramesWritten_mutex); DEBUGLOG(4, "compressed size : %u ", (U32)dbm.out.pos); return dbm.out.pos; } diff --git a/programs/bench.c b/programs/bench.c index 6009ebc7b..b5cc77eed 100644 --- a/programs/bench.c +++ b/programs/bench.c @@ -159,8 +159,6 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize, U32 nbBlocks; UTIL_time_t ticksPerSecond; - ZSTDMT_CCtx* const mtcctx = ZSTDMT_createCCtx(g_nbThreads); - /* checks */ if (!compressedBuffer || !resultBuffer || !blockTable || !ctx || !dctx) EXM_THROW(31, "allocation error : not enough memory"); @@ -228,6 +226,8 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize, const char* const marks[NB_MARKS] = { " |", " /", " =", "\\" }; U32 markNb = 0; + ZSTDMT_CCtx* const mtcctx = ZSTDMT_createCCtx(g_nbThreads); + UTIL_getTime(&coolTime); DISPLAYLEVEL(2, "\r%79s\r", ""); while (!cCompleted || !dCompleted) {