diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 7423d9db7..97d2b38ee 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -273,6 +273,7 @@ struct ZSTDMT_CCtx_s { pthread_mutex_t jobCompleted_mutex; pthread_cond_t jobCompleted_cond; size_t targetSectionSize; + size_t marginSize; size_t inBuffSize; size_t dictSize; size_t targetDictSize; @@ -285,7 +286,7 @@ struct ZSTDMT_CCtx_s { unsigned nextJobID; unsigned frameEnded; unsigned allJobsCompleted; - unsigned overlapWrLog; + unsigned overlapRLog; unsigned long long frameContentSize; size_t sectionSize; ZSTD_CDict* cdict; @@ -308,7 +309,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) cctx->jobIDMask = nbJobs - 1; cctx->allJobsCompleted = 1; cctx->sectionSize = 0; - cctx->overlapWrLog = 3; + cctx->overlapRLog = 3; cctx->factory = POOL_create(nbThreads, 1); cctx->buffPool = ZSTDMT_createBufferPool(nbThreads); cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads); @@ -368,8 +369,9 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, case ZSTDMT_p_sectionSize : mtctx->sectionSize = value; return 0; - case ZSTDMT_p_overlapSectionRLog : - mtctx->overlapWrLog = value; + case ZSTDMT_p_overlapSectionLog : + DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value); + mtctx->overlapRLog = (value >= 9) ? 0 : 9 - value; return 0; default : return ERROR(compressionParameter_unsupported); @@ -512,10 +514,15 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, if (zcs->cdict == NULL) return ERROR(memory_allocation); } } zcs->frameContentSize = pledgedSrcSize; + zcs->targetDictSize = (zcs->overlapRLog>=9) ? 0 : (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapRLog); + DEBUGLOG(4, "overlapRLog : %u ", zcs->overlapRLog); + DEBUGLOG(3, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10)); zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2); zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); - zcs->targetDictSize = zcs->overlapWrLog < 10 ? (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapWrLog) : 0; - zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog) /* margin */ + zcs->targetDictSize; + zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize); + DEBUGLOG(3, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10)); + zcs->marginSize = zcs->targetSectionSize >> 2; + zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize + zcs->marginSize; zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation); zcs->inBuff.filled = 0; @@ -680,6 +687,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { + size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize; if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */ if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input); @@ -690,7 +698,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu zcs->inBuff.filled += toLoad; } - if ( (zcs->inBuff.filled == zcs->inBuffSize) /* filled enough : let's compress */ + if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */ CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) ); } diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 3a26b93d7..27f78ee03 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -60,7 +60,7 @@ ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, const void* d * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */ typedef enum { ZSTDMT_p_sectionSize, /* size of input "section". Each section is compressed in parallel. 0 means default, which is dynamically determined within compression functions */ - ZSTDMT_p_overlapSectionRLog /* reverse log of overlapped section; 0 == use a complete window, 3(default) == use 1/8th of window, values >=10 means no overlap */ + ZSTDMT_p_overlapSectionLog /* Log of overlapped section; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window */ } ZSDTMT_parameter; /* ZSTDMT_setMTCtxParameter() : diff --git a/programs/fileio.c b/programs/fileio.c index 86da00131..960c6e3d9 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -78,14 +78,14 @@ * Macros ***************************************/ #define DISPLAY(...) fprintf(stderr, __VA_ARGS__) -#define DISPLAYLEVEL(l, ...) if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } +#define DISPLAYLEVEL(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } } static U32 g_displayLevel = 2; /* 0 : no display; 1: errors; 2 : + result + interaction + warnings; 3 : + progression; 4 : + information */ void FIO_setNotificationLevel(unsigned level) { g_displayLevel=level; } -#define DISPLAYUPDATE(l, ...) if (g_displayLevel>=l) { \ +#define DISPLAYUPDATE(l, ...) { if (g_displayLevel>=l) { \ if ((clock() - g_time > refreshRate) || (g_displayLevel>=4)) \ { g_time = clock(); DISPLAY(__VA_ARGS__); \ - if (g_displayLevel>=4) fflush(stdout); } } + if (g_displayLevel>=4) fflush(stdout); } } } static const clock_t refreshRate = CLOCKS_PER_SEC * 15 / 100; static clock_t g_time = 0; @@ -124,6 +124,13 @@ void FIO_setBlockSize(unsigned blockSize) { #endif g_blockSize = blockSize; } +#define FIO_OVERLAP_LOG_NOTSET 9999 +static U32 g_overlapLog = FIO_OVERLAP_LOG_NOTSET; +void FIO_setOverlapLog(unsigned overlapLog){ + if (overlapLog && g_nbThreads==1) + DISPLAYLEVEL(2, "Setting overlapLog is useless in single-thread mode \n"); + g_overlapLog = overlapLog; +} /*-************************************* @@ -272,8 +279,10 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, #ifdef ZSTD_MULTITHREAD ress.cctx = ZSTDMT_createCCtx(g_nbThreads); if (ress.cctx == NULL) EXM_THROW(30, "zstd: allocation error : can't create ZSTD_CStream"); - if (cLevel==ZSTD_maxCLevel()) - ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_overlapSectionRLog, 0); /* use complete window for overlap */ + if ((cLevel==ZSTD_maxCLevel()) && (g_overlapLog==FIO_OVERLAP_LOG_NOTSET)) + ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_overlapSectionLog, 9); /* use complete window for overlap */ + if (g_overlapLog != FIO_OVERLAP_LOG_NOTSET) + ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_overlapSectionLog, g_overlapLog); #else ress.cctx = ZSTD_createCStream(); if (ress.cctx == NULL) EXM_THROW(30, "zstd: allocation error : can't create ZSTD_CStream"); @@ -355,7 +364,6 @@ static int FIO_compressFilename_internal(cRess_t ress, size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile); if (inSize==0) break; readsize += inSize; - DISPLAYUPDATE(2, "\rRead : %u MB ", (U32)(readsize>>20)); { ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; while (inBuff.pos != inBuff.size) { /* note : is there any possibility of endless loop ? for example, if outBuff is not large enough ? */ @@ -373,7 +381,13 @@ static int FIO_compressFilename_internal(cRess_t ress, if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); compressedfilesize += outBuff.pos; } } } - DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ", (U32)(readsize>>20), (double)compressedfilesize/readsize*100); +#ifdef ZSTD_MULTITHREAD + if (!fileSize) DISPLAYUPDATE(2, "\rRead : %u MB", (U32)(readsize>>20)) + else DISPLAYUPDATE(2, "\rRead : %u / %u MB", (U32)(readsize>>20), (U32)(fileSize>>20)); +#else + if (!fileSize) DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%%", (U32)(readsize>>20), (double)compressedfilesize/readsize*100) + else DISPLAYUPDATE(2, "\rRead : %u / %u MB ==> %.2f%%", (U32)(readsize>>20), (U32)(fileSize>>20), (double)compressedfilesize/readsize*100); +#endif } /* End of Frame */ diff --git a/programs/fileio.h b/programs/fileio.h index 11178bcca..daff0312e 100644 --- a/programs/fileio.h +++ b/programs/fileio.h @@ -43,6 +43,7 @@ void FIO_setRemoveSrcFile(unsigned flag); void FIO_setMemLimit(unsigned memLimit); void FIO_setNbThreads(unsigned nbThreads); void FIO_setBlockSize(unsigned blockSize); +void FIO_setOverlapLog(unsigned overlapLog); /*-************************************* diff --git a/programs/zstdcli.c b/programs/zstdcli.c index 8b8a16b2b..9df2a4cb2 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -63,6 +63,8 @@ static const char* g_defaultDictName = "dictionary"; static const unsigned g_defaultMaxDictSize = 110 KB; static const int g_defaultDictCLevel = 3; static const unsigned g_defaultSelectivityLevel = 9; +#define OVERLAP_LOG_DEFAULT 9999 +static U32 g_overlapLog = OVERLAP_LOG_DEFAULT; /*-************************************ @@ -186,7 +188,7 @@ static unsigned readU32FromChar(const char** stringPtr) } /** longCommandWArg() : - * check is *stringPtr is the same as longCommand. + * check if *stringPtr is the same as longCommand. * If yes, @return 1 and advances *stringPtr to the position which immediately follows longCommand. * @return 0 and doesn't modify *stringPtr otherwise. */ @@ -220,6 +222,8 @@ static unsigned parseCoverParameters(const char* stringPtr, COVER_params_t *para return 1; } #endif + + /** parseCompressionParameters() : * reads compression parameters from *stringPtr (e.g. "--zstd=wlog=23,clog=23,hlog=22,slog=6,slen=3,tlen=48,strat=6") into *params * @return 1 means that compression parameters were correct @@ -235,6 +239,7 @@ static unsigned parseCompressionParameters(const char* stringPtr, ZSTD_compressi if (longCommandWArg(&stringPtr, "searchLength=") || longCommandWArg(&stringPtr, "slen=")) { params->searchLength = readU32FromChar(&stringPtr); if (stringPtr[0]==',') { stringPtr++; continue; } else break; } if (longCommandWArg(&stringPtr, "targetLength=") || longCommandWArg(&stringPtr, "tlen=")) { params->targetLength = readU32FromChar(&stringPtr); if (stringPtr[0]==',') { stringPtr++; continue; } else break; } if (longCommandWArg(&stringPtr, "strategy=") || longCommandWArg(&stringPtr, "strat=")) { params->strategy = (ZSTD_strategy)(1 + readU32FromChar(&stringPtr)); if (stringPtr[0]==',') { stringPtr++; continue; } else break; } + if (longCommandWArg(&stringPtr, "overlapLog=") || longCommandWArg(&stringPtr, "ovlog=")) { g_overlapLog = readU32FromChar(&stringPtr); if (stringPtr[0]==',') { stringPtr++; continue; } else break; } return 0; } @@ -370,6 +375,7 @@ int main(int argCount, const char* argv[]) if (longCommandWArg(&argument, "--memlimit=")) { memLimit = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--memory=")) { memLimit = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--memlimit-decompress=")) { memLimit = readU32FromChar(&argument); continue; } + if (longCommandWArg(&argument, "--block-size=")) { blockSize = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--zstd=")) { if (!parseCompressionParameters(argument, &compressionParams)) CLEAN_RETURN(badusage(programName)); continue; } /* fall-through, will trigger bad_usage() later on */ } @@ -629,6 +635,7 @@ int main(int argCount, const char* argv[]) #ifndef ZSTD_NOCOMPRESS FIO_setNbThreads(nbThreads); FIO_setBlockSize((U32)blockSize); + if (g_overlapLog!=OVERLAP_LOG_DEFAULT) FIO_setOverlapLog(g_overlapLog); if ((filenameIdx==1) && outFileName) operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams); else diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 0fdb1ee12..390a4fce5 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -837,9 +837,11 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp DISPLAYLEVEL(5, "Init with windowLog = %u \n", params.cParams.windowLog); params.fParams.checksumFlag = FUZ_rand(&lseed) & 1; params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1; - { size_t const initError = ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize); - CHECK (ZSTD_isError(initError),"ZSTDMT_initCStream_advanced error : %s", ZSTD_getErrorName(initError)); - } } } + { size_t const initError = ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize); + CHECK (ZSTD_isError(initError),"ZSTDMT_initCStream_advanced error : %s", ZSTD_getErrorName(initError)); } + ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12); + ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_sectionSize, FUZ_rand(&lseed) % (2*maxTestSize+1)); + } } /* multi-segments compression test */ XXH64_reset(&xxhState, 0);