mirror of
				https://github.com/facebook/zstd.git
				synced 2025-10-30 10:25:39 +03:00 
			
		
		
		
	Fedora now enables this option by default, resulting
in the following build failure:
Logging.h: In instantiation of
'void pzstd::Logger::operator()(int, const char*, Args ...)
Pzstd.cpp:413:48:   required from here
Logging.h:46:17: error: format not a string literal and no format arguments
[-Werror=format-security]
     std::fprintf(out_, fmt, args...);
     ~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~
		
	
		
			
				
	
	
		
			616 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			616 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2016-present, Facebook, Inc.
 | |
|  * All rights reserved.
 | |
|  *
 | |
|  * This source code is licensed under the BSD-style license found in the
 | |
|  * LICENSE file in the root directory of this source tree. An additional grant
 | |
|  * of patent rights can be found in the PATENTS file in the same directory.
 | |
|  */
 | |
| #include "Pzstd.h"
 | |
| #include "SkippableFrame.h"
 | |
| #include "utils/FileSystem.h"
 | |
| #include "utils/Range.h"
 | |
| #include "utils/ScopeGuard.h"
 | |
| #include "utils/ThreadPool.h"
 | |
| #include "utils/WorkQueue.h"
 | |
| 
 | |
| #include <chrono>
 | |
| #include <cinttypes>
 | |
| #include <cstddef>
 | |
| #include <cstdio>
 | |
| #include <memory>
 | |
| #include <string>
 | |
| 
 | |
| #if defined(MSDOS) || defined(OS2) || defined(WIN32) || defined(_WIN32) || defined(__CYGWIN__)
 | |
| #  include <fcntl.h>    /* _O_BINARY */
 | |
| #  include <io.h>       /* _setmode, _isatty */
 | |
| #  define SET_BINARY_MODE(file) { if (_setmode(_fileno(file), _O_BINARY) == -1) perror("Cannot set _O_BINARY"); }
 | |
| #else
 | |
| #  include <unistd.h>   /* isatty */
 | |
| #  define SET_BINARY_MODE(file)
 | |
| #endif
 | |
| 
 | |
| namespace pzstd {
 | |
| 
 | |
| namespace {
 | |
| #ifdef _WIN32
 | |
| const std::string nullOutput = "nul";
 | |
| #else
 | |
| const std::string nullOutput = "/dev/null";
 | |
| #endif
 | |
| }
 | |
| 
 | |
| using std::size_t;
 | |
| 
 | |
| static std::uintmax_t fileSizeOrZero(const std::string &file) {
 | |
|   if (file == "-") {
 | |
|     return 0;
 | |
|   }
 | |
|   std::error_code ec;
 | |
|   auto size = file_size(file, ec);
 | |
|   if (ec) {
 | |
|     size = 0;
 | |
|   }
 | |
|   return size;
 | |
| }
 | |
| 
 | |
| static std::uint64_t handleOneInput(const Options &options,
 | |
|                              const std::string &inputFile,
 | |
|                              FILE* inputFd,
 | |
|                              const std::string &outputFile,
 | |
|                              FILE* outputFd,
 | |
|                              SharedState& state) {
 | |
|   auto inputSize = fileSizeOrZero(inputFile);
 | |
|   // WorkQueue outlives ThreadPool so in the case of error we are certain
 | |
|   // we don't accidently try to call push() on it after it is destroyed
 | |
|   WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
 | |
|   std::uint64_t bytesRead;
 | |
|   std::uint64_t bytesWritten;
 | |
|   {
 | |
|     // Initialize the (de)compression thread pool with numThreads
 | |
|     ThreadPool executor(options.numThreads);
 | |
|     // Run the reader thread on an extra thread
 | |
|     ThreadPool readExecutor(1);
 | |
|     if (!options.decompress) {
 | |
|       // Add a job that reads the input and starts all the compression jobs
 | |
|       readExecutor.add(
 | |
|           [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
 | |
|             bytesRead = asyncCompressChunks(
 | |
|                 state,
 | |
|                 outs,
 | |
|                 executor,
 | |
|                 inputFd,
 | |
|                 inputSize,
 | |
|                 options.numThreads,
 | |
|                 options.determineParameters());
 | |
|           });
 | |
|       // Start writing
 | |
|       bytesWritten = writeFile(state, outs, outputFd, options.decompress);
 | |
|     } else {
 | |
|       // Add a job that reads the input and starts all the decompression jobs
 | |
|       readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
 | |
|         bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
 | |
|       });
 | |
|       // Start writing
 | |
|       bytesWritten = writeFile(state, outs, outputFd, options.decompress);
 | |
|     }
 | |
|   }
 | |
