You've already forked cpp-httplib
Implement SSEClient
This commit is contained in:
721
test/test.cc
721
test/test.cc
@@ -13255,3 +13255,724 @@ TEST(ETagTest, NegativeFileModificationTime) {
|
||||
t.join();
|
||||
std::remove(fname);
|
||||
}
|
||||
|
||||
//==============================================================================
|
||||
// SSE Parsing Tests
|
||||
//==============================================================================
|
||||
|
||||
class SSEParsingTest : public ::testing::Test {
|
||||
protected:
|
||||
// Test helper that mimics SSE parsing behavior
|
||||
static bool parse_sse_line(const std::string &line, sse::SSEMessage &msg,
|
||||
int &retry_ms) {
|
||||
// Blank line signals end of event
|
||||
if (line.empty() || line == "\r") { return true; }
|
||||
|
||||
// Lines starting with ':' are comments (ignored)
|
||||
if (!line.empty() && line[0] == ':') { return false; }
|
||||
|
||||
// Find the colon separator
|
||||
auto colon_pos = line.find(':');
|
||||
if (colon_pos == std::string::npos) {
|
||||
// Line with no colon is treated as field name with empty value
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string field = line.substr(0, colon_pos);
|
||||
std::string value;
|
||||
|
||||
// Value starts after colon, skip optional single space
|
||||
if (colon_pos + 1 < line.size()) {
|
||||
size_t value_start = colon_pos + 1;
|
||||
if (line[value_start] == ' ') { value_start++; }
|
||||
value = line.substr(value_start);
|
||||
// Remove trailing \r if present
|
||||
if (!value.empty() && value.back() == '\r') { value.pop_back(); }
|
||||
}
|
||||
|
||||
// Handle known fields
|
||||
if (field == "event") {
|
||||
msg.event = value;
|
||||
} else if (field == "data") {
|
||||
// Multiple data lines are concatenated with newlines
|
||||
if (!msg.data.empty()) { msg.data += "\n"; }
|
||||
msg.data += value;
|
||||
} else if (field == "id") {
|
||||
// Empty id is valid (clears the last event ID)
|
||||
msg.id = value;
|
||||
} else if (field == "retry") {
|
||||
// Parse retry interval in milliseconds
|
||||
try {
|
||||
retry_ms = std::stoi(value);
|
||||
} catch (...) {
|
||||
// Invalid retry value, ignore
|
||||
}
|
||||
}
|
||||
// Unknown fields are ignored per SSE spec
|
||||
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
// Test: Single-line data
|
||||
TEST_F(SSEParsingTest, SingleLineData) {
|
||||
sse::SSEMessage msg;
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line("data: hello", msg, retry_ms));
|
||||
EXPECT_EQ(msg.data, "hello");
|
||||
EXPECT_EQ(msg.event, "message");
|
||||
|
||||
// Blank line ends event
|
||||
EXPECT_TRUE(parse_sse_line("", msg, retry_ms));
|
||||
}
|
||||
|
||||
// Test: Multi-line data
|
||||
TEST_F(SSEParsingTest, MultiLineData) {
|
||||
sse::SSEMessage msg;
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line("data: line1", msg, retry_ms));
|
||||
EXPECT_FALSE(parse_sse_line("data: line2", msg, retry_ms));
|
||||
EXPECT_FALSE(parse_sse_line("data: line3", msg, retry_ms));
|
||||
EXPECT_EQ(msg.data, "line1\nline2\nline3");
|
||||
}
|
||||
|
||||
// Test: Custom event types
|
||||
TEST_F(SSEParsingTest, CustomEventType) {
|
||||
sse::SSEMessage msg;
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line("event: update", msg, retry_ms));
|
||||
EXPECT_FALSE(parse_sse_line("data: payload", msg, retry_ms));
|
||||
EXPECT_EQ(msg.event, "update");
|
||||
EXPECT_EQ(msg.data, "payload");
|
||||
}
|
||||
|
||||
// Test: Event ID handling
|
||||
TEST_F(SSEParsingTest, EventIdHandling) {
|
||||
sse::SSEMessage msg;
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line("id: 12345", msg, retry_ms));
|
||||
EXPECT_FALSE(parse_sse_line("data: test", msg, retry_ms));
|
||||
EXPECT_EQ(msg.id, "12345");
|
||||
}
|
||||
|
||||
// Test: Empty event ID (clears last event ID)
|
||||
TEST_F(SSEParsingTest, EmptyEventId) {
|
||||
sse::SSEMessage msg;
|
||||
msg.id = "previous";
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line("id:", msg, retry_ms));
|
||||
EXPECT_EQ(msg.id, "");
|
||||
}
|
||||
|
||||
// Test: Retry field parsing
|
||||
TEST_F(SSEParsingTest, RetryFieldParsing) {
|
||||
sse::SSEMessage msg;
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line("retry: 5000", msg, retry_ms));
|
||||
EXPECT_EQ(retry_ms, 5000);
|
||||
}
|
||||
|
||||
// Test: Invalid retry value
|
||||
TEST_F(SSEParsingTest, InvalidRetryValue) {
|
||||
sse::SSEMessage msg;
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line("retry: invalid", msg, retry_ms));
|
||||
EXPECT_EQ(retry_ms, 3000); // Unchanged
|
||||
}
|
||||
|
||||
// Test: Comments (lines starting with :)
|
||||
TEST_F(SSEParsingTest, CommentsIgnored) {
|
||||
sse::SSEMessage msg;
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line(": this is a comment", msg, retry_ms));
|
||||
EXPECT_EQ(msg.data, "");
|
||||
EXPECT_EQ(msg.event, "message");
|
||||
}
|
||||
|
||||
// Test: Colon in value
|
||||
TEST_F(SSEParsingTest, ColonInValue) {
|
||||
sse::SSEMessage msg;
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line("data: hello:world:test", msg, retry_ms));
|
||||
EXPECT_EQ(msg.data, "hello:world:test");
|
||||
}
|
||||
|
||||
// Test: Line with no colon (field name only)
|
||||
TEST_F(SSEParsingTest, FieldNameOnly) {
|
||||
sse::SSEMessage msg;
|
||||
int retry_ms = 3000;
|
||||
|
||||
// According to SSE spec, this is treated as field name with empty value
|
||||
EXPECT_FALSE(parse_sse_line("data", msg, retry_ms));
|
||||
// Since we don't recognize "data" without colon, data should be empty
|
||||
EXPECT_EQ(msg.data, "");
|
||||
}
|
||||
|
||||
// Test: Trailing \r handling
|
||||
TEST_F(SSEParsingTest, TrailingCarriageReturn) {
|
||||
sse::SSEMessage msg;
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line("data: hello\r", msg, retry_ms));
|
||||
EXPECT_EQ(msg.data, "hello");
|
||||
}
|
||||
|
||||
// Test: Unknown fields ignored
|
||||
TEST_F(SSEParsingTest, UnknownFieldsIgnored) {
|
||||
sse::SSEMessage msg;
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line("unknown: value", msg, retry_ms));
|
||||
EXPECT_EQ(msg.data, "");
|
||||
EXPECT_EQ(msg.event, "message");
|
||||
}
|
||||
|
||||
// Test: Space after colon is optional
|
||||
TEST_F(SSEParsingTest, SpaceAfterColonOptional) {
|
||||
sse::SSEMessage msg1, msg2;
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line("data: hello", msg1, retry_ms));
|
||||
EXPECT_FALSE(parse_sse_line("data:hello", msg2, retry_ms));
|
||||
EXPECT_EQ(msg1.data, "hello");
|
||||
EXPECT_EQ(msg2.data, "hello");
|
||||
}
|
||||
|
||||
// Test: SSEMessage clear
|
||||
TEST_F(SSEParsingTest, MessageClear) {
|
||||
sse::SSEMessage msg;
|
||||
msg.event = "custom";
|
||||
msg.data = "some data";
|
||||
msg.id = "123";
|
||||
|
||||
msg.clear();
|
||||
|
||||
EXPECT_EQ(msg.event, "message");
|
||||
EXPECT_EQ(msg.data, "");
|
||||
EXPECT_EQ(msg.id, "");
|
||||
}
|
||||
|
||||
// Test: Complete event parsing
|
||||
TEST_F(SSEParsingTest, CompleteEventParsing) {
|
||||
sse::SSEMessage msg;
|
||||
int retry_ms = 3000;
|
||||
|
||||
EXPECT_FALSE(parse_sse_line("event: notification", msg, retry_ms));
|
||||
EXPECT_FALSE(parse_sse_line("id: evt-42", msg, retry_ms));
|
||||
EXPECT_FALSE(parse_sse_line("data: {\"type\":\"alert\"}", msg, retry_ms));
|
||||
EXPECT_FALSE(parse_sse_line("retry: 1000", msg, retry_ms));
|
||||
|
||||
// Blank line ends event
|
||||
EXPECT_TRUE(parse_sse_line("", msg, retry_ms));
|
||||
|
||||
EXPECT_EQ(msg.event, "notification");
|
||||
EXPECT_EQ(msg.id, "evt-42");
|
||||
EXPECT_EQ(msg.data, "{\"type\":\"alert\"}");
|
||||
EXPECT_EQ(retry_ms, 1000);
|
||||
}
|
||||
|
||||
//==============================================================================
|
||||
// Integration Tests with Server
|
||||
//==============================================================================
|
||||
|
||||
class SSEIntegrationTest : public ::testing::Test {
|
||||
protected:
|
||||
void SetUp() override {
|
||||
stop_server_.store(false);
|
||||
events_.clear();
|
||||
server_ = httplib::detail::make_unique<Server>();
|
||||
setup_server();
|
||||
start_server();
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
stop_server_.store(true);
|
||||
event_cv_.notify_all();
|
||||
server_->stop();
|
||||
if (server_thread_.joinable()) { server_thread_.join(); }
|
||||
}
|
||||
|
||||
void setup_server() {
|
||||
// Simple SSE endpoint
|
||||
server_->Get("/events", [this](const Request &req, Response &res) {
|
||||
auto last_id = req.get_header_value("Last-Event-ID");
|
||||
if (!last_id.empty()) { last_received_event_id_ = last_id; }
|
||||
|
||||
res.set_chunked_content_provider(
|
||||
"text/event-stream", [this](size_t /*offset*/, DataSink &sink) {
|
||||
std::unique_lock<std::mutex> lock(event_mutex_);
|
||||
if (event_cv_.wait_for(
|
||||
lock, std::chrono::milliseconds(200), [this] {
|
||||
return !events_.empty() || stop_server_.load();
|
||||
})) {
|
||||
if (stop_server_.load()) { return false; }
|
||||
if (!events_.empty()) {
|
||||
std::string event = events_.front();
|
||||
events_.erase(events_.begin());
|
||||
sink.write(event.data(), event.size());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return !stop_server_.load();
|
||||
});
|
||||
});
|
||||
|
||||
// Endpoint that returns error
|
||||
server_->Get("/error-endpoint", [](const Request &, Response &res) {
|
||||
res.status = 500;
|
||||
res.set_content("Internal Server Error", "text/plain");
|
||||
});
|
||||
|
||||
// Endpoint for custom event types
|
||||
server_->Get("/custom-events", [](const Request &, Response &res) {
|
||||
res.set_chunked_content_provider(
|
||||
"text/event-stream", [](size_t offset, DataSink &sink) {
|
||||
if (offset == 0) {
|
||||
std::string event = "event: update\ndata: updated\n\n"
|
||||
"event: delete\ndata: deleted\n\n";
|
||||
sink.write(event.data(), event.size());
|
||||
}
|
||||
return false; // End stream after sending
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void start_server() {
|
||||
port_ = server_->bind_to_any_port("localhost");
|
||||
server_thread_ = std::thread([this]() { server_->listen_after_bind(); });
|
||||
|
||||
// Wait for server to start
|
||||
while (!server_->is_running()) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
}
|
||||
}
|
||||
|
||||
int get_port() const { return port_; }
|
||||
|
||||
void send_event(const std::string &event) {
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
events_.push_back(event);
|
||||
event_cv_.notify_all();
|
||||
}
|
||||
|
||||
std::unique_ptr<Server> server_;
|
||||
std::thread server_thread_;
|
||||
std::mutex event_mutex_;
|
||||
std::condition_variable event_cv_;
|
||||
std::vector<std::string> events_;
|
||||
std::atomic<bool> stop_server_{false};
|
||||
std::string last_received_event_id_;
|
||||
int port_ = 0;
|
||||
};
|
||||
|
||||
// Test: Successful connection and on_open callback
|
||||
TEST_F(SSEIntegrationTest, SuccessfulConnection) {
|
||||
// Add a simple endpoint that sends one event and closes
|
||||
server_->Get("/simple-event", [](const Request &, Response &res) {
|
||||
res.set_chunked_content_provider(
|
||||
"text/event-stream", [](size_t offset, DataSink &sink) {
|
||||
if (offset == 0) {
|
||||
std::string event = "data: hello\n\n";
|
||||
sink.write(event.data(), event.size());
|
||||
}
|
||||
return false; // Close stream after sending
|
||||
});
|
||||
});
|
||||
|
||||
Client client("localhost", get_port());
|
||||
sse::SSEClient sse(client, "/simple-event");
|
||||
|
||||
std::atomic<bool> open_called{false};
|
||||
std::atomic<bool> message_received{false};
|
||||
|
||||
sse.on_open([&open_called]() { open_called.store(true); });
|
||||
|
||||
sse.on_message([&message_received](const sse::SSEMessage &msg) {
|
||||
if (msg.data == "hello") { message_received.store(true); }
|
||||
});
|
||||
|
||||
sse.set_reconnect_interval(100);
|
||||
sse.set_max_reconnect_attempts(1);
|
||||
|
||||
// Start async
|
||||
sse.start_async();
|
||||
|
||||
// Wait for message to be processed
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
|
||||
sse.stop();
|
||||
|
||||
EXPECT_TRUE(open_called.load());
|
||||
EXPECT_TRUE(message_received.load());
|
||||
}
|
||||
|
||||
// Test: on_message callback
|
||||
TEST_F(SSEIntegrationTest, OnMessageCallback) {
|
||||
// Endpoint that sends multiple events then closes
|
||||
server_->Get("/multi-event", [](const Request &, Response &res) {
|
||||
res.set_chunked_content_provider(
|
||||
"text/event-stream", [](size_t offset, DataSink &sink) {
|
||||
if (offset == 0) {
|
||||
std::string events = "data: message1\n\ndata: message2\n\n";
|
||||
sink.write(events.data(), events.size());
|
||||
}
|
||||
return false;
|
||||
});
|
||||
});
|
||||
|
||||
Client client("localhost", get_port());
|
||||
sse::SSEClient sse(client, "/multi-event");
|
||||
|
||||
std::vector<std::string> received_messages;
|
||||
std::mutex messages_mutex;
|
||||
|
||||
sse.on_message([&](const sse::SSEMessage &msg) {
|
||||
std::lock_guard<std::mutex> lock(messages_mutex);
|
||||
received_messages.push_back(msg.data);
|
||||
});
|
||||
|
||||
sse.set_reconnect_interval(100);
|
||||
sse.set_max_reconnect_attempts(1);
|
||||
sse.start_async();
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
sse.stop();
|
||||
|
||||
std::lock_guard<std::mutex> lock(messages_mutex);
|
||||
EXPECT_GE(received_messages.size(), 2u);
|
||||
if (received_messages.size() >= 2) {
|
||||
EXPECT_EQ(received_messages[0], "message1");
|
||||
EXPECT_EQ(received_messages[1], "message2");
|
||||
}
|
||||
}
|
||||
|
||||
// Test: on_event for specific types
|
||||
TEST_F(SSEIntegrationTest, OnEventForSpecificTypes) {
|
||||
Client client("localhost", get_port());
|
||||
sse::SSEClient sse(client, "/custom-events");
|
||||
|
||||
std::atomic<bool> update_received{false};
|
||||
std::atomic<bool> delete_received{false};
|
||||
|
||||
sse.on_event("update", [&update_received](const sse::SSEMessage &msg) {
|
||||
if (msg.data == "updated") { update_received.store(true); }
|
||||
});
|
||||
|
||||
sse.on_event("delete", [&delete_received](const sse::SSEMessage &msg) {
|
||||
if (msg.data == "deleted") { delete_received.store(true); }
|
||||
});
|
||||
|
||||
sse.set_max_reconnect_attempts(1);
|
||||
sse.start_async();
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
sse.stop();
|
||||
|
||||
EXPECT_TRUE(update_received.load());
|
||||
EXPECT_TRUE(delete_received.load());
|
||||
}
|
||||
|
||||
// Test: on_error callback on connection failure
|
||||
TEST_F(SSEIntegrationTest, OnErrorCallback) {
|
||||
// Connect to a non-existent port
|
||||
Client client("localhost", 59999);
|
||||
sse::SSEClient sse(client, "/events");
|
||||
|
||||
std::atomic<bool> error_called{false};
|
||||
Error received_error = Error::Success;
|
||||
|
||||
sse.on_error([&](Error err) {
|
||||
error_called.store(true);
|
||||
received_error = err;
|
||||
});
|
||||
|
||||
sse.set_reconnect_interval(50);
|
||||
sse.set_max_reconnect_attempts(1);
|
||||
|
||||
sse.start_async();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
sse.stop();
|
||||
|
||||
EXPECT_TRUE(error_called.load());
|
||||
EXPECT_NE(received_error, Error::Success);
|
||||
}
|
||||
|
||||
// Test: Last-Event-ID header sent on reconnect
|
||||
TEST_F(SSEIntegrationTest, LastEventIdHeader) {
|
||||
// Endpoint that sends event with ID
|
||||
server_->Get("/event-with-id", [](const Request &, Response &res) {
|
||||
res.set_chunked_content_provider(
|
||||
"text/event-stream", [](size_t offset, DataSink &sink) {
|
||||
if (offset == 0) {
|
||||
std::string event = "id: evt-123\ndata: test\n\n";
|
||||
sink.write(event.data(), event.size());
|
||||
}
|
||||
return false;
|
||||
});
|
||||
});
|
||||
|
||||
Client client("localhost", get_port());
|
||||
sse::SSEClient sse(client, "/event-with-id");
|
||||
|
||||
std::atomic<bool> id_received{false};
|
||||
|
||||
sse.on_message([&](const sse::SSEMessage &msg) {
|
||||
if (!msg.id.empty()) { id_received.store(true); }
|
||||
});
|
||||
|
||||
sse.set_reconnect_interval(100);
|
||||
sse.set_max_reconnect_attempts(1);
|
||||
sse.start_async();
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
sse.stop();
|
||||
|
||||
EXPECT_TRUE(id_received.load());
|
||||
EXPECT_EQ(sse.last_event_id(), "evt-123");
|
||||
}
|
||||
|
||||
// Test: Manual stop
|
||||
TEST_F(SSEIntegrationTest, ManualStop) {
|
||||
// Endpoint that sends one event and stays open briefly
|
||||
std::atomic<bool> handler_running{true};
|
||||
|
||||
server_->Get("/stay-open", [&handler_running](const Request &,
|
||||
Response &res) {
|
||||
res.set_chunked_content_provider(
|
||||
"text/event-stream", [&handler_running](size_t offset, DataSink &sink) {
|
||||
if (offset == 0) {
|
||||
std::string event = "data: connected\n\n";
|
||||
sink.write(event.data(), event.size());
|
||||
}
|
||||
// Keep connection open while handler_running is true
|
||||
for (int i = 0; i < 10 && handler_running.load(); ++i) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
return false;
|
||||
});
|
||||
});
|
||||
|
||||
Client client("localhost", get_port());
|
||||
sse::SSEClient sse(client, "/stay-open");
|
||||
|
||||
std::atomic<bool> connected{false};
|
||||
sse.on_open([&connected]() { connected.store(true); });
|
||||
|
||||
sse.set_reconnect_interval(100);
|
||||
sse.set_max_reconnect_attempts(1);
|
||||
sse.start_async();
|
||||
|
||||
// Wait for connection to establish
|
||||
for (int i = 0; i < 20 && !connected.load(); ++i) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
EXPECT_TRUE(connected.load());
|
||||
EXPECT_TRUE(sse.is_connected());
|
||||
|
||||
// Signal handler to stop
|
||||
handler_running.store(false);
|
||||
|
||||
// Stop SSE client
|
||||
sse.stop();
|
||||
EXPECT_FALSE(sse.is_connected());
|
||||
}
|
||||
|
||||
// Test: SSEClient with custom headers
|
||||
TEST_F(SSEIntegrationTest, CustomHeaders) {
|
||||
// Setup a server endpoint that checks for custom header
|
||||
std::atomic<bool> header_received{false};
|
||||
|
||||
server_->Get("/header-check", [&](const Request &req, Response &res) {
|
||||
if (req.get_header_value("X-Custom-Header") == "custom-value") {
|
||||
header_received.store(true);
|
||||
}
|
||||
res.set_chunked_content_provider("text/event-stream",
|
||||
[](size_t, DataSink &) { return false; });
|
||||
});
|
||||
|
||||
Client client("localhost", get_port());
|
||||
Headers headers = {{"X-Custom-Header", "custom-value"}};
|
||||
sse::SSEClient sse(client, "/header-check", headers);
|
||||
|
||||
sse.set_max_reconnect_attempts(1);
|
||||
sse.start_async();
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
sse.stop();
|
||||
|
||||
EXPECT_TRUE(header_received.load());
|
||||
}
|
||||
|
||||
// Test: Reconnect interval configuration
|
||||
TEST_F(SSEIntegrationTest, ReconnectIntervalConfiguration) {
|
||||
Client client("localhost", get_port());
|
||||
sse::SSEClient sse(client, "/events");
|
||||
|
||||
auto &result = sse.set_reconnect_interval(500);
|
||||
// Builder pattern should return reference to self
|
||||
EXPECT_EQ(&result, &sse);
|
||||
}
|
||||
|
||||
// Test: Max reconnect attempts
|
||||
TEST_F(SSEIntegrationTest, MaxReconnectAttempts) {
|
||||
// Connect to non-existent port to force reconnects
|
||||
Client client("localhost", 59998);
|
||||
sse::SSEClient sse(client, "/events");
|
||||
|
||||
std::atomic<int> error_count{0};
|
||||
|
||||
sse.on_error([&](Error) { error_count.fetch_add(1); });
|
||||
|
||||
sse.set_reconnect_interval(50);
|
||||
sse.set_max_reconnect_attempts(2);
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
sse.start(); // Blocking call
|
||||
auto end = std::chrono::steady_clock::now();
|
||||
|
||||
// Should have stopped after 2 failed attempts
|
||||
EXPECT_GE(error_count.load(), 2);
|
||||
|
||||
// Should not have taken too long (max 2 attempts * 50ms + overhead)
|
||||
auto duration =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
|
||||
EXPECT_LT(duration.count(), 1000);
|
||||
}
|
||||
|
||||
// Test: Multi-line data in integration
|
||||
TEST_F(SSEIntegrationTest, MultiLineDataIntegration) {
|
||||
// Endpoint with multi-line data
|
||||
server_->Get("/multiline-data", [](const Request &, Response &res) {
|
||||
res.set_chunked_content_provider(
|
||||
"text/event-stream", [](size_t offset, DataSink &sink) {
|
||||
if (offset == 0) {
|
||||
std::string event = "data: line1\ndata: line2\ndata: line3\n\n";
|
||||
sink.write(event.data(), event.size());
|
||||
}
|
||||
return false;
|
||||
});
|
||||
});
|
||||
|
||||
Client client("localhost", get_port());
|
||||
sse::SSEClient sse(client, "/multiline-data");
|
||||
|
||||
std::string received_data;
|
||||
std::mutex data_mutex;
|
||||
|
||||
sse.on_message([&](const sse::SSEMessage &msg) {
|
||||
std::lock_guard<std::mutex> lock(data_mutex);
|
||||
received_data = msg.data;
|
||||
});
|
||||
|
||||
sse.set_reconnect_interval(100);
|
||||
sse.set_max_reconnect_attempts(1);
|
||||
sse.start_async();
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
sse.stop();
|
||||
|
||||
std::lock_guard<std::mutex> lock(data_mutex);
|
||||
EXPECT_EQ(received_data, "line1\nline2\nline3");
|
||||
}
|
||||
|
||||
// Test: Auto-reconnect after server disconnection
|
||||
TEST_F(SSEIntegrationTest, AutoReconnectAfterDisconnect) {
|
||||
std::atomic<int> connection_count{0};
|
||||
std::atomic<int> message_count{0};
|
||||
|
||||
// Endpoint that sends one event and closes, forcing reconnect
|
||||
server_->Get("/reconnect-test",
|
||||
[&connection_count](const Request &, Response &res) {
|
||||
connection_count.fetch_add(1);
|
||||
res.set_chunked_content_provider(
|
||||
"text/event-stream", [](size_t offset, DataSink &sink) {
|
||||
if (offset == 0) {
|
||||
std::string event = "data: hello\n\n";
|
||||
sink.write(event.data(), event.size());
|
||||
}
|
||||
return false; // Close connection after sending
|
||||
});
|
||||
});
|
||||
|
||||
Client client("localhost", get_port());
|
||||
sse::SSEClient sse(client, "/reconnect-test");
|
||||
|
||||
sse.on_message([&message_count](const sse::SSEMessage &) {
|
||||
message_count.fetch_add(1);
|
||||
});
|
||||
|
||||
sse.set_reconnect_interval(100);
|
||||
sse.set_max_reconnect_attempts(3);
|
||||
sse.start_async();
|
||||
|
||||
// Wait long enough for multiple reconnects
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(800));
|
||||
sse.stop();
|
||||
|
||||
// Should have connected multiple times (initial + reconnects)
|
||||
EXPECT_GE(connection_count.load(), 2);
|
||||
// Should have received messages from multiple connections
|
||||
EXPECT_GE(message_count.load(), 2);
|
||||
}
|
||||
|
||||
// Test: Last-Event-ID sent on reconnect
|
||||
TEST_F(SSEIntegrationTest, LastEventIdSentOnReconnect) {
|
||||
std::atomic<int> connection_count{0};
|
||||
std::vector<std::string> received_last_event_ids;
|
||||
std::mutex id_mutex;
|
||||
|
||||
// Endpoint that checks Last-Event-ID header and sends event with ID
|
||||
server_->Get("/reconnect-with-id", [&](const Request &req, Response &res) {
|
||||
int conn = connection_count.fetch_add(1);
|
||||
|
||||
// Capture the Last-Event-ID header from each connection
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(id_mutex);
|
||||
received_last_event_ids.push_back(req.get_header_value("Last-Event-ID"));
|
||||
}
|
||||
|
||||
res.set_chunked_content_provider(
|
||||
"text/event-stream", [conn](size_t offset, DataSink &sink) {
|
||||
if (offset == 0) {
|
||||
std::string event =
|
||||
"id: event-" + std::to_string(conn) + "\ndata: msg\n\n";
|
||||
sink.write(event.data(), event.size());
|
||||
}
|
||||
return false;
|
||||
});
|
||||
});
|
||||
|
||||
Client client("localhost", get_port());
|
||||
sse::SSEClient sse(client, "/reconnect-with-id");
|
||||
|
||||
sse.set_reconnect_interval(100);
|
||||
sse.set_max_reconnect_attempts(3);
|
||||
sse.start_async();
|
||||
|
||||
// Wait for at least 2 connections
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
sse.stop();
|
||||
|
||||
// Verify behavior
|
||||
std::lock_guard<std::mutex> lock(id_mutex);
|
||||
EXPECT_GE(received_last_event_ids.size(), 2u);
|
||||
|
||||
// First connection should have no Last-Event-ID
|
||||
if (!received_last_event_ids.empty()) {
|
||||
EXPECT_EQ(received_last_event_ids[0], "");
|
||||
}
|
||||
|
||||
// Second connection should have Last-Event-ID from first connection
|
||||
if (received_last_event_ids.size() >= 2) {
|
||||
EXPECT_EQ(received_last_event_ids[1], "event-0");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user