diff --git a/contrib/pzstd/utils/ThreadPool.h b/contrib/pzstd/utils/ThreadPool.h index a1d1fc0b9..99b3ecfa5 100644 --- a/contrib/pzstd/utils/ThreadPool.h +++ b/contrib/pzstd/utils/ThreadPool.h @@ -27,7 +27,7 @@ class ThreadPool { explicit ThreadPool(std::size_t numThreads) { threads_.reserve(numThreads); for (std::size_t i = 0; i < numThreads; ++i) { - threads_.emplace_back([&] { + threads_.emplace_back([this] { std::function task; while (tasks_.pop(task)) { task(); diff --git a/contrib/pzstd/utils/WorkQueue.h b/contrib/pzstd/utils/WorkQueue.h index c46e6cbcf..780e5360f 100644 --- a/contrib/pzstd/utils/WorkQueue.h +++ b/contrib/pzstd/utils/WorkQueue.h @@ -54,12 +54,13 @@ class WorkQueue { /** * Push an item onto the work queue. Notify a single thread that work is * available. If `finish()` has been called, do nothing and return false. + * If `push()` returns false, then `item` has not been moved from. * * @param item Item to push onto the queue. * @returns True upon success, false if `finish()` has been called. An * item was pushed iff `push()` returns true. */ - bool push(T item) { + bool push(T&& item) { { std::unique_lock lock(mutex_); while (full() && !done_) { diff --git a/contrib/pzstd/utils/test/WorkQueueTest.cpp b/contrib/pzstd/utils/test/WorkQueueTest.cpp index ebf375a84..7f58ccb3f 100644 --- a/contrib/pzstd/utils/test/WorkQueueTest.cpp +++ b/contrib/pzstd/utils/test/WorkQueueTest.cpp @@ -10,6 +10,7 @@ #include "utils/WorkQueue.h" #include +#include #include #include #include @@ -64,7 +65,7 @@ TEST(WorkQueue, SPSC) { const int max = 100; for (int i = 0; i < 10; ++i) { - queue.push(i); + queue.push(int{i}); } std::thread thread([ &queue, max ] { @@ -80,7 +81,7 @@ TEST(WorkQueue, SPSC) { std::this_thread::yield(); for (int i = 10; i < max; ++i) { - queue.push(i); + queue.push(int{i}); } queue.finish(); @@ -97,7 +98,7 @@ TEST(WorkQueue, SPMC) { } for (int i = 0; i < 50; ++i) { - queue.push(i); + queue.push(int{i}); } queue.finish(); @@ -126,7 +127,7 @@ TEST(WorkQueue, MPMC) { pusherThreads.emplace_back( [ &queue, min, max ] { for (int i = min; i < max; ++i) { - queue.push(i); + queue.push(int{i}); } }); } @@ -212,7 +213,7 @@ TEST(WorkQueue, BoundedSizeMPMC) { pusherThreads.emplace_back( [ &queue, min, max ] { for (int i = min; i < max; ++i) { - queue.push(i); + queue.push(int{i}); } }); } @@ -231,6 +232,18 @@ TEST(WorkQueue, BoundedSizeMPMC) { } } +TEST(WorkQueue, FailedPush) { + WorkQueue> queue; + std::unique_ptr x(new int{5}); + EXPECT_TRUE(queue.push(std::move(x))); + EXPECT_EQ(nullptr, x); + queue.finish(); + x.reset(new int{6}); + EXPECT_FALSE(queue.push(std::move(x))); + EXPECT_NE(nullptr, x); + EXPECT_EQ(6, *x); +} + TEST(BufferWorkQueue, SizeCalculatedCorrectly) { { BufferWorkQueue queue;