|   if (!state.errorHolder.hasError()) {
 | |
|     std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
 | |
|     std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
 | |
|     if (!options.decompress) {
 | |
|       double ratio = static_cast<double>(bytesWritten) /
 | |
|                      static_cast<double>(bytesRead + !bytesRead);
 | |
|       state.log(INFO, "%-20s :%6.2f%%   (%6" PRIu64 " => %6" PRIu64
 | |
|                    " bytes, %s)\n",
 | |
|                    inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
 | |
|                    outputFileName.c_str());
 | |
|     } else {
 | |
|       state.log(INFO, "%-20s: %" PRIu64 " bytes \n",
 | |
|                    inputFileName.c_str(),bytesWritten);
 | |
|     }
 | |
|   }
 | |
|   return bytesWritten;
 | |
| }
 | |
| 
 | |
| static FILE *openInputFile(const std::string &inputFile,
 | |
|                            ErrorHolder &errorHolder) {
 | |
|   if (inputFile == "-") {
 | |
|     SET_BINARY_MODE(stdin);
 | |
|     return stdin;
 | |
|   }
 | |
|   // Check if input file is a directory
 | |
|   {
 | |
|     std::error_code ec;
 | |
|     if (is_directory(inputFile, ec)) {
 | |
|       errorHolder.setError("Output file is a directory -- ignored");
 | |
|       return nullptr;
 | |
|     }
 | |
|   }
 | |
|   auto inputFd = std::fopen(inputFile.c_str(), "rb");
 | |
|   if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) {
 | |
|     return nullptr;
 | |
|   }
 | |
|   return inputFd;
 | |
| }
 | |
| 
 | |
