diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index ccd4f6266..5de90e8b6 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -52,16 +52,18 @@ static std::uintmax_t fileSizeOrZero(const std::string &file) { return size; } -static size_t handleOneInput(const Options &options, +static std::uint64_t handleOneInput(const Options &options, const std::string &inputFile, FILE* inputFd, + const std::string &outputFile, FILE* outputFd, ErrorHolder &errorHolder) { auto inputSize = fileSizeOrZero(inputFile); // WorkQueue outlives ThreadPool so in the case of error we are certain // we don't accidently try to call push() on it after it is destroyed. WorkQueue> outs{options.numThreads + 1}; - size_t bytesWritten; + std::uint64_t bytesRead; + std::uint64_t bytesWritten; { // Initialize the thread pool with numThreads + 1 // We add one because the read thread spends most of its time waiting. @@ -71,8 +73,9 @@ static size_t handleOneInput(const Options &options, if (!options.decompress) { // Add a job that reads the input and starts all the compression jobs executor.add( - [&errorHolder, &outs, &executor, inputFd, inputSize, &options] { - asyncCompressChunks( + [&errorHolder, &outs, &executor, inputFd, inputSize, &options, + &bytesRead] { + bytesRead = asyncCompressChunks( errorHolder, outs, executor, @@ -85,13 +88,27 @@ static size_t handleOneInput(const Options &options, bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress); } else { // Add a job that reads the input and starts all the decompression jobs - executor.add([&errorHolder, &outs, &executor, inputFd] { - asyncDecompressFrames(errorHolder, outs, executor, inputFd); + executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] { + bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd); }); // Start writing bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress); } } + if (options.verbosity > 1 && !errorHolder.hasError()) { + std::string inputFileName = inputFile == "-" ? "stdin" : inputFile; + std::string outputFileName = outputFile == "-" ? "stdout" : outputFile; + if (!options.decompress) { + double ratio = static_cast(bytesWritten) / + static_cast(bytesRead + !bytesRead); + std::fprintf(stderr, "%-20s :%6.2f%% (%6llu => %6llu bytes, %s)\n", + inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten, + outputFileName.c_str()); + } else { + std::fprintf(stderr, "%-20s: %llu bytes \n", + inputFileName.c_str(),bytesWritten); + } + } return bytesWritten; } @@ -185,7 +202,7 @@ int pzstdMain(const Options &options) { } auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); }); // (de)compress the file - handleOneInput(options, input, inputFd, outputFd, errorHolder); + handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder); if (errorHolder.hasError()) { continue; } @@ -359,11 +376,13 @@ FileStatus fileStatus(FILE* fd) { * Returns the status of the file after all of the reads have occurred. */ static FileStatus -readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) { +readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd, + std::uint64_t *totalBytesRead) { Buffer buffer(size); while (!buffer.empty()) { auto bytesRead = std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd); + *totalBytesRead += bytesRead; queue.push(buffer.splitAt(bytesRead)); auto status = fileStatus(fd); if (status != FileStatus::Continue) { @@ -373,7 +392,7 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) { return FileStatus::Continue; } -void asyncCompressChunks( +std::uint64_t asyncCompressChunks( ErrorHolder& errorHolder, WorkQueue>& chunks, ThreadPool& executor, @@ -382,6 +401,7 @@ void asyncCompressChunks( size_t numThreads, ZSTD_parameters params) { auto chunksGuard = makeScopeGuard([&] { chunks.finish(); }); + std::uint64_t bytesRead = 0; // Break the input up into chunks of size `step` and compress each chunk // independently. @@ -401,9 +421,10 @@ void asyncCompressChunks( // Pass the output queue to the writer thread. chunks.push(std::move(out)); // Fill the input queue for the compression job we just started - status = readData(*in, ZSTD_CStreamInSize(), step, fd); + status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead); } errorHolder.check(status != FileStatus::Error, "Error reading input"); + return bytesRead; } /** @@ -484,12 +505,14 @@ static void decompress( } } -void asyncDecompressFrames( +std::uint64_t asyncDecompressFrames( ErrorHolder& errorHolder, WorkQueue>& frames, ThreadPool& executor, FILE* fd) { auto framesGuard = makeScopeGuard([&] { frames.finish(); }); + std::uint64_t totalBytesRead = 0; + // Split the source up into its component frames. // If we find our recognized skippable frame we know the next frames size // which means that we can decompress each standard frame in independently. @@ -509,6 +532,7 @@ void asyncDecompressFrames( // frameSize is 0 if the frame info can't be decoded. Buffer buffer(SkippableFrame::kSize); auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd); + totalBytesRead += bytesRead; status = fileStatus(fd); if (bytesRead == 0 && status != FileStatus::Continue) { break; @@ -533,14 +557,15 @@ void asyncDecompressFrames( // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted // Pass the rest of the source to this decompression task while (status == FileStatus::Continue && !errorHolder.hasError()) { - status = readData(*in, chunkSize, chunkSize, fd); + status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead); } break; } // Fill the input queue for the decompression job we just started - status = readData(*in, chunkSize, frameSize, fd); + status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead); } errorHolder.check(status != FileStatus::Error, "Error reading input"); + return totalBytesRead; } /// Write `data` to `fd`, returns true iff success. @@ -554,12 +579,12 @@ static bool writeData(ByteRange data, FILE* fd) { return true; } -size_t writeFile( +std::uint64_t writeFile( ErrorHolder& errorHolder, WorkQueue>& outs, FILE* outputFd, bool decompress) { - size_t bytesWritten = 0; + std::uint64_t bytesWritten = 0; std::shared_ptr out; // Grab the output queue for each decompression job (in order). while (outs.pop(out) && !errorHolder.hasError()) { diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h index 0c21d1352..c3b2926b6 100644 --- a/contrib/pzstd/Pzstd.h +++ b/contrib/pzstd/Pzstd.h @@ -45,8 +45,9 @@ int pzstdMain(const Options& options); * @param size The size of the input file if known, 0 otherwise * @param numThreads The number of threads in the thread pool * @param parameters The zstd parameters to use for compression + * @returns The number of bytes read from the file */ -void asyncCompressChunks( +std::uint64_t asyncCompressChunks( ErrorHolder& errorHolder, WorkQueue>& chunks, ThreadPool& executor, @@ -66,8 +67,9 @@ void asyncCompressChunks( * as soon as it is available * @param executor The thread pool to run compression jobs in * @param fd The input file descriptor + * @returns The number of bytes read from the file */ -void asyncDecompressFrames( +std::uint64_t asyncDecompressFrames( ErrorHolder& errorHolder, WorkQueue>& frames, ThreadPool& executor, @@ -84,7 +86,7 @@ void asyncDecompressFrames( * @param decompress Are we decompressing? * @returns The number of bytes written */ -std::size_t writeFile( +std::uint64_t writeFile( ErrorHolder& errorHolder, WorkQueue>& outs, FILE* outputFd, diff --git a/contrib/pzstd/test/PzstdTest.cpp b/contrib/pzstd/test/PzstdTest.cpp index 64bcf9cab..c85f73a39 100644 --- a/contrib/pzstd/test/PzstdTest.cpp +++ b/contrib/pzstd/test/PzstdTest.cpp @@ -54,6 +54,7 @@ TEST(Pzstd, SmallSizes) { options.inputFiles = {inputFile}; options.numThreads = numThreads; options.compressionLevel = level; + options.verbosity = 1; ASSERT_TRUE(roundTrip(options)); errorGuard.dismiss(); } @@ -91,6 +92,7 @@ TEST(Pzstd, LargeSizes) { options.inputFiles = {inputFile}; options.numThreads = std::min(numThreads, options.numThreads); options.compressionLevel = level; + options.verbosity = 1; ASSERT_TRUE(roundTrip(options)); errorGuard.dismiss(); }