diff --git a/httplib.h b/httplib.h index c7bf810..67fa1dd 100644 --- a/httplib.h +++ b/httplib.h @@ -549,8 +549,11 @@ public: ~ThreadPool() override = default; void enqueue(std::function fn) override { - std::unique_lock lock(mutex_); - jobs_.push_back(std::move(fn)); + { + std::unique_lock lock(mutex_); + jobs_.push_back(std::move(fn)); + } + cond_.notify_one(); } @@ -559,9 +562,10 @@ public: { std::unique_lock lock(mutex_); shutdown_ = true; - cond_.notify_all(); } + cond_.notify_all(); + // Join... for (auto &t : threads_) { t.join(); @@ -583,7 +587,7 @@ private: if (pool_.shutdown_ && pool_.jobs_.empty()) { break; } - fn = pool_.jobs_.front(); + fn = std::move(pool_.jobs_.front()); pool_.jobs_.pop_front(); } diff --git a/test/test.cc b/test/test.cc index 10bec8d..32eb127 100644 --- a/test/test.cc +++ b/test/test.cc @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -5522,3 +5523,18 @@ TEST(SocketStream, is_writable_INET) { ASSERT_EQ(0, close(disconnected_svr_sock)); } #endif // #ifndef _WIN32 + +TEST(TaskQueueTest, IncreaseAtomicInteger) { + static constexpr unsigned int number_of_task{1000000}; + std::atomic_uint count{0}; + std::unique_ptr task_queue{ + new ThreadPool{CPPHTTPLIB_THREAD_POOL_COUNT}}; + + for (unsigned int i = 0; i < number_of_task; ++i) { + task_queue->enqueue( + [&count] { count.fetch_add(1, std::memory_order_relaxed); }); + } + + EXPECT_NO_THROW(task_queue->shutdown()); + EXPECT_EQ(number_of_task, count.load()); +}