| static FILE *openOutputFile(const Options &options,
 | |
|                             const std::string &outputFile,
 | |
|                             SharedState& state) {
 | |
|   if (outputFile == "-") {
 | |
|     SET_BINARY_MODE(stdout);
 | |
|     return stdout;
 | |
|   }
 | |
|   // Check if the output file exists and then open it
 | |
|   if (!options.overwrite && outputFile != nullOutput) {
 | |
|     auto outputFd = std::fopen(outputFile.c_str(), "rb");
 | |
|     if (outputFd != nullptr) {
 | |
|       std::fclose(outputFd);
 | |
|       if (!state.log.logsAt(INFO)) {
 | |
|         state.errorHolder.setError("Output file exists");
 | |
|         return nullptr;
 | |
|       }
 | |
|       state.log(
 | |
|           INFO,
 | |
|           "pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
 | |
|           outputFile.c_str());
 | |
|       int c = getchar();
 | |
|       if (c != 'y' && c != 'Y') {
 | |
|         state.errorHolder.setError("Not overwritten");
 | |
|         return nullptr;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   auto outputFd = std::fopen(outputFile.c_str(), "wb");
 | |
|   if (!state.errorHolder.check(
 | |
|           outputFd != nullptr, "Failed to open output file")) {
 | |
|     return nullptr;
 | |
|   }
 | |
|   return outputFd;
 | |
| }
 | |
| 
 | |
| int pzstdMain(const Options &options) {
 | |
|   int returnCode = 0;
 | |
|   SharedState state(options);
 | |
|   for (const auto& input : options.inputFiles) {
 | |
|     // Setup the shared state
 | |
|     auto printErrorGuard = makeScopeGuard([&] {
 | |
|       if (state.errorHolder.hasError()) {
 | |
|         returnCode = 1;
 | |
|         state.log(ERROR, "pzstd: %s: %s.\n", input.c_str(),
 | |
|                   state.errorHolder.getError().c_str());
 | |
|       }
 | |
|     });
 | |
|     // Open the input file
 | |
|     auto inputFd = openInputFile(input, state.errorHolder);
 | |
|     if (inputFd == nullptr) {
 | |
|       continue;
 | |
|     }
 | |
|     auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
 | |
|     // Open the output file
 | |
|     auto outputFile = options.getOutputFile(input);
 | |
|     if (!state.errorHolder.check(outputFile != "",
 | |
|                            "Input file does not have extension .zst")) {
 | |
|       continue;
 | |
|     }
 | |
|     auto outputFd = openOutputFile(options, outputFile, state);
 | |
|     if (outputFd == nullptr) {
 | |
|       continue;
 | |
|     }
 | |
|     auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
 | |
|     // (de)compress the file
 | |
|     handleOneInput(options, input, inputFd, outputFile, outputFd, state);
 | |
|     if (state.errorHolder.hasError()) {
 | |
|       continue;
 | |
|     }
 | |
|     // Delete the input file if necessary
 | |
|     if (!options.keepSource) {
 | |
|       // Be sure that we are done and have written everything before we delete
 | |
|       if (!state.errorHolder.check(std::fclose(inputFd) == 0,
 | |
|                              "Failed to close input file")) {
 | |
|         continue;
 | |
|       }
 | |
|       closeInputGuard.dismiss();
 | |
|       if (!state.errorHolder.check(std::fclose(outputFd) == 0,
 | |
|                              "Failed to close output file")) {
 | |
|         continue;
 | |
|       }
 | |
|       closeOutputGuard.dismiss();
 | |
|       if (std::remove(input.c_str()) != 0) {
 | |
|         state.errorHolder.setError("Failed to remove input file");
 | |
|         continue;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   // Returns 1 if any of the files failed to (de)compress.
 | |
|   return returnCode;
 | |
| }
 | |
| 
 | |
| /// Construct a `ZSTD_inBuffer` that points to the data in `buffer`.
 | |
| static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) {
 | |
|   return ZSTD_inBuffer{buffer.data(), buffer.size(), 0};
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by
 | |
|  * `inBuffer.pos`.
 | |
|  */
 | |
| void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) {
 | |
|   auto pos = inBuffer.pos;
 | |
|   inBuffer.src = static_cast<const unsigned char*>(inBuffer.src) + pos;
 | |
|   inBuffer.size -= pos;
 | |
|   inBuffer.pos = 0;
 | |
|   return buffer.advance(pos);
 | |
| }
 | |
| 
 | |
| /// Construct a `ZSTD_outBuffer` that points to the data in `buffer`.
 | |
| static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) {
 | |
|   return ZSTD_outBuffer{buffer.data(), buffer.size(), 0};
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Split `buffer` and advance `outBuffer` by the amount of data written, as
 | |
|  * indicated by `outBuffer.pos`.
 | |
|  */
 | |
| Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
 | |
|   auto pos = outBuffer.pos;
 | |
|   outBuffer.dst = static_cast<unsigned char*>(outBuffer.dst) + pos;
 | |
|   outBuffer.size -= pos;
 | |
|   outBuffer.pos = 0;
 | |
|   return buffer.splitAt(pos);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Stream chunks of input from `in`, compress it, and stream it out to `out`.
 | |
|  *
 | |
|  * @param state        The shared state
 | |
|  * @param in           Queue that we `pop()` input buffers from
 | |
|  * @param out          Queue that we `push()` compressed output buffers to
 | |
|  * @param maxInputSize An upper bound on the size of the input
 | |
|  */
 | |
| static void compress(
 | |
|     SharedState& state,
 | |
|     std::shared_ptr<BufferWorkQueue> in,
 | |
|     std::shared_ptr<BufferWorkQueue> out,
 | |
|     size_t maxInputSize) {
 | |
|   auto& errorHolder = state.errorHolder;
 | |
|   auto guard = makeScopeGuard([&] { out->finish(); });
 | |
|   // Initialize the CCtx
 | |
|   auto ctx = state.cStreamPool->get();
 | |
|   if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
 | |
|     return;
 | |
|   }
 | |
|   {
 | |
|     auto err = ZSTD_resetCStream(ctx.get(), 0);
 | |
|     if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
 | |
|       return;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Allocate space for the result
 | |
|   auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize));
 | |
|   auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
 | |
|   {
 | |
|     Buffer inBuffer;
 | |
|     // Read a buffer in from the input queue
 | |
|     while (in->pop(inBuffer) && !errorHolder.hasError()) {
 | |
|       auto zstdInBuffer = makeZstdInBuffer(inBuffer);
 | |
|       // Compress the whole buffer and send it to the output queue
 | |
|       while (!inBuffer.empty() && !errorHolder.hasError()) {
 | |
|         if (!errorHolder.check(
 | |
|                 !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
 | |
|           return;
 | |
|         }
 | |
|         // Compress
 | |
|         auto err =
 | |
|             ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
 | |
|         if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
 | |
|           return;
 | |
|         }
 | |
|         // Split the compressed data off outBuffer and pass to the output queue
 | |
|         out->push(split(outBuffer, zstdOutBuffer));
 | |
|         // Forget about the data we already compressed
 | |
|         advance(inBuffer, zstdInBuffer);
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   // Write the epilog
 | |
|   size_t bytesLeft;
 | |
|   do {
 | |
|     if (!errorHolder.check(
 | |
|             !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
 | |
|       return;
 | |
|     }
 | |
|     bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer);
 | |
|     if (!errorHolder.check(
 | |
|             !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) {
 | |
|       return;
 | |
|     }
 | |
|     out->push(split(outBuffer, zstdOutBuffer));
 | |
|   } while (bytesLeft != 0 && !errorHolder.hasError());
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Calculates how large each independently compressed frame should be.
 | |
|  *
 | |
|  * @param size       The size of the source if known, 0 otherwise
 | |
|  * @param numThreads The number of threads available to run compression jobs on
 | |
|  * @param params     The zstd parameters to be used for compression
 | |
|  */
 | |
| static size_t calculateStep(
 | |
|     std::uintmax_t size,
 | |
|     size_t numThreads,
 | |
|     const ZSTD_parameters ¶ms) {
 | |
|   (void)size;
 | |
|   (void)numThreads;
 | |
|   return size_t{1} << (params.cParams.windowLog + 2);
 | |
| }
 | |
| 
 | |
| namespace {
 | |
| enum class FileStatus { Continue, Done, Error };
 | |
| /// Determines the status of the file descriptor `fd`.
 | |
| FileStatus fileStatus(FILE* fd) {
 | |
|   if (std::feof(fd)) {
 | |
|     return FileStatus::Done;
 | |
|   } else if (std::ferror(fd)) {
 | |
|     return FileStatus::Error;
 | |
|   }
 | |
|   return FileStatus::Continue;
 | |
| }
 | |
| } // anonymous namespace
 | |
| 
 | |
| /**
 | |
|  * Reads `size` data in chunks of `chunkSize` and puts it into `queue`.
 | |
|  * Will read less if an error or EOF occurs.
 | |
|  * Returns the status of the file after all of the reads have occurred.
 | |
|  */
 | |
| static FileStatus
 | |
| readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
 | |
|          std::uint64_t *totalBytesRead) {
 | |
|   Buffer buffer(size);
 | |
|   while (!buffer.empty()) {
 | |
|     auto bytesRead =
 | |
|         std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
 | |
|     *totalBytesRead += bytesRead;
 | |
|     queue.push(buffer.splitAt(bytesRead));
 | |
|     auto status = fileStatus(fd);
 | |
|     if (status != FileStatus::Continue) {
 | |
|       return status;
 | |
|     }
 | |
|   }
 | |
|   return FileStatus::Continue;
 | |
| }
 | |
| 
 | |
| std::uint64_t asyncCompressChunks(
 | |
|     SharedState& state,
 | |
|     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
 | |
|     ThreadPool& executor,
 | |
|     FILE* fd,
 | |
|     std::uintmax_t size,
 | |
|     size_t numThreads,
 | |
|     ZSTD_parameters params) {
 | |
|   auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
 | |
|   std::uint64_t bytesRead = 0;
 | |
| 
 | |
|   // Break the input up into chunks of size `step` and compress each chunk
 | |
|   // independently.
 | |
|   size_t step = calculateStep(size, numThreads, params);
 | |
|   state.log(DEBUG, "Chosen frame size: %zu\n", step);
 | |
|   auto status = FileStatus::Continue;
 | |
|   while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
 | |
|     // Make a new input queue that we will put the chunk's input data into.
 | |
|     auto in = std::make_shared<BufferWorkQueue>();
 | |
|     auto inGuard = makeScopeGuard([&] { in->finish(); });
 | |
|     // Make a new output queue that compress will put the compressed data into.
 | |
|     auto out = std::make_shared<BufferWorkQueue>();
 | |
|     // Start compression in the thread pool
 | |
|     executor.add([&state, in, out, step] {
 | |
|       return compress(
 | |
|           state, std::move(in), std::move(out), step);
 | |
|     });
 | |
|     // Pass the output queue to the writer thread.
 | |
|     chunks.push(std::move(out));
 | |
|     state.log(VERBOSE, "%s\n", "Starting a new frame");
 | |
|     // Fill the input queue for the compression job we just started
 | |
|     status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
 | |
|   }
 | |
|   state.errorHolder.check(status != FileStatus::Error, "Error reading input");
 | |
|   return bytesRead;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Decompress a frame, whose data is streamed into `in`, and stream the output
 | |
|  * to `out`.
 | |
|  *
 | |
|  * @param state        The shared state
 | |
|  * @param in           Queue that we `pop()` input buffers from. It contains
 | |
|  *                      exactly one compressed frame.
 | |
|  * @param out          Queue that we `push()` decompressed output buffers to
 | |
|  */
 | |
| static void decompress(
 | |
|     SharedState& state,
 | |
|     std::shared_ptr<BufferWorkQueue> in,
 | |
|     std::shared_ptr<BufferWorkQueue> out) {
 | |
|   auto& errorHolder = state.errorHolder;
 | |
|   auto guard = makeScopeGuard([&] { out->finish(); });
 | |
|   // Initialize the DCtx
 | |
|   auto ctx = state.dStreamPool->get();
 | |
|   if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
 | |
|     return;
 | |
|   }
 | |
|   {
 | |
|     auto err = ZSTD_resetDStream(ctx.get());
 | |
|     if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
 | |
|       return;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   const size_t outSize = ZSTD_DStreamOutSize();
 | |
|   Buffer inBuffer;
 | |
|   size_t returnCode = 0;
 | |
|   // Read a buffer in from the input queue
 | |
|   while (in->pop(inBuffer) && !errorHolder.hasError()) {
 | |
|     auto zstdInBuffer = makeZstdInBuffer(inBuffer);
 | |
|     // Decompress the whole buffer and send it to the output queue
 | |
|     while (!inBuffer.empty() && !errorHolder.hasError()) {
 | |
|       // Allocate a buffer with at least outSize bytes.
 | |
|       Buffer outBuffer(outSize);
 | |
|       auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
 | |
|       // Decompress
 | |
|       returnCode =
 | |
|           ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
 | |
|       if (!errorHolder.check(
 | |
|               !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
 | |
|         return;
 | |
|       }
 | |
|       // Pass the buffer with the decompressed data to the output queue
 | |
|       out->push(split(outBuffer, zstdOutBuffer));
 | |
|       // Advance past the input we already read
 | |
|       advance(inBuffer, zstdInBuffer);
 | |
|       if (returnCode == 0) {
 | |
|         // The frame is over, prepare to (maybe) start a new frame
 | |
|         ZSTD_initDStream(ctx.get());
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   if (!errorHolder.check(returnCode <= 1, "Incomplete block")) {
 | |
|     return;
 | |
|   }
 | |
|   // We've given ZSTD_decompressStream all of our data, but there may still
 | |
|   // be data to read.
 | |
|   while (returnCode == 1) {
 | |
|     // Allocate a buffer with at least outSize bytes.
 | |
|     Buffer outBuffer(outSize);
 | |
|     auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
 | |
|     // Pass in no input.
 | |
|     ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0};
 | |
|     // Decompress
 | |
|     returnCode =
 | |
|         ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
 | |
|     if (!errorHolder.check(
 | |
|             !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
 | |
|       return;
 | |
|     }
 | |
|     // Pass the buffer with the decompressed data to the output queue
 | |
|     out->push(split(outBuffer, zstdOutBuffer));
 | |
|   }
 | |
| }
 | |
| 
 | |
| std::uint64_t asyncDecompressFrames(
 | |
|     SharedState& state,
 | |
|     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
 | |
|     ThreadPool& executor,
 | |
|     FILE* fd) {
 | |
|   auto framesGuard = makeScopeGuard([&] { frames.finish(); });
 | |
|   std::uint64_t totalBytesRead = 0;
 | |
| 
 | |
|   // Split the source up into its component frames.
 | |
|   // If we find our recognized skippable frame we know the next frames size
 | |
|   // which means that we can decompress each standard frame in independently.
 | |
|   // Otherwise, we will decompress using only one decompression task.
 | |
|   const size_t chunkSize = ZSTD_DStreamInSize();
 | |
|   auto status = FileStatus::Continue;
 | |
|   while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
 | |
|     // Make a new input queue that we will put the frames's bytes into.
 | |
|     auto in = std::make_shared<BufferWorkQueue>();
 | |
|     auto inGuard = makeScopeGuard([&] { in->finish(); });
 | |
|     // Make a output queue that decompress will put the decompressed data into
 | |
|     auto out = std::make_shared<BufferWorkQueue>();
 | |
| 
 | |
|     size_t frameSize;
 | |
|     {
 | |
|       // Calculate the size of the next frame.
 | |
|       // frameSize is 0 if the frame info can't be decoded.
 | |
|       Buffer buffer(SkippableFrame::kSize);
 | |
|       auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
 | |
|       totalBytesRead += bytesRead;
 | |
|       status = fileStatus(fd);
 | |
|       if (bytesRead == 0 && status != FileStatus::Continue) {
 | |
|         break;
 | |
|       }
 | |
|       buffer.subtract(buffer.size() - bytesRead);
 | |
|       frameSize = SkippableFrame::tryRead(buffer.range());
 | |
|       in->push(std::move(buffer));
 | |
|     }
 | |
|     if (frameSize == 0) {
 | |
|       // We hit a non SkippableFrame, so this will be the last job.
 | |
|       // Make sure that we don't use too much memory
 | |
|       in->setMaxSize(64);
 | |
|       out->setMaxSize(64);
 | |
|     }
 | |
|     // Start decompression in the thread pool
 | |
|     executor.add([&state, in, out] {
 | |
|       return decompress(state, std::move(in), std::move(out));
 | |
|     });
 | |
|     // Pass the output queue to the writer thread
 | |
|     frames.push(std::move(out));
 | |
|     if (frameSize == 0) {
 | |
|       // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
 | |
|       // Pass the rest of the source to this decompression task
 | |
|       state.log(VERBOSE, "%s\n",
 | |
|           "Input not in pzstd format, falling back to serial decompression");
 | |
|       while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
 | |
|         status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
 | |
|       }
 | |
|       break;
 | |
|     }
 | |
|     state.log(VERBOSE, "Decompressing a frame of size %zu", frameSize);
 | |
|     // Fill the input queue for the decompression job we just started
 | |
|     status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
 | |
|   }
 | |
|   state.errorHolder.check(status != FileStatus::Error, "Error reading input");
 | |
|   return totalBytesRead;
 | |
| }
 | |
| 
 | |
| /// Write `data` to `fd`, returns true iff success.
 | |
| static bool writeData(ByteRange data, FILE* fd) {
 | |
|   while (!data.empty()) {
 | |
|     data.advance(std::fwrite(data.begin(), 1, data.size(), fd));
 | |
|     if (std::ferror(fd)) {
 | |
|       return false;
 | |
|     }
 | |
|   }
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| std::uint64_t writeFile(
 | |
|     SharedState& state,
 | |
|     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
 | |
|     FILE* outputFd,
 | |
|     bool decompress) {
 | |
|   auto& errorHolder = state.errorHolder;
 | |
|   auto lineClearGuard = makeScopeGuard([&state] {
 | |
|     state.log.clear(INFO);
 | |
|   });
 | |
|   std::uint64_t bytesWritten = 0;
 | |
|   std::shared_ptr<BufferWorkQueue> out;
 | |
|   // Grab the output queue for each decompression job (in order).
 | |
|   while (outs.pop(out) && !errorHolder.hasError()) {
 | |
|     if (!decompress) {
 | |
|       // If we are compressing and want to write skippable frames we can't
 | |
|       // start writing before compression is done because we need to know the
 | |
|       // compressed size.
 | |
|       // Wait for the compressed size to be available and write skippable frame
 | |
|       SkippableFrame frame(out->size());
 | |
|       if (!writeData(frame.data(), outputFd)) {
 | |
|         errorHolder.setError("Failed to write output");
 | |
|         return bytesWritten;
 | |
|       }
 | |
|       bytesWritten += frame.kSize;
 | |
|     }
 | |
|     // For each chunk of the frame: Pop it from the queue and write it
 | |
|     Buffer buffer;
 | |
|     while (out->pop(buffer) && !errorHolder.hasError()) {
 | |
|       if (!writeData(buffer.range(), outputFd)) {
 | |
|         errorHolder.setError("Failed to write output");
 | |
|         return bytesWritten;
 | |
|       }
 | |
|       bytesWritten += buffer.size();
 | |
|       state.log.update(INFO, "Written: %u MB   ",
 | |
|                 static_cast<std::uint32_t>(bytesWritten >> 20));
 | |
|     }
 | |
|   }
 | |
|   return bytesWritten;
 | |
| }
 | |
| }
 |