1
0
mirror of https://github.com/facebook/zstd.git synced 2025-08-08 17:22:10 +03:00

[pzstd] Run the reading thread separately

This commit is contained in:
Nick Terrell
2016-10-07 15:04:34 -07:00
parent 4cb5e90a5c
commit 9b603ee284

View File

@@ -62,19 +62,18 @@ static std::uint64_t handleOneInput(const Options &options,
ErrorHolder &errorHolder) { ErrorHolder &errorHolder) {
auto inputSize = fileSizeOrZero(inputFile); auto inputSize = fileSizeOrZero(inputFile);
// WorkQueue outlives ThreadPool so in the case of error we are certain // WorkQueue outlives ThreadPool so in the case of error we are certain
// we don't accidently try to call push() on it after it is destroyed. // we don't accidently try to call push() on it after it is destroyed
WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1}; WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
std::uint64_t bytesRead; std::uint64_t bytesRead;
std::uint64_t bytesWritten; std::uint64_t bytesWritten;
{ {
// Initialize the thread pool with numThreads + 1 // Initialize the (de)compression thread pool with numThreads
// We add one because the read thread spends most of its time waiting. ThreadPool executor(options.numThreads);
// This also sets the minimum number of threads to 2, so the algorithm // Run the reader thread on an extra thread
// doesn't deadlock. ThreadPool readExecutor(1);
ThreadPool executor(options.numThreads + 1);
if (!options.decompress) { if (!options.decompress) {
// Add a job that reads the input and starts all the compression jobs // Add a job that reads the input and starts all the compression jobs
executor.add( readExecutor.add(
[&errorHolder, &outs, &executor, inputFd, inputSize, &options, [&errorHolder, &outs, &executor, inputFd, inputSize, &options,
&bytesRead] { &bytesRead] {
bytesRead = asyncCompressChunks( bytesRead = asyncCompressChunks(
@@ -91,7 +90,7 @@ static std::uint64_t handleOneInput(const Options &options,
options.verbosity); options.verbosity);
} else { } else {
// Add a job that reads the input and starts all the decompression jobs // Add a job that reads the input and starts all the decompression jobs
executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] { readExecutor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] {
bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd); bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd);
}); });
// Start writing // Start writing