1
0
mirror of synced 2025-11-03 19:53:13 +03:00

Fix EventDispatcher problem (#2257)

This commit is contained in:
yhirose
2025-10-27 18:10:52 -04:00
committed by GitHub
parent 6e0f211cff
commit 2da189f88c
2 changed files with 24 additions and 11 deletions

View File

@@ -14,11 +14,18 @@ class EventDispatcher {
public:
EventDispatcher() {}
void wait_event(DataSink *sink) {
bool wait_event(DataSink *sink) {
unique_lock<mutex> lk(m_);
int id = id_;
cv_.wait(lk, [&] { return cid_ == id; });
// Wait with timeout to prevent hanging if client disconnects
if (!cv_.wait_for(lk, std::chrono::seconds(5),
[&] { return cid_ == id; })) {
return false; // Timeout occurred
}
sink->write(message_.data(), message_.size());
return true;
}
void send_event(const string &message) {
@@ -71,8 +78,7 @@ int main(void) {
cout << "connected to event1..." << endl;
res.set_chunked_content_provider("text/event-stream",
[&](size_t /*offset*/, DataSink &sink) {
ed.wait_event(&sink);
return true;
return ed.wait_event(&sink);
});
});
@@ -80,8 +86,7 @@ int main(void) {
cout << "connected to event2..." << endl;
res.set_chunked_content_provider("text/event-stream",
[&](size_t /*offset*/, DataSink &sink) {
ed.wait_event(&sink);
return true;
return ed.wait_event(&sink);
});
});

View File

@@ -11043,11 +11043,18 @@ class EventDispatcher {
public:
EventDispatcher() {}
void wait_event(DataSink *sink) {
bool wait_event(DataSink *sink) {
unique_lock<mutex> lk(m_);
int id = id_;
cv_.wait(lk, [&] { return cid_ == id; });
// Wait with timeout to prevent hanging if client disconnects
if (!cv_.wait_for(lk, std::chrono::seconds(5),
[&] { return cid_ == id; })) {
return false; // Timeout occurred
}
sink->write(message_.data(), message_.size());
return true;
}
void send_event(const string &message) {
@@ -11072,8 +11079,7 @@ TEST(ClientInThreadTest, Issue2068) {
svr.Get("/event1", [&](const Request & /*req*/, Response &res) {
res.set_chunked_content_provider("text/event-stream",
[&](size_t /*offset*/, DataSink &sink) {
ed.wait_event(&sink);
return true;
return ed.wait_event(&sink);
});
});
@@ -11116,9 +11122,11 @@ TEST(ClientInThreadTest, Issue2068) {
std::this_thread::sleep_for(std::chrono::seconds(2));
stop = true;
client->stop();
client.reset();
t.join();
// Reset client after thread has finished
client.reset();
}
}