diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index 68f5bb977..db9b8c85b 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -62,19 +62,18 @@ static std::uint64_t handleOneInput(const Options &options, ErrorHolder &errorHolder) { auto inputSize = fileSizeOrZero(inputFile); // WorkQueue outlives ThreadPool so in the case of error we are certain - // we don't accidently try to call push() on it after it is destroyed. + // we don't accidently try to call push() on it after it is destroyed WorkQueue> outs{options.numThreads + 1}; std::uint64_t bytesRead; std::uint64_t bytesWritten; { - // Initialize the thread pool with numThreads + 1 - // We add one because the read thread spends most of its time waiting. - // This also sets the minimum number of threads to 2, so the algorithm - // doesn't deadlock. - ThreadPool executor(options.numThreads + 1); + // Initialize the (de)compression thread pool with numThreads + ThreadPool executor(options.numThreads); + // Run the reader thread on an extra thread + ThreadPool readExecutor(1); if (!options.decompress) { // Add a job that reads the input and starts all the compression jobs - executor.add( + readExecutor.add( [&errorHolder, &outs, &executor, inputFd, inputSize, &options, &bytesRead] { bytesRead = asyncCompressChunks( @@ -91,7 +90,7 @@ static std::uint64_t handleOneInput(const Options &options, options.verbosity); } else { // Add a job that reads the input and starts all the decompression jobs - executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] { + readExecutor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] { bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd); }); // Start writing