From b35c468cca9bf8ef3357a18e522f676857c1b5b8 Mon Sep 17 00:00:00 2001 From: yhirose Date: Sun, 14 Dec 2025 19:10:34 -0500 Subject: [PATCH] Implement SSEClient --- README-sse.md | 182 ++++++++ README.md | 26 +- example/Makefile | 7 +- example/ssecli.cc | 48 ++- httplib.h | 1019 +++++++++++++++++++++++++++++---------------- test/test.cc | 721 ++++++++++++++++++++++++++++++++ 6 files changed, 1621 insertions(+), 382 deletions(-) create mode 100644 README-sse.md diff --git a/README-sse.md b/README-sse.md new file mode 100644 index 0000000..62dee37 --- /dev/null +++ b/README-sse.md @@ -0,0 +1,182 @@ +# SSEClient - Server-Sent Events Client + +A simple, EventSource-like SSE client for C++11. + +## Features + +- **Auto-reconnect**: Automatically reconnects on connection loss +- **Last-Event-ID**: Sends last received ID on reconnect for resumption +- **retry field**: Respects server's reconnect interval +- **Event types**: Supports custom event types via `on_event()` +- **Async support**: Run in background thread with `start_async()` +- **C++11 compatible**: No C++14/17/20 features required + +## Quick Start + +```cpp +httplib::Client cli("http://localhost:8080"); +httplib::sse::SSEClient sse(cli, "/events"); + +sse.on_message([](const httplib::sse::SSEMessage &msg) { + std::cout << "Event: " << msg.event << std::endl; + std::cout << "Data: " << msg.data << std::endl; +}); + +sse.start(); // Blocking, with auto-reconnect +``` + +## API Reference + +### SSEMessage + +```cpp +struct SSEMessage { + std::string event; // Event type (default: "message") + std::string data; // Event payload + std::string id; // Event ID +}; +``` + +### SSEClient + +#### Constructor + +```cpp +// Basic +SSEClient(Client &client, const std::string &path); + +// With custom headers +SSEClient(Client &client, const std::string &path, const Headers &headers); +``` + +#### Event Handlers + +```cpp +// Called for all events (or events without a specific handler) +sse.on_message([](const SSEMessage &msg) { }); + +// Called for specific event types +sse.on_event("update", [](const SSEMessage &msg) { }); +sse.on_event("delete", [](const SSEMessage &msg) { }); + +// Called when connection is established +sse.on_open([]() { }); + +// Called on connection errors +sse.on_error([](httplib::Error err) { }); +``` + +#### Configuration + +```cpp +// Set reconnect interval (default: 3000ms) +sse.set_reconnect_interval(5000); + +// Set max reconnect attempts (default: 0 = unlimited) +sse.set_max_reconnect_attempts(10); +``` + +#### Control + +```cpp +// Blocking start with auto-reconnect +sse.start(); + +// Non-blocking start (runs in background thread) +sse.start_async(); + +// Stop the client (thread-safe) +sse.stop(); +``` + +#### State + +```cpp +bool connected = sse.is_connected(); +const std::string &id = sse.last_event_id(); +``` + +## Examples + +### Basic Usage + +```cpp +httplib::Client cli("http://localhost:8080"); +httplib::sse::SSEClient sse(cli, "/events"); + +sse.on_message([](const httplib::sse::SSEMessage &msg) { + std::cout << msg.data << std::endl; +}); + +sse.start(); +``` + +### With Custom Event Types + +```cpp +httplib::sse::SSEClient sse(cli, "/events"); + +sse.on_event("notification", [](const httplib::sse::SSEMessage &msg) { + std::cout << "Notification: " << msg.data << std::endl; +}); + +sse.on_event("update", [](const httplib::sse::SSEMessage &msg) { + std::cout << "Update: " << msg.data << std::endl; +}); + +sse.start(); +``` + +### Async with Stop + +```cpp +httplib::sse::SSEClient sse(cli, "/events"); + +sse.on_message([](const httplib::sse::SSEMessage &msg) { + std::cout << msg.data << std::endl; +}); + +sse.start_async(); // Returns immediately + +// ... do other work ... + +sse.stop(); // Stop when done +``` + +### With Custom Headers (e.g., Authentication) + +```cpp +httplib::Headers headers = { + {"Authorization", "Bearer token123"} +}; + +httplib::sse::SSEClient sse(cli, "/events", headers); +sse.start(); +``` + +### Error Handling + +```cpp +sse.on_error([](httplib::Error err) { + std::cerr << "Error: " << httplib::to_string(err) << std::endl; +}); + +sse.set_reconnect_interval(1000); +sse.set_max_reconnect_attempts(5); + +sse.start(); +``` + +## SSE Protocol + +The client parses SSE format according to the [W3C specification](https://html.spec.whatwg.org/multipage/server-sent-events.html): + +``` +event: custom-type +id: 123 +data: {"message": "hello"} + +data: simple message + +: this is a comment (ignored) +``` diff --git a/README.md b/README.md index 9c64a17..71309ef 100644 --- a/README.md +++ b/README.md @@ -1206,8 +1206,8 @@ std::string decoded_component = httplib::decode_uri_component(encoded_component) Use `encode_uri()` for full URLs and `encode_uri_component()` for individual query parameters or path segments. -Streaming API -------------- +Stream API +---------- Process large responses without loading everything into memory. @@ -1234,6 +1234,28 @@ All HTTP methods are supported: `stream::Get`, `Post`, `Put`, `Patch`, `Delete`, See [README-stream.md](README-stream.md) for more details. +SSE Client +---------- + +```cpp +#include + +int main() { + httplib::Client cli("http://localhost:8080"); + httplib::sse::SSEClient sse(cli, "/events"); + + sse.on_message([](const httplib::sse::SSEMessage &msg) { + std::cout << "Event: " << msg.event << std::endl; + std::cout << "Data: " << msg.data << std::endl; + }); + + sse.start(); // Blocking, with auto-reconnect + return 0; +} +``` + +See [README-sse.md](README-sse.md) for more details. + Split httplib.h into .h and .cc ------------------------------- diff --git a/example/Makefile b/example/Makefile index 0d3c46d..3082b88 100644 --- a/example/Makefile +++ b/example/Makefile @@ -18,7 +18,7 @@ ZLIB_SUPPORT = -DCPPHTTPLIB_ZLIB_SUPPORT -lz BROTLI_DIR = $(PREFIX)/opt/brotli BROTLI_SUPPORT = -DCPPHTTPLIB_BROTLI_SUPPORT -I$(BROTLI_DIR)/include -L$(BROTLI_DIR)/lib -lbrotlicommon -lbrotlienc -lbrotlidec -all: server client hello simplecli simplesvr upload redirect ssesvr ssecli ssecli-stream benchmark one_time_request server_and_client accept_header +all: server client hello simplecli simplesvr upload redirect ssesvr ssecli benchmark one_time_request server_and_client accept_header server : server.cc ../httplib.h Makefile $(CXX) -o server $(CXXFLAGS) server.cc $(OPENSSL_SUPPORT) $(ZLIB_SUPPORT) $(BROTLI_SUPPORT) @@ -47,9 +47,6 @@ ssesvr : ssesvr.cc ../httplib.h Makefile ssecli : ssecli.cc ../httplib.h Makefile $(CXX) -o ssecli $(CXXFLAGS) ssecli.cc $(OPENSSL_SUPPORT) $(ZLIB_SUPPORT) $(BROTLI_SUPPORT) -ssecli-stream : ssecli-stream.cc ../httplib.h ../httplib.h Makefile - $(CXX) -o ssecli-stream $(CXXFLAGS) ssecli-stream.cc $(OPENSSL_SUPPORT) $(ZLIB_SUPPORT) $(BROTLI_SUPPORT) - benchmark : benchmark.cc ../httplib.h Makefile $(CXX) -o benchmark $(CXXFLAGS) benchmark.cc $(OPENSSL_SUPPORT) $(ZLIB_SUPPORT) $(BROTLI_SUPPORT) @@ -67,4 +64,4 @@ pem: openssl req -new -key key.pem | openssl x509 -days 3650 -req -signkey key.pem > cert.pem clean: - rm server client hello simplecli simplesvr upload redirect ssesvr ssecli ssecli-stream benchmark one_time_request server_and_client accept_header *.pem + rm server client hello simplecli simplesvr upload redirect ssesvr ssecli benchmark one_time_request server_and_client accept_header *.pem diff --git a/example/ssecli.cc b/example/ssecli.cc index 2c93822..f7b9ed5 100644 --- a/example/ssecli.cc +++ b/example/ssecli.cc @@ -6,16 +6,52 @@ // #include + +#include #include using namespace std; -int main(void) { - httplib::Client("http://localhost:1234") - .Get("/event1", [&](const char *data, size_t data_length) { - std::cout << string(data, data_length); - return true; - }); +// Global SSEClient pointer for signal handling +httplib::sse::SSEClient *g_sse = nullptr; +void signal_handler(int) { + if (g_sse) { g_sse->stop(); } +} + +int main(void) { + // Configuration + const string host = "http://localhost:1234"; + const string path = "/event1"; + + cout << "SSE Client using httplib::sse::SSEClient\n"; + cout << "Connecting to: " << host << path << "\n"; + cout << "Press Ctrl+C to exit\n\n"; + + httplib::Client cli(host); + httplib::sse::SSEClient sse(cli, path); + + // Set up signal handler for graceful shutdown + g_sse = &sse; + signal(SIGINT, signal_handler); + + // Event handlers + sse.on_open([]() { cout << "[Connected]\n\n"; }); + + sse.on_message([](const httplib::sse::SSEMessage &msg) { + cout << "Event: " << msg.event << "\n"; + cout << "Data: " << msg.data << "\n"; + if (!msg.id.empty()) { cout << "ID: " << msg.id << "\n"; } + cout << "\n"; + }); + + sse.on_error([](httplib::Error err) { + cerr << "[Error] " << httplib::to_string(err) << "\n"; + }); + + // Start with auto-reconnect (blocking) + sse.start(); + + cout << "\n[Disconnected]\n"; return 0; } diff --git a/httplib.h b/httplib.h index 5e34915..114d60b 100644 --- a/httplib.h +++ b/httplib.h @@ -2756,6 +2756,656 @@ bool is_field_value(const std::string &s); } // namespace detail +namespace stream { + +class Result { +public: + Result() : chunk_size_(8192) {} + + explicit Result(ClientImpl::StreamHandle &&handle, size_t chunk_size = 8192) + : handle_(std::move(handle)), chunk_size_(chunk_size) {} + + Result(Result &&other) noexcept + : handle_(std::move(other.handle_)), buffer_(std::move(other.buffer_)), + current_size_(other.current_size_), chunk_size_(other.chunk_size_), + finished_(other.finished_) { + other.current_size_ = 0; + other.finished_ = true; + } + + Result &operator=(Result &&other) noexcept { + if (this != &other) { + handle_ = std::move(other.handle_); + buffer_ = std::move(other.buffer_); + current_size_ = other.current_size_; + chunk_size_ = other.chunk_size_; + finished_ = other.finished_; + other.current_size_ = 0; + other.finished_ = true; + } + return *this; + } + + Result(const Result &) = delete; + Result &operator=(const Result &) = delete; + + // Check if the result is valid (connection succeeded and response received) + bool is_valid() const { return handle_.is_valid(); } + explicit operator bool() const { return is_valid(); } + + // Response status code + int status() const { + return handle_.response ? handle_.response->status : -1; + } + + // Response headers + const Headers &headers() const { + static const Headers empty_headers; + return handle_.response ? handle_.response->headers : empty_headers; + } + + std::string get_header_value(const std::string &key, + const char *def = "") const { + return handle_.response ? handle_.response->get_header_value(key, def) + : def; + } + + bool has_header(const std::string &key) const { + return handle_.response ? handle_.response->has_header(key) : false; + } + + // Error information + Error error() const { return handle_.error; } + Error read_error() const { return handle_.get_read_error(); } + bool has_read_error() const { return handle_.has_read_error(); } + + // Streaming iteration API + // Call next() to read the next chunk, then access data via data()/size() + // Returns true if data was read, false when stream is exhausted + bool next() { + if (!handle_.is_valid() || finished_) { return false; } + + if (buffer_.size() < chunk_size_) { buffer_.resize(chunk_size_); } + + ssize_t n = handle_.read(&buffer_[0], chunk_size_); + if (n > 0) { + current_size_ = static_cast(n); + return true; + } + + current_size_ = 0; + finished_ = true; + return false; + } + + // Pointer to current chunk data (valid after next() returns true) + const char *data() const { return buffer_.data(); } + + // Size of current chunk (valid after next() returns true) + size_t size() const { return current_size_; } + + // Convenience method: read all remaining data into a string + std::string read_all() { + std::string result; + while (next()) { + result.append(data(), size()); + } + return result; + } + +private: + ClientImpl::StreamHandle handle_; + std::string buffer_; + size_t current_size_ = 0; + size_t chunk_size_; + bool finished_ = false; +}; + +// GET +template +inline Result Get(ClientType &cli, const std::string &path, + size_t chunk_size = 8192) { + return Result{cli.open_stream("GET", path), chunk_size}; +} + +template +inline Result Get(ClientType &cli, const std::string &path, + const Headers &headers, size_t chunk_size = 8192) { + return Result{cli.open_stream("GET", path, {}, headers), chunk_size}; +} + +template +inline Result Get(ClientType &cli, const std::string &path, + const Params ¶ms, size_t chunk_size = 8192) { + return Result{cli.open_stream("GET", path, params), chunk_size}; +} + +template +inline Result Get(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + size_t chunk_size = 8192) { + return Result{cli.open_stream("GET", path, params, headers), chunk_size}; +} + +// POST +template +inline Result Post(ClientType &cli, const std::string &path, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{cli.open_stream("POST", path, {}, {}, body, content_type), + chunk_size}; +} + +template +inline Result Post(ClientType &cli, const std::string &path, + const Headers &headers, const std::string &body, + const std::string &content_type, size_t chunk_size = 8192) { + return Result{cli.open_stream("POST", path, {}, headers, body, content_type), + chunk_size}; +} + +template +inline Result Post(ClientType &cli, const std::string &path, + const Params ¶ms, const std::string &body, + const std::string &content_type, size_t chunk_size = 8192) { + return Result{cli.open_stream("POST", path, params, {}, body, content_type), + chunk_size}; +} + +template +inline Result Post(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{ + cli.open_stream("POST", path, params, headers, body, content_type), + chunk_size}; +} + +// PUT +template +inline Result Put(ClientType &cli, const std::string &path, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{cli.open_stream("PUT", path, {}, {}, body, content_type), + chunk_size}; +} + +template +inline Result Put(ClientType &cli, const std::string &path, + const Headers &headers, const std::string &body, + const std::string &content_type, size_t chunk_size = 8192) { + return Result{cli.open_stream("PUT", path, {}, headers, body, content_type), + chunk_size}; +} + +template +inline Result Put(ClientType &cli, const std::string &path, + const Params ¶ms, const std::string &body, + const std::string &content_type, size_t chunk_size = 8192) { + return Result{cli.open_stream("PUT", path, params, {}, body, content_type), + chunk_size}; +} + +template +inline Result Put(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{ + cli.open_stream("PUT", path, params, headers, body, content_type), + chunk_size}; +} + +// PATCH +template +inline Result Patch(ClientType &cli, const std::string &path, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{cli.open_stream("PATCH", path, {}, {}, body, content_type), + chunk_size}; +} + +template +inline Result Patch(ClientType &cli, const std::string &path, + const Headers &headers, const std::string &body, + const std::string &content_type, size_t chunk_size = 8192) { + return Result{cli.open_stream("PATCH", path, {}, headers, body, content_type), + chunk_size}; +} + +template +inline Result Patch(ClientType &cli, const std::string &path, + const Params ¶ms, const std::string &body, + const std::string &content_type, size_t chunk_size = 8192) { + return Result{cli.open_stream("PATCH", path, params, {}, body, content_type), + chunk_size}; +} + +template +inline Result Patch(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{ + cli.open_stream("PATCH", path, params, headers, body, content_type), + chunk_size}; +} + +// DELETE +template +inline Result Delete(ClientType &cli, const std::string &path, + size_t chunk_size = 8192) { + return Result{cli.open_stream("DELETE", path), chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const Headers &headers, size_t chunk_size = 8192) { + return Result{cli.open_stream("DELETE", path, {}, headers), chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{cli.open_stream("DELETE", path, {}, {}, body, content_type), + chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const Headers &headers, const std::string &body, + const std::string &content_type, + size_t chunk_size = 8192) { + return Result{ + cli.open_stream("DELETE", path, {}, headers, body, content_type), + chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const Params ¶ms, size_t chunk_size = 8192) { + return Result{cli.open_stream("DELETE", path, params), chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + size_t chunk_size = 8192) { + return Result{cli.open_stream("DELETE", path, params, headers), chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const Params ¶ms, const std::string &body, + const std::string &content_type, + size_t chunk_size = 8192) { + return Result{cli.open_stream("DELETE", path, params, {}, body, content_type), + chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{ + cli.open_stream("DELETE", path, params, headers, body, content_type), + chunk_size}; +} + +// HEAD +template +inline Result Head(ClientType &cli, const std::string &path, + size_t chunk_size = 8192) { + return Result{cli.open_stream("HEAD", path), chunk_size}; +} + +template +inline Result Head(ClientType &cli, const std::string &path, + const Headers &headers, size_t chunk_size = 8192) { + return Result{cli.open_stream("HEAD", path, {}, headers), chunk_size}; +} + +template +inline Result Head(ClientType &cli, const std::string &path, + const Params ¶ms, size_t chunk_size = 8192) { + return Result{cli.open_stream("HEAD", path, params), chunk_size}; +} + +template +inline Result Head(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + size_t chunk_size = 8192) { + return Result{cli.open_stream("HEAD", path, params, headers), chunk_size}; +} + +// OPTIONS +template +inline Result Options(ClientType &cli, const std::string &path, + size_t chunk_size = 8192) { + return Result{cli.open_stream("OPTIONS", path), chunk_size}; +} + +template +inline Result Options(ClientType &cli, const std::string &path, + const Headers &headers, size_t chunk_size = 8192) { + return Result{cli.open_stream("OPTIONS", path, {}, headers), chunk_size}; +} + +template +inline Result Options(ClientType &cli, const std::string &path, + const Params ¶ms, size_t chunk_size = 8192) { + return Result{cli.open_stream("OPTIONS", path, params), chunk_size}; +} + +template +inline Result Options(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + size_t chunk_size = 8192) { + return Result{cli.open_stream("OPTIONS", path, params, headers), chunk_size}; +} + +} // namespace stream + +namespace sse { + +struct SSEMessage { + std::string event; // Event type (default: "message") + std::string data; // Event payload + std::string id; // Event ID for Last-Event-ID header + + SSEMessage() : event("message") {} + + void clear() { + event = "message"; + data.clear(); + id.clear(); + } +}; + +class SSEClient { +public: + using MessageHandler = std::function; + using ErrorHandler = std::function; + using OpenHandler = std::function; + + SSEClient(Client &client, const std::string &path) + : client_(client), path_(path) {} + + SSEClient(Client &client, const std::string &path, const Headers &headers) + : client_(client), path_(path), headers_(headers) {} + + ~SSEClient() { stop(); } + + SSEClient(const SSEClient &) = delete; + SSEClient &operator=(const SSEClient &) = delete; + + // Event handlers + SSEClient &on_message(MessageHandler handler) { + on_message_ = std::move(handler); + return *this; + } + + SSEClient &on_event(const std::string &type, MessageHandler handler) { + event_handlers_[type] = std::move(handler); + return *this; + } + + SSEClient &on_open(OpenHandler handler) { + on_open_ = std::move(handler); + return *this; + } + + SSEClient &on_error(ErrorHandler handler) { + on_error_ = std::move(handler); + return *this; + } + + SSEClient &set_reconnect_interval(int ms) { + reconnect_interval_ms_ = ms; + return *this; + } + + SSEClient &set_max_reconnect_attempts(int n) { + max_reconnect_attempts_ = n; + return *this; + } + + // State accessors + bool is_connected() const { return connected_.load(); } + const std::string &last_event_id() const { return last_event_id_; } + + // Blocking start - runs event loop with auto-reconnect + void start() { + running_.store(true); + run_event_loop(); + } + + // Non-blocking start - runs in background thread + void start_async() { + running_.store(true); + async_thread_ = std::thread([this]() { run_event_loop(); }); + } + + // Stop the client (thread-safe) + void stop() { + running_.store(false); + client_.stop(); // Cancel any pending operations + if (async_thread_.joinable()) { async_thread_.join(); } + } + +private: + // Parse a single SSE field line + // Returns true if this line ends an event (blank line) + bool parse_sse_line(const std::string &line, SSEMessage &msg, int &retry_ms) { + // Blank line signals end of event + if (line.empty() || line == "\r") { return true; } + + // Lines starting with ':' are comments (ignored) + if (!line.empty() && line[0] == ':') { return false; } + + // Find the colon separator + auto colon_pos = line.find(':'); + if (colon_pos == std::string::npos) { + // Line with no colon is treated as field name with empty value + return false; + } + + auto field = line.substr(0, colon_pos); + std::string value; + + // Value starts after colon, skip optional single space + if (colon_pos + 1 < line.size()) { + auto value_start = colon_pos + 1; + if (line[value_start] == ' ') { value_start++; } + value = line.substr(value_start); + // Remove trailing \r if present + if (!value.empty() && value.back() == '\r') { value.pop_back(); } + } + + // Handle known fields + if (field == "event") { + msg.event = value; + } else if (field == "data") { + // Multiple data lines are concatenated with newlines + if (!msg.data.empty()) { msg.data += "\n"; } + msg.data += value; + } else if (field == "id") { + // Empty id is valid (clears the last event ID) + msg.id = value; + } else if (field == "retry") { + // Parse retry interval in milliseconds + try { + retry_ms = std::stoi(value); + } catch (...) { + // Invalid retry value, ignore + } + } + // Unknown fields are ignored per SSE spec + + return false; + } + + // Main event loop with auto-reconnect + void run_event_loop() { + auto reconnect_count = 0; + + while (running_.load()) { + // Build headers, including Last-Event-ID if we have one + auto request_headers = headers_; + if (!last_event_id_.empty()) { + request_headers.emplace("Last-Event-ID", last_event_id_); + } + + // Open streaming connection + auto result = stream::Get(client_, path_, request_headers); + + // Connection error handling + if (!result) { + connected_.store(false); + if (on_error_) { on_error_(result.error()); } + + if (!should_reconnect(reconnect_count)) { break; } + wait_for_reconnect(); + reconnect_count++; + continue; + } + + if (result.status() != 200) { + connected_.store(false); + // For certain errors, don't reconnect + if (result.status() == 204 || // No Content - server wants us to stop + result.status() == 404 || // Not Found + result.status() == 401 || // Unauthorized + result.status() == 403) { // Forbidden + if (on_error_) { on_error_(Error::Connection); } + break; + } + + if (on_error_) { on_error_(Error::Connection); } + + if (!should_reconnect(reconnect_count)) { break; } + wait_for_reconnect(); + reconnect_count++; + continue; + } + + // Connection successful + connected_.store(true); + reconnect_count = 0; + if (on_open_) { on_open_(); } + + // Event receiving loop + std::string buffer; + SSEMessage current_msg; + + while (running_.load() && result.next()) { + buffer.append(result.data(), result.size()); + + // Process complete lines in the buffer + size_t line_start = 0; + size_t newline_pos; + + while ((newline_pos = buffer.find('\n', line_start)) != + std::string::npos) { + auto line = buffer.substr(line_start, newline_pos - line_start); + line_start = newline_pos + 1; + + // Parse the line and check if event is complete + auto event_complete = + parse_sse_line(line, current_msg, reconnect_interval_ms_); + + if (event_complete && !current_msg.data.empty()) { + // Update last_event_id for reconnection + if (!current_msg.id.empty()) { last_event_id_ = current_msg.id; } + + // Dispatch event to appropriate handler + dispatch_event(current_msg); + + current_msg.clear(); + } + } + + // Keep unprocessed data in buffer + buffer.erase(0, line_start); + } + + // Connection ended + connected_.store(false); + + if (!running_.load()) { break; } + + // Check for read errors + if (result.has_read_error()) { + if (on_error_) { on_error_(result.read_error()); } + } + + if (!should_reconnect(reconnect_count)) { break; } + wait_for_reconnect(); + reconnect_count++; + } + + connected_.store(false); + } + + // Dispatch event to appropriate handler + void dispatch_event(const SSEMessage &msg) { + // Check for specific event type handler first + auto it = event_handlers_.find(msg.event); + if (it != event_handlers_.end()) { + it->second(msg); + return; + } + + // Fall back to generic message handler + if (on_message_) { on_message_(msg); } + } + + // Check if we should attempt to reconnect + bool should_reconnect(int count) const { + if (!running_.load()) { return false; } + if (max_reconnect_attempts_ == 0) { return true; } // unlimited + return count < max_reconnect_attempts_; + } + + // Wait for reconnect interval + void wait_for_reconnect() { + // Use small increments to check running_ flag frequently + auto waited = 0; + while (running_.load() && waited < reconnect_interval_ms_) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + waited += 100; + } + } + + // Client and path + Client &client_; + std::string path_; + Headers headers_; + + // Callbacks + MessageHandler on_message_; + std::map event_handlers_; + OpenHandler on_open_; + ErrorHandler on_error_; + + // Configuration + int reconnect_interval_ms_ = 3000; + int max_reconnect_attempts_ = 0; // 0 = unlimited + + // State + std::atomic running_{false}; + std::atomic connected_{false}; + std::string last_event_id_; + + // Async support + std::thread async_thread_; +}; + +} // namespace sse + // ---------------------------------------------------------------------------- /* @@ -13249,375 +13899,6 @@ inline SSL_CTX *Client::ssl_context() const { // ---------------------------------------------------------------------------- -/* - * C++11/14/17 Streaming API - * - * This section provides iterator-style streaming functionality for C++11/14/17. - * For C++20 and later, a coroutine-based API with range-for syntax is - * available. - * - * Usage: - * httplib::Client cli("example.com"); - * auto result = httplib::stream::Get(cli, "/large-file"); - * if (result) { - * while (result.next()) { - * process(result.data(), result.size()); - * } - * } - */ - -namespace stream { - -class Result { -public: - Result() : chunk_size_(8192) {} - - explicit Result(ClientImpl::StreamHandle &&handle, size_t chunk_size = 8192) - : handle_(std::move(handle)), chunk_size_(chunk_size) {} - - Result(Result &&other) noexcept - : handle_(std::move(other.handle_)), buffer_(std::move(other.buffer_)), - current_size_(other.current_size_), chunk_size_(other.chunk_size_), - finished_(other.finished_) { - other.current_size_ = 0; - other.finished_ = true; - } - - Result &operator=(Result &&other) noexcept { - if (this != &other) { - handle_ = std::move(other.handle_); - buffer_ = std::move(other.buffer_); - current_size_ = other.current_size_; - chunk_size_ = other.chunk_size_; - finished_ = other.finished_; - other.current_size_ = 0; - other.finished_ = true; - } - return *this; - } - - Result(const Result &) = delete; - Result &operator=(const Result &) = delete; - - // Check if the result is valid (connection succeeded and response received) - bool is_valid() const { return handle_.is_valid(); } - explicit operator bool() const { return is_valid(); } - - // Response status code - int status() const { - return handle_.response ? handle_.response->status : -1; - } - - // Response headers - const Headers &headers() const { - static const Headers empty_headers; - return handle_.response ? handle_.response->headers : empty_headers; - } - - std::string get_header_value(const std::string &key, - const char *def = "") const { - return handle_.response ? handle_.response->get_header_value(key, def) - : def; - } - - bool has_header(const std::string &key) const { - return handle_.response ? handle_.response->has_header(key) : false; - } - - // Error information - Error error() const { return handle_.error; } - Error read_error() const { return handle_.get_read_error(); } - bool has_read_error() const { return handle_.has_read_error(); } - - // Streaming iteration API - // Call next() to read the next chunk, then access data via data()/size() - // Returns true if data was read, false when stream is exhausted - bool next() { - if (!handle_.is_valid() || finished_) { return false; } - - if (buffer_.size() < chunk_size_) { buffer_.resize(chunk_size_); } - - ssize_t n = handle_.read(&buffer_[0], chunk_size_); - if (n > 0) { - current_size_ = static_cast(n); - return true; - } - - current_size_ = 0; - finished_ = true; - return false; - } - - // Pointer to current chunk data (valid after next() returns true) - const char *data() const { return buffer_.data(); } - - // Size of current chunk (valid after next() returns true) - size_t size() const { return current_size_; } - - // Convenience method: read all remaining data into a string - std::string read_all() { - std::string result; - while (next()) { - result.append(data(), size()); - } - return result; - } - -private: - ClientImpl::StreamHandle handle_; - std::string buffer_; - size_t current_size_ = 0; - size_t chunk_size_; - bool finished_ = false; -}; - -// GET -template -inline Result Get(ClientType &cli, const std::string &path, - size_t chunk_size = 8192) { - return Result{cli.open_stream("GET", path), chunk_size}; -} - -template -inline Result Get(ClientType &cli, const std::string &path, - const Headers &headers, size_t chunk_size = 8192) { - return Result{cli.open_stream("GET", path, {}, headers), chunk_size}; -} - -template -inline Result Get(ClientType &cli, const std::string &path, - const Params ¶ms, size_t chunk_size = 8192) { - return Result{cli.open_stream("GET", path, params), chunk_size}; -} - -template -inline Result Get(ClientType &cli, const std::string &path, - const Params ¶ms, const Headers &headers, - size_t chunk_size = 8192) { - return Result{cli.open_stream("GET", path, params, headers), chunk_size}; -} - -// POST -template -inline Result Post(ClientType &cli, const std::string &path, - const std::string &body, const std::string &content_type, - size_t chunk_size = 8192) { - return Result{cli.open_stream("POST", path, {}, {}, body, content_type), - chunk_size}; -} - -template -inline Result Post(ClientType &cli, const std::string &path, - const Headers &headers, const std::string &body, - const std::string &content_type, size_t chunk_size = 8192) { - return Result{cli.open_stream("POST", path, {}, headers, body, content_type), - chunk_size}; -} - -template -inline Result Post(ClientType &cli, const std::string &path, - const Params ¶ms, const std::string &body, - const std::string &content_type, size_t chunk_size = 8192) { - return Result{cli.open_stream("POST", path, params, {}, body, content_type), - chunk_size}; -} - -template -inline Result Post(ClientType &cli, const std::string &path, - const Params ¶ms, const Headers &headers, - const std::string &body, const std::string &content_type, - size_t chunk_size = 8192) { - return Result{ - cli.open_stream("POST", path, params, headers, body, content_type), - chunk_size}; -} - -// PUT -template -inline Result Put(ClientType &cli, const std::string &path, - const std::string &body, const std::string &content_type, - size_t chunk_size = 8192) { - return Result{cli.open_stream("PUT", path, {}, {}, body, content_type), - chunk_size}; -} - -template -inline Result Put(ClientType &cli, const std::string &path, - const Headers &headers, const std::string &body, - const std::string &content_type, size_t chunk_size = 8192) { - return Result{cli.open_stream("PUT", path, {}, headers, body, content_type), - chunk_size}; -} - -template -inline Result Put(ClientType &cli, const std::string &path, - const Params ¶ms, const std::string &body, - const std::string &content_type, size_t chunk_size = 8192) { - return Result{cli.open_stream("PUT", path, params, {}, body, content_type), - chunk_size}; -} - -template -inline Result Put(ClientType &cli, const std::string &path, - const Params ¶ms, const Headers &headers, - const std::string &body, const std::string &content_type, - size_t chunk_size = 8192) { - return Result{ - cli.open_stream("PUT", path, params, headers, body, content_type), - chunk_size}; -} - -// PATCH -template -inline Result Patch(ClientType &cli, const std::string &path, - const std::string &body, const std::string &content_type, - size_t chunk_size = 8192) { - return Result{cli.open_stream("PATCH", path, {}, {}, body, content_type), - chunk_size}; -} - -template -inline Result Patch(ClientType &cli, const std::string &path, - const Headers &headers, const std::string &body, - const std::string &content_type, size_t chunk_size = 8192) { - return Result{cli.open_stream("PATCH", path, {}, headers, body, content_type), - chunk_size}; -} - -template -inline Result Patch(ClientType &cli, const std::string &path, - const Params ¶ms, const std::string &body, - const std::string &content_type, size_t chunk_size = 8192) { - return Result{cli.open_stream("PATCH", path, params, {}, body, content_type), - chunk_size}; -} - -template -inline Result Patch(ClientType &cli, const std::string &path, - const Params ¶ms, const Headers &headers, - const std::string &body, const std::string &content_type, - size_t chunk_size = 8192) { - return Result{ - cli.open_stream("PATCH", path, params, headers, body, content_type), - chunk_size}; -} - -// DELETE -template -inline Result Delete(ClientType &cli, const std::string &path, - size_t chunk_size = 8192) { - return Result{cli.open_stream("DELETE", path), chunk_size}; -} - -template -inline Result Delete(ClientType &cli, const std::string &path, - const Headers &headers, size_t chunk_size = 8192) { - return Result{cli.open_stream("DELETE", path, {}, headers), chunk_size}; -} - -template -inline Result Delete(ClientType &cli, const std::string &path, - const std::string &body, const std::string &content_type, - size_t chunk_size = 8192) { - return Result{cli.open_stream("DELETE", path, {}, {}, body, content_type), - chunk_size}; -} - -template -inline Result Delete(ClientType &cli, const std::string &path, - const Headers &headers, const std::string &body, - const std::string &content_type, - size_t chunk_size = 8192) { - return Result{ - cli.open_stream("DELETE", path, {}, headers, body, content_type), - chunk_size}; -} - -template -inline Result Delete(ClientType &cli, const std::string &path, - const Params ¶ms, size_t chunk_size = 8192) { - return Result{cli.open_stream("DELETE", path, params), chunk_size}; -} - -template -inline Result Delete(ClientType &cli, const std::string &path, - const Params ¶ms, const Headers &headers, - size_t chunk_size = 8192) { - return Result{cli.open_stream("DELETE", path, params, headers), chunk_size}; -} - -template -inline Result Delete(ClientType &cli, const std::string &path, - const Params ¶ms, const std::string &body, - const std::string &content_type, - size_t chunk_size = 8192) { - return Result{cli.open_stream("DELETE", path, params, {}, body, content_type), - chunk_size}; -} - -template -inline Result Delete(ClientType &cli, const std::string &path, - const Params ¶ms, const Headers &headers, - const std::string &body, const std::string &content_type, - size_t chunk_size = 8192) { - return Result{ - cli.open_stream("DELETE", path, params, headers, body, content_type), - chunk_size}; -} - -// HEAD -template -inline Result Head(ClientType &cli, const std::string &path, - size_t chunk_size = 8192) { - return Result{cli.open_stream("HEAD", path), chunk_size}; -} - -template -inline Result Head(ClientType &cli, const std::string &path, - const Headers &headers, size_t chunk_size = 8192) { - return Result{cli.open_stream("HEAD", path, {}, headers), chunk_size}; -} - -template -inline Result Head(ClientType &cli, const std::string &path, - const Params ¶ms, size_t chunk_size = 8192) { - return Result{cli.open_stream("HEAD", path, params), chunk_size}; -} - -template -inline Result Head(ClientType &cli, const std::string &path, - const Params ¶ms, const Headers &headers, - size_t chunk_size = 8192) { - return Result{cli.open_stream("HEAD", path, params, headers), chunk_size}; -} - -// OPTIONS -template -inline Result Options(ClientType &cli, const std::string &path, - size_t chunk_size = 8192) { - return Result{cli.open_stream("OPTIONS", path), chunk_size}; -} - -template -inline Result Options(ClientType &cli, const std::string &path, - const Headers &headers, size_t chunk_size = 8192) { - return Result{cli.open_stream("OPTIONS", path, {}, headers), chunk_size}; -} - -template -inline Result Options(ClientType &cli, const std::string &path, - const Params ¶ms, size_t chunk_size = 8192) { - return Result{cli.open_stream("OPTIONS", path, params), chunk_size}; -} - -template -inline Result Options(ClientType &cli, const std::string &path, - const Params ¶ms, const Headers &headers, - size_t chunk_size = 8192) { - return Result{cli.open_stream("OPTIONS", path, params, headers), chunk_size}; -} - -} // namespace stream } // namespace httplib #endif // CPPHTTPLIB_HTTPLIB_H diff --git a/test/test.cc b/test/test.cc index 035f050..7907ccd 100644 --- a/test/test.cc +++ b/test/test.cc @@ -13255,3 +13255,724 @@ TEST(ETagTest, NegativeFileModificationTime) { t.join(); std::remove(fname); } + +//============================================================================== +// SSE Parsing Tests +//============================================================================== + +class SSEParsingTest : public ::testing::Test { +protected: + // Test helper that mimics SSE parsing behavior + static bool parse_sse_line(const std::string &line, sse::SSEMessage &msg, + int &retry_ms) { + // Blank line signals end of event + if (line.empty() || line == "\r") { return true; } + + // Lines starting with ':' are comments (ignored) + if (!line.empty() && line[0] == ':') { return false; } + + // Find the colon separator + auto colon_pos = line.find(':'); + if (colon_pos == std::string::npos) { + // Line with no colon is treated as field name with empty value + return false; + } + + std::string field = line.substr(0, colon_pos); + std::string value; + + // Value starts after colon, skip optional single space + if (colon_pos + 1 < line.size()) { + size_t value_start = colon_pos + 1; + if (line[value_start] == ' ') { value_start++; } + value = line.substr(value_start); + // Remove trailing \r if present + if (!value.empty() && value.back() == '\r') { value.pop_back(); } + } + + // Handle known fields + if (field == "event") { + msg.event = value; + } else if (field == "data") { + // Multiple data lines are concatenated with newlines + if (!msg.data.empty()) { msg.data += "\n"; } + msg.data += value; + } else if (field == "id") { + // Empty id is valid (clears the last event ID) + msg.id = value; + } else if (field == "retry") { + // Parse retry interval in milliseconds + try { + retry_ms = std::stoi(value); + } catch (...) { + // Invalid retry value, ignore + } + } + // Unknown fields are ignored per SSE spec + + return false; + } +}; + +// Test: Single-line data +TEST_F(SSEParsingTest, SingleLineData) { + sse::SSEMessage msg; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line("data: hello", msg, retry_ms)); + EXPECT_EQ(msg.data, "hello"); + EXPECT_EQ(msg.event, "message"); + + // Blank line ends event + EXPECT_TRUE(parse_sse_line("", msg, retry_ms)); +} + +// Test: Multi-line data +TEST_F(SSEParsingTest, MultiLineData) { + sse::SSEMessage msg; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line("data: line1", msg, retry_ms)); + EXPECT_FALSE(parse_sse_line("data: line2", msg, retry_ms)); + EXPECT_FALSE(parse_sse_line("data: line3", msg, retry_ms)); + EXPECT_EQ(msg.data, "line1\nline2\nline3"); +} + +// Test: Custom event types +TEST_F(SSEParsingTest, CustomEventType) { + sse::SSEMessage msg; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line("event: update", msg, retry_ms)); + EXPECT_FALSE(parse_sse_line("data: payload", msg, retry_ms)); + EXPECT_EQ(msg.event, "update"); + EXPECT_EQ(msg.data, "payload"); +} + +// Test: Event ID handling +TEST_F(SSEParsingTest, EventIdHandling) { + sse::SSEMessage msg; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line("id: 12345", msg, retry_ms)); + EXPECT_FALSE(parse_sse_line("data: test", msg, retry_ms)); + EXPECT_EQ(msg.id, "12345"); +} + +// Test: Empty event ID (clears last event ID) +TEST_F(SSEParsingTest, EmptyEventId) { + sse::SSEMessage msg; + msg.id = "previous"; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line("id:", msg, retry_ms)); + EXPECT_EQ(msg.id, ""); +} + +// Test: Retry field parsing +TEST_F(SSEParsingTest, RetryFieldParsing) { + sse::SSEMessage msg; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line("retry: 5000", msg, retry_ms)); + EXPECT_EQ(retry_ms, 5000); +} + +// Test: Invalid retry value +TEST_F(SSEParsingTest, InvalidRetryValue) { + sse::SSEMessage msg; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line("retry: invalid", msg, retry_ms)); + EXPECT_EQ(retry_ms, 3000); // Unchanged +} + +// Test: Comments (lines starting with :) +TEST_F(SSEParsingTest, CommentsIgnored) { + sse::SSEMessage msg; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line(": this is a comment", msg, retry_ms)); + EXPECT_EQ(msg.data, ""); + EXPECT_EQ(msg.event, "message"); +} + +// Test: Colon in value +TEST_F(SSEParsingTest, ColonInValue) { + sse::SSEMessage msg; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line("data: hello:world:test", msg, retry_ms)); + EXPECT_EQ(msg.data, "hello:world:test"); +} + +// Test: Line with no colon (field name only) +TEST_F(SSEParsingTest, FieldNameOnly) { + sse::SSEMessage msg; + int retry_ms = 3000; + + // According to SSE spec, this is treated as field name with empty value + EXPECT_FALSE(parse_sse_line("data", msg, retry_ms)); + // Since we don't recognize "data" without colon, data should be empty + EXPECT_EQ(msg.data, ""); +} + +// Test: Trailing \r handling +TEST_F(SSEParsingTest, TrailingCarriageReturn) { + sse::SSEMessage msg; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line("data: hello\r", msg, retry_ms)); + EXPECT_EQ(msg.data, "hello"); +} + +// Test: Unknown fields ignored +TEST_F(SSEParsingTest, UnknownFieldsIgnored) { + sse::SSEMessage msg; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line("unknown: value", msg, retry_ms)); + EXPECT_EQ(msg.data, ""); + EXPECT_EQ(msg.event, "message"); +} + +// Test: Space after colon is optional +TEST_F(SSEParsingTest, SpaceAfterColonOptional) { + sse::SSEMessage msg1, msg2; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line("data: hello", msg1, retry_ms)); + EXPECT_FALSE(parse_sse_line("data:hello", msg2, retry_ms)); + EXPECT_EQ(msg1.data, "hello"); + EXPECT_EQ(msg2.data, "hello"); +} + +// Test: SSEMessage clear +TEST_F(SSEParsingTest, MessageClear) { + sse::SSEMessage msg; + msg.event = "custom"; + msg.data = "some data"; + msg.id = "123"; + + msg.clear(); + + EXPECT_EQ(msg.event, "message"); + EXPECT_EQ(msg.data, ""); + EXPECT_EQ(msg.id, ""); +} + +// Test: Complete event parsing +TEST_F(SSEParsingTest, CompleteEventParsing) { + sse::SSEMessage msg; + int retry_ms = 3000; + + EXPECT_FALSE(parse_sse_line("event: notification", msg, retry_ms)); + EXPECT_FALSE(parse_sse_line("id: evt-42", msg, retry_ms)); + EXPECT_FALSE(parse_sse_line("data: {\"type\":\"alert\"}", msg, retry_ms)); + EXPECT_FALSE(parse_sse_line("retry: 1000", msg, retry_ms)); + + // Blank line ends event + EXPECT_TRUE(parse_sse_line("", msg, retry_ms)); + + EXPECT_EQ(msg.event, "notification"); + EXPECT_EQ(msg.id, "evt-42"); + EXPECT_EQ(msg.data, "{\"type\":\"alert\"}"); + EXPECT_EQ(retry_ms, 1000); +} + +//============================================================================== +// Integration Tests with Server +//============================================================================== + +class SSEIntegrationTest : public ::testing::Test { +protected: + void SetUp() override { + stop_server_.store(false); + events_.clear(); + server_ = httplib::detail::make_unique(); + setup_server(); + start_server(); + } + + void TearDown() override { + stop_server_.store(true); + event_cv_.notify_all(); + server_->stop(); + if (server_thread_.joinable()) { server_thread_.join(); } + } + + void setup_server() { + // Simple SSE endpoint + server_->Get("/events", [this](const Request &req, Response &res) { + auto last_id = req.get_header_value("Last-Event-ID"); + if (!last_id.empty()) { last_received_event_id_ = last_id; } + + res.set_chunked_content_provider( + "text/event-stream", [this](size_t /*offset*/, DataSink &sink) { + std::unique_lock lock(event_mutex_); + if (event_cv_.wait_for( + lock, std::chrono::milliseconds(200), [this] { + return !events_.empty() || stop_server_.load(); + })) { + if (stop_server_.load()) { return false; } + if (!events_.empty()) { + std::string event = events_.front(); + events_.erase(events_.begin()); + sink.write(event.data(), event.size()); + return true; + } + } + return !stop_server_.load(); + }); + }); + + // Endpoint that returns error + server_->Get("/error-endpoint", [](const Request &, Response &res) { + res.status = 500; + res.set_content("Internal Server Error", "text/plain"); + }); + + // Endpoint for custom event types + server_->Get("/custom-events", [](const Request &, Response &res) { + res.set_chunked_content_provider( + "text/event-stream", [](size_t offset, DataSink &sink) { + if (offset == 0) { + std::string event = "event: update\ndata: updated\n\n" + "event: delete\ndata: deleted\n\n"; + sink.write(event.data(), event.size()); + } + return false; // End stream after sending + }); + }); + } + + void start_server() { + port_ = server_->bind_to_any_port("localhost"); + server_thread_ = std::thread([this]() { server_->listen_after_bind(); }); + + // Wait for server to start + while (!server_->is_running()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + + int get_port() const { return port_; } + + void send_event(const std::string &event) { + std::lock_guard lock(event_mutex_); + events_.push_back(event); + event_cv_.notify_all(); + } + + std::unique_ptr server_; + std::thread server_thread_; + std::mutex event_mutex_; + std::condition_variable event_cv_; + std::vector events_; + std::atomic stop_server_{false}; + std::string last_received_event_id_; + int port_ = 0; +}; + +// Test: Successful connection and on_open callback +TEST_F(SSEIntegrationTest, SuccessfulConnection) { + // Add a simple endpoint that sends one event and closes + server_->Get("/simple-event", [](const Request &, Response &res) { + res.set_chunked_content_provider( + "text/event-stream", [](size_t offset, DataSink &sink) { + if (offset == 0) { + std::string event = "data: hello\n\n"; + sink.write(event.data(), event.size()); + } + return false; // Close stream after sending + }); + }); + + Client client("localhost", get_port()); + sse::SSEClient sse(client, "/simple-event"); + + std::atomic open_called{false}; + std::atomic message_received{false}; + + sse.on_open([&open_called]() { open_called.store(true); }); + + sse.on_message([&message_received](const sse::SSEMessage &msg) { + if (msg.data == "hello") { message_received.store(true); } + }); + + sse.set_reconnect_interval(100); + sse.set_max_reconnect_attempts(1); + + // Start async + sse.start_async(); + + // Wait for message to be processed + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + sse.stop(); + + EXPECT_TRUE(open_called.load()); + EXPECT_TRUE(message_received.load()); +} + +// Test: on_message callback +TEST_F(SSEIntegrationTest, OnMessageCallback) { + // Endpoint that sends multiple events then closes + server_->Get("/multi-event", [](const Request &, Response &res) { + res.set_chunked_content_provider( + "text/event-stream", [](size_t offset, DataSink &sink) { + if (offset == 0) { + std::string events = "data: message1\n\ndata: message2\n\n"; + sink.write(events.data(), events.size()); + } + return false; + }); + }); + + Client client("localhost", get_port()); + sse::SSEClient sse(client, "/multi-event"); + + std::vector received_messages; + std::mutex messages_mutex; + + sse.on_message([&](const sse::SSEMessage &msg) { + std::lock_guard lock(messages_mutex); + received_messages.push_back(msg.data); + }); + + sse.set_reconnect_interval(100); + sse.set_max_reconnect_attempts(1); + sse.start_async(); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + sse.stop(); + + std::lock_guard lock(messages_mutex); + EXPECT_GE(received_messages.size(), 2u); + if (received_messages.size() >= 2) { + EXPECT_EQ(received_messages[0], "message1"); + EXPECT_EQ(received_messages[1], "message2"); + } +} + +// Test: on_event for specific types +TEST_F(SSEIntegrationTest, OnEventForSpecificTypes) { + Client client("localhost", get_port()); + sse::SSEClient sse(client, "/custom-events"); + + std::atomic update_received{false}; + std::atomic delete_received{false}; + + sse.on_event("update", [&update_received](const sse::SSEMessage &msg) { + if (msg.data == "updated") { update_received.store(true); } + }); + + sse.on_event("delete", [&delete_received](const sse::SSEMessage &msg) { + if (msg.data == "deleted") { delete_received.store(true); } + }); + + sse.set_max_reconnect_attempts(1); + sse.start_async(); + + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + sse.stop(); + + EXPECT_TRUE(update_received.load()); + EXPECT_TRUE(delete_received.load()); +} + +// Test: on_error callback on connection failure +TEST_F(SSEIntegrationTest, OnErrorCallback) { + // Connect to a non-existent port + Client client("localhost", 59999); + sse::SSEClient sse(client, "/events"); + + std::atomic error_called{false}; + Error received_error = Error::Success; + + sse.on_error([&](Error err) { + error_called.store(true); + received_error = err; + }); + + sse.set_reconnect_interval(50); + sse.set_max_reconnect_attempts(1); + + sse.start_async(); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + sse.stop(); + + EXPECT_TRUE(error_called.load()); + EXPECT_NE(received_error, Error::Success); +} + +// Test: Last-Event-ID header sent on reconnect +TEST_F(SSEIntegrationTest, LastEventIdHeader) { + // Endpoint that sends event with ID + server_->Get("/event-with-id", [](const Request &, Response &res) { + res.set_chunked_content_provider( + "text/event-stream", [](size_t offset, DataSink &sink) { + if (offset == 0) { + std::string event = "id: evt-123\ndata: test\n\n"; + sink.write(event.data(), event.size()); + } + return false; + }); + }); + + Client client("localhost", get_port()); + sse::SSEClient sse(client, "/event-with-id"); + + std::atomic id_received{false}; + + sse.on_message([&](const sse::SSEMessage &msg) { + if (!msg.id.empty()) { id_received.store(true); } + }); + + sse.set_reconnect_interval(100); + sse.set_max_reconnect_attempts(1); + sse.start_async(); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + sse.stop(); + + EXPECT_TRUE(id_received.load()); + EXPECT_EQ(sse.last_event_id(), "evt-123"); +} + +// Test: Manual stop +TEST_F(SSEIntegrationTest, ManualStop) { + // Endpoint that sends one event and stays open briefly + std::atomic handler_running{true}; + + server_->Get("/stay-open", [&handler_running](const Request &, + Response &res) { + res.set_chunked_content_provider( + "text/event-stream", [&handler_running](size_t offset, DataSink &sink) { + if (offset == 0) { + std::string event = "data: connected\n\n"; + sink.write(event.data(), event.size()); + } + // Keep connection open while handler_running is true + for (int i = 0; i < 10 && handler_running.load(); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + return false; + }); + }); + + Client client("localhost", get_port()); + sse::SSEClient sse(client, "/stay-open"); + + std::atomic connected{false}; + sse.on_open([&connected]() { connected.store(true); }); + + sse.set_reconnect_interval(100); + sse.set_max_reconnect_attempts(1); + sse.start_async(); + + // Wait for connection to establish + for (int i = 0; i < 20 && !connected.load(); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + EXPECT_TRUE(connected.load()); + EXPECT_TRUE(sse.is_connected()); + + // Signal handler to stop + handler_running.store(false); + + // Stop SSE client + sse.stop(); + EXPECT_FALSE(sse.is_connected()); +} + +// Test: SSEClient with custom headers +TEST_F(SSEIntegrationTest, CustomHeaders) { + // Setup a server endpoint that checks for custom header + std::atomic header_received{false}; + + server_->Get("/header-check", [&](const Request &req, Response &res) { + if (req.get_header_value("X-Custom-Header") == "custom-value") { + header_received.store(true); + } + res.set_chunked_content_provider("text/event-stream", + [](size_t, DataSink &) { return false; }); + }); + + Client client("localhost", get_port()); + Headers headers = {{"X-Custom-Header", "custom-value"}}; + sse::SSEClient sse(client, "/header-check", headers); + + sse.set_max_reconnect_attempts(1); + sse.start_async(); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + sse.stop(); + + EXPECT_TRUE(header_received.load()); +} + +// Test: Reconnect interval configuration +TEST_F(SSEIntegrationTest, ReconnectIntervalConfiguration) { + Client client("localhost", get_port()); + sse::SSEClient sse(client, "/events"); + + auto &result = sse.set_reconnect_interval(500); + // Builder pattern should return reference to self + EXPECT_EQ(&result, &sse); +} + +// Test: Max reconnect attempts +TEST_F(SSEIntegrationTest, MaxReconnectAttempts) { + // Connect to non-existent port to force reconnects + Client client("localhost", 59998); + sse::SSEClient sse(client, "/events"); + + std::atomic error_count{0}; + + sse.on_error([&](Error) { error_count.fetch_add(1); }); + + sse.set_reconnect_interval(50); + sse.set_max_reconnect_attempts(2); + + auto start = std::chrono::steady_clock::now(); + sse.start(); // Blocking call + auto end = std::chrono::steady_clock::now(); + + // Should have stopped after 2 failed attempts + EXPECT_GE(error_count.load(), 2); + + // Should not have taken too long (max 2 attempts * 50ms + overhead) + auto duration = + std::chrono::duration_cast(end - start); + EXPECT_LT(duration.count(), 1000); +} + +// Test: Multi-line data in integration +TEST_F(SSEIntegrationTest, MultiLineDataIntegration) { + // Endpoint with multi-line data + server_->Get("/multiline-data", [](const Request &, Response &res) { + res.set_chunked_content_provider( + "text/event-stream", [](size_t offset, DataSink &sink) { + if (offset == 0) { + std::string event = "data: line1\ndata: line2\ndata: line3\n\n"; + sink.write(event.data(), event.size()); + } + return false; + }); + }); + + Client client("localhost", get_port()); + sse::SSEClient sse(client, "/multiline-data"); + + std::string received_data; + std::mutex data_mutex; + + sse.on_message([&](const sse::SSEMessage &msg) { + std::lock_guard lock(data_mutex); + received_data = msg.data; + }); + + sse.set_reconnect_interval(100); + sse.set_max_reconnect_attempts(1); + sse.start_async(); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + sse.stop(); + + std::lock_guard lock(data_mutex); + EXPECT_EQ(received_data, "line1\nline2\nline3"); +} + +// Test: Auto-reconnect after server disconnection +TEST_F(SSEIntegrationTest, AutoReconnectAfterDisconnect) { + std::atomic connection_count{0}; + std::atomic message_count{0}; + + // Endpoint that sends one event and closes, forcing reconnect + server_->Get("/reconnect-test", + [&connection_count](const Request &, Response &res) { + connection_count.fetch_add(1); + res.set_chunked_content_provider( + "text/event-stream", [](size_t offset, DataSink &sink) { + if (offset == 0) { + std::string event = "data: hello\n\n"; + sink.write(event.data(), event.size()); + } + return false; // Close connection after sending + }); + }); + + Client client("localhost", get_port()); + sse::SSEClient sse(client, "/reconnect-test"); + + sse.on_message([&message_count](const sse::SSEMessage &) { + message_count.fetch_add(1); + }); + + sse.set_reconnect_interval(100); + sse.set_max_reconnect_attempts(3); + sse.start_async(); + + // Wait long enough for multiple reconnects + std::this_thread::sleep_for(std::chrono::milliseconds(800)); + sse.stop(); + + // Should have connected multiple times (initial + reconnects) + EXPECT_GE(connection_count.load(), 2); + // Should have received messages from multiple connections + EXPECT_GE(message_count.load(), 2); +} + +// Test: Last-Event-ID sent on reconnect +TEST_F(SSEIntegrationTest, LastEventIdSentOnReconnect) { + std::atomic connection_count{0}; + std::vector received_last_event_ids; + std::mutex id_mutex; + + // Endpoint that checks Last-Event-ID header and sends event with ID + server_->Get("/reconnect-with-id", [&](const Request &req, Response &res) { + int conn = connection_count.fetch_add(1); + + // Capture the Last-Event-ID header from each connection + { + std::lock_guard lock(id_mutex); + received_last_event_ids.push_back(req.get_header_value("Last-Event-ID")); + } + + res.set_chunked_content_provider( + "text/event-stream", [conn](size_t offset, DataSink &sink) { + if (offset == 0) { + std::string event = + "id: event-" + std::to_string(conn) + "\ndata: msg\n\n"; + sink.write(event.data(), event.size()); + } + return false; + }); + }); + + Client client("localhost", get_port()); + sse::SSEClient sse(client, "/reconnect-with-id"); + + sse.set_reconnect_interval(100); + sse.set_max_reconnect_attempts(3); + sse.start_async(); + + // Wait for at least 2 connections + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + sse.stop(); + + // Verify behavior + std::lock_guard lock(id_mutex); + EXPECT_GE(received_last_event_ids.size(), 2u); + + // First connection should have no Last-Event-ID + if (!received_last_event_ids.empty()) { + EXPECT_EQ(received_last_event_ids[0], ""); + } + + // Second connection should have Last-Event-ID from first connection + if (received_last_event_ids.size() >= 2) { + EXPECT_EQ(received_last_event_ids[1], "event-0"); + } +}