1
0
mirror of https://github.com/facebook/zstd.git synced 2025-08-07 06:23:00 +03:00

[pzstd] Put ErrorHolder into SharedState

This commit is contained in:
Nick Terrell
2016-10-12 15:18:16 -07:00
parent 9b603ee284
commit 48294b57c3
2 changed files with 50 additions and 42 deletions

View File

@@ -59,7 +59,7 @@ static std::uint64_t handleOneInput(const Options &options,
FILE* inputFd, FILE* inputFd,
const std::string &outputFile, const std::string &outputFile,
FILE* outputFd, FILE* outputFd,
ErrorHolder &errorHolder) { SharedState& state) {
auto inputSize = fileSizeOrZero(inputFile); auto inputSize = fileSizeOrZero(inputFile);
// WorkQueue outlives ThreadPool so in the case of error we are certain // WorkQueue outlives ThreadPool so in the case of error we are certain
// we don't accidently try to call push() on it after it is destroyed // we don't accidently try to call push() on it after it is destroyed
@@ -74,10 +74,9 @@ static std::uint64_t handleOneInput(const Options &options,
if (!options.decompress) { if (!options.decompress) {
// Add a job that reads the input and starts all the compression jobs // Add a job that reads the input and starts all the compression jobs
readExecutor.add( readExecutor.add(
[&errorHolder, &outs, &executor, inputFd, inputSize, &options, [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
&bytesRead] {
bytesRead = asyncCompressChunks( bytesRead = asyncCompressChunks(
errorHolder, state,
outs, outs,
executor, executor,
inputFd, inputFd,
@@ -86,19 +85,19 @@ static std::uint64_t handleOneInput(const Options &options,
options.determineParameters()); options.determineParameters());
}); });
// Start writing // Start writing
bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress, bytesWritten = writeFile(state, outs, outputFd, options.decompress,
options.verbosity); options.verbosity);
} else { } else {
// Add a job that reads the input and starts all the decompression jobs // Add a job that reads the input and starts all the decompression jobs
readExecutor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] { readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd); bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
}); });
// Start writing // Start writing
bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress, bytesWritten = writeFile(state, outs, outputFd, options.decompress,
options.verbosity); options.verbosity);
} }
} }
if (options.verbosity > 1 && !errorHolder.hasError()) { if (options.verbosity > 1 && !state.errorHolder.hasError()) {
std::string inputFileName = inputFile == "-" ? "stdin" : inputFile; std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
std::string outputFileName = outputFile == "-" ? "stdout" : outputFile; std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
if (!options.decompress) { if (!options.decompress) {
@@ -176,53 +175,53 @@ int pzstdMain(const Options &options) {
int returnCode = 0; int returnCode = 0;
for (const auto& input : options.inputFiles) { for (const auto& input : options.inputFiles) {
// Setup the error holder // Setup the error holder
ErrorHolder errorHolder; SharedState state;
auto printErrorGuard = makeScopeGuard([&] { auto printErrorGuard = makeScopeGuard([&] {
if (errorHolder.hasError()) { if (state.errorHolder.hasError()) {
returnCode = 1; returnCode = 1;
if (options.verbosity > 0) { if (options.verbosity > 0) {
std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(), std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(),
errorHolder.getError().c_str()); state.errorHolder.getError().c_str());
} }
} }
}); });
// Open the input file // Open the input file
auto inputFd = openInputFile(input, errorHolder); auto inputFd = openInputFile(input, state.errorHolder);
if (inputFd == nullptr) { if (inputFd == nullptr) {
continue; continue;
} }
auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); }); auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
// Open the output file // Open the output file
auto outputFile = options.getOutputFile(input); auto outputFile = options.getOutputFile(input);
if (!errorHolder.check(outputFile != "", if (!state.errorHolder.check(outputFile != "",
"Input file does not have extension .zst")) { "Input file does not have extension .zst")) {
continue; continue;
} }
auto outputFd = openOutputFile(options, outputFile, errorHolder); auto outputFd = openOutputFile(options, outputFile, state.errorHolder);
if (outputFd == nullptr) { if (outputFd == nullptr) {
continue; continue;
} }
auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); }); auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
// (de)compress the file // (de)compress the file
handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder); handleOneInput(options, input, inputFd, outputFile, outputFd, state);
if (errorHolder.hasError()) { if (state.errorHolder.hasError()) {
continue; continue;
} }
// Delete the input file if necessary // Delete the input file if necessary
if (!options.keepSource) { if (!options.keepSource) {
// Be sure that we are done and have written everything before we delete // Be sure that we are done and have written everything before we delete
if (!errorHolder.check(std::fclose(inputFd) == 0, if (!state.errorHolder.check(std::fclose(inputFd) == 0,
"Failed to close input file")) { "Failed to close input file")) {
continue; continue;
} }
closeInputGuard.dismiss(); closeInputGuard.dismiss();
if (!errorHolder.check(std::fclose(outputFd) == 0, if (!state.errorHolder.check(std::fclose(outputFd) == 0,
"Failed to close output file")) { "Failed to close output file")) {
continue; continue;
} }
closeOutputGuard.dismiss(); closeOutputGuard.dismiss();
if (std::remove(input.c_str()) != 0) { if (std::remove(input.c_str()) != 0) {
errorHolder.setError("Failed to remove input file"); state.errorHolder.setError("Failed to remove input file");
continue; continue;
} }
} }
@@ -268,18 +267,19 @@ Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
/** /**
* Stream chunks of input from `in`, compress it, and stream it out to `out`. * Stream chunks of input from `in`, compress it, and stream it out to `out`.
* *
* @param errorHolder Used to report errors and check if an error occured * @param state The shared state
* @param in Queue that we `pop()` input buffers from * @param in Queue that we `pop()` input buffers from
* @param out Queue that we `push()` compressed output buffers to * @param out Queue that we `push()` compressed output buffers to
* @param maxInputSize An upper bound on the size of the input * @param maxInputSize An upper bound on the size of the input
* @param parameters The zstd parameters to use for compression * @param parameters The zstd parameters to use for compression
*/ */
static void compress( static void compress(
ErrorHolder& errorHolder, SharedState& state,
std::shared_ptr<BufferWorkQueue> in, std::shared_ptr<BufferWorkQueue> in,
std::shared_ptr<BufferWorkQueue> out, std::shared_ptr<BufferWorkQueue> out,
size_t maxInputSize, size_t maxInputSize,
ZSTD_parameters parameters) { ZSTD_parameters parameters) {
auto& errorHolder = state.errorHolder;
auto guard = makeScopeGuard([&] { out->finish(); }); auto guard = makeScopeGuard([&] { out->finish(); });
// Initialize the CCtx // Initialize the CCtx
std::unique_ptr<ZSTD_CStream, size_t (*)(ZSTD_CStream*)> ctx( std::unique_ptr<ZSTD_CStream, size_t (*)(ZSTD_CStream*)> ctx(
@@ -395,7 +395,7 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
} }
std::uint64_t asyncCompressChunks( std::uint64_t asyncCompressChunks(
ErrorHolder& errorHolder, SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
ThreadPool& executor, ThreadPool& executor,
FILE* fd, FILE* fd,
@@ -409,23 +409,23 @@ std::uint64_t asyncCompressChunks(
// independently. // independently.
size_t step = calculateStep(size, numThreads, params); size_t step = calculateStep(size, numThreads, params);
auto status = FileStatus::Continue; auto status = FileStatus::Continue;
while (status == FileStatus::Continue && !errorHolder.hasError()) { while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
// Make a new input queue that we will put the chunk's input data into. // Make a new input queue that we will put the chunk's input data into.
auto in = std::make_shared<BufferWorkQueue>(); auto in = std::make_shared<BufferWorkQueue>();
auto inGuard = makeScopeGuard([&] { in->finish(); }); auto inGuard = makeScopeGuard([&] { in->finish(); });
// Make a new output queue that compress will put the compressed data into. // Make a new output queue that compress will put the compressed data into.
auto out = std::make_shared<BufferWorkQueue>(); auto out = std::make_shared<BufferWorkQueue>();
// Start compression in the thread pool // Start compression in the thread pool
executor.add([&errorHolder, in, out, step, params] { executor.add([&state, in, out, step, params] {
return compress( return compress(
errorHolder, std::move(in), std::move(out), step, params); state, std::move(in), std::move(out), step, params);
}); });
// Pass the output queue to the writer thread. // Pass the output queue to the writer thread.
chunks.push(std::move(out)); chunks.push(std::move(out));
// Fill the input queue for the compression job we just started // Fill the input queue for the compression job we just started
status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead); status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
} }
errorHolder.check(status != FileStatus::Error, "Error reading input"); state.errorHolder.check(status != FileStatus::Error, "Error reading input");
return bytesRead; return bytesRead;
} }
@@ -433,15 +433,16 @@ std::uint64_t asyncCompressChunks(
* Decompress a frame, whose data is streamed into `in`, and stream the output * Decompress a frame, whose data is streamed into `in`, and stream the output
* to `out`. * to `out`.
* *
* @param errorHolder Used to report errors and check if an error occured * @param state The shared state
* @param in Queue that we `pop()` input buffers from. It contains * @param in Queue that we `pop()` input buffers from. It contains
* exactly one compressed frame. * exactly one compressed frame.
* @param out Queue that we `push()` decompressed output buffers to * @param out Queue that we `push()` decompressed output buffers to
*/ */
static void decompress( static void decompress(
ErrorHolder& errorHolder, SharedState& state,
std::shared_ptr<BufferWorkQueue> in, std::shared_ptr<BufferWorkQueue> in,
std::shared_ptr<BufferWorkQueue> out) { std::shared_ptr<BufferWorkQueue> out) {
auto& errorHolder = state.errorHolder;
auto guard = makeScopeGuard([&] { out->finish(); }); auto guard = makeScopeGuard([&] { out->finish(); });
// Initialize the DCtx // Initialize the DCtx
std::unique_ptr<ZSTD_DStream, size_t (*)(ZSTD_DStream*)> ctx( std::unique_ptr<ZSTD_DStream, size_t (*)(ZSTD_DStream*)> ctx(
@@ -508,7 +509,7 @@ static void decompress(
} }
std::uint64_t asyncDecompressFrames( std::uint64_t asyncDecompressFrames(
ErrorHolder& errorHolder, SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
ThreadPool& executor, ThreadPool& executor,
FILE* fd) { FILE* fd) {
@@ -521,7 +522,7 @@ std::uint64_t asyncDecompressFrames(
// Otherwise, we will decompress using only one decompression task. // Otherwise, we will decompress using only one decompression task.
const size_t chunkSize = ZSTD_DStreamInSize(); const size_t chunkSize = ZSTD_DStreamInSize();
auto status = FileStatus::Continue; auto status = FileStatus::Continue;
while (status == FileStatus::Continue && !errorHolder.hasError()) { while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
// Make a new input queue that we will put the frames's bytes into. // Make a new input queue that we will put the frames's bytes into.
auto in = std::make_shared<BufferWorkQueue>(); auto in = std::make_shared<BufferWorkQueue>();
auto inGuard = makeScopeGuard([&] { in->finish(); }); auto inGuard = makeScopeGuard([&] { in->finish(); });
@@ -550,15 +551,15 @@ std::uint64_t asyncDecompressFrames(
out->setMaxSize(64); out->setMaxSize(64);
} }
// Start decompression in the thread pool // Start decompression in the thread pool
executor.add([&errorHolder, in, out] { executor.add([&state, in, out] {
return decompress(errorHolder, std::move(in), std::move(out)); return decompress(state, std::move(in), std::move(out));
}); });
// Pass the output queue to the writer thread // Pass the output queue to the writer thread
frames.push(std::move(out)); frames.push(std::move(out));
if (frameSize == 0) { if (frameSize == 0) {
// We hit a non SkippableFrame ==> not compressed by pzstd or corrupted // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
// Pass the rest of the source to this decompression task // Pass the rest of the source to this decompression task
while (status == FileStatus::Continue && !errorHolder.hasError()) { while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead); status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
} }
break; break;
@@ -566,7 +567,7 @@ std::uint64_t asyncDecompressFrames(
// Fill the input queue for the decompression job we just started // Fill the input queue for the decompression job we just started
status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead); status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
} }
errorHolder.check(status != FileStatus::Error, "Error reading input"); state.errorHolder.check(status != FileStatus::Error, "Error reading input");
return totalBytesRead; return totalBytesRead;
} }
@@ -598,11 +599,12 @@ void updateWritten(int verbosity, std::uint64_t bytesWritten) {
} }
std::uint64_t writeFile( std::uint64_t writeFile(
ErrorHolder& errorHolder, SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd, FILE* outputFd,
bool decompress, bool decompress,
int verbosity) { int verbosity) {
auto& errorHolder = state.errorHolder;
auto lineClearGuard = makeScopeGuard([verbosity] { auto lineClearGuard = makeScopeGuard([verbosity] {
if (verbosity > 1) { if (verbosity > 1) {
std::fprintf(stderr, "\r%79s\r", ""); std::fprintf(stderr, "\r%79s\r", "");

View File

@@ -12,6 +12,7 @@
#include "Options.h" #include "Options.h"
#include "utils/Buffer.h" #include "utils/Buffer.h"
#include "utils/Range.h" #include "utils/Range.h"
#include "utils/ResourcePool.h"
#include "utils/ThreadPool.h" #include "utils/ThreadPool.h"
#include "utils/WorkQueue.h" #include "utils/WorkQueue.h"
#define ZSTD_STATIC_LINKING_ONLY #define ZSTD_STATIC_LINKING_ONLY
@@ -32,12 +33,17 @@ namespace pzstd {
*/ */
int pzstdMain(const Options& options); int pzstdMain(const Options& options);
class SharedState {
public:
ErrorHolder errorHolder;
};
/** /**
* Streams input from `fd`, breaks input up into chunks, and compresses each * Streams input from `fd`, breaks input up into chunks, and compresses each
* chunk independently. Output of each chunk gets streamed to a queue, and * chunk independently. Output of each chunk gets streamed to a queue, and
* the output queues get put into `chunks` in order. * the output queues get put into `chunks` in order.
* *
* @param errorHolder Used to report errors and coordinate early shutdown * @param state The shared state
* @param chunks Each compression jobs output queue gets `pushed()` here * @param chunks Each compression jobs output queue gets `pushed()` here
* as soon as it is available * as soon as it is available
* @param executor The thread pool to run compression jobs in * @param executor The thread pool to run compression jobs in
@@ -48,7 +54,7 @@ int pzstdMain(const Options& options);
* @returns The number of bytes read from the file * @returns The number of bytes read from the file
*/ */
std::uint64_t asyncCompressChunks( std::uint64_t asyncCompressChunks(
ErrorHolder& errorHolder, SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
ThreadPool& executor, ThreadPool& executor,
FILE* fd, FILE* fd,
@@ -62,7 +68,7 @@ std::uint64_t asyncCompressChunks(
* decompression job. Output of each frame gets streamed to a queue, and * decompression job. Output of each frame gets streamed to a queue, and
* the output queues get put into `frames` in order. * the output queues get put into `frames` in order.
* *
* @param errorHolder Used to report errors and coordinate early shutdown * @param state The shared state
* @param frames Each decompression jobs output queue gets `pushed()` here * @param frames Each decompression jobs output queue gets `pushed()` here
* as soon as it is available * as soon as it is available
* @param executor The thread pool to run compression jobs in * @param executor The thread pool to run compression jobs in
@@ -70,7 +76,7 @@ std::uint64_t asyncCompressChunks(
* @returns The number of bytes read from the file * @returns The number of bytes read from the file
*/ */
std::uint64_t asyncDecompressFrames( std::uint64_t asyncDecompressFrames(
ErrorHolder& errorHolder, SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
ThreadPool& executor, ThreadPool& executor,
FILE* fd); FILE* fd);
@@ -79,7 +85,7 @@ std::uint64_t asyncDecompressFrames(
* Streams input in from each queue in `outs` in order, and writes the data to * Streams input in from each queue in `outs` in order, and writes the data to
* `outputFd`. * `outputFd`.
* *
* @param errorHolder Used to report errors and coordinate early exit * @param state The shared state
* @param outs A queue of output queues, one for each * @param outs A queue of output queues, one for each
* (de)compression job. * (de)compression job.
* @param outputFd The file descriptor to write to * @param outputFd The file descriptor to write to
@@ -88,7 +94,7 @@ std::uint64_t asyncDecompressFrames(
* @returns The number of bytes written * @returns The number of bytes written
*/ */
std::uint64_t writeFile( std::uint64_t writeFile(
ErrorHolder& errorHolder, SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd, FILE* outputFd,
bool decompress, bool decompress,