diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index dd1fd3452..de5032271 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -443,6 +443,13 @@ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t di return 0; } +/* ZSTDMT_resetCStream() : + * pledgedSrcSize is optional and can be zero == unknown */ +size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) +{ + return ZSTDMT_initCStream_advanced(zcs, zcs->dict, zcs->dictSize, zcs->params, pledgedSrcSize); +} + size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0); return ZSTDMT_initCStream_advanced(zcs, NULL, 0, params, 0); diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index d1b01a79e..759906db1 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -20,6 +20,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx, /* === Streaming functions === */ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel); +size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown ; current limitation : no checksum */ diff --git a/programs/Makefile b/programs/Makefile index 02c924f39..f2a0ff26e 100644 --- a/programs/Makefile +++ b/programs/Makefile @@ -22,7 +22,7 @@ else ALIGN_LOOP = endif -CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/dictBuilder +CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/compress -I$(ZSTDDIR)/dictBuilder CFLAGS ?= -O3 CFLAGS += -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \ -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef \ diff --git a/programs/bench.c b/programs/bench.c index 5299b4712..74d26ca06 100644 --- a/programs/bench.c +++ b/programs/bench.c @@ -9,6 +9,14 @@ +/* ************************************** +* Tuning parameters +****************************************/ +#ifndef BMK_TIMETEST_DEFAULT_S /* default minimum time per test */ +#define BMK_TIMETEST_DEFAULT_S 3 +#endif + + /* ************************************** * Compiler Warnings ****************************************/ @@ -43,7 +51,6 @@ # define ZSTD_GIT_COMMIT_STRING ZSTD_EXPAND_AND_QUOTE(ZSTD_GIT_COMMIT) #endif -#define NBSECONDS 3 #define TIMELOOP_MICROSEC 1*1000000ULL /* 1 second */ #define ACTIVEPERIOD_MICROSEC 70*1000000ULL /* 70 seconds */ #define COOLPERIOD_SEC 10 @@ -109,31 +116,36 @@ static clock_us_t BMK_clockMicroSec(void) /* ************************************* * Benchmark Parameters ***************************************/ -static U32 g_nbSeconds = NBSECONDS; -static size_t g_blockSize = 0; static int g_additionalParam = 0; static U32 g_decodeOnly = 0; -static U32 g_nbThreads = 1; void BMK_setNotificationLevel(unsigned level) { g_displayLevel=level; } void BMK_setAdditionalParam(int additionalParam) { g_additionalParam=additionalParam; } -void BMK_SetNbSeconds(unsigned nbSeconds) +static U32 g_nbSeconds = BMK_TIMETEST_DEFAULT_S; +void BMK_setNbSeconds(unsigned nbSeconds) { g_nbSeconds = nbSeconds; - DISPLAYLEVEL(3, "- test >= %u seconds per compression / decompression -\n", g_nbSeconds); + DISPLAYLEVEL(3, "- test >= %u seconds per compression / decompression - \n", g_nbSeconds); } -void BMK_SetBlockSize(size_t blockSize) +static size_t g_blockSize = 0; +void BMK_setBlockSize(size_t blockSize) { g_blockSize = blockSize; - DISPLAYLEVEL(2, "using blocks of size %u KB \n", (U32)(blockSize>>10)); + if (g_blockSize) DISPLAYLEVEL(2, "using blocks of size %u KB \n", (U32)(blockSize>>10)); } void BMK_setDecodeOnlyMode(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); } -void BMK_SetNbThreads(unsigned nbThreads) { g_nbThreads = nbThreads; } +static U32 g_nbThreads = 1; +void BMK_setNbThreads(unsigned nbThreads) { +#ifndef ZSTD_MULTITHREAD + if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n"); +#endif + g_nbThreads = nbThreads; +} /* ******************************************************** diff --git a/programs/bench.h b/programs/bench.h index 87850bcc3..2918c02bf 100644 --- a/programs/bench.h +++ b/programs/bench.h @@ -19,9 +19,9 @@ int BMK_benchFiles(const char** fileNamesTable, unsigned nbFiles,const char* dic int cLevel, int cLevelLast, ZSTD_compressionParameters* compressionParams); /* Set Parameters */ -void BMK_SetNbSeconds(unsigned nbLoops); -void BMK_SetBlockSize(size_t blockSize); -void BMK_SetNbThreads(unsigned nbThreads); +void BMK_setNbSeconds(unsigned nbLoops); +void BMK_setBlockSize(size_t blockSize); +void BMK_setNbThreads(unsigned nbThreads); void BMK_setNotificationLevel(unsigned level); void BMK_setAdditionalParam(int additionalParam); void BMK_setDecodeOnlyMode(unsigned decodeFlag); diff --git a/programs/fileio.c b/programs/fileio.c index a112cc049..3864a5fab 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -34,11 +34,14 @@ #include "fileio.h" #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */ #include "zstd.h" -#ifdef ZSTD_GZDECOMPRESS -#include "zlib.h" -#if !defined(z_const) - #define z_const +#ifdef ZSTD_MULTITHREAD +# include "zstdmt_compress.h" #endif +#ifdef ZSTD_GZDECOMPRESS +# include "zlib.h" +# if !defined(z_const) +# define z_const +# endif #endif @@ -103,7 +106,13 @@ static U32 g_removeSrcFile = 0; void FIO_setRemoveSrcFile(unsigned flag) { g_removeSrcFile = (flag>0); } static U32 g_memLimit = 0; void FIO_setMemLimit(unsigned memLimit) { g_memLimit = memLimit; } - +static U32 g_nbThreads = 1; +void FIO_setNbThreads(unsigned nbThreads) { +#ifndef ZSTD_MULTITHREAD + if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n"); +#endif + g_nbThreads = nbThreads; +} /*-************************************* @@ -226,22 +235,30 @@ static size_t FIO_loadFile(void** bufferPtr, const char* fileName) * Compression ************************************************************************/ typedef struct { + FILE* srcFile; + FILE* dstFile; void* srcBuffer; size_t srcBufferSize; void* dstBuffer; size_t dstBufferSize; +#ifdef ZSTD_MULTITHREAD + ZSTDMT_CCtx* cctx; +#else ZSTD_CStream* cctx; - FILE* dstFile; - FILE* srcFile; +#endif } cRess_t; -static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, +static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, U64 srcSize, ZSTD_compressionParameters* comprParams) { cRess_t ress; memset(&ress, 0, sizeof(ress)); +#ifdef ZSTD_MULTITHREAD + ress.cctx = ZSTDMT_createCCtx(g_nbThreads); +#else ress.cctx = ZSTD_createCStream(); +#endif if (ress.cctx == NULL) EXM_THROW(30, "zstd: allocation error : can't create ZSTD_CStream"); ress.srcBufferSize = ZSTD_CStreamInSize(); ress.srcBuffer = malloc(ress.srcBufferSize); @@ -264,7 +281,11 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, if (comprParams->searchLength) params.cParams.searchLength = comprParams->searchLength; if (comprParams->targetLength) params.cParams.targetLength = comprParams->targetLength; if (comprParams->strategy) params.cParams.strategy = (ZSTD_strategy)(comprParams->strategy - 1); +#ifdef ZSTD_MULTITHREAD + { size_t const errorCode = ZSTDMT_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize); +#else { size_t const errorCode = ZSTD_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize); +#endif if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode)); } } free(dictBuffer); @@ -277,7 +298,11 @@ static void FIO_freeCResources(cRess_t ress) { free(ress.srcBuffer); free(ress.dstBuffer); +#ifdef ZSTD_MULTITHREAD + ZSTDMT_freeCCtx(ress.cctx); +#else ZSTD_freeCStream(ress.cctx); /* never fails */ +#endif } @@ -296,7 +321,11 @@ static int FIO_compressFilename_internal(cRess_t ress, U64 const fileSize = UTIL_getFileSize(srcFileName); /* init */ +#ifdef ZSTD_MULTITHREAD + { size_t const resetError = ZSTDMT_resetCStream(ress.cctx, fileSize); +#else { size_t const resetError = ZSTD_resetCStream(ress.cctx, fileSize); +#endif if (ZSTD_isError(resetError)) EXM_THROW(21, "Error initializing compression : %s", ZSTD_getErrorName(resetError)); } @@ -311,11 +340,14 @@ static int FIO_compressFilename_internal(cRess_t ress, /* Compress using buffered streaming */ { ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 }; - { size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff); - if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result)); } - if (inBuff.pos != inBuff.size) - /* inBuff should be entirely consumed since buffer sizes are recommended ones */ - EXM_THROW(24, "Compression error : input block not fully consumed"); + while (inBuff.pos != inBuff.size) { /* note : is there any possibility of endless loop ? for example, if outBuff is not large enough ? */ +#ifdef ZSTD_MULTITHREAD + size_t const result = ZSTDMT_compressStream(ress.cctx, &outBuff, &inBuff); +#else + size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff); +#endif + if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result)); + } /* Write cBlock */ { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); @@ -326,13 +358,19 @@ static int FIO_compressFilename_internal(cRess_t ress, } /* End of Frame */ - { ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; - size_t const result = ZSTD_endStream(ress.cctx, &outBuff); - if (result!=0) EXM_THROW(26, "Compression error : cannot create frame end"); - - { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); - if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); } - compressedfilesize += outBuff.pos; + { size_t result = 1; + while (result!=0) { /* note : is there any possibility of endless loop ? */ + ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; +#ifdef ZSTD_MULTITHREAD + result = ZSTDMT_endStream(ress.cctx, &outBuff); +#else + result = ZSTD_endStream(ress.cctx, &outBuff); +#endif + if (ZSTD_isError(result)) EXM_THROW(26, "Compression error during frame end : %s", ZSTD_getErrorName(result)); + { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); + if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); } + compressedfilesize += outBuff.pos; + } } /* Status */ @@ -632,7 +670,7 @@ unsigned long long FIO_decompressFrame(dRess_t* ress, if (ZSTD_isError(readSizeHint)) EXM_THROW(36, "Decoding error : %s", ZSTD_getErrorName(readSizeHint)); /* Write block */ - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, storedSkips); + storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, storedSkips); frameSize += outBuff.pos; DISPLAYUPDATE(2, "\rDecoded : %u MB... ", (U32)((alreadyDecoded+frameSize)>>20) ); diff --git a/programs/fileio.h b/programs/fileio.h index b71658334..9ef449294 100644 --- a/programs/fileio.h +++ b/programs/fileio.h @@ -40,6 +40,7 @@ void FIO_setDictIDFlag(unsigned dictIDFlag); void FIO_setChecksumFlag(unsigned checksumFlag); void FIO_setRemoveSrcFile(unsigned flag); void FIO_setMemLimit(unsigned memLimit); +void FIO_setNbThreads(unsigned nbThreads); /*-************************************* diff --git a/programs/zstdcli.c b/programs/zstdcli.c index c9d8100eb..de25d0f0a 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -110,12 +110,15 @@ static int usage_advanced(const char* programName) DISPLAY( " -q : suppress warnings; specify twice to suppress errors too\n"); DISPLAY( " -c : force write to standard output, even if it is the console\n"); #ifdef UTIL_HAS_CREATEFILELIST - DISPLAY( " -r : operate recursively on directories\n"); + DISPLAY( " -r : operate recursively on directories \n"); #endif #ifndef ZSTD_NOCOMPRESS DISPLAY( "--ultra : enable levels beyond %i, up to %i (requires more memory)\n", ZSTDCLI_CLEVEL_MAX, ZSTD_maxCLevel()); DISPLAY( "--no-dictID : don't write dictID into header (dictionary compression)\n"); - DISPLAY( "--[no-]check : integrity check (default:enabled)\n"); + DISPLAY( "--[no-]check : integrity check (default:enabled) \n"); +#ifdef ZSTD_MULTITHREAD + DISPLAY( " -T# : use # threads for compression (default:1) \n"); +#endif #endif #ifndef ZSTD_NODECOMPRESS DISPLAY( "--test : test compressed file integrity \n"); @@ -233,7 +236,10 @@ int main(int argCount, const char* argv[]) nextArgumentIsDictID=0, nextArgumentsAreFiles=0, ultra=0, - lastCommand = 0; + lastCommand = 0, + nbThreads = 1; + unsigned bench_nbSeconds = 3; /* would be better if this value was synchronized from bench */ + size_t blockSize = 0; zstd_operation_mode operation = zom_compress; ZSTD_compressionParameters compressionParams; int cLevel = ZSTDCLI_CLEVEL_DEFAULT; @@ -396,39 +402,37 @@ int main(int argCount, const char* argv[]) #ifndef ZSTD_NOBENCH /* Benchmark */ - case 'b': operation=zom_bench; argument++; break; + case 'b': + operation=zom_bench; + argument++; + break; /* range bench (benchmark only) */ case 'e': - /* compression Level */ - argument++; - cLevelLast = readU32FromChar(&argument); - break; + /* compression Level */ + argument++; + cLevelLast = readU32FromChar(&argument); + break; /* Modify Nb Iterations (benchmark only) */ case 'i': argument++; - { U32 const iters = readU32FromChar(&argument); - BMK_setNotificationLevel(displayLevel); - BMK_SetNbSeconds(iters); - } + bench_nbSeconds = readU32FromChar(&argument); break; /* cut input into blocks (benchmark only) */ case 'B': argument++; - { size_t const bSize = readU32FromChar(&argument); - BMK_setNotificationLevel(displayLevel); - BMK_SetBlockSize(bSize); - } + blockSize = readU32FromChar(&argument); break; +#endif /* ZSTD_NOBENCH */ + /* nb of threads (hidden option) */ case 'T': argument++; - BMK_SetNbThreads(readU32FromChar(&argument)); + nbThreads = readU32FromChar(&argument); break; -#endif /* ZSTD_NOBENCH */ /* Dictionary Selection level */ case 's': @@ -518,6 +522,9 @@ int main(int argCount, const char* argv[]) if (operation==zom_bench) { #ifndef ZSTD_NOBENCH BMK_setNotificationLevel(displayLevel); + BMK_setBlockSize(blockSize); + BMK_setNbThreads(nbThreads); + BMK_setNbSeconds(bench_nbSeconds); BMK_benchFiles(filenameTable, filenameIdx, dictFileName, cLevel, cLevelLast, &compressionParams); #endif goto _end; @@ -569,6 +576,7 @@ int main(int argCount, const char* argv[]) FIO_setNotificationLevel(displayLevel); if (operation==zom_compress) { #ifndef ZSTD_NOCOMPRESS + FIO_setNbThreads(nbThreads); if ((filenameIdx==1) && outFileName) operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams); else