diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index 67b941991..048a006b3 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -269,7 +269,10 @@ static void compress( std::shared_ptr out, size_t maxInputSize) { auto& errorHolder = state.errorHolder; - auto guard = makeScopeGuard([&] { out->finish(); }); + auto guard = makeScopeGuard([&] { + in->finish(); + out->finish(); + }); // Initialize the CCtx auto ctx = state.cStreamPool->get(); if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) { @@ -431,7 +434,10 @@ static void decompress( std::shared_ptr in, std::shared_ptr out) { auto& errorHolder = state.errorHolder; - auto guard = makeScopeGuard([&] { out->finish(); }); + auto guard = makeScopeGuard([&] { + in->finish(); + out->finish(); + }); // Initialize the DCtx auto ctx = state.dStreamPool->get(); if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) { @@ -578,6 +584,7 @@ std::uint64_t writeFile( FILE* outputFd, bool decompress) { auto& errorHolder = state.errorHolder; + auto outsFinishGuard = makeScopeGuard([&outs] { outs.finish(); }); auto lineClearGuard = makeScopeGuard([&state] { state.log.clear(kLogInfo); }); @@ -585,6 +592,7 @@ std::uint64_t writeFile( std::shared_ptr out; // Grab the output queue for each decompression job (in order). while (outs.pop(out)) { + auto outFinishGuard = makeScopeGuard([&out] { out->finish(); }); if (errorHolder.hasError()) { continue; } diff --git a/contrib/pzstd/utils/WorkQueue.h b/contrib/pzstd/utils/WorkQueue.h index d7947b814..07842e598 100644 --- a/contrib/pzstd/utils/WorkQueue.h +++ b/contrib/pzstd/utils/WorkQueue.h @@ -115,13 +115,14 @@ class WorkQueue { } /** - * Promise that `push()` won't be called again, so once the queue is empty - * there will never any more work. + * Promise that either the reader side or the writer side is done. + * If the writer is done, `push()` won't be called again, so once the queue + * is empty there will never be any more work. If the reader is done, `pop()` + * won't be called again, so further items pushed will just be ignored. */ void finish() { { std::lock_guard lock(mutex_); - assert(!done_); done_ = true; } readerCv_.notify_all();