From 9785cd47f20eed14f9cbfaa8833727f51dfaec01 Mon Sep 17 00:00:00 2001 From: yhirose Date: Sat, 3 Aug 2019 16:18:15 +0900 Subject: [PATCH] Thread pool support --- httplib.h | 118 +++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 104 insertions(+), 14 deletions(-) diff --git a/httplib.h b/httplib.h index da5c565..711558f 100644 --- a/httplib.h +++ b/httplib.h @@ -61,12 +61,14 @@ typedef int socket_t; #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -101,6 +103,7 @@ inline const unsigned char *ASN1_STRING_get0_data(const ASN1_STRING *asn1) { #define CPPHTTPLIB_REQUEST_URI_MAX_LENGTH 8192 #define CPPHTTPLIB_PAYLOAD_MAX_LENGTH (std::numeric_limits::max)() #define CPPHTTPLIB_RECV_BUFSIZ size_t(4096u) +#define CPPHTTPLIB_THREAD_POOL_COUNT 8 namespace httplib { @@ -269,16 +272,101 @@ class TaskQueue { public: TaskQueue() {} virtual ~TaskQueue() {} - virtual void enque(std::function fn) = 0; + virtual void enqueue(std::function fn) = 0; virtual void shutdown() = 0; }; -class ThreadsTaskQueue : public TaskQueue { +#if CPPHTTPLIB_THREAD_POOL_COUNT > 0 +class ThreadPool : public TaskQueue { public: - ThreadsTaskQueue() : running_threads_(0) {} - virtual ~ThreadsTaskQueue() {} + ThreadPool(size_t n) : shutdown_(false), remaining_(0) { + while (n) { + auto t = std::make_shared(worker(*this)); + threads_.push_back(t); + n--; + } + } - virtual void enque(std::function fn) override { + ThreadPool(const ThreadPool &) = delete; + virtual ~ThreadPool() {} + + virtual void enqueue(std::function fn) override { + std::unique_lock lock(mutex_); + jobs_.push_back(fn); + cond_.notify_one(); + } + + virtual void shutdown() override { + // Handle all remaining jobs... + for (;;) { + std::unique_lock lock(mutex_); + if (jobs_.empty()) break; + cond_.notify_one(); + } + + // Stop all worker threads... + { + std::unique_lock lock(mutex_); + shutdown_ = true; + remaining_ = threads_.size(); + } + + for (;;) { + std::unique_lock lock(mutex_); + if (!remaining_) break; + cond_.notify_all(); + } + + // Join... + for (auto t : threads_) { + t->join(); + } + } + +private: + struct worker { + worker(ThreadPool &pool) : pool_(pool) {} + + void operator()() { + for (;;) { + std::unique_lock lock(pool_.mutex_); + + pool_.cond_.wait( + lock, [&] { return !pool_.jobs_.empty() || pool_.shutdown_; }); + + if (pool_.shutdown_) { break; } + + auto fn = pool_.jobs_.front(); + pool_.jobs_.pop_front(); + + assert(true == (bool)fn); + fn(); + } + + std::unique_lock lock(pool_.mutex_); + pool_.remaining_--; + } + + ThreadPool &pool_; + }; + friend struct worker; + + std::vector> threads_; + std::list> jobs_; + + bool shutdown_; + size_t remaining_; + + std::condition_variable cond_; + std::mutex mutex_; +}; +#else +class Threads : public TaskQueue { +public: + Threads() : running_threads_(0) {} + virtual ~Threads() {} + + virtual void enqueue(std::function fn) override { std::thread([=]() { { std::lock_guard guard(running_threads_mutex_); @@ -306,6 +394,7 @@ private: std::mutex running_threads_mutex_; int running_threads_; }; +#endif class Server { public: @@ -342,7 +431,7 @@ public: bool is_running() const; void stop(); - std::function new_task_queue; + std::function new_task_queue; protected: bool process_request(Stream &strm, bool last_connection, @@ -1934,6 +2023,13 @@ inline Server::Server() #ifndef _WIN32 signal(SIGPIPE, SIG_IGN); #endif + new_task_queue = [] { +#if CPPHTTPLIB_THREAD_POOL_COUNT > 0 + return new ThreadPool(CPPHTTPLIB_THREAD_POOL_COUNT); +#else + return new Threads(); +#endif + }; } inline Server::~Server() {} @@ -2235,13 +2331,7 @@ inline bool Server::listen_internal() { is_running_ = true; { - std::unique_ptr task_queue; - - if (new_task_queue) { - task_queue.reset(new_task_queue()); - } else { - task_queue.reset(new ThreadsTaskQueue()); - } + std::unique_ptr task_queue(new_task_queue()); for (;;) { if (svr_sock_ == INVALID_SOCKET) { @@ -2267,7 +2357,7 @@ inline bool Server::listen_internal() { break; } - task_queue->enque([=]() { read_and_close_socket(sock); }); + task_queue->enqueue([=]() { read_and_close_socket(sock); }); } task_queue->shutdown();