diff --git a/build/.gitignore b/build/.gitignore index 7ceb958ea..86ed710bd 100644 --- a/build/.gitignore +++ b/build/.gitignore @@ -1,7 +1,7 @@ *Copy # Visual C++ -build/ +bin/ VS2005/ VS2008/ VS2010/ diff --git a/contrib/pzstd/Options.cpp b/contrib/pzstd/Options.cpp index b503def15..ece8c0782 100644 --- a/contrib/pzstd/Options.cpp +++ b/contrib/pzstd/Options.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index ccd4f6266..e0826b9d8 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -14,6 +14,7 @@ #include "utils/ThreadPool.h" #include "utils/WorkQueue.h" +#include #include #include #include @@ -52,16 +53,18 @@ static std::uintmax_t fileSizeOrZero(const std::string &file) { return size; } -static size_t handleOneInput(const Options &options, +static std::uint64_t handleOneInput(const Options &options, const std::string &inputFile, FILE* inputFd, + const std::string &outputFile, FILE* outputFd, ErrorHolder &errorHolder) { auto inputSize = fileSizeOrZero(inputFile); // WorkQueue outlives ThreadPool so in the case of error we are certain // we don't accidently try to call push() on it after it is destroyed. WorkQueue> outs{options.numThreads + 1}; - size_t bytesWritten; + std::uint64_t bytesRead; + std::uint64_t bytesWritten; { // Initialize the thread pool with numThreads + 1 // We add one because the read thread spends most of its time waiting. @@ -71,8 +74,9 @@ static size_t handleOneInput(const Options &options, if (!options.decompress) { // Add a job that reads the input and starts all the compression jobs executor.add( - [&errorHolder, &outs, &executor, inputFd, inputSize, &options] { - asyncCompressChunks( + [&errorHolder, &outs, &executor, inputFd, inputSize, &options, + &bytesRead] { + bytesRead = asyncCompressChunks( errorHolder, outs, executor, @@ -82,14 +86,30 @@ static size_t handleOneInput(const Options &options, options.determineParameters()); }); // Start writing - bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress); + bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress, + options.verbosity); } else { // Add a job that reads the input and starts all the decompression jobs - executor.add([&errorHolder, &outs, &executor, inputFd] { - asyncDecompressFrames(errorHolder, outs, executor, inputFd); + executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] { + bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd); }); // Start writing - bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress); + bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress, + options.verbosity); + } + } + if (options.verbosity > 1 && !errorHolder.hasError()) { + std::string inputFileName = inputFile == "-" ? "stdin" : inputFile; + std::string outputFileName = outputFile == "-" ? "stdout" : outputFile; + if (!options.decompress) { + double ratio = static_cast(bytesWritten) / + static_cast(bytesRead + !bytesRead); + std::fprintf(stderr, "%-20s :%6.2f%% (%6llu => %6llu bytes, %s)\n", + inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten, + outputFileName.c_str()); + } else { + std::fprintf(stderr, "%-20s: %llu bytes \n", + inputFileName.c_str(),bytesWritten); } } return bytesWritten; @@ -185,7 +205,7 @@ int pzstdMain(const Options &options) { } auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); }); // (de)compress the file - handleOneInput(options, input, inputFd, outputFd, errorHolder); + handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder); if (errorHolder.hasError()) { continue; } @@ -359,11 +379,13 @@ FileStatus fileStatus(FILE* fd) { * Returns the status of the file after all of the reads have occurred. */ static FileStatus -readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) { +readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd, + std::uint64_t *totalBytesRead) { Buffer buffer(size); while (!buffer.empty()) { auto bytesRead = std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd); + *totalBytesRead += bytesRead; queue.push(buffer.splitAt(bytesRead)); auto status = fileStatus(fd); if (status != FileStatus::Continue) { @@ -373,7 +395,7 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) { return FileStatus::Continue; } -void asyncCompressChunks( +std::uint64_t asyncCompressChunks( ErrorHolder& errorHolder, WorkQueue>& chunks, ThreadPool& executor, @@ -382,6 +404,7 @@ void asyncCompressChunks( size_t numThreads, ZSTD_parameters params) { auto chunksGuard = makeScopeGuard([&] { chunks.finish(); }); + std::uint64_t bytesRead = 0; // Break the input up into chunks of size `step` and compress each chunk // independently. @@ -401,9 +424,10 @@ void asyncCompressChunks( // Pass the output queue to the writer thread. chunks.push(std::move(out)); // Fill the input queue for the compression job we just started - status = readData(*in, ZSTD_CStreamInSize(), step, fd); + status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead); } errorHolder.check(status != FileStatus::Error, "Error reading input"); + return bytesRead; } /** @@ -484,12 +508,14 @@ static void decompress( } } -void asyncDecompressFrames( +std::uint64_t asyncDecompressFrames( ErrorHolder& errorHolder, WorkQueue>& frames, ThreadPool& executor, FILE* fd) { auto framesGuard = makeScopeGuard([&] { frames.finish(); }); + std::uint64_t totalBytesRead = 0; + // Split the source up into its component frames. // If we find our recognized skippable frame we know the next frames size // which means that we can decompress each standard frame in independently. @@ -509,6 +535,7 @@ void asyncDecompressFrames( // frameSize is 0 if the frame info can't be decoded. Buffer buffer(SkippableFrame::kSize); auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd); + totalBytesRead += bytesRead; status = fileStatus(fd); if (bytesRead == 0 && status != FileStatus::Continue) { break; @@ -533,14 +560,15 @@ void asyncDecompressFrames( // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted // Pass the rest of the source to this decompression task while (status == FileStatus::Continue && !errorHolder.hasError()) { - status = readData(*in, chunkSize, chunkSize, fd); + status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead); } break; } // Fill the input queue for the decompression job we just started - status = readData(*in, chunkSize, frameSize, fd); + status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead); } errorHolder.check(status != FileStatus::Error, "Error reading input"); + return totalBytesRead; } /// Write `data` to `fd`, returns true iff success. @@ -554,12 +582,34 @@ static bool writeData(ByteRange data, FILE* fd) { return true; } -size_t writeFile( +void updateWritten(int verbosity, std::uint64_t bytesWritten) { + if (verbosity <= 1) { + return; + } + using Clock = std::chrono::system_clock; + static Clock::time_point then; + constexpr std::chrono::milliseconds refreshRate{150}; + + auto now = Clock::now(); + if (now - then > refreshRate) { + then = now; + std::fprintf(stderr, "\rWritten: %u MB ", + static_cast(bytesWritten >> 20)); + } +} + +std::uint64_t writeFile( ErrorHolder& errorHolder, WorkQueue>& outs, FILE* outputFd, - bool decompress) { - size_t bytesWritten = 0; + bool decompress, + int verbosity) { + auto lineClearGuard = makeScopeGuard([verbosity] { + if (verbosity > 1) { + std::fprintf(stderr, "\r%79s\r", ""); + } + }); + std::uint64_t bytesWritten = 0; std::shared_ptr out; // Grab the output queue for each decompression job (in order). while (outs.pop(out) && !errorHolder.hasError()) { @@ -583,6 +633,7 @@ size_t writeFile( return bytesWritten; } bytesWritten += buffer.size(); + updateWritten(verbosity, bytesWritten); } } return bytesWritten; diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h index 0c21d1352..fe44ccfde 100644 --- a/contrib/pzstd/Pzstd.h +++ b/contrib/pzstd/Pzstd.h @@ -45,8 +45,9 @@ int pzstdMain(const Options& options); * @param size The size of the input file if known, 0 otherwise * @param numThreads The number of threads in the thread pool * @param parameters The zstd parameters to use for compression + * @returns The number of bytes read from the file */ -void asyncCompressChunks( +std::uint64_t asyncCompressChunks( ErrorHolder& errorHolder, WorkQueue>& chunks, ThreadPool& executor, @@ -66,8 +67,9 @@ void asyncCompressChunks( * as soon as it is available * @param executor The thread pool to run compression jobs in * @param fd The input file descriptor + * @returns The number of bytes read from the file */ -void asyncDecompressFrames( +std::uint64_t asyncDecompressFrames( ErrorHolder& errorHolder, WorkQueue>& frames, ThreadPool& executor, @@ -82,11 +84,13 @@ void asyncDecompressFrames( * (de)compression job. * @param outputFd The file descriptor to write to * @param decompress Are we decompressing? + * @param verbosity The verbosity level to log at * @returns The number of bytes written */ -std::size_t writeFile( +std::uint64_t writeFile( ErrorHolder& errorHolder, WorkQueue>& outs, FILE* outputFd, - bool decompress); + bool decompress, + int verbosity); } diff --git a/contrib/pzstd/README.md b/contrib/pzstd/README.md index eba64085a..05ceb5599 100644 --- a/contrib/pzstd/README.md +++ b/contrib/pzstd/README.md @@ -4,24 +4,31 @@ Parallel Zstandard is a Pigz-like tool for Zstandard. It provides Zstandard format compatible compression and decompression that is able to utilize multiple cores. It breaks the input up into equal sized chunks and compresses each chunk independently into a Zstandard frame. It then concatenates the frames together to produce the final compressed output. -Optionally, with the `-p` option, PZstandard will write a 12 byte header for each frame that is a skippable frame in the Zstandard format, which tells PZstandard the size of the next compressed frame. -When `-p` is specified for compression, PZstandard can decompress the output in parallel. +Pzstandard will write a 12 byte header for each frame that is a skippable frame in the Zstandard format, which tells PZstandard the size of the next compressed frame. +PZstandard supports parallel decompression of files compressed with PZstandard. +When decompressing files compressed with Zstandard, PZstandard does IO in one thread, and decompression in another. ## Usage +PZstandard supports the same command line interface as Zstandard, but also provies the `-p` option to specify the number of threads. +Dictionary mode is not currently supported. + Basic usage - pzstd input-file -o output-file -n num-threads [ -p ] -# # Compression - pzstd -d input-file -o output-file -n num-threads # Decompression + pzstd input-file -o output-file -p num-threads -# # Compression + pzstd -d input-file -o output-file -p num-threads # Decompression PZstandard also supports piping and fifo pipes - cat input-file | pzstd -n num-threads [ -p ] -# -c > /dev/null + cat input-file | pzstd -p num-threads -# -c > /dev/null For more options pzstd --help +PZstandard tries to pick a smart default number of threads if not specified (displayed in `pzstd --help`). +If this number is not suitable, during compilation you can define `PZSTD_NUM_THREADS` to the number of threads you prefer. + ## Benchmarks As a reference, PZstandard and Pigz were compared on an Intel Core i7 @ 3.1 GHz, each using 4 threads, with the [Silesia compression corpus](http://sun.aei.polsl.pl/~sdeor/index.php?page=silesia). @@ -32,8 +39,8 @@ Compression Speed vs Ratio with 4 Threads | Decompression Speed with 4 Threads The test procedure was to run each of the following commands 2 times for each compression level, and take the minimum time. - time pzstd -# -n 4 -p -c silesia.tar > silesia.tar.zst - time pzstd -d -n 4 -c silesia.tar.zst > /dev/null + time pzstd -# -p 4 -c silesia.tar > silesia.tar.zst + time pzstd -d -p 4 -c silesia.tar.zst > /dev/null time pigz -# -p 4 -k -c silesia.tar > silesia.tar.gz time pigz -d -p 4 -k -c silesia.tar.gz > /dev/null diff --git a/contrib/pzstd/test/PzstdTest.cpp b/contrib/pzstd/test/PzstdTest.cpp index 64bcf9cab..c85f73a39 100644 --- a/contrib/pzstd/test/PzstdTest.cpp +++ b/contrib/pzstd/test/PzstdTest.cpp @@ -54,6 +54,7 @@ TEST(Pzstd, SmallSizes) { options.inputFiles = {inputFile}; options.numThreads = numThreads; options.compressionLevel = level; + options.verbosity = 1; ASSERT_TRUE(roundTrip(options)); errorGuard.dismiss(); } @@ -91,6 +92,7 @@ TEST(Pzstd, LargeSizes) { options.inputFiles = {inputFile}; options.numThreads = std::min(numThreads, options.numThreads); options.compressionLevel = level; + options.verbosity = 1; ASSERT_TRUE(roundTrip(options)); errorGuard.dismiss(); } diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 298278c99..94f4b5a25 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -142,21 +142,8 @@ size_t ZSTD_checkCParams(ZSTD_compressionParameters cParams) } -/** ZSTD_checkCParams_advanced() : - temporary work-around, while the compressor compatibility remains limited regarding windowLog < 18 */ -size_t ZSTD_checkCParams_advanced(ZSTD_compressionParameters cParams, U64 srcSize) -{ - if (srcSize > (1ULL << ZSTD_WINDOWLOG_MIN)) return ZSTD_checkCParams(cParams); - if (cParams.windowLog < ZSTD_WINDOWLOG_ABSOLUTEMIN) return ERROR(compressionParameter_unsupported); - if (srcSize <= (1ULL << cParams.windowLog)) cParams.windowLog = ZSTD_WINDOWLOG_MIN; /* fake value - temporary work around */ - if (srcSize <= (1ULL << cParams.chainLog)) cParams.chainLog = ZSTD_CHAINLOG_MIN; /* fake value - temporary work around */ - if ((srcSize <= (1ULL << cParams.hashLog)) & ((U32)cParams.strategy < (U32)ZSTD_btlazy2)) cParams.hashLog = ZSTD_HASHLOG_MIN; /* fake value - temporary work around */ - return ZSTD_checkCParams(cParams); -} - - /** ZSTD_adjustCParams() : - optimize cPar for a given input (`srcSize` and `dictSize`). + optimize `cPar` for a given input (`srcSize` and `dictSize`). mostly downsizing to reduce memory consumption and initialization. Both `srcSize` and `dictSize` are optional (use 0 if unknown), but if both are 0, no optimization can be done. @@ -169,7 +156,7 @@ ZSTD_compressionParameters ZSTD_adjustCParams(ZSTD_compressionParameters cPar, u { U32 const minSrcSize = (srcSize==0) ? 500 : 0; U64 const rSize = srcSize + dictSize + minSrcSize; if (rSize < ((U64)1< srcLog) cPar.windowLog = srcLog; } } if (cPar.hashLog > cPar.windowLog) cPar.hashLog = cPar.windowLog; @@ -178,7 +165,6 @@ ZSTD_compressionParameters ZSTD_adjustCParams(ZSTD_compressionParameters cPar, u if (cPar.chainLog > maxChainLog) cPar.chainLog = maxChainLog; } /* <= ZSTD_CHAINLOG_MAX */ if (cPar.windowLog < ZSTD_WINDOWLOG_ABSOLUTEMIN) cPar.windowLog = ZSTD_WINDOWLOG_ABSOLUTEMIN; /* required for frame header */ - if ((cPar.hashLog < ZSTD_HASHLOG_MIN) & ((U32)cPar.strategy >= (U32)ZSTD_btlazy2)) cPar.hashLog = ZSTD_HASHLOG_MIN; /* required to ensure collision resistance in bt */ return cPar; } @@ -2556,7 +2542,7 @@ size_t ZSTD_compressBegin_advanced(ZSTD_CCtx* cctx, ZSTD_parameters params, unsigned long long pledgedSrcSize) { /* compression parameters verification and optimization */ - CHECK_F(ZSTD_checkCParams_advanced(params.cParams, pledgedSrcSize)); + CHECK_F(ZSTD_checkCParams(params.cParams)); return ZSTD_compressBegin_internal(cctx, dict, dictSize, params, pledgedSrcSize); } @@ -2644,7 +2630,7 @@ size_t ZSTD_compress_advanced (ZSTD_CCtx* ctx, const void* dict,size_t dictSize, ZSTD_parameters params) { - CHECK_F(ZSTD_checkCParams_advanced(params.cParams, srcSize)); + CHECK_F(ZSTD_checkCParams(params.cParams)); return ZSTD_compress_internal(ctx, dst, dstCapacity, src, srcSize, dict, dictSize, params); } @@ -2851,7 +2837,7 @@ size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, { size_t const neededInBuffSize = (size_t)1 << params.cParams.windowLog; if (zcs->inBuffSize < neededInBuffSize) { zcs->inBuffSize = neededInBuffSize; - ZSTD_free(zcs->inBuff, zcs->customMem); /* should not be necessary */ + ZSTD_free(zcs->inBuff, zcs->customMem); zcs->inBuff = (char*) ZSTD_malloc(neededInBuffSize, zcs->customMem); if (zcs->inBuff == NULL) return ERROR(memory_allocation); } @@ -2859,7 +2845,7 @@ size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, } if (zcs->outBuffSize < ZSTD_compressBound(zcs->blockSize)+1) { zcs->outBuffSize = ZSTD_compressBound(zcs->blockSize)+1; - ZSTD_free(zcs->outBuff, zcs->customMem); /* should not be necessary */ + ZSTD_free(zcs->outBuff, zcs->customMem); zcs->outBuff = (char*) ZSTD_malloc(zcs->outBuffSize, zcs->customMem); if (zcs->outBuff == NULL) return ERROR(memory_allocation); } diff --git a/lib/decompress/zstd_decompress.c b/lib/decompress/zstd_decompress.c index 3410bbc0a..47b5f42c7 100644 --- a/lib/decompress/zstd_decompress.c +++ b/lib/decompress/zstd_decompress.c @@ -1554,6 +1554,7 @@ size_t ZSTD_initDStream(ZSTD_DStream* zds) size_t ZSTD_resetDStream(ZSTD_DStream* zds) { + if (zds->ddict == NULL) return ERROR(stage_wrong); /* must be init at least once */ zds->stage = zdss_loadHeader; zds->lhSize = zds->inPos = zds->outStart = zds->outEnd = 0; zds->legacyVersion = 0; diff --git a/lib/dictBuilder/zdict.c b/lib/dictBuilder/zdict.c index 8a38aadeb..874351ebf 100644 --- a/lib/dictBuilder/zdict.c +++ b/lib/dictBuilder/zdict.c @@ -505,7 +505,8 @@ static size_t ZDICT_trainBuffer(dictItem* dictList, U32 dictListSize, { size_t pos; for (pos=0; pos < bufferSize; pos++) reverseSuffix[suffix[pos]] = (U32)pos; - /* build file pos */ + /* note filePos tracks borders between samples. + It's not used at this stage, but planned to become useful in a later update */ filePos[0] = 0; for (pos=1; pos= srcBufferSize) maxTestSize = srcBufferSize-1; + { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? 0 : maxTestSize; + size_t const resetError = ZSTD_resetCStream(zc, pledgedSrcSize); + CHECK(ZSTD_isError(resetError), "ZSTD_resetCStream error : %s", ZSTD_getErrorName(resetError)); + } + } else { + U32 const testLog = FUZ_rand(&lseed) % maxSrcLog; U32 const cLevel = (FUZ_rand(&lseed) % (ZSTD_maxCLevel() - (testLog/3))) + 1; maxTestSize = FUZ_rLogLength(&lseed, testLog); - dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_randomLength(&lseed, maxSampleLog) : 0; + oldTestLog = testLog; /* random dictionary selection */ + dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_randomLength(&lseed, maxSampleLog) : 0; { size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize); dict = srcBuffer + dictStart; } - { ZSTD_parameters params = ZSTD_getParams(cLevel, 0, dictSize); + { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? 0 : maxTestSize; + ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize); params.fParams.checksumFlag = FUZ_rand(&lseed) & 1; params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1; - { size_t const initError = ZSTD_initCStream_advanced(zc, dict, dictSize, params, 0); + { size_t const initError = ZSTD_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize); CHECK (ZSTD_isError(initError),"ZSTD_initCStream_advanced error : %s", ZSTD_getErrorName(initError)); } } } /* multi-segments compression test */ XXH64_reset(&xxhState, 0); - nbChunks = (FUZ_rand(&lseed) & 127) + 2; { ZSTD_outBuffer outBuff = { cBuffer, cBufferSize, 0 } ; - for (n=0, cSize=0, totalTestSize=0 ; (n