From 64c1c065cc48dfc7bdcbe06f3fd374a37903e934 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Fri, 2 Sep 2016 13:53:23 -0700 Subject: [PATCH] Add optional max size to work queue --- contrib/pzstd/Makefile | 2 +- contrib/pzstd/utils/WorkQueue.h | 59 ++++++++++++++------ contrib/pzstd/utils/test/Makefile | 2 +- contrib/pzstd/utils/test/WorkQueueTest.cpp | 65 ++++++++++++++++++++++ 4 files changed, 108 insertions(+), 20 deletions(-) diff --git a/contrib/pzstd/Makefile b/contrib/pzstd/Makefile index 5338a5a9e..c59a6d107 100644 --- a/contrib/pzstd/Makefile +++ b/contrib/pzstd/Makefile @@ -70,5 +70,5 @@ clean: $(MAKE) -C $(ZSTDDIR) clean $(MAKE) -C utils/test clean $(MAKE) -C test clean - @$(RM) -rf googletest/ libzstd.a *.o pzstd$(EXT) + @$(RM) -rf libzstd.a *.o pzstd$(EXT) @echo Cleaning completed diff --git a/contrib/pzstd/utils/WorkQueue.h b/contrib/pzstd/utils/WorkQueue.h index 3d926cc80..2fa417f41 100644 --- a/contrib/pzstd/utils/WorkQueue.h +++ b/contrib/pzstd/utils/WorkQueue.h @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -25,14 +26,29 @@ template class WorkQueue { // Protects all member variable access std::mutex mutex_; - std::condition_variable cv_; + std::condition_variable readerCv_; + std::condition_variable writerCv_; std::queue queue_; bool done_; + std::size_t maxSize_; + + // Must have lock to call this function + bool full() const { + if (maxSize_ == 0) { + return false; + } + return queue_.size() >= maxSize_; + } public: - /// Constructs an empty work queue. - WorkQueue() : done_(false) {} + /** + * Constructs an empty work queue with an optional max size. + * If `maxSize == 0` the queue size is unbounded. + * + * @param maxSize The maximum allowed size of the work queue. + */ + WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {} /** * Push an item onto the work queue. Notify a single thread that work is @@ -44,13 +60,16 @@ class WorkQueue { */ bool push(T item) { { - std::lock_guard lock(mutex_); + std::unique_lock lock(mutex_); + while (full() && !done_) { + writerCv_.wait(lock); + } if (done_) { return false; } queue_.push(std::move(item)); } - cv_.notify_one(); + readerCv_.notify_one(); return true; } @@ -64,16 +83,19 @@ class WorkQueue { * `finish()` has been called. */ bool pop(T& item) { - std::unique_lock lock(mutex_); - while (queue_.empty() && !done_) { - cv_.wait(lock); + { + std::unique_lock lock(mutex_); + while (queue_.empty() && !done_) { + readerCv_.wait(lock); + } + if (queue_.empty()) { + assert(done_); + return false; + } + item = std::move(queue_.front()); + queue_.pop(); } - if (queue_.empty()) { - assert(done_); - return false; - } - item = std::move(queue_.front()); - queue_.pop(); + writerCv_.notify_one(); return true; } @@ -87,18 +109,19 @@ class WorkQueue { assert(!done_); done_ = true; } - cv_.notify_all(); + readerCv_.notify_all(); + writerCv_.notify_all(); } /// Blocks until `finish()` has been called (but the queue may not be empty). void waitUntilFinished() { std::unique_lock lock(mutex_); while (!done_) { - cv_.wait(lock); + readerCv_.wait(lock); // If we were woken by a push, we need to wake a thread waiting on pop(). if (!done_) { lock.unlock(); - cv_.notify_one(); + readerCv_.notify_one(); lock.lock(); } } @@ -111,7 +134,7 @@ class BufferWorkQueue { std::atomic size_; public: - BufferWorkQueue() : size_(0) {} + BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {} void push(Buffer buffer) { size_.fetch_add(buffer.size()); diff --git a/contrib/pzstd/utils/test/Makefile b/contrib/pzstd/utils/test/Makefile index 23f111e55..b9ea73e32 100644 --- a/contrib/pzstd/utils/test/Makefile +++ b/contrib/pzstd/utils/test/Makefile @@ -23,7 +23,7 @@ GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest CPPFLAGS = -I$(PZSTDDIR) $(GTEST_INC) $(GTEST_LIB) CXXFLAGS ?= -O3 CXXFLAGS += -std=c++11 -CFLAGS += $(MOREFLAGS) +CXXFLAGS += $(MOREFLAGS) FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS) %: %.cpp diff --git a/contrib/pzstd/utils/test/WorkQueueTest.cpp b/contrib/pzstd/utils/test/WorkQueueTest.cpp index 1b548d160..074891fda 100644 --- a/contrib/pzstd/utils/test/WorkQueueTest.cpp +++ b/contrib/pzstd/utils/test/WorkQueueTest.cpp @@ -145,6 +145,71 @@ TEST(WorkQueue, MPMC) { } } +TEST(WorkQueue, BoundedSizeWorks) { + WorkQueue queue(1); + int result; + queue.push(5); + queue.pop(result); + queue.push(5); + queue.pop(result); + queue.push(5); + queue.finish(); + queue.pop(result); + EXPECT_EQ(5, result); +} + +TEST(WorkQueue, BoundedSizePushAfterFinish) { + WorkQueue queue(1); + int result; + queue.push(5); + std::thread pusher([&queue] { + queue.push(6); + }); + // Dirtily try and make sure that pusher has run. + std::this_thread::sleep_for(std::chrono::seconds(1)); + queue.finish(); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(5, result); + EXPECT_FALSE(queue.pop(result)); + + pusher.join(); +} + +TEST(WorkQueue, BoundedSizeMPMC) { + WorkQueue queue(100); + std::vector results(10000, -1); + std::mutex mutex; + std::vector popperThreads; + for (int i = 0; i < 10; ++i) { + popperThreads.emplace_back(Popper{&queue, results.data(), &mutex}); + } + + std::vector pusherThreads; + for (int i = 0; i < 100; ++i) { + auto min = i * 100; + auto max = (i + 1) * 100; + pusherThreads.emplace_back( + [ &queue, min, max ] { + for (int i = min; i < max; ++i) { + queue.push(i); + } + }); + } + + for (auto& thread : pusherThreads) { + thread.join(); + } + queue.finish(); + + for (auto& thread : popperThreads) { + thread.join(); + } + + for (int i = 0; i < 10000; ++i) { + EXPECT_EQ(i, results[i]); + } +} + TEST(BufferWorkQueue, SizeCalculatedCorrectly) { { BufferWorkQueue queue;