1
0
mirror of synced 2025-12-18 16:34:09 +03:00

Implement SSEClient (#2308)

* Implement SSEClient

* Fix Windows problem
This commit is contained in:
yhirose
2025-12-15 00:00:42 -05:00
committed by GitHub
parent 7eb03e81fc
commit 51b704b902
6 changed files with 1626 additions and 382 deletions

182
README-sse.md Normal file
View File

@@ -0,0 +1,182 @@
# SSEClient - Server-Sent Events Client
A simple, EventSource-like SSE client for C++11.
## Features
- **Auto-reconnect**: Automatically reconnects on connection loss
- **Last-Event-ID**: Sends last received ID on reconnect for resumption
- **retry field**: Respects server's reconnect interval
- **Event types**: Supports custom event types via `on_event()`
- **Async support**: Run in background thread with `start_async()`
- **C++11 compatible**: No C++14/17/20 features required
## Quick Start
```cpp
httplib::Client cli("http://localhost:8080");
httplib::sse::SSEClient sse(cli, "/events");
sse.on_message([](const httplib::sse::SSEMessage &msg) {
std::cout << "Event: " << msg.event << std::endl;
std::cout << "Data: " << msg.data << std::endl;
});
sse.start(); // Blocking, with auto-reconnect
```
## API Reference
### SSEMessage
```cpp
struct SSEMessage {
std::string event; // Event type (default: "message")
std::string data; // Event payload
std::string id; // Event ID
};
```
### SSEClient
#### Constructor
```cpp
// Basic
SSEClient(Client &client, const std::string &path);
// With custom headers
SSEClient(Client &client, const std::string &path, const Headers &headers);
```
#### Event Handlers
```cpp
// Called for all events (or events without a specific handler)
sse.on_message([](const SSEMessage &msg) { });
// Called for specific event types
sse.on_event("update", [](const SSEMessage &msg) { });
sse.on_event("delete", [](const SSEMessage &msg) { });
// Called when connection is established
sse.on_open([]() { });
// Called on connection errors
sse.on_error([](httplib::Error err) { });
```
#### Configuration
```cpp
// Set reconnect interval (default: 3000ms)
sse.set_reconnect_interval(5000);
// Set max reconnect attempts (default: 0 = unlimited)
sse.set_max_reconnect_attempts(10);
```
#### Control
```cpp
// Blocking start with auto-reconnect
sse.start();
// Non-blocking start (runs in background thread)
sse.start_async();
// Stop the client (thread-safe)
sse.stop();
```
#### State
```cpp
bool connected = sse.is_connected();
const std::string &id = sse.last_event_id();
```
## Examples
### Basic Usage
```cpp
httplib::Client cli("http://localhost:8080");
httplib::sse::SSEClient sse(cli, "/events");
sse.on_message([](const httplib::sse::SSEMessage &msg) {
std::cout << msg.data << std::endl;
});
sse.start();
```
### With Custom Event Types
```cpp
httplib::sse::SSEClient sse(cli, "/events");
sse.on_event("notification", [](const httplib::sse::SSEMessage &msg) {
std::cout << "Notification: " << msg.data << std::endl;
});
sse.on_event("update", [](const httplib::sse::SSEMessage &msg) {
std::cout << "Update: " << msg.data << std::endl;
});
sse.start();
```
### Async with Stop
```cpp
httplib::sse::SSEClient sse(cli, "/events");
sse.on_message([](const httplib::sse::SSEMessage &msg) {
std::cout << msg.data << std::endl;
});
sse.start_async(); // Returns immediately
// ... do other work ...
sse.stop(); // Stop when done
```
### With Custom Headers (e.g., Authentication)
```cpp
httplib::Headers headers = {
{"Authorization", "Bearer token123"}
};
httplib::sse::SSEClient sse(cli, "/events", headers);
sse.start();
```
### Error Handling
```cpp
sse.on_error([](httplib::Error err) {
std::cerr << "Error: " << httplib::to_string(err) << std::endl;
});
sse.set_reconnect_interval(1000);
sse.set_max_reconnect_attempts(5);
sse.start();
```
## SSE Protocol
The client parses SSE format according to the [W3C specification](https://html.spec.whatwg.org/multipage/server-sent-events.html):
```
event: custom-type
id: 123
data: {"message": "hello"}
data: simple message
: this is a comment (ignored)
```

View File

@@ -1206,8 +1206,8 @@ std::string decoded_component = httplib::decode_uri_component(encoded_component)
Use `encode_uri()` for full URLs and `encode_uri_component()` for individual query parameters or path segments.
Streaming API
-------------
Stream API
----------
Process large responses without loading everything into memory.
@@ -1234,6 +1234,28 @@ All HTTP methods are supported: `stream::Get`, `Post`, `Put`, `Patch`, `Delete`,
See [README-stream.md](README-stream.md) for more details.
SSE Client
----------
```cpp
#include <httplib.h>
int main() {
httplib::Client cli("http://localhost:8080");
httplib::sse::SSEClient sse(cli, "/events");
sse.on_message([](const httplib::sse::SSEMessage &msg) {
std::cout << "Event: " << msg.event << std::endl;
std::cout << "Data: " << msg.data << std::endl;
});
sse.start(); // Blocking, with auto-reconnect
return 0;
}
```
See [README-sse.md](README-sse.md) for more details.
Split httplib.h into .h and .cc
-------------------------------

View File

@@ -18,7 +18,7 @@ ZLIB_SUPPORT = -DCPPHTTPLIB_ZLIB_SUPPORT -lz
BROTLI_DIR = $(PREFIX)/opt/brotli
BROTLI_SUPPORT = -DCPPHTTPLIB_BROTLI_SUPPORT -I$(BROTLI_DIR)/include -L$(BROTLI_DIR)/lib -lbrotlicommon -lbrotlienc -lbrotlidec
all: server client hello simplecli simplesvr upload redirect ssesvr ssecli ssecli-stream benchmark one_time_request server_and_client accept_header
all: server client hello simplecli simplesvr upload redirect ssesvr ssecli benchmark one_time_request server_and_client accept_header
server : server.cc ../httplib.h Makefile
$(CXX) -o server $(CXXFLAGS) server.cc $(OPENSSL_SUPPORT) $(ZLIB_SUPPORT) $(BROTLI_SUPPORT)
@@ -47,9 +47,6 @@ ssesvr : ssesvr.cc ../httplib.h Makefile
ssecli : ssecli.cc ../httplib.h Makefile
$(CXX) -o ssecli $(CXXFLAGS) ssecli.cc $(OPENSSL_SUPPORT) $(ZLIB_SUPPORT) $(BROTLI_SUPPORT)
ssecli-stream : ssecli-stream.cc ../httplib.h ../httplib.h Makefile
$(CXX) -o ssecli-stream $(CXXFLAGS) ssecli-stream.cc $(OPENSSL_SUPPORT) $(ZLIB_SUPPORT) $(BROTLI_SUPPORT)
benchmark : benchmark.cc ../httplib.h Makefile
$(CXX) -o benchmark $(CXXFLAGS) benchmark.cc $(OPENSSL_SUPPORT) $(ZLIB_SUPPORT) $(BROTLI_SUPPORT)
@@ -67,4 +64,4 @@ pem:
openssl req -new -key key.pem | openssl x509 -days 3650 -req -signkey key.pem > cert.pem
clean:
rm server client hello simplecli simplesvr upload redirect ssesvr ssecli ssecli-stream benchmark one_time_request server_and_client accept_header *.pem
rm server client hello simplecli simplesvr upload redirect ssesvr ssecli benchmark one_time_request server_and_client accept_header *.pem

View File

@@ -6,16 +6,52 @@
//
#include <httplib.h>
#include <csignal>
#include <iostream>
using namespace std;
int main(void) {
httplib::Client("http://localhost:1234")
.Get("/event1", [&](const char *data, size_t data_length) {
std::cout << string(data, data_length);
return true;
});
// Global SSEClient pointer for signal handling
httplib::sse::SSEClient *g_sse = nullptr;
void signal_handler(int) {
if (g_sse) { g_sse->stop(); }
}
int main(void) {
// Configuration
const string host = "http://localhost:1234";
const string path = "/event1";
cout << "SSE Client using httplib::sse::SSEClient\n";
cout << "Connecting to: " << host << path << "\n";
cout << "Press Ctrl+C to exit\n\n";
httplib::Client cli(host);
httplib::sse::SSEClient sse(cli, path);
// Set up signal handler for graceful shutdown
g_sse = &sse;
signal(SIGINT, signal_handler);
// Event handlers
sse.on_open([]() { cout << "[Connected]\n\n"; });
sse.on_message([](const httplib::sse::SSEMessage &msg) {
cout << "Event: " << msg.event << "\n";
cout << "Data: " << msg.data << "\n";
if (!msg.id.empty()) { cout << "ID: " << msg.id << "\n"; }
cout << "\n";
});
sse.on_error([](httplib::Error err) {
cerr << "[Error] " << httplib::to_string(err) << "\n";
});
// Start with auto-reconnect (blocking)
sse.start();
cout << "\n[Disconnected]\n";
return 0;
}

1019
httplib.h

File diff suppressed because it is too large Load Diff

View File

@@ -13255,3 +13255,729 @@ TEST(ETagTest, NegativeFileModificationTime) {
t.join();
std::remove(fname);
}
//==============================================================================
// SSE Parsing Tests
//==============================================================================
class SSEParsingTest : public ::testing::Test {
protected:
// Test helper that mimics SSE parsing behavior
static bool parse_sse_line(const std::string &line, sse::SSEMessage &msg,
int &retry_ms) {
// Blank line signals end of event
if (line.empty() || line == "\r") { return true; }
// Lines starting with ':' are comments (ignored)
if (!line.empty() && line[0] == ':') { return false; }
// Find the colon separator
auto colon_pos = line.find(':');
if (colon_pos == std::string::npos) {
// Line with no colon is treated as field name with empty value
return false;
}
std::string field = line.substr(0, colon_pos);
std::string value;
// Value starts after colon, skip optional single space
if (colon_pos + 1 < line.size()) {
size_t value_start = colon_pos + 1;
if (line[value_start] == ' ') { value_start++; }
value = line.substr(value_start);
// Remove trailing \r if present
if (!value.empty() && value.back() == '\r') { value.pop_back(); }
}
// Handle known fields
if (field == "event") {
msg.event = value;
} else if (field == "data") {
// Multiple data lines are concatenated with newlines
if (!msg.data.empty()) { msg.data += "\n"; }
msg.data += value;
} else if (field == "id") {
// Empty id is valid (clears the last event ID)
msg.id = value;
} else if (field == "retry") {
// Parse retry interval in milliseconds
try {
retry_ms = std::stoi(value);
} catch (...) {
// Invalid retry value, ignore
}
}
// Unknown fields are ignored per SSE spec
return false;
}
};
// Test: Single-line data
TEST_F(SSEParsingTest, SingleLineData) {
sse::SSEMessage msg;
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line("data: hello", msg, retry_ms));
EXPECT_EQ(msg.data, "hello");
EXPECT_EQ(msg.event, "message");
// Blank line ends event
EXPECT_TRUE(parse_sse_line("", msg, retry_ms));
}
// Test: Multi-line data
TEST_F(SSEParsingTest, MultiLineData) {
sse::SSEMessage msg;
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line("data: line1", msg, retry_ms));
EXPECT_FALSE(parse_sse_line("data: line2", msg, retry_ms));
EXPECT_FALSE(parse_sse_line("data: line3", msg, retry_ms));
EXPECT_EQ(msg.data, "line1\nline2\nline3");
}
// Test: Custom event types
TEST_F(SSEParsingTest, CustomEventType) {
sse::SSEMessage msg;
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line("event: update", msg, retry_ms));
EXPECT_FALSE(parse_sse_line("data: payload", msg, retry_ms));
EXPECT_EQ(msg.event, "update");
EXPECT_EQ(msg.data, "payload");
}
// Test: Event ID handling
TEST_F(SSEParsingTest, EventIdHandling) {
sse::SSEMessage msg;
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line("id: 12345", msg, retry_ms));
EXPECT_FALSE(parse_sse_line("data: test", msg, retry_ms));
EXPECT_EQ(msg.id, "12345");
}
// Test: Empty event ID (clears last event ID)
TEST_F(SSEParsingTest, EmptyEventId) {
sse::SSEMessage msg;
msg.id = "previous";
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line("id:", msg, retry_ms));
EXPECT_EQ(msg.id, "");
}
// Test: Retry field parsing
TEST_F(SSEParsingTest, RetryFieldParsing) {
sse::SSEMessage msg;
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line("retry: 5000", msg, retry_ms));
EXPECT_EQ(retry_ms, 5000);
}
// Test: Invalid retry value
TEST_F(SSEParsingTest, InvalidRetryValue) {
sse::SSEMessage msg;
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line("retry: invalid", msg, retry_ms));
EXPECT_EQ(retry_ms, 3000); // Unchanged
}
// Test: Comments (lines starting with :)
TEST_F(SSEParsingTest, CommentsIgnored) {
sse::SSEMessage msg;
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line(": this is a comment", msg, retry_ms));
EXPECT_EQ(msg.data, "");
EXPECT_EQ(msg.event, "message");
}
// Test: Colon in value
TEST_F(SSEParsingTest, ColonInValue) {
sse::SSEMessage msg;
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line("data: hello:world:test", msg, retry_ms));
EXPECT_EQ(msg.data, "hello:world:test");
}
// Test: Line with no colon (field name only)
TEST_F(SSEParsingTest, FieldNameOnly) {
sse::SSEMessage msg;
int retry_ms = 3000;
// According to SSE spec, this is treated as field name with empty value
EXPECT_FALSE(parse_sse_line("data", msg, retry_ms));
// Since we don't recognize "data" without colon, data should be empty
EXPECT_EQ(msg.data, "");
}
// Test: Trailing \r handling
TEST_F(SSEParsingTest, TrailingCarriageReturn) {
sse::SSEMessage msg;
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line("data: hello\r", msg, retry_ms));
EXPECT_EQ(msg.data, "hello");
}
// Test: Unknown fields ignored
TEST_F(SSEParsingTest, UnknownFieldsIgnored) {
sse::SSEMessage msg;
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line("unknown: value", msg, retry_ms));
EXPECT_EQ(msg.data, "");
EXPECT_EQ(msg.event, "message");
}
// Test: Space after colon is optional
TEST_F(SSEParsingTest, SpaceAfterColonOptional) {
sse::SSEMessage msg1, msg2;
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line("data: hello", msg1, retry_ms));
EXPECT_FALSE(parse_sse_line("data:hello", msg2, retry_ms));
EXPECT_EQ(msg1.data, "hello");
EXPECT_EQ(msg2.data, "hello");
}
// Test: SSEMessage clear
TEST_F(SSEParsingTest, MessageClear) {
sse::SSEMessage msg;
msg.event = "custom";
msg.data = "some data";
msg.id = "123";
msg.clear();
EXPECT_EQ(msg.event, "message");
EXPECT_EQ(msg.data, "");
EXPECT_EQ(msg.id, "");
}
// Test: Complete event parsing
TEST_F(SSEParsingTest, CompleteEventParsing) {
sse::SSEMessage msg;
int retry_ms = 3000;
EXPECT_FALSE(parse_sse_line("event: notification", msg, retry_ms));
EXPECT_FALSE(parse_sse_line("id: evt-42", msg, retry_ms));
EXPECT_FALSE(parse_sse_line("data: {\"type\":\"alert\"}", msg, retry_ms));
EXPECT_FALSE(parse_sse_line("retry: 1000", msg, retry_ms));
// Blank line ends event
EXPECT_TRUE(parse_sse_line("", msg, retry_ms));
EXPECT_EQ(msg.event, "notification");
EXPECT_EQ(msg.id, "evt-42");
EXPECT_EQ(msg.data, "{\"type\":\"alert\"}");
EXPECT_EQ(retry_ms, 1000);
}
//==============================================================================
// Integration Tests with Server
//==============================================================================
class SSEIntegrationTest : public ::testing::Test {
protected:
void SetUp() override {
stop_server_.store(false);
events_.clear();
server_ = httplib::detail::make_unique<Server>();
setup_server();
start_server();
}
void TearDown() override {
stop_server_.store(true);
event_cv_.notify_all();
server_->stop();
if (server_thread_.joinable()) { server_thread_.join(); }
}
void setup_server() {
// Simple SSE endpoint
server_->Get("/events", [this](const Request &req, Response &res) {
auto last_id = req.get_header_value("Last-Event-ID");
if (!last_id.empty()) { last_received_event_id_ = last_id; }
res.set_chunked_content_provider(
"text/event-stream", [this](size_t /*offset*/, DataSink &sink) {
std::unique_lock<std::mutex> lock(event_mutex_);
if (event_cv_.wait_for(
lock, std::chrono::milliseconds(200), [this] {
return !events_.empty() || stop_server_.load();
})) {
if (stop_server_.load()) { return false; }
if (!events_.empty()) {
std::string event = events_.front();
events_.erase(events_.begin());
sink.write(event.data(), event.size());
return true;
}
}
return !stop_server_.load();
});
});
// Endpoint that returns error
server_->Get("/error-endpoint", [](const Request &, Response &res) {
res.status = 500;
res.set_content("Internal Server Error", "text/plain");
});
// Endpoint for custom event types
server_->Get("/custom-events", [](const Request &, Response &res) {
res.set_chunked_content_provider(
"text/event-stream", [](size_t offset, DataSink &sink) {
if (offset == 0) {
std::string event = "event: update\ndata: updated\n\n"
"event: delete\ndata: deleted\n\n";
sink.write(event.data(), event.size());
}
return false; // End stream after sending
});
});
}
void start_server() {
port_ = server_->bind_to_any_port("localhost");
server_thread_ = std::thread([this]() { server_->listen_after_bind(); });
// Wait for server to start
while (!server_->is_running()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
int get_port() const { return port_; }
void send_event(const std::string &event) {
std::lock_guard<std::mutex> lock(event_mutex_);
events_.push_back(event);
event_cv_.notify_all();
}
std::unique_ptr<Server> server_;
std::thread server_thread_;
std::mutex event_mutex_;
std::condition_variable event_cv_;
std::vector<std::string> events_;
std::atomic<bool> stop_server_{false};
std::string last_received_event_id_;
int port_ = 0;
};
// Test: Successful connection and on_open callback
TEST_F(SSEIntegrationTest, SuccessfulConnection) {
// Add a simple endpoint that sends one event and closes
server_->Get("/simple-event", [](const Request &, Response &res) {
res.set_chunked_content_provider(
"text/event-stream", [](size_t offset, DataSink &sink) {
if (offset == 0) {
std::string event = "data: hello\n\n";
sink.write(event.data(), event.size());
}
return false; // Close stream after sending
});
});
Client client("localhost", get_port());
sse::SSEClient sse(client, "/simple-event");
std::atomic<bool> open_called{false};
std::atomic<bool> message_received{false};
sse.on_open([&open_called]() { open_called.store(true); });
sse.on_message([&message_received](const sse::SSEMessage &msg) {
if (msg.data == "hello") { message_received.store(true); }
});
sse.set_reconnect_interval(100);
sse.set_max_reconnect_attempts(1);
// Start async
sse.start_async();
// Wait for message to be processed
std::this_thread::sleep_for(std::chrono::milliseconds(500));
sse.stop();
EXPECT_TRUE(open_called.load());
EXPECT_TRUE(message_received.load());
}
// Test: on_message callback
TEST_F(SSEIntegrationTest, OnMessageCallback) {
// Endpoint that sends multiple events then closes
server_->Get("/multi-event", [](const Request &, Response &res) {
res.set_chunked_content_provider(
"text/event-stream", [](size_t offset, DataSink &sink) {
if (offset == 0) {
std::string events = "data: message1\n\ndata: message2\n\n";
sink.write(events.data(), events.size());
}
return false;
});
});
Client client("localhost", get_port());
sse::SSEClient sse(client, "/multi-event");
std::vector<std::string> received_messages;
std::mutex messages_mutex;
sse.on_message([&](const sse::SSEMessage &msg) {
std::lock_guard<std::mutex> lock(messages_mutex);
received_messages.push_back(msg.data);
});
sse.set_reconnect_interval(100);
sse.set_max_reconnect_attempts(1);
sse.start_async();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
sse.stop();
std::lock_guard<std::mutex> lock(messages_mutex);
EXPECT_GE(received_messages.size(), 2u);
if (received_messages.size() >= 2) {
EXPECT_EQ(received_messages[0], "message1");
EXPECT_EQ(received_messages[1], "message2");
}
}
// Test: on_event for specific types
TEST_F(SSEIntegrationTest, OnEventForSpecificTypes) {
Client client("localhost", get_port());
sse::SSEClient sse(client, "/custom-events");
std::atomic<bool> update_received{false};
std::atomic<bool> delete_received{false};
sse.on_event("update", [&update_received](const sse::SSEMessage &msg) {
if (msg.data == "updated") { update_received.store(true); }
});
sse.on_event("delete", [&delete_received](const sse::SSEMessage &msg) {
if (msg.data == "deleted") { delete_received.store(true); }
});
sse.set_max_reconnect_attempts(1);
sse.start_async();
std::this_thread::sleep_for(std::chrono::milliseconds(300));
sse.stop();
EXPECT_TRUE(update_received.load());
EXPECT_TRUE(delete_received.load());
}
// Test: on_error callback on connection failure
TEST_F(SSEIntegrationTest, OnErrorCallback) {
// Connect to a non-existent port
Client client("localhost", 59999);
sse::SSEClient sse(client, "/events");
std::atomic<bool> error_called{false};
Error received_error = Error::Success;
sse.on_error([&](Error err) {
error_called.store(true);
received_error = err;
});
sse.set_reconnect_interval(50);
sse.set_max_reconnect_attempts(1);
sse.start_async();
std::this_thread::sleep_for(std::chrono::milliseconds(200));
sse.stop();
EXPECT_TRUE(error_called.load());
EXPECT_NE(received_error, Error::Success);
}
// Test: Last-Event-ID header sent on reconnect
TEST_F(SSEIntegrationTest, LastEventIdHeader) {
// Endpoint that sends event with ID
server_->Get("/event-with-id", [](const Request &, Response &res) {
res.set_chunked_content_provider(
"text/event-stream", [](size_t offset, DataSink &sink) {
if (offset == 0) {
std::string event = "id: evt-123\ndata: test\n\n";
sink.write(event.data(), event.size());
}
return false;
});
});
Client client("localhost", get_port());
sse::SSEClient sse(client, "/event-with-id");
std::atomic<bool> id_received{false};
sse.on_message([&](const sse::SSEMessage &msg) {
if (!msg.id.empty()) { id_received.store(true); }
});
sse.set_reconnect_interval(100);
sse.set_max_reconnect_attempts(1);
sse.start_async();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
sse.stop();
EXPECT_TRUE(id_received.load());
EXPECT_EQ(sse.last_event_id(), "evt-123");
}
// Test: Manual stop
TEST_F(SSEIntegrationTest, ManualStop) {
// Endpoint that sends one event and stays open briefly
std::atomic<bool> handler_running{true};
server_->Get("/stay-open", [&handler_running](const Request &,
Response &res) {
res.set_chunked_content_provider(
"text/event-stream", [&handler_running](size_t offset, DataSink &sink) {
if (offset == 0) {
std::string event = "data: connected\n\n";
sink.write(event.data(), event.size());
}
// Keep connection open while handler_running is true
for (int i = 0; i < 10 && handler_running.load(); ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
return false;
});
});
Client client("localhost", get_port());
sse::SSEClient sse(client, "/stay-open");
std::atomic<bool> connected{false};
sse.on_open([&connected]() { connected.store(true); });
sse.set_reconnect_interval(100);
sse.set_max_reconnect_attempts(1);
sse.start_async();
// Wait for connection to establish
for (int i = 0; i < 20 && !connected.load(); ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
EXPECT_TRUE(connected.load());
EXPECT_TRUE(sse.is_connected());
// Signal handler to stop
handler_running.store(false);
// Stop SSE client
sse.stop();
EXPECT_FALSE(sse.is_connected());
}
// Test: SSEClient with custom headers
TEST_F(SSEIntegrationTest, CustomHeaders) {
// Setup a server endpoint that checks for custom header
std::atomic<bool> header_received{false};
server_->Get("/header-check", [&](const Request &req, Response &res) {
if (req.get_header_value("X-Custom-Header") == "custom-value") {
header_received.store(true);
}
res.set_chunked_content_provider("text/event-stream",
[](size_t, DataSink &) { return false; });
});
Client client("localhost", get_port());
Headers headers = {{"X-Custom-Header", "custom-value"}};
sse::SSEClient sse(client, "/header-check", headers);
sse.set_max_reconnect_attempts(1);
sse.start_async();
std::this_thread::sleep_for(std::chrono::milliseconds(200));
sse.stop();
EXPECT_TRUE(header_received.load());
}
// Test: Reconnect interval configuration
TEST_F(SSEIntegrationTest, ReconnectIntervalConfiguration) {
Client client("localhost", get_port());
sse::SSEClient sse(client, "/events");
auto &result = sse.set_reconnect_interval(500);
// Builder pattern should return reference to self
EXPECT_EQ(&result, &sse);
}
// Test: Max reconnect attempts
TEST_F(SSEIntegrationTest, MaxReconnectAttempts) {
// Connect to non-existent port to force reconnects
Client client("localhost", 59998);
sse::SSEClient sse(client, "/events");
std::atomic<int> error_count{0};
sse.on_error([&](Error) { error_count.fetch_add(1); });
sse.set_reconnect_interval(50);
sse.set_max_reconnect_attempts(2);
auto start = std::chrono::steady_clock::now();
sse.start(); // Blocking call
auto end = std::chrono::steady_clock::now();
// Should have stopped after 2 failed attempts
EXPECT_GE(error_count.load(), 2);
// Should not have taken too long (max 2 attempts * 50ms + overhead)
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
#ifdef _WIN32
// Windows is much slower for socket connection failures
EXPECT_LT(duration.count(), 7000);
#else
EXPECT_LT(duration.count(), 1000);
#endif
}
// Test: Multi-line data in integration
TEST_F(SSEIntegrationTest, MultiLineDataIntegration) {
// Endpoint with multi-line data
server_->Get("/multiline-data", [](const Request &, Response &res) {
res.set_chunked_content_provider(
"text/event-stream", [](size_t offset, DataSink &sink) {
if (offset == 0) {
std::string event = "data: line1\ndata: line2\ndata: line3\n\n";
sink.write(event.data(), event.size());
}
return false;
});
});
Client client("localhost", get_port());
sse::SSEClient sse(client, "/multiline-data");
std::string received_data;
std::mutex data_mutex;
sse.on_message([&](const sse::SSEMessage &msg) {
std::lock_guard<std::mutex> lock(data_mutex);
received_data = msg.data;
});
sse.set_reconnect_interval(100);
sse.set_max_reconnect_attempts(1);
sse.start_async();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
sse.stop();
std::lock_guard<std::mutex> lock(data_mutex);
EXPECT_EQ(received_data, "line1\nline2\nline3");
}
// Test: Auto-reconnect after server disconnection
TEST_F(SSEIntegrationTest, AutoReconnectAfterDisconnect) {
std::atomic<int> connection_count{0};
std::atomic<int> message_count{0};
// Endpoint that sends one event and closes, forcing reconnect
server_->Get("/reconnect-test",
[&connection_count](const Request &, Response &res) {
connection_count.fetch_add(1);
res.set_chunked_content_provider(
"text/event-stream", [](size_t offset, DataSink &sink) {
if (offset == 0) {
std::string event = "data: hello\n\n";
sink.write(event.data(), event.size());
}
return false; // Close connection after sending
});
});
Client client("localhost", get_port());
sse::SSEClient sse(client, "/reconnect-test");
sse.on_message([&message_count](const sse::SSEMessage &) {
message_count.fetch_add(1);
});
sse.set_reconnect_interval(100);
sse.set_max_reconnect_attempts(3);
sse.start_async();
// Wait long enough for multiple reconnects
std::this_thread::sleep_for(std::chrono::milliseconds(800));
sse.stop();
// Should have connected multiple times (initial + reconnects)
EXPECT_GE(connection_count.load(), 2);
// Should have received messages from multiple connections
EXPECT_GE(message_count.load(), 2);
}
// Test: Last-Event-ID sent on reconnect
TEST_F(SSEIntegrationTest, LastEventIdSentOnReconnect) {
std::atomic<int> connection_count{0};
std::vector<std::string> received_last_event_ids;
std::mutex id_mutex;
// Endpoint that checks Last-Event-ID header and sends event with ID
server_->Get("/reconnect-with-id", [&](const Request &req, Response &res) {
int conn = connection_count.fetch_add(1);
// Capture the Last-Event-ID header from each connection
{
std::lock_guard<std::mutex> lock(id_mutex);
received_last_event_ids.push_back(req.get_header_value("Last-Event-ID"));
}
res.set_chunked_content_provider(
"text/event-stream", [conn](size_t offset, DataSink &sink) {
if (offset == 0) {
std::string event =
"id: event-" + std::to_string(conn) + "\ndata: msg\n\n";
sink.write(event.data(), event.size());
}
return false;
});
});
Client client("localhost", get_port());
sse::SSEClient sse(client, "/reconnect-with-id");
sse.set_reconnect_interval(100);
sse.set_max_reconnect_attempts(3);
sse.start_async();
// Wait for at least 2 connections
std::this_thread::sleep_for(std::chrono::milliseconds(500));
sse.stop();
// Verify behavior
std::lock_guard<std::mutex> lock(id_mutex);
EXPECT_GE(received_last_event_ids.size(), 2u);
// First connection should have no Last-Event-ID
if (!received_last_event_ids.empty()) {
EXPECT_EQ(received_last_event_ids[0], "");
}
// Second connection should have Last-Event-ID from first connection
if (received_last_event_ids.size() >= 2) {
EXPECT_EQ(received_last_event_ids[1], "event-0");
}
}