1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Split stream state machine into send and receive state machines

Summary: This is step 1 for removing reset on reset, since the send side may need to transition to waiting for a reset ack while the read side is an any state.

Reviewed By: lnicco

Differential Revision: D15075849

fbshipit-source-id: 1e094942a8a1ca9a01d4161cd6309b4136a9cfbf
This commit is contained in:
Alan Frindell
2019-05-06 13:29:41 -07:00
committed by Facebook Github Bot
parent 5c9c0821ff
commit 90e5e1b3f1
32 changed files with 753 additions and 769 deletions

View File

@@ -1801,7 +1801,7 @@ folly::Expected<folly::Unit, LocalErrorCode> QuicTransportBase::resetStream(
return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED); return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED);
} }
// Invoke state machine // Invoke state machine
invokeStreamStateMachine( invokeStreamSendStateMachine(
*conn_, *stream, StreamEvents::SendReset(errorCode)); *conn_, *stream, StreamEvents::SendReset(errorCode));
for (auto pendingResetIt = conn_->pendingEvents.resets.begin(); for (auto pendingResetIt = conn_->pendingEvents.resets.begin();
closeState_ == CloseState::OPEN && closeState_ == CloseState::OPEN &&

View File

@@ -281,7 +281,8 @@ class TestQuicTransport
void closeStream(StreamId id) { void closeStream(StreamId id) {
QuicStreamState* stream = conn_->streamManager->getStream(id); QuicStreamState* stream = conn_->streamManager->getStream(id);
stream->state = StreamStates::Closed{}; stream->send.state = StreamSendStates::Closed();
stream->recv.state = StreamReceiveStates::Closed();
conn_->streamManager->addClosed(id); conn_->streamManager->addClosed(id);
auto deliveryCb = deliveryCallbacks_.find(id); auto deliveryCb = deliveryCallbacks_.find(id);

View File

@@ -775,7 +775,7 @@ TEST_F(QuicTransportTest, RstStream) {
auto stream = auto stream =
transport_->getConnectionState().streamManager->findStream(streamId); transport_->getConnectionState().streamManager->findStream(streamId);
ASSERT_TRUE(stream); ASSERT_TRUE(stream);
EXPECT_TRUE(isState<StreamStates::WaitingForRstAck>(*stream)); EXPECT_TRUE(isState<StreamSendStates::ResetSent>(stream->send));
EXPECT_TRUE(stream->retransmissionBuffer.empty()); EXPECT_TRUE(stream->retransmissionBuffer.empty());
EXPECT_TRUE(stream->writeBuffer.empty()); EXPECT_TRUE(stream->writeBuffer.empty());
EXPECT_FALSE(stream->writable()); EXPECT_FALSE(stream->writable());
@@ -1245,7 +1245,7 @@ TEST_F(QuicTransportTest, RstWrittenStream) {
} }
EXPECT_TRUE(foundReset); EXPECT_TRUE(foundReset);
EXPECT_TRUE(isState<StreamStates::WaitingForRstAck>(*stream)); EXPECT_TRUE(isState<StreamSendStates::ResetSent>(stream->send));
EXPECT_TRUE(stream->retransmissionBuffer.empty()); EXPECT_TRUE(stream->retransmissionBuffer.empty());
EXPECT_TRUE(stream->writeBuffer.empty()); EXPECT_TRUE(stream->writeBuffer.empty());
EXPECT_FALSE(stream->writable()); EXPECT_FALSE(stream->writable());
@@ -1311,7 +1311,7 @@ TEST_F(QuicTransportTest, WriteAfterSendRst) {
transport_->resetStream(streamId, GenericApplicationErrorCode::UNKNOWN); transport_->resetStream(streamId, GenericApplicationErrorCode::UNKNOWN);
loopForWrites(); loopForWrites();
EXPECT_TRUE(isState<StreamStates::WaitingForRstAck>(*stream)); EXPECT_TRUE(isState<StreamSendStates::ResetSent>(stream->send));
EXPECT_TRUE(stream->retransmissionBuffer.empty()); EXPECT_TRUE(stream->retransmissionBuffer.empty());
EXPECT_TRUE(stream->writeBuffer.empty()); EXPECT_TRUE(stream->writeBuffer.empty());
EXPECT_FALSE(stream->writable()); EXPECT_FALSE(stream->writable());

View File

@@ -355,7 +355,7 @@ void QuicClientTransport::processPacketData(
auto stream = auto stream =
conn_->streamManager->getStream(frame.streamId); conn_->streamManager->getStream(frame.streamId);
if (stream) { if (stream) {
invokeStreamStateMachine( invokeStreamSendStateMachine(
*conn_, *stream, StreamEvents::RstAck(frame)); *conn_, *stream, StreamEvents::RstAck(frame));
} }
}, },
@@ -368,7 +368,7 @@ void QuicClientTransport::processPacketData(
<< " closed=" << (ackedStream == nullptr) << " " << " closed=" << (ackedStream == nullptr) << " "
<< *this; << *this;
if (ackedStream) { if (ackedStream) {
invokeStreamStateMachine( invokeStreamSendStateMachine(
*conn_, *conn_,
*ackedStream, *ackedStream,
StreamEvents::AckStreamFrame(frame)); StreamEvents::AckStreamFrame(frame));
@@ -398,7 +398,7 @@ void QuicClientTransport::processPacketData(
if (!stream) { if (!stream) {
return; return;
} }
invokeStreamStateMachine(*conn_, *stream, std::move(frame)); invokeStreamReceiveStateMachine(*conn_, *stream, std::move(frame));
}, },
[&](ReadCryptoFrame& cryptoFrame) { [&](ReadCryptoFrame& cryptoFrame) {
pktHasRetransmittableData = true; pktHasRetransmittableData = true;
@@ -425,7 +425,7 @@ void QuicClientTransport::processPacketData(
<< *conn_; << *conn_;
return; return;
} }
invokeStreamStateMachine(*conn_, *stream, std::move(frame)); invokeStreamReceiveStateMachine(*conn_, *stream, std::move(frame));
}, },
[&](MaxDataFrame& connWindowUpdate) { [&](MaxDataFrame& connWindowUpdate) {
VLOG(10) << "Client received max data offset=" VLOG(10) << "Client received max data offset="

View File

@@ -82,6 +82,7 @@ void ClientInvalidStateHandler(QuicClientConnectionState& state);
struct QuicClientStateMachine { struct QuicClientStateMachine {
using StateData = QuicClientConnectionState; using StateData = QuicClientConnectionState;
using UserData = QuicClientConnectionState;
static constexpr auto InvalidEventHandler = &ClientInvalidStateHandler; static constexpr auto InvalidEventHandler = &ClientInvalidStateHandler;
}; };

View File

@@ -43,10 +43,8 @@ class PacketRebuilder {
PacketEvent cloneOutstandingPacket(OutstandingPacket& packet); PacketEvent cloneOutstandingPacket(OutstandingPacket& packet);
bool retransmittable(const QuicStreamState& stream) const { bool retransmittable(const QuicStreamState& stream) const {
return matchesStates< return matchesStates<StreamSendStateData, StreamSendStates::Open>(
StreamStateData, stream.send.state);
StreamStates::Open,
StreamStates::HalfClosedRemote>(stream.state);
} }
Buf cloneCryptoRetransmissionBuffer( Buf cloneCryptoRetransmissionBuffer(

View File

@@ -175,7 +175,7 @@ TEST_F(QuicPacketRebuilderTest, RebuildAfterResetStream) {
ASSERT_EQ(1, packet1.packet.frames.size()); ASSERT_EQ(1, packet1.packet.frames.size());
// Then we reset the stream // Then we reset the stream
invokeStreamStateMachine( invokeStreamSendStateMachine(
conn, conn,
*stream, *stream,
StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN)); StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN));

View File

@@ -75,7 +75,8 @@ PacketNum rstStreamAndSendPacket(
auto aead = createNoOpAead(); auto aead = createNoOpAead();
auto headerCipher = createNoOpHeaderCipher(); auto headerCipher = createNoOpHeaderCipher();
auto version = conn.version.value_or(*conn.originalVersion); auto version = conn.version.value_or(*conn.originalVersion);
invokeStreamStateMachine(conn, stream, StreamEvents::SendReset(errorCode)); invokeStreamSendStateMachine(
conn, stream, StreamEvents::SendReset(errorCode));
writeQuicDataToSocket( writeQuicDataToSocket(
sock, sock,
conn, conn,

View File

@@ -262,7 +262,8 @@ TEST_F(QuicFlowControlTest, DontSendStreamWindowUpdateTwice) {
TEST_F(QuicFlowControlTest, DontSendStreamWindowUpdateOnRemoteHalfClosed) { TEST_F(QuicFlowControlTest, DontSendStreamWindowUpdateOnRemoteHalfClosed) {
StreamId id = 3; StreamId id = 3;
QuicStreamState stream(id, conn_); QuicStreamState stream(id, conn_);
stream.state = StreamStates::HalfClosedRemote(); stream.send.state = StreamSendStates::Open();
stream.recv.state = StreamReceiveStates::Closed();
// Should not send window update // Should not send window update
maybeSendStreamWindowUpdate(stream, Clock::now()); maybeSendStreamWindowUpdate(stream, Clock::now());
EXPECT_FALSE(conn_.streamManager->pendingWindowUpdate(stream.id)); EXPECT_FALSE(conn_.streamManager->pendingWindowUpdate(stream.id));
@@ -476,7 +477,8 @@ TEST_F(QuicFlowControlTest, LostStreamWindowUpdateHalfClosedRemote) {
stream.currentReadOffset = 300; stream.currentReadOffset = 300;
stream.flowControlState.windowSize = 500; stream.flowControlState.windowSize = 500;
stream.flowControlState.advertisedMaxOffset = 400; stream.flowControlState.advertisedMaxOffset = 400;
stream.state = StreamStates::HalfClosedRemote(); stream.send.state = StreamSendStates::Open();
stream.recv.state = StreamReceiveStates::Closed();
// Should not send window update // Should not send window update
onStreamWindowUpdateLost(stream); onStreamWindowUpdateLost(stream);

View File

@@ -443,7 +443,7 @@ TEST_F(QuicLossFunctionsTest, TestMarkPacketLossAfterStreamReset) {
*stream1, *stream1,
*buf, *buf,
true); true);
invokeStreamStateMachine( invokeStreamSendStateMachine(
*conn, *conn,
*stream1, *stream1,
StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN)); StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN));

View File

@@ -690,7 +690,7 @@ void onServerReadDataFromOpen(
auto ackedStream = auto ackedStream =
conn.streamManager->getStream(frame.streamId); conn.streamManager->getStream(frame.streamId);
if (ackedStream) { if (ackedStream) {
invokeStreamStateMachine( invokeStreamSendStateMachine(
conn, conn,
*ackedStream, *ackedStream,
StreamEvents::AckStreamFrame(frame)); StreamEvents::AckStreamFrame(frame));
@@ -708,7 +708,7 @@ void onServerReadDataFromOpen(
auto stream = auto stream =
conn.streamManager->getStream(frame.streamId); conn.streamManager->getStream(frame.streamId);
if (stream) { if (stream) {
invokeStreamStateMachine( invokeStreamSendStateMachine(
conn, *stream, StreamEvents::RstAck(frame)); conn, *stream, StreamEvents::RstAck(frame));
} }
}, },
@@ -734,7 +734,7 @@ void onServerReadDataFromOpen(
if (!stream) { if (!stream) {
return; return;
} }
invokeStreamStateMachine(conn, *stream, frame); invokeStreamReceiveStateMachine(conn, *stream, frame);
}, },
[&](ReadCryptoFrame& cryptoFrame) { [&](ReadCryptoFrame& cryptoFrame) {
pktHasRetransmittableData = true; pktHasRetransmittableData = true;
@@ -763,7 +763,7 @@ void onServerReadDataFromOpen(
// Ignore data from closed streams that we don't have the // Ignore data from closed streams that we don't have the
// state for any more. // state for any more.
if (stream) { if (stream) {
invokeStreamStateMachine(conn, *stream, frame); invokeStreamReceiveStateMachine(conn, *stream, frame);
} }
}, },
[&](MaxDataFrame& connWindowUpdate) { [&](MaxDataFrame& connWindowUpdate) {

View File

@@ -177,4 +177,3 @@ void onConnectionMigration(
QuicServerConnectionState& conn, QuicServerConnectionState& conn,
const folly::SocketAddress& newPeerAddress); const folly::SocketAddress& newPeerAddress);
} // namespace quic } // namespace quic

View File

@@ -1079,7 +1079,8 @@ TEST_F(QuicServerTransportTest, TestOpenAckStreamFrame) {
EXPECT_EQ( EXPECT_EQ(
stream->retransmissionBuffer.size(), stream->retransmissionBuffer.size(),
originalRetransSize - buffersInPacket1); originalRetransSize - buffersInPacket1);
EXPECT_TRUE(isState<StreamStates::Open>(*stream)); EXPECT_TRUE(isState<StreamSendStates::Open>(stream->send));
EXPECT_TRUE(isState<StreamReceiveStates::Open>(stream->recv));
// Dup ack // Dup ack
auto packet2 = createAckPacket( auto packet2 = createAckPacket(
@@ -1092,7 +1093,8 @@ TEST_F(QuicServerTransportTest, TestOpenAckStreamFrame) {
EXPECT_EQ( EXPECT_EQ(
stream->retransmissionBuffer.size(), stream->retransmissionBuffer.size(),
originalRetransSize - buffersInPacket1); originalRetransSize - buffersInPacket1);
EXPECT_TRUE(isState<StreamStates::Open>(*stream)); EXPECT_TRUE(isState<StreamSendStates::Open>(stream->send));
EXPECT_TRUE(isState<StreamReceiveStates::Open>(stream->recv));
IntervalSet<PacketNum> acks2 = {{packetNum1, lastPacketNum}}; IntervalSet<PacketNum> acks2 = {{packetNum1, lastPacketNum}};
auto packet3 = createAckPacket( auto packet3 = createAckPacket(
@@ -1103,7 +1105,8 @@ TEST_F(QuicServerTransportTest, TestOpenAckStreamFrame) {
deliverData(packetToBuf(packet3)); deliverData(packetToBuf(packet3));
EXPECT_EQ(stream->retransmissionBuffer.size(), 0); EXPECT_EQ(stream->retransmissionBuffer.size(), 0);
EXPECT_TRUE(isState<StreamStates::Open>(*stream)); EXPECT_TRUE(isState<StreamSendStates::Open>(stream->send));
EXPECT_TRUE(isState<StreamReceiveStates::Open>(stream->recv));
auto empty = IOBuf::create(0); auto empty = IOBuf::create(0);
server->writeChain(streamId, std::move(empty), true, false); server->writeChain(streamId, std::move(empty), true, false);
@@ -1123,7 +1126,8 @@ TEST_F(QuicServerTransportTest, TestOpenAckStreamFrame) {
acks3, acks3,
PacketNumberSpace::AppData); PacketNumberSpace::AppData);
deliverData(packetToBuf(packet4)); deliverData(packetToBuf(packet4));
EXPECT_TRUE(isState<StreamStates::HalfClosedLocal>(*stream)); EXPECT_TRUE(isState<StreamSendStates::Closed>(stream->send));
EXPECT_TRUE(isState<StreamReceiveStates::Open>(stream->recv));
} }
TEST_F(QuicServerTransportTest, RecvRstStreamFrameNonexistClientStream) { TEST_F(QuicServerTransportTest, RecvRstStreamFrameNonexistClientStream) {
@@ -3018,7 +3022,8 @@ TEST_P(
0 /* cipherOverhead */, 0 /* cipherOverhead */,
0 /* largestAcked */, 0 /* largestAcked */,
std::make_pair( std::make_pair(
LongHeader::Types::ZeroRtt, server->getConn().supportedVersions[0]))); LongHeader::Types::ZeroRtt, server->getConn().supportedVersions[0]),
false));
deliverData(std::move(packetData), false); deliverData(std::move(packetData), false);
if (GetParam().acceptZeroRtt) { if (GetParam().acceptZeroRtt) {
if (!GetParam().chloSync) { if (!GetParam().chloSync) {
@@ -3049,7 +3054,9 @@ TEST_P(
streamId, streamId,
*data, *data,
0 /* cipherOverhead */, 0 /* cipherOverhead */,
0 /* largestAcked */)); 0 /* largestAcked */,
folly::none,
false));
deliverData(std::move(packetData)); deliverData(std::move(packetData));
EXPECT_EQ(server->getConn().streamManager->streamCount(), 0); EXPECT_EQ(server->getConn().streamManager->streamCount(), 0);
EXPECT_EQ(server->getConn().pendingOneRttData->size(), 1); EXPECT_EQ(server->getConn().pendingOneRttData->size(), 1);
@@ -3076,7 +3083,8 @@ TEST_P(
0 /* cipherOverhead */, 0 /* cipherOverhead */,
0 /* largestAcked */, 0 /* largestAcked */,
std::make_pair( std::make_pair(
LongHeader::Types::ZeroRtt, server->getConn().supportedVersions[0]))); LongHeader::Types::ZeroRtt, server->getConn().supportedVersions[0]),
false));
deliverData(std::move(packetData), false); deliverData(std::move(packetData), false);
if (GetParam().acceptZeroRtt) { if (GetParam().acceptZeroRtt) {
if (!GetParam().chloSync) { if (!GetParam().chloSync) {

View File

@@ -113,10 +113,8 @@ void onRecvMinStreamDataFrame(
PacketNum packetNum) { PacketNum packetNum) {
if (isReceivingStream(stream->conn.nodeType, stream->id) || if (isReceivingStream(stream->conn.nodeType, stream->id) ||
(isSendingStream(stream->conn.nodeType, stream->id) && (isSendingStream(stream->conn.nodeType, stream->id) &&
!matchesStates< !matchesStates<StreamSendStateData, StreamSendStates::Open>(
StreamStateData, stream->send.state))) {
StreamStates::Open,
StreamStates::HalfClosedRemote>(stream->state))) {
throw QuicTransportException( throw QuicTransportException(
"MinStreamDataFrame on receiving-only stream or " "MinStreamDataFrame on receiving-only stream or "
"sending-only stream but not opened", "sending-only stream but not opened",

View File

@@ -36,11 +36,20 @@ void updateRtt(
std::chrono::microseconds ackDelay); std::chrono::microseconds ackDelay);
template <typename Event> template <typename Event>
void invokeStreamStateMachine( void invokeStreamSendStateMachine(
QuicConnectionStateBase&, QuicConnectionStateBase&,
QuicStreamState& stream, QuicStreamState& stream,
Event event) { Event event) {
invokeHandler<StreamStateMachine>(stream, std::move(event)); invokeHandler<StreamSendStateMachine>(stream.send, std::move(event), stream);
}
template <typename Event>
void invokeStreamReceiveStateMachine(
QuicConnectionStateBase&,
QuicStreamState& stream,
Event event) {
invokeHandler<StreamReceiveStateMachine>(
stream.recv, std::move(event), stream);
} }
bool isConnectionPaced(const QuicConnectionStateBase& conn) noexcept; bool isConnectionPaced(const QuicConnectionStateBase& conn) noexcept;

View File

@@ -305,9 +305,8 @@ void QuicStreamManager::removeClosedStream(StreamId streamId) {
return; return;
} }
VLOG(10) << "Removing closed stream=" << streamId; VLOG(10) << "Removing closed stream=" << streamId;
bool matchClosedState = bool inTerminalStates = it->second.inTerminalStates();
matchesStates<StreamStateData, StreamStates::Closed>(it->second.state); DCHECK(inTerminalStates);
DCHECK(matchClosedState);
readableStreams_.erase(streamId); readableStreams_.erase(streamId);
peekableStreams_.erase(streamId); peekableStreams_.erase(streamId);
writableStreams_.erase(streamId); writableStreams_.erase(streamId);

View File

@@ -129,7 +129,7 @@ bool updateSimpleFrameOnPacketReceived(
[&](const StopSendingFrame& frame) { [&](const StopSendingFrame& frame) {
auto stream = conn.streamManager->getStream(frame.streamId); auto stream = conn.streamManager->getStream(frame.streamId);
if (stream) { if (stream) {
invokeStreamStateMachine(conn, *stream, frame); invokeStreamSendStateMachine(conn, *stream, frame);
} }
return true; return true;
}, },

View File

@@ -32,6 +32,13 @@ QuicStreamState::QuicStreamState(StreamId idIn, QuicConnectionStateBase& connIn)
: isLocalStream(connIn.nodeType, idIn) : isLocalStream(connIn.nodeType, idIn)
? conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote ? conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote
: conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiLocal; : conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiLocal;
if (isUnidirectionalStream(idIn)) {
if (isLocalStream(connIn.nodeType, idIn)) {
recv.state = StreamReceiveStates::Invalid();
} else {
send.state = StreamSendStates::Invalid();
}
}
} }
std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st) { std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st) {

View File

@@ -28,6 +28,7 @@ namespace quic {
* 2. Create a Machine type with an alias of StateData * 2. Create a Machine type with an alias of StateData
* struct Machine { * struct Machine {
* using StateData = StateDataType; * using StateData = StateDataType;
* using UserData = UserDataType;
* constexpr auto InvalidEventHandler = &InvalidEventHandler; * constexpr auto InvalidEventHandler = &InvalidEventHandler;
* } * }
* 3. Create types for each state * 3. Create types for each state
@@ -96,12 +97,15 @@ struct HandlerBase {
} }
}; };
#define QUIC_DECLARE_STATE_HANDLER(machine, state, event, ...) \ #define QUIC_DECLARE_STATE_HANDLER(machine, state, event, ...) \
template <> \ template <> \
class Handler<machine, state, event> \ class Handler<machine, state, event> \
: public HandlerBase<machine, state, event, ##__VA_ARGS__> { \ : public HandlerBase<machine, state, event, ##__VA_ARGS__> { \
public: \ public: \
static void handle(typename machine::StateData& data, event evt); \ static void handle( \
typename machine::StateData& data, \
event evt, \
typename machine::UserData& userData); \
}; };
#define QUIC_DECLARE_STATE_HANDLER_T(state, event, ...) \ #define QUIC_DECLARE_STATE_HANDLER_T(state, event, ...) \
@@ -109,7 +113,10 @@ struct HandlerBase {
class Handler<Machine, state, event> \ class Handler<Machine, state, event> \
: public HandlerBase<Machine, state, event, ##__VA_ARGS__> { \ : public HandlerBase<Machine, state, event, ##__VA_ARGS__> { \
public: \ public: \
static void handle(typename Machine::StateData& data, event evt); \ static void handle( \
typename Machine::StateData& data, \
event evt, \
typename Machine::UserData&); \
template <class NewState> \ template <class NewState> \
static void transit(typename Machine::StateData& data) { \ static void transit(typename Machine::StateData& data) { \
HandlerBase<Machine, state, event, ##__VA_ARGS__>::template transit< \ HandlerBase<Machine, state, event, ##__VA_ARGS__>::template transit< \
@@ -123,28 +130,39 @@ struct HandlerBase {
*/ */
template <class Machine, class State, class Event> template <class Machine, class State, class Event>
struct Handler : public HandlerBase<Machine, State, Event> { struct Handler : public HandlerBase<Machine, State, Event> {
static void handle(typename Machine::StateData& data, Event /*event*/) { static void handle(
Machine::InvalidEventHandler(data); typename Machine::StateData& /*state*/,
Event /*event*/,
typename Machine::UserData& userData) {
Machine::InvalidEventHandler(userData);
} }
}; };
template <class Machine, class Event> template <class Machine, class Event>
struct state_visitor : public boost::static_visitor<> { struct state_visitor : public boost::static_visitor<> {
explicit state_visitor(Event event, typename Machine::StateData& data) explicit state_visitor(
: event_(std::move(event)), data_(data) {} Event event,
typename Machine::StateData& data,
typename Machine::UserData& userData)
: event_(std::move(event)), data_(data), userData_(userData) {}
template <class State> template <class State>
void operator()(const State& /*state*/) { void operator()(const State& /*state*/) {
Handler<Machine, State, Event>::handle(data_, std::move(event_)); Handler<Machine, State, Event>::handle(data_, std::move(event_), userData_);
} }
Event event_; Event event_;
typename Machine::StateData& data_; typename Machine::StateData& data_;
typename Machine::UserData& userData_;
}; };
template <class Machine, class Event> template <class Machine, class Event>
void invokeHandler(typename Machine::StateData& data, Event event) { void invokeHandler(
auto visitor = state_visitor<Machine, Event>(std::move(event), data); typename Machine::StateData& data,
Event event,
typename Machine::UserData& userData) {
auto visitor =
state_visitor<Machine, Event>(std::move(event), data, userData);
return boost::apply_visitor(visitor, data.state); return boost::apply_visitor(visitor, data.state);
} }
} }

View File

@@ -81,38 +81,57 @@ struct QuicStreamLike {
struct QuicConnectionStateBase; struct QuicConnectionStateBase;
struct StreamStates { struct StreamSendStates {
// The stream is open // Ready, Send and FinSent are collapsed into Open
struct Open {}; struct Open {};
// The stream has closed its write // RST has been sent
struct HalfClosedLocal {}; struct ResetSent {};
// The stream has closed read. // All data acked or RST acked. Collapsed Data Received and Reset Received
struct HalfClosedRemote {};
// The stream is waiting for the ack of the reset stream
struct WaitingForRstAck {};
// The stream is now closed.
struct Closed {}; struct Closed {};
// Used for peer initiated unidirectional streams
struct Invalid {};
}; };
using StreamStateData = boost::variant< struct StreamReceiveStates {
StreamStates::Open, // Renamed Receive to Open
StreamStates::HalfClosedLocal, struct Open {};
StreamStates::HalfClosedRemote,
StreamStates::WaitingForRstAck,
StreamStates::Closed>;
inline std::string streamStateToString(const StreamStateData& state) { // All data or rst received
struct Closed {};
// Used for self-initiated unidirectional streams
struct Invalid {};
};
using StreamSendStateData = boost::variant<
StreamSendStates::Open,
StreamSendStates::ResetSent,
StreamSendStates::Closed,
StreamSendStates::Invalid>;
using StreamReceiveStateData = boost::variant<
StreamReceiveStates::Open,
StreamReceiveStates::Closed,
StreamReceiveStates::Invalid>;
inline std::string streamStateToString(const StreamSendStateData& state) {
return folly::variant_match( return folly::variant_match(
state, state,
[](const StreamStates::Open&) { return "Open"; }, [](const StreamSendStates::Open&) { return "Open"; },
[](const StreamStates::HalfClosedLocal&) { return "HalfClosedLocal"; }, [](const StreamSendStates::ResetSent&) { return "ResetSent"; },
[](const StreamStates::HalfClosedRemote&) { return "HalfClosedRemote"; }, [](const StreamSendStates::Closed&) { return "Closed"; },
[](const StreamStates::WaitingForRstAck&) { return "WaitingForRstAck"; }, [](const StreamSendStates::Invalid&) { return "Invalid"; });
[](const StreamStates::Closed&) { return "Closed"; }); }
inline std::string streamStateToString(const StreamReceiveStateData& state) {
return folly::variant_match(
state,
[](const StreamReceiveStates::Open&) { return "Open"; },
[](const StreamReceiveStates::Closed&) { return "Closed"; },
[](const StreamReceiveStates::Invalid&) { return "Invalid"; });
} }
struct QuicStreamState : public QuicStreamLike { struct QuicStreamState : public QuicStreamLike {
@@ -145,7 +164,12 @@ struct QuicStreamState : public QuicStreamLike {
folly::Optional<QuicErrorCode> streamWriteError; folly::Optional<QuicErrorCode> streamWriteError;
// State machine data // State machine data
StreamStateData state{StreamStates::Open()}; struct Send {
StreamSendStateData state{StreamSendStates::Open()};
} send;
struct Recv {
StreamReceiveStateData state{StreamReceiveStates::Open()};
} recv;
// The packet number of the latest packet that contains a MaxStreamDataFrame // The packet number of the latest packet that contains a MaxStreamDataFrame
// sent out by us. // sent out by us.
@@ -167,20 +191,29 @@ struct QuicStreamState : public QuicStreamLike {
// lastHolbTime indicates whether the stream is HOL blocked at the moment. // lastHolbTime indicates whether the stream is HOL blocked at the moment.
uint32_t holbCount{0}; uint32_t holbCount{0};
// Returns true if both send and receive state machines are in a terminal
// state
bool inTerminalStates() const {
return matchesStates<
StreamSendStateData,
StreamSendStates::Closed,
StreamSendStates::Invalid>(send.state) &&
matchesStates<
StreamReceiveStateData,
StreamReceiveStates::Closed,
StreamReceiveStates::Invalid>(recv.state);
}
// If the stream is still writable. // If the stream is still writable.
bool writable() const { bool writable() const {
return matchesStates< return matchesStates<StreamSendStateData, StreamSendStates::Open>(
StreamStateData, send.state) &&
StreamStates::Open,
StreamStates::HalfClosedRemote>(state) &&
!finalWriteOffset.hasValue(); !finalWriteOffset.hasValue();
} }
bool shouldSendFlowControl() const { bool shouldSendFlowControl() const {
return matchesStates< return matchesStates<StreamReceiveStateData, StreamReceiveStates::Open>(
StreamStateData, recv.state);
StreamStates::Open,
StreamStates::HalfClosedLocal>(state);
} }
bool hasWritableData() const { bool hasWritableData() const {

View File

@@ -12,16 +12,21 @@
#include <quic/state/stream/StreamStateFunctions.h> #include <quic/state/stream/StreamStateFunctions.h>
namespace quic { namespace quic {
template <typename Event>
inline void void invokeStreamSendStateMachine(
Handler<StreamStateMachine, StreamStates::Closed, ReadStreamFrame>::handle( QuicConnectionStateBase&,
QuicStreamState& stream, QuicStreamState& stream,
ReadStreamFrame frame) { Event event);
if (isSendingStream(stream.conn.nodeType, stream.id)) {
throw QuicTransportException( inline void Handler<
"ReadStreamFrame on unidirectional sending stream", StreamReceiveStateMachine,
TransportErrorCode::STREAM_STATE_ERROR); StreamReceiveStates::Closed,
} ReadStreamFrame>::
handle(
QuicStreamState::Recv& state,
ReadStreamFrame frame,
QuicStreamState& stream) {
CHECK(!isSendingStream(stream.conn.nodeType, stream.id));
VLOG_IF(10, frame.fin) << "Closed: Received data with fin" VLOG_IF(10, frame.fin) << "Closed: Received data with fin"
<< " stream=" << stream.id << " " << stream.conn; << " stream=" << stream.id << " " << stream.conn;
appendDataToReadBuffer( appendDataToReadBuffer(
@@ -29,64 +34,64 @@ Handler<StreamStateMachine, StreamStates::Closed, ReadStreamFrame>::handle(
} }
inline void inline void
Handler<StreamStateMachine, StreamStates::Closed, StopSendingFrame>::handle( Handler<StreamSendStateMachine, StreamSendStates::Closed, StopSendingFrame>::
QuicStreamState& stream, handle(
StopSendingFrame /* frame */) { QuicStreamState::Send& /*state*/,
if (isReceivingStream(stream.conn.nodeType, stream.id)) { StopSendingFrame /*frame*/,
throw QuicTransportException( QuicStreamState& /*stream*/) {
"StopSendingFrame on unidirectional receiving stream", // no-op, we're already done sending
TransportErrorCode::STREAM_STATE_ERROR);
}
} }
inline void inline void Handler<
Handler<StreamStateMachine, StreamStates::Closed, RstStreamFrame>::handle( StreamReceiveStateMachine,
QuicStreamState& stream, StreamReceiveStates::Closed,
RstStreamFrame rst) { RstStreamFrame>::
if (isSendingStream(stream.conn.nodeType, stream.id)) { handle(
throw QuicTransportException( QuicStreamState::Recv& state,
"RstStreamFrame on unidirectional sending stream", RstStreamFrame rst,
TransportErrorCode::STREAM_STATE_ERROR); QuicStreamState& stream) {
} // TODO: Remove
invokeStreamSendStateMachine(
stream.conn,
stream,
StreamEvents::SendReset(GenericApplicationErrorCode::NO_ERROR));
// This will check whether the reset is still consistent with the stream. // This will check whether the reset is still consistent with the stream.
onResetQuicStream(stream, std::move(rst)); onResetQuicStream(stream, std::move(rst));
} }
inline void Handler< inline void Handler<
StreamStateMachine, StreamSendStateMachine,
StreamStates::Closed, StreamSendStates::Closed,
StreamEvents::AckStreamFrame>:: StreamEvents::AckStreamFrame>::
handle(QuicStreamState& stream, StreamEvents::AckStreamFrame /*ack*/) { handle(
// do nothing here, we're done with handling our write data. QuicStreamState::Send& /*state*/,
if (isReceivingStream(stream.conn.nodeType, stream.id)) { StreamEvents::AckStreamFrame /*ack*/,
throw QuicTransportException( QuicStreamState& stream) {
"AckStreamFrame on unidirectional receiving stream",
TransportErrorCode::STREAM_STATE_ERROR);
}
DCHECK(stream.retransmissionBuffer.empty()); DCHECK(stream.retransmissionBuffer.empty());
DCHECK(stream.writeBuffer.empty()); DCHECK(stream.writeBuffer.empty());
} }
inline void inline void Handler<
Handler<StreamStateMachine, StreamStates::Closed, StreamEvents::RstAck>::handle( StreamSendStateMachine,
QuicStreamState& stream, StreamSendStates::Closed,
StreamEvents::RstAck /*ack*/) { StreamEvents::RstAck>::
if (isReceivingStream(stream.conn.nodeType, stream.id)) { handle(
throw QuicTransportException( QuicStreamState::Send& /*state*/,
"RstAck on unidirectional receiving stream", StreamEvents::RstAck /*ack*/,
TransportErrorCode::STREAM_STATE_ERROR); QuicStreamState& /*stream*/) {
}
// Just discard the ack if we are already in Closed state. // Just discard the ack if we are already in Closed state.
} }
inline void inline void Handler<
Handler<StreamStateMachine, StreamStates::Closed, StreamEvents::SendReset>:: StreamSendStateMachine,
handle(QuicStreamState& stream, StreamEvents::SendReset) { StreamSendStates::Closed,
if (isReceivingStream(stream.conn.nodeType, stream.id)) { StreamEvents::SendReset>::
throw QuicTransportException( handle(
"SendReset on unidirectional receiving stream", QuicStreamState::Send& state,
TransportErrorCode::STREAM_STATE_ERROR); StreamEvents::SendReset,
} QuicStreamState& stream) {
// TODO: remove this as a valid state transition
LOG(ERROR) << "Ignoring SendReset from closed state. This may be a bug";
// Discard the send reset. // Discard the send reset.
} }
} // namespace quic } // namespace quic

View File

@@ -1,72 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
*/
// override-include-guard
#include <quic/state/stream/StreamStateFunctions.h>
namespace quic {
inline void
Handler<StreamStateMachine, StreamStates::HalfClosedLocal, RstStreamFrame>::
handle(QuicStreamState& stream, RstStreamFrame rst) {
// We transition before invoking onResetQuicStream so that reset stream
// can use the state to make decisions.
VLOG(10) << "HalfClosedLocal: Received reset, transition to closed"
<< " stream=" << stream.id << " " << stream.conn;
transit<StreamStates::Closed>(stream);
onResetQuicStream(stream, std::move(rst));
stream.conn.streamManager->addClosed(stream.id);
}
inline void
Handler<StreamStateMachine, StreamStates::HalfClosedLocal, ReadStreamFrame>::
handle(QuicStreamState& stream, ReadStreamFrame frame) {
VLOG_IF(10, frame.fin) << "HalfClosedLocal: Received data with fin"
<< " stream=" << stream.id
<< " offset=" << frame.offset
<< " readOffset=" << stream.currentReadOffset << " "
<< stream.conn;
appendDataToReadBuffer(
stream, StreamBuffer(std::move(frame.data), frame.offset, frame.fin));
if (isAllDataReceived(stream)) {
stream.conn.streamManager->addClosed(stream.id);
VLOG(10) << "HalfClosedLocal: Transition to closed"
<< " stream=" << stream.id << " " << stream.conn;
transit<StreamStates::Closed>(stream);
}
stream.conn.streamManager->updateReadableStreams(stream);
}
inline void Handler<
StreamStateMachine,
StreamStates::HalfClosedLocal,
StreamEvents::AckStreamFrame>::
handle(QuicStreamState& stream, StreamEvents::AckStreamFrame /*ack*/) {
// do nothing here, we already got acks for all the bytes till fin.
DCHECK(stream.retransmissionBuffer.empty());
DCHECK(stream.writeBuffer.empty());
}
inline void
Handler<StreamStateMachine, StreamStates::HalfClosedLocal, StopSendingFrame>::
handle(QuicStreamState& /*stream*/, StopSendingFrame /*frame*/) {}
inline void Handler<
StreamStateMachine,
StreamStates::HalfClosedLocal,
StreamEvents::SendReset>::
handle(QuicStreamState& stream, StreamEvents::SendReset rst) {
resetQuicStream(stream, rst.errorCode);
appendPendingStreamReset(stream.conn, stream, rst.errorCode);
// Move the state machine:
VLOG(10) << "HalfClosedLocal: Transition to WaitingForRstAck"
<< " stream=" << stream.id << " " << stream.conn;
transit<StreamStates::WaitingForRstAck>(stream);
}
} // namespace quic

View File

@@ -1,96 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
*/
// override-include-guard
#include <quic/QuicException.h>
#include <quic/state/stream/StreamStateFunctions.h>
namespace quic {
inline void
Handler<StreamStateMachine, StreamStates::HalfClosedRemote, ReadStreamFrame>::
handle(QuicStreamState& stream, ReadStreamFrame frame) {
VLOG_IF(10, frame.fin) << "HalfClosedRemote: Received data with fin"
<< " stream=" << stream.id << " " << stream.conn;
appendDataToReadBuffer(
stream, StreamBuffer(std::move(frame.data), frame.offset, frame.fin));
}
inline void Handler<
StreamStateMachine,
StreamStates::HalfClosedRemote,
StreamEvents::AckStreamFrame>::
handle(QuicStreamState& stream, StreamEvents::AckStreamFrame ack) {
// Clean up the acked buffers from the retransmissionBuffer.
auto ackedBuffer = std::lower_bound(
stream.retransmissionBuffer.begin(),
stream.retransmissionBuffer.end(),
ack.ackedFrame.offset,
[](const auto& buffer, const auto& offset) {
return buffer.offset < offset;
});
if (ackedBuffer != stream.retransmissionBuffer.end()) {
// Since the StreamFrames that are ACKed are computed from the outstanding
// packets, we always know that the retransmission buffer corresponds to
// 1 buffer in the retranmission buffer.
CHECK_EQ(ackedBuffer->offset, ack.ackedFrame.offset);
CHECK_EQ(ackedBuffer->data.chainLength(), ack.ackedFrame.len);
VLOG(10) << "HalfClosedRemote: stream data acked stream=" << stream.id
<< " offset=" << ackedBuffer->offset
<< " len=" << ackedBuffer->data.chainLength()
<< " eof=" << ackedBuffer->eof << " " << stream.conn;
stream.retransmissionBuffer.erase(ackedBuffer);
} else {
VLOG(10) << __func__ << ": offset " << ack.ackedFrame.offset
<< " no longer in retransmissionBuffer";
}
// This stream may be able to invoke some deliveryCallbacks:
stream.conn.streamManager->addDeliverable(stream.id);
// Check for whether or not we have ACKed all bytes until our FIN.
if (allBytesTillFinAcked(stream)) {
stream.conn.streamManager->addClosed(stream.id);
VLOG(10) << "HalfClosedRemote: Transition to closed stream=" << stream.id
<< " " << stream.conn;
transit<StreamStates::Closed>(stream);
}
}
inline void
Handler<StreamStateMachine, StreamStates::HalfClosedRemote, StopSendingFrame>::
handle(QuicStreamState& stream, StopSendingFrame frame) {
stream.conn.streamManager->addStopSending(stream.id, frame.errorCode);
}
inline void
Handler<StreamStateMachine, StreamStates::HalfClosedRemote, RstStreamFrame>::
handle(QuicStreamState& stream, RstStreamFrame rst) {
// We transit before invoking onResetQuicStream because it will check the
// state of the stream for flow control.
transit<StreamStates::WaitingForRstAck>(stream);
onResetQuicStream(stream, std::move(rst));
// TODO: remove.
appendPendingStreamReset(
stream.conn, stream, GenericApplicationErrorCode::NO_ERROR);
}
inline void Handler<
StreamStateMachine,
StreamStates::HalfClosedRemote,
StreamEvents::SendReset>::
handle(QuicStreamState& stream, StreamEvents::SendReset rst) {
resetQuicStream(stream, rst.errorCode);
appendPendingStreamReset(stream.conn, stream, rst.errorCode);
// Move the state machine:
transit<StreamStates::WaitingForRstAck>(stream);
}
} // namespace quic

View File

@@ -9,95 +9,92 @@
// override-include-guard // override-include-guard
#include <quic/state/QuicStreamUtilities.h> #include <quic/state/QuicStreamUtilities.h>
#include <quic/state/stream/StreamStateFunctions.h>
namespace quic { namespace quic {
template <typename Event>
inline void void invokeStreamSendStateMachine(
Handler<StreamStateMachine, StreamStates::Open, ReadStreamFrame>::handle( QuicConnectionStateBase&,
QuicStreamState& stream, QuicStreamState& stream,
ReadStreamFrame frame) { Event event);
if (isSendingStream(stream.conn.nodeType, stream.id)) { inline void
throw QuicTransportException( Handler<StreamReceiveStateMachine, StreamReceiveStates::Open, ReadStreamFrame>::
"ReadStreamFrame on unidirectional sending stream", handle(
TransportErrorCode::STREAM_STATE_ERROR); QuicStreamState::Recv& state,
} ReadStreamFrame frame,
QuicStreamState& stream) {
VLOG_IF(10, frame.fin) << "Open: Received data with fin" VLOG_IF(10, frame.fin) << "Open: Received data with fin"
<< " stream=" << stream.id << " " << stream.conn; << " stream=" << stream.id << " " << stream.conn;
appendDataToReadBuffer( appendDataToReadBuffer(
stream, StreamBuffer(std::move(frame.data), frame.offset, frame.fin)); stream, StreamBuffer(std::move(frame.data), frame.offset, frame.fin));
if (isAllDataReceived(stream)) { if (isAllDataReceived(stream)) {
if (isUnidirectionalStream(stream.id)) { VLOG(10) << "Open: Transition to Closed"
VLOG(10) << "Open: Transition to Closed" << " stream=" << stream.id << " " << stream.conn;
<< " stream=" << stream.id << " " << stream.conn; transit<StreamReceiveStates::Closed>(state);
transit<StreamStates::Closed>(stream); if (stream.inTerminalStates()) {
} else { stream.conn.streamManager->addClosed(stream.id);
VLOG(10) << "Open: Transition to HalfClosedRemote"
<< " stream=" << stream.id << " " << stream.conn;
transit<StreamStates::HalfClosedRemote>(stream);
} }
} }
stream.conn.streamManager->updateReadableStreams(stream); stream.conn.streamManager->updateReadableStreams(stream);
} }
inline void inline void
Handler<StreamStateMachine, StreamStates::Open, StopSendingFrame>::handle( Handler<StreamSendStateMachine, StreamSendStates::Open, StopSendingFrame>::
QuicStreamState& stream, handle(
StopSendingFrame frame) { QuicStreamState::Send& state,
if (isBidirectionalStream(stream.id) || StopSendingFrame frame,
isSendingStream(stream.conn.nodeType, stream.id)) { QuicStreamState& stream) {
stream.conn.streamManager->addStopSending(stream.id, frame.errorCode); CHECK(
} else { isBidirectionalStream(stream.id) ||
throw QuicTransportException( isSendingStream(stream.conn.nodeType, stream.id));
"StopSendingFrame on unidirectional receiving stream", stream.conn.streamManager->addStopSending(stream.id, frame.errorCode);
TransportErrorCode::STREAM_STATE_ERROR);
}
} }
inline void inline void
Handler<StreamStateMachine, StreamStates::Open, RstStreamFrame>::handle( Handler<StreamReceiveStateMachine, StreamReceiveStates::Open, RstStreamFrame>::
QuicStreamState& stream, handle(
RstStreamFrame rst) { QuicStreamState::Recv& state,
if (isSendingStream(stream.conn.nodeType, stream.id)) { RstStreamFrame rst,
throw QuicTransportException( QuicStreamState& stream) {
"RstStreamFrame on unidirectional sending stream", if (matchesStates<StreamSendStateData, StreamSendStates::Open>(
TransportErrorCode::STREAM_STATE_ERROR); stream.send.state)) {
}
// We transit before invoking onResetQuicStream because it will check the
// state of the stream for flow control.
transit<StreamStates::WaitingForRstAck>(stream);
onResetQuicStream(stream, std::move(rst));
if (isBidirectionalStream(stream.id)) {
// TODO: remove. // TODO: remove.
appendPendingStreamReset( invokeStreamSendStateMachine(
stream.conn, stream, GenericApplicationErrorCode::NO_ERROR); stream.conn,
} else { stream,
transit<StreamStates::Closed>(stream); StreamEvents::SendReset(GenericApplicationErrorCode::NO_ERROR));
} }
// We transit the receive state machine to Closed before invoking
// onResetQuicStream because it will check the state of the stream for flow
// control.
transit<StreamReceiveStates::Closed>(state);
if (stream.inTerminalStates()) {
stream.conn.streamManager->addClosed(stream.id);
}
onResetQuicStream(stream, std::move(rst));
} }
inline void inline void Handler<
Handler<StreamStateMachine, StreamStates::Open, StreamEvents::SendReset>:: StreamSendStateMachine,
handle(QuicStreamState& stream, StreamEvents::SendReset rst) { StreamSendStates::Open,
if (isReceivingStream(stream.conn.nodeType, stream.id)) { StreamEvents::SendReset>::
throw QuicTransportException( handle(
"SendReset on unidirectional receiving stream", QuicStreamState::Send& state,
TransportErrorCode::STREAM_STATE_ERROR); StreamEvents::SendReset rst,
} QuicStreamState& stream) {
resetQuicStream(stream, rst.errorCode); resetQuicStream(stream, rst.errorCode);
appendPendingStreamReset(stream.conn, stream, rst.errorCode); appendPendingStreamReset(stream.conn, stream, rst.errorCode);
// Move the state machine: // Move the state machine:
transit<StreamStates::WaitingForRstAck>(stream); transit<StreamSendStates::ResetSent>(state);
} }
inline void inline void Handler<
Handler<StreamStateMachine, StreamStates::Open, StreamEvents::AckStreamFrame>:: StreamSendStateMachine,
handle(QuicStreamState& stream, StreamEvents::AckStreamFrame ack) { StreamSendStates::Open,
if (isReceivingStream(stream.conn.nodeType, stream.id)) { StreamEvents::AckStreamFrame>::
throw QuicTransportException( handle(
"AckStreamFrame on unidirectional receiving stream", QuicStreamState::Send& state,
TransportErrorCode::STREAM_STATE_ERROR); StreamEvents::AckStreamFrame ack,
} QuicStreamState& stream) {
// Clean up the acked buffers from the retransmissionBuffer. // Clean up the acked buffers from the retransmissionBuffer.
auto ackedBuffer = std::lower_bound( auto ackedBuffer = std::lower_bound(
@@ -129,10 +126,9 @@ Handler<StreamStateMachine, StreamStates::Open, StreamEvents::AckStreamFrame>::
// Check for whether or not we have ACKed all bytes until our FIN. // Check for whether or not we have ACKed all bytes until our FIN.
if (allBytesTillFinAcked(stream)) { if (allBytesTillFinAcked(stream)) {
if (isUnidirectionalStream(stream.id)) { transit<StreamSendStates::Closed>(state);
transit<StreamStates::Closed>(stream); if (stream.inTerminalStates()) {
} else { stream.conn.streamManager->addClosed(stream.id);
transit<StreamStates::HalfClosedLocal>(stream);
} }
} }
} }

View File

@@ -40,219 +40,171 @@ struct StreamEvents {
// Transition the stream to an error state if there is an invalid state // Transition the stream to an error state if there is an invalid state
// transition. // transition.
inline void StreamStateMachineInvalidHandler(const QuicStreamState& state) { inline void StreamSendStateMachineInvalidHandler(const QuicStreamState& state) {
throw QuicTransportException( throw QuicTransportException(
folly::to<std::string>( folly::to<std::string>(
"Invalid transition from state=", "Invalid transition from state=",
folly::variant_match( folly::variant_match(
state.state, state.send.state,
[](const StreamStates::Open&) { return "Open"; }, [](const StreamSendStates::Open&) { return "Open"; },
[](const StreamStates::HalfClosedLocal&) { [](const StreamSendStates::ResetSent&) { return "ResetSent"; },
return "HalfClosedLocal"; [](const StreamSendStates::Closed&) { return "Closed"; },
}, [](const StreamSendStates::Invalid&) { return "Invalid"; })),
[](const StreamStates::HalfClosedRemote&) {
return "HalfClosedRemote";
},
[](const StreamStates::WaitingForRstAck&) {
return "WaitingForRstAck";
},
[](const StreamStates::Closed&) { return "Closed"; })),
TransportErrorCode::STREAM_STATE_ERROR); TransportErrorCode::STREAM_STATE_ERROR);
} }
struct StreamStateMachine { struct StreamSendStateMachine {
using StateData = QuicStreamState; using StateData = QuicStreamState::Send;
static constexpr auto InvalidEventHandler = &StreamStateMachineInvalidHandler; using UserData = QuicStreamState;
static constexpr auto InvalidEventHandler =
&StreamSendStateMachineInvalidHandler;
};
// Transition the stream to an error state if there is an invalid state
// transition.
inline void StreamReceiveStateMachineInvalidHandler(
const QuicStreamState& state) {
throw QuicTransportException(
folly::to<std::string>(
"Invalid transition from state=",
folly::variant_match(
state.recv.state,
[](const StreamReceiveStates::Open&) { return "Open"; },
[](const StreamReceiveStates::Closed&) { return "Closed"; },
[](const StreamReceiveStates::Invalid&) { return "Invalid"; })),
TransportErrorCode::STREAM_STATE_ERROR);
}
struct StreamReceiveStateMachine {
using StateData = QuicStreamState::Recv;
using UserData = QuicStreamState;
static constexpr auto InvalidEventHandler =
&StreamReceiveStateMachineInvalidHandler;
}; };
/** /**
* Welcome to the stream state machine, we got fun and games. * Welcome to the stream state machine, we got fun and games.
* *
* ACK = Ack of stream frame. * This is a simplified version of the state machines defined in the transport
* specification. The "Invalid" state is used for unidirectional streams that
* do not have that half (eg: an ingress uni stream is in send state Invalid)
*
* Send State Machine
* ==================
*
* [ Initial State ]
* |
* | Send Stream
* |
* v
* Send::Open ---------------+
* | |
* | Ack all bytes | Send RST
* | ti FIN |
* v v
* Send::Closed <------ ResetSent
* RST
* Acked
* *
* *
* Stream / ACK Stream/ ACK * Receive State Machine
* |--| |----| * =====================
* | v All bytes till FIN | v All bytes till FIN *
* Open ---------------------> HalfClosedRemote ------------------------| * [ Initial State ]
* | | recv | acked | * |
* | | | | * | Stream
* | | SendReset / Reset | | * |
* | ----------------------------| | | * v
* | | | | * Receive::Open -----------+
* | All bytes till FIN acked | | | * | |
* | | | | * | Receive all | Receive RST
* | | | | * | bytes til FIN |
* | Stream / ACK | | SendReset / | * v |
* | |---| | | Reset | * Receive::Closed <---------+
* v | v SendReset v v RstAck v *
* HalfClosedLocal --------------> WaitingForRstAck---------------------|
* | | ^ |
* | |---| |
* | Reset / Stream / ACK / Reset / SendReset |
* | All bytes till FIN recv |
* |--------------------------------------------------------------> Closed
* | ^
* |---|
* Stream /
* Reset /
* RstAck /
* Ack /
* SendReset
*/ */
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamReceiveStateMachine,
StreamStates::Open, StreamReceiveStates::Open,
ReadStreamFrame, ReadStreamFrame,
StreamStates::HalfClosedRemote, StreamReceiveStates::Closed);
StreamStates::Closed);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamReceiveStateMachine,
StreamStates::Open, StreamReceiveStates::Open,
RstStreamFrame, RstStreamFrame,
StreamStates::WaitingForRstAck, StreamReceiveStates::Closed);
StreamStates::Closed);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamSendStateMachine,
StreamStates::Open, StreamSendStates::Open,
StreamEvents::SendReset, StreamEvents::SendReset,
StreamStates::WaitingForRstAck); StreamSendStates::ResetSent);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamSendStateMachine,
StreamStates::Open, StreamSendStates::Open,
StreamEvents::AckStreamFrame, StreamEvents::AckStreamFrame,
StreamStates::HalfClosedLocal, StreamSendStates::Closed);
StreamStates::Closed);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamSendStateMachine,
StreamStates::Open, StreamSendStates::Open,
StopSendingFrame); StopSendingFrame);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamSendStateMachine,
StreamStates::HalfClosedLocal, StreamSendStates::ResetSent,
RstStreamFrame,
StreamStates::Closed);
QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine,
StreamStates::HalfClosedLocal,
ReadStreamFrame,
StreamStates::Closed);
QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine,
StreamStates::HalfClosedLocal,
StreamEvents::AckStreamFrame); StreamEvents::AckStreamFrame);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamSendStateMachine,
StreamStates::HalfClosedLocal, StreamSendStates::ResetSent,
StopSendingFrame); StopSendingFrame);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamSendStateMachine,
StreamStates::HalfClosedLocal, StreamSendStates::ResetSent,
StreamEvents::SendReset,
StreamStates::WaitingForRstAck);
QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine,
StreamStates::HalfClosedRemote,
ReadStreamFrame);
QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine,
StreamStates::HalfClosedRemote,
StreamEvents::AckStreamFrame,
StreamStates::Closed);
QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine,
StreamStates::HalfClosedRemote,
StopSendingFrame);
QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine,
StreamStates::HalfClosedRemote,
StreamEvents::SendReset,
StreamStates::WaitingForRstAck);
QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine,
StreamStates::HalfClosedRemote,
RstStreamFrame,
StreamStates::WaitingForRstAck);
QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine,
StreamStates::WaitingForRstAck,
ReadStreamFrame);
QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine,
StreamStates::WaitingForRstAck,
StreamEvents::AckStreamFrame);
QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine,
StreamStates::WaitingForRstAck,
StopSendingFrame);
QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine,
StreamStates::WaitingForRstAck,
RstStreamFrame);
QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine,
StreamStates::WaitingForRstAck,
StreamEvents::RstAck, StreamEvents::RstAck,
StreamStates::Closed); StreamSendStates::Closed);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamSendStateMachine,
StreamStates::WaitingForRstAck, StreamSendStates::ResetSent,
StreamEvents::SendReset); StreamEvents::SendReset);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamReceiveStateMachine,
StreamStates::Closed, StreamReceiveStates::Closed,
ReadStreamFrame); ReadStreamFrame);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamSendStateMachine,
StreamStates::Closed, StreamSendStates::Closed,
StreamEvents::RstAck); StreamEvents::RstAck);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamSendStateMachine,
StreamStates::Closed, StreamSendStates::Closed,
StreamEvents::AckStreamFrame); StreamEvents::AckStreamFrame);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamSendStateMachine,
StreamStates::Closed, StreamSendStates::Closed,
StopSendingFrame); StopSendingFrame);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamReceiveStateMachine,
StreamStates::Closed, StreamReceiveStates::Closed,
RstStreamFrame); RstStreamFrame);
QUIC_DECLARE_STATE_HANDLER( QUIC_DECLARE_STATE_HANDLER(
StreamStateMachine, StreamSendStateMachine,
StreamStates::Closed, StreamSendStates::Closed,
StreamEvents::SendReset); StreamEvents::SendReset);
} // namespace quic } // namespace quic
#include <quic/state/stream/StreamClosedHandlers.h> #include <quic/state/stream/StreamClosedHandlers.h>
#include <quic/state/stream/StreamHalfClosedLocalHandlers.h>
#include <quic/state/stream/StreamHalfClosedRemoteHandlers.h>
#include <quic/state/stream/StreamOpenHandlers.h> #include <quic/state/stream/StreamOpenHandlers.h>
#include <quic/state/stream/StreamWaitingForRstAckHandlers.h> #include <quic/state/stream/StreamWaitingForRstAckHandlers.h>

View File

@@ -12,25 +12,20 @@
namespace quic { namespace quic {
inline void template <typename Event>
Handler<StreamStateMachine, StreamStates::WaitingForRstAck, ReadStreamFrame>:: void invokeStreamReceiveStateMachine(
handle(QuicStreamState& stream, ReadStreamFrame frame) { QuicConnectionStateBase&,
if (isSendingStream(stream.conn.nodeType, stream.id)) { QuicStreamState& stream,
throw QuicTransportException( Event event);
"ReadStreamFrame on unidirectional sending stream",
TransportErrorCode::STREAM_STATE_ERROR);
}
VLOG_IF(10, frame.fin) << "WaitingForRstAck: Received data with fin"
<< " stream=" << stream.id << " " << stream.conn;
appendDataToReadBuffer(
stream, StreamBuffer(std::move(frame.data), frame.offset, frame.fin));
}
inline void Handler< inline void Handler<
StreamStateMachine, StreamSendStateMachine,
StreamStates::WaitingForRstAck, StreamSendStates::ResetSent,
StreamEvents::AckStreamFrame>:: StreamEvents::AckStreamFrame>::
handle(QuicStreamState& stream, StreamEvents::AckStreamFrame /*ack*/) { handle(
QuicStreamState::Send& /*state*/,
StreamEvents::AckStreamFrame /*ack*/,
QuicStreamState& stream) {
// do nothing here. We should have already dumped the stream state before // do nothing here. We should have already dumped the stream state before
// we got here. // we got here.
DCHECK(stream.retransmissionBuffer.empty()); DCHECK(stream.retransmissionBuffer.empty());
@@ -38,43 +33,46 @@ inline void Handler<
} }
inline void inline void
Handler<StreamStateMachine, StreamStates::WaitingForRstAck, StopSendingFrame>:: Handler<StreamSendStateMachine, StreamSendStates::ResetSent, StopSendingFrame>::
handle(QuicStreamState& stream, StopSendingFrame /*frame*/) { handle(
if (isReceivingStream(stream.conn.nodeType, stream.id)) { QuicStreamState::Send& /*state*/,
throw QuicTransportException( StopSendingFrame /*frame*/,
"StopSendingFrame on unidirectional receiving stream", QuicStreamState& /*stream*/) {
TransportErrorCode::STREAM_STATE_ERROR); // no-op, we already sent a reset
}
}
inline void
Handler<StreamStateMachine, StreamStates::WaitingForRstAck, RstStreamFrame>::
handle(QuicStreamState& stream, RstStreamFrame rst) {
if (isSendingStream(stream.conn.nodeType, stream.id)) {
throw QuicTransportException(
"RstStreamFrame on unidirectional sending stream",
TransportErrorCode::STREAM_STATE_ERROR);
}
// This will make sure all the states are consistent between resets.
onResetQuicStream(stream, std::move(rst));
} }
inline void Handler< inline void Handler<
StreamStateMachine, StreamSendStateMachine,
StreamStates::WaitingForRstAck, StreamSendStates::ResetSent,
StreamEvents::RstAck>:: StreamEvents::RstAck>::
handle(QuicStreamState& stream, StreamEvents::RstAck /*ack*/) { handle(
stream.conn.streamManager->addClosed(stream.id); QuicStreamState::Send& state,
VLOG(10) << "WaitingForRstAck: Transition to closed stream=" << stream.id StreamEvents::RstAck /*ack*/,
<< " " << stream.conn; QuicStreamState& stream) {
transit<StreamStates::Closed>(stream); VLOG(10) << "ResetSent: Transition to closed stream=" << stream.id << " "
<< stream.conn;
transit<StreamSendStates::Closed>(state);
if (matchesStates<StreamReceiveStateData, StreamReceiveStates::Open>(
stream.recv.state)) {
// Terminate the ingress state machine until we remove rst on rst
invokeStreamReceiveStateMachine(
stream.conn,
stream,
RstStreamFrame(stream.id, GenericApplicationErrorCode::NO_ERROR, 0));
}
if (stream.inTerminalStates()) {
stream.conn.streamManager->addClosed(stream.id);
}
} }
inline void Handler< inline void Handler<
StreamStateMachine, StreamSendStateMachine,
StreamStates::WaitingForRstAck, StreamSendStates::ResetSent,
StreamEvents::SendReset>:: StreamEvents::SendReset>::
handle(QuicStreamState& /*stream*/, StreamEvents::SendReset) { handle(
QuicStreamState::Send& /*state*/,
StreamEvents::SendReset,
QuicStreamState& /*stream*/) {
// do nothing. // do nothing.
} }
} // namespace quic } // namespace quic

View File

@@ -36,8 +36,10 @@ TEST_F(StreamStateFunctionsTests, SanityTest) {
auto currentReadOffset = stream.currentReadOffset; auto currentReadOffset = stream.currentReadOffset;
EXPECT_TRUE(stream.writable()); EXPECT_TRUE(stream.writable());
invokeHandler<StreamStateMachine>( invokeHandler<StreamSendStateMachine>(
stream, StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN)); stream.send,
StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN),
stream);
// Something are cleared: // Something are cleared:
EXPECT_TRUE(stream.writeBuffer.empty()); EXPECT_TRUE(stream.writeBuffer.empty());
EXPECT_TRUE(stream.retransmissionBuffer.empty()); EXPECT_TRUE(stream.retransmissionBuffer.empty());

View File

@@ -27,10 +27,13 @@ void verifyStreamReset(
const QuicStreamState& stream, const QuicStreamState& stream,
uint64_t readOffsetExpected) { uint64_t readOffsetExpected) {
EXPECT_TRUE(stream.readBuffer.empty()); EXPECT_TRUE(stream.readBuffer.empty());
// AHF
EXPECT_TRUE(stream.retransmissionBuffer.empty()); EXPECT_TRUE(stream.retransmissionBuffer.empty());
EXPECT_TRUE(stream.writeBuffer.empty()); EXPECT_TRUE(stream.writeBuffer.empty());
EXPECT_TRUE(stream.finalReadOffset.hasValue()); EXPECT_TRUE(stream.finalReadOffset.hasValue());
EXPECT_EQ(readOffsetExpected, stream.finalReadOffset.value()); EXPECT_EQ(readOffsetExpected, stream.finalReadOffset.value());
// AHF
EXPECT_FALSE(stream.writable()); EXPECT_FALSE(stream.writable());
} }
@@ -65,10 +68,11 @@ TEST_F(QuicOpenStateTest, ReadStreamDataNotFin) {
bool fin = false; bool fin = false;
ReadStreamFrame frame(id, offset, fin); ReadStreamFrame frame(id, offset, fin);
frame.data = IOBuf::copyBuffer("hey"); frame.data = IOBuf::copyBuffer("hey");
invokeHandler<StreamStateMachine>(stream, std::move(frame)); invokeHandler<StreamReceiveStateMachine>(
stream.recv, std::move(frame), stream);
EXPECT_TRUE(stream.hasReadableData()); EXPECT_TRUE(stream.hasReadableData());
EXPECT_TRUE(stream.hasPeekableData()); EXPECT_TRUE(stream.hasPeekableData());
EXPECT_TRUE(isState<StreamStates::Open>(stream)); EXPECT_TRUE(isState<StreamReceiveStates::Open>(stream.recv));
} }
TEST_F(QuicOpenStateTest, ReadInvalidData) { TEST_F(QuicOpenStateTest, ReadInvalidData) {
@@ -81,15 +85,17 @@ TEST_F(QuicOpenStateTest, ReadInvalidData) {
// EOF in middle of stream // EOF in middle of stream
ReadStreamFrame frame1(id, offset1, fin1); ReadStreamFrame frame1(id, offset1, fin1);
frame1.data = IOBuf::copyBuffer("hey"); frame1.data = IOBuf::copyBuffer("hey");
invokeHandler<StreamStateMachine>(stream, std::move(frame1)); invokeHandler<StreamReceiveStateMachine>(
EXPECT_TRUE(isState<StreamStates::Open>(stream)); stream.recv, std::move(frame1), stream);
EXPECT_TRUE(isState<StreamReceiveStates::Open>(stream.recv));
uint64_t offset2 = 1; uint64_t offset2 = 1;
bool fin2 = true; bool fin2 = true;
ReadStreamFrame frame2(id, offset2, fin2); ReadStreamFrame frame2(id, offset2, fin2);
frame2.data = IOBuf::copyBuffer("e"); frame2.data = IOBuf::copyBuffer("e");
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>(stream, std::move(frame2)), invokeHandler<StreamReceiveStateMachine>(
stream.recv, std::move(frame2), stream),
QuicTransportException); QuicTransportException);
} }
@@ -99,7 +105,8 @@ TEST_F(QuicOpenStateTest, InvalidEvent) {
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
RstStreamFrame frame(1, GenericApplicationErrorCode::UNKNOWN, 0); RstStreamFrame frame(1, GenericApplicationErrorCode::UNKNOWN, 0);
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>(stream, StreamEvents::RstAck(frame)), invokeHandler<StreamSendStateMachine>(
stream.send, StreamEvents::RstAck(frame), stream),
QuicTransportException); QuicTransportException);
} }
@@ -113,8 +120,9 @@ TEST_F(QuicOpenStateTest, ReceiveStreamFrameWithFIN) {
ReadStreamFrame receivedStreamFrame(stream->id, 100, true); ReadStreamFrame receivedStreamFrame(stream->id, 100, true);
receivedStreamFrame.data = folly::IOBuf::create(10); receivedStreamFrame.data = folly::IOBuf::create(10);
receivedStreamFrame.data->append(10); receivedStreamFrame.data->append(10);
invokeHandler<StreamStateMachine>(*stream, std::move(receivedStreamFrame)); invokeHandler<StreamReceiveStateMachine>(
ASSERT_TRUE(isState<StreamStates::HalfClosedRemote>(*stream)); stream->recv, std::move(receivedStreamFrame), *stream);
ASSERT_TRUE(isState<StreamReceiveStates::Closed>(stream->recv));
} }
TEST_F(QuicOpenStateTest, ReceiveStreamFrameWithFINReadbuffHole) { TEST_F(QuicOpenStateTest, ReceiveStreamFrameWithFINReadbuffHole) {
@@ -127,8 +135,9 @@ TEST_F(QuicOpenStateTest, ReceiveStreamFrameWithFINReadbuffHole) {
ReadStreamFrame receivedStreamFrame(stream->id, 200, true); ReadStreamFrame receivedStreamFrame(stream->id, 200, true);
receivedStreamFrame.data = folly::IOBuf::create(10); receivedStreamFrame.data = folly::IOBuf::create(10);
receivedStreamFrame.data->append(10); receivedStreamFrame.data->append(10);
invokeHandler<StreamStateMachine>(*stream, std::move(receivedStreamFrame)); invokeHandler<StreamReceiveStateMachine>(
ASSERT_TRUE(isState<StreamStates::Open>(*stream)); stream->recv, std::move(receivedStreamFrame), *stream);
ASSERT_TRUE(isState<StreamReceiveStates::Open>(stream->recv));
} }
TEST_F(QuicOpenStateTest, ReceiveStreamFrameWithoutFIN) { TEST_F(QuicOpenStateTest, ReceiveStreamFrameWithoutFIN) {
@@ -141,28 +150,32 @@ TEST_F(QuicOpenStateTest, ReceiveStreamFrameWithoutFIN) {
ReadStreamFrame receivedStreamFrame(stream->id, 100, false); ReadStreamFrame receivedStreamFrame(stream->id, 100, false);
receivedStreamFrame.data = folly::IOBuf::create(10); receivedStreamFrame.data = folly::IOBuf::create(10);
receivedStreamFrame.data->append(10); receivedStreamFrame.data->append(10);
invokeHandler<StreamStateMachine>(*stream, std::move(receivedStreamFrame)); invokeHandler<StreamReceiveStateMachine>(
ASSERT_TRUE(isState<StreamStates::Open>(*stream)); stream->recv, std::move(receivedStreamFrame), *stream);
ASSERT_TRUE(isState<StreamReceiveStates::Open>(stream->recv));
} }
class QuicWaitingForRstAckStateTest : public Test {}; class QuicResetSentStateTest : public Test {};
TEST_F(QuicWaitingForRstAckStateTest, RstAck) { TEST_F(QuicResetSentStateTest, RstAck) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::WaitingForRstAck(); stream.send.state = StreamSendStates::ResetSent();
stream.currentReadOffset = 0xABCD; stream.currentReadOffset = 0xABCD;
stream.finalWriteOffset = 0xACDC; stream.finalWriteOffset = 0xACDC;
stream.readBuffer.emplace_back( stream.readBuffer.emplace_back(
folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false); folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false);
RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0);
invokeHandler<StreamStateMachine>(stream, StreamEvents::RstAck(frame)); invokeHandler<StreamSendStateMachine>(
stream.send, StreamEvents::RstAck(frame), stream);
EXPECT_TRUE(isState<StreamStates::Closed>(stream)); EXPECT_TRUE(isState<StreamSendStates::Closed>(stream.send));
EXPECT_FALSE(stream.finalReadOffset); // AHF
EXPECT_FALSE(stream.readBuffer.empty()); EXPECT_TRUE(isState<StreamReceiveStates::Closed>(stream.recv));
// EXPECT_FALSE(stream.finalReadOffset);
// EXPECT_FALSE(stream.readBuffer.empty());
} }
class QuicClosedStateTest : public Test {}; class QuicClosedStateTest : public Test {};
@@ -171,10 +184,11 @@ TEST_F(QuicClosedStateTest, RstAck) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Closed(); stream.send.state = StreamSendStates::Closed();
RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0);
invokeHandler<StreamStateMachine>(stream, StreamEvents::RstAck(frame)); invokeHandler<StreamSendStateMachine>(
EXPECT_TRUE(isState<StreamStates::Closed>(stream)); stream.send, StreamEvents::RstAck(frame), stream);
EXPECT_TRUE(isState<StreamSendStates::Closed>(stream.send));
} }
TEST_F(QuicOpenStateTest, AckStream) { TEST_F(QuicOpenStateTest, AckStream) {
@@ -204,11 +218,11 @@ TEST_F(QuicOpenStateTest, AckStream) {
->packet.frames.front()); ->packet.frames.front());
StreamEvents::AckStreamFrame ack(streamFrame); StreamEvents::AckStreamFrame ack(streamFrame);
invokeHandler<StreamStateMachine>(*stream, ack); invokeHandler<StreamSendStateMachine>(stream->send, ack, *stream);
ASSERT_TRUE(isState<StreamStates::HalfClosedLocal>(*stream)); ASSERT_TRUE(isState<StreamSendStates::Closed>(stream->send));
invokeHandler<StreamStateMachine>(*stream, ack); invokeHandler<StreamSendStateMachine>(stream->send, ack, *stream);
ASSERT_TRUE(isState<StreamStates::HalfClosedLocal>(*stream)); ASSERT_TRUE(isState<StreamSendStates::Closed>(stream->send));
} }
TEST_F(QuicOpenStateTest, AckStreamAfterSkip) { TEST_F(QuicOpenStateTest, AckStreamAfterSkip) {
@@ -245,11 +259,13 @@ TEST_F(QuicOpenStateTest, AckStreamAfterSkip) {
EXPECT_TRUE(stream->retransmissionBuffer.empty()); EXPECT_TRUE(stream->retransmissionBuffer.empty());
StreamEvents::AckStreamFrame ack(streamFrame); StreamEvents::AckStreamFrame ack(streamFrame);
invokeHandler<StreamStateMachine>(*stream, ack); invokeHandler<StreamSendStateMachine>(stream->send, ack, *stream);
ASSERT_TRUE(isState<StreamStates::HalfClosedLocal>(*stream)); ASSERT_TRUE(isState<StreamSendStates::Closed>(stream->send));
ASSERT_TRUE(isState<StreamReceiveStates::Open>(stream->recv));
invokeHandler<StreamStateMachine>(*stream, ack); invokeHandler<StreamSendStateMachine>(stream->send, ack, *stream);
ASSERT_TRUE(isState<StreamStates::HalfClosedLocal>(*stream)); ASSERT_TRUE(isState<StreamSendStates::Closed>(stream->send));
ASSERT_TRUE(isState<StreamReceiveStates::Open>(stream->recv));
} }
class QuicHalfClosedLocalStateTest : public Test {}; class QuicHalfClosedLocalStateTest : public Test {};
@@ -258,45 +274,54 @@ TEST_F(QuicHalfClosedLocalStateTest, ReceiveStreamFrameWithFIN) {
auto conn = createConn(); auto conn = createConn();
auto stream = conn->streamManager->createNextBidirectionalStream().value(); auto stream = conn->streamManager->createNextBidirectionalStream().value();
stream->state = StreamStates::HalfClosedLocal(); stream->send.state = StreamSendStates::Closed();
stream->recv.state = StreamReceiveStates::Open();
stream->currentReadOffset = 100; stream->currentReadOffset = 100;
// We received FIN and everything: // We received FIN and everything:
ReadStreamFrame receivedStreamFrame(stream->id, 100, true); ReadStreamFrame receivedStreamFrame(stream->id, 100, true);
receivedStreamFrame.data = folly::IOBuf::create(10); receivedStreamFrame.data = folly::IOBuf::create(10);
receivedStreamFrame.data->append(10); receivedStreamFrame.data->append(10);
invokeHandler<StreamStateMachine>(*stream, std::move(receivedStreamFrame)); invokeHandler<StreamReceiveStateMachine>(
ASSERT_TRUE(isState<StreamStates::Closed>(*stream)); stream->recv, std::move(receivedStreamFrame), *stream);
ASSERT_TRUE(isState<StreamSendStates::Closed>(stream->send));
ASSERT_TRUE(isState<StreamReceiveStates::Closed>(stream->recv));
} }
TEST_F(QuicHalfClosedLocalStateTest, ReceiveStreamFrameWithFINReadbuffHole) { TEST_F(QuicHalfClosedLocalStateTest, ReceiveStreamFrameWithFINReadbuffHole) {
auto conn = createConn(); auto conn = createConn();
auto stream = conn->streamManager->createNextBidirectionalStream().value(); auto stream = conn->streamManager->createNextBidirectionalStream().value();
stream->state = StreamStates::HalfClosedLocal(); stream->send.state = StreamSendStates::Closed();
stream->recv.state = StreamReceiveStates::Open();
stream->currentReadOffset = 100; stream->currentReadOffset = 100;
// We received FIN, but we havn't received anything between 100 and 200: // We received FIN, but we havn't received anything between 100 and 200:
ReadStreamFrame receivedStreamFrame(stream->id, 200, true); ReadStreamFrame receivedStreamFrame(stream->id, 200, true);
receivedStreamFrame.data = folly::IOBuf::create(10); receivedStreamFrame.data = folly::IOBuf::create(10);
receivedStreamFrame.data->append(10); receivedStreamFrame.data->append(10);
invokeHandler<StreamStateMachine>(*stream, std::move(receivedStreamFrame)); invokeHandler<StreamReceiveStateMachine>(
ASSERT_TRUE(isState<StreamStates::HalfClosedLocal>(*stream)); stream->recv, std::move(receivedStreamFrame), *stream);
ASSERT_TRUE(isState<StreamSendStates::Closed>(stream->send));
ASSERT_TRUE(isState<StreamReceiveStates::Open>(stream->recv));
} }
TEST_F(QuicHalfClosedLocalStateTest, ReceiveStreamFrameWithoutFIN) { TEST_F(QuicHalfClosedLocalStateTest, ReceiveStreamFrameWithoutFIN) {
auto conn = createConn(); auto conn = createConn();
auto stream = conn->streamManager->createNextBidirectionalStream().value(); auto stream = conn->streamManager->createNextBidirectionalStream().value();
stream->state = StreamStates::HalfClosedLocal(); stream->send.state = StreamSendStates::Closed();
stream->recv.state = StreamReceiveStates::Open();
stream->currentReadOffset = 100; stream->currentReadOffset = 100;
// We haven't received FIN: // We haven't received FIN:
ReadStreamFrame receivedStreamFrame(stream->id, 100, false); ReadStreamFrame receivedStreamFrame(stream->id, 100, false);
receivedStreamFrame.data = folly::IOBuf::create(10); receivedStreamFrame.data = folly::IOBuf::create(10);
receivedStreamFrame.data->append(10); receivedStreamFrame.data->append(10);
invokeHandler<StreamStateMachine>(*stream, std::move(receivedStreamFrame)); invokeHandler<StreamReceiveStateMachine>(
ASSERT_TRUE(isState<StreamStates::HalfClosedLocal>(*stream)); stream->recv, std::move(receivedStreamFrame), *stream);
ASSERT_TRUE(isState<StreamSendStates::Closed>(stream->send));
ASSERT_TRUE(isState<StreamReceiveStates::Open>(stream->recv));
} }
class QuicHalfClosedRemoteStateTest : public Test {}; class QuicHalfClosedRemoteStateTest : public Test {};
@@ -310,7 +335,8 @@ TEST_F(QuicHalfClosedRemoteStateTest, AckStream) {
folly::Optional<ConnectionId> serverChosenConnId = folly::Optional<ConnectionId> serverChosenConnId =
connIdAlgo->encodeConnectionId(params); connIdAlgo->encodeConnectionId(params);
auto stream = conn->streamManager->createNextBidirectionalStream().value(); auto stream = conn->streamManager->createNextBidirectionalStream().value();
stream->state = StreamStates::HalfClosedRemote(); stream->send.state = StreamSendStates::Open();
stream->recv.state = StreamReceiveStates::Closed();
EventBase evb; EventBase evb;
auto sock = std::make_unique<folly::test::MockAsyncUDPSocket>(&evb); auto sock = std::make_unique<folly::test::MockAsyncUDPSocket>(&evb);
@@ -333,11 +359,11 @@ TEST_F(QuicHalfClosedRemoteStateTest, AckStream) {
->packet.frames.front()); ->packet.frames.front());
StreamEvents::AckStreamFrame ack(streamFrame); StreamEvents::AckStreamFrame ack(streamFrame);
invokeHandler<StreamStateMachine>(*stream, ack); invokeHandler<StreamSendStateMachine>(stream->send, ack, *stream);
ASSERT_TRUE(isState<StreamStates::Closed>(*stream)); ASSERT_TRUE(isState<StreamSendStates::Closed>(stream->send));
invokeHandler<StreamStateMachine>(*stream, ack); invokeHandler<StreamSendStateMachine>(stream->send, ack, *stream);
ASSERT_TRUE(isState<StreamStates::Closed>(*stream)); ASSERT_TRUE(isState<StreamSendStates::Closed>(stream->send));
} }
TEST_F(QuicHalfClosedRemoteStateTest, AckStreamAfterSkip) { TEST_F(QuicHalfClosedRemoteStateTest, AckStreamAfterSkip) {
@@ -349,7 +375,8 @@ TEST_F(QuicHalfClosedRemoteStateTest, AckStreamAfterSkip) {
folly::Optional<ConnectionId> serverChosenConnId = folly::Optional<ConnectionId> serverChosenConnId =
connIdAlgo->encodeConnectionId(params); connIdAlgo->encodeConnectionId(params);
auto stream = conn->streamManager->createNextBidirectionalStream().value(); auto stream = conn->streamManager->createNextBidirectionalStream().value();
stream->state = StreamStates::HalfClosedRemote(); stream->send.state = StreamSendStates::Open();
stream->recv.state = StreamReceiveStates::Closed();
EventBase evb; EventBase evb;
auto sock = std::make_unique<folly::test::MockAsyncUDPSocket>(&evb); auto sock = std::make_unique<folly::test::MockAsyncUDPSocket>(&evb);
@@ -379,11 +406,13 @@ TEST_F(QuicHalfClosedRemoteStateTest, AckStreamAfterSkip) {
EXPECT_TRUE(stream->retransmissionBuffer.empty()); EXPECT_TRUE(stream->retransmissionBuffer.empty());
StreamEvents::AckStreamFrame ack(streamFrame); StreamEvents::AckStreamFrame ack(streamFrame);
invokeHandler<StreamStateMachine>(*stream, ack); invokeHandler<StreamSendStateMachine>(stream->send, ack, *stream);
ASSERT_TRUE(isState<StreamStates::Closed>(*stream)); ASSERT_TRUE(isState<StreamSendStates::Closed>(stream->send));
ASSERT_TRUE(isState<StreamReceiveStates::Closed>(stream->recv));
invokeHandler<StreamStateMachine>(*stream, ack); invokeHandler<StreamSendStateMachine>(stream->send, ack, *stream);
ASSERT_TRUE(isState<StreamStates::Closed>(*stream)); ASSERT_TRUE(isState<StreamSendStates::Closed>(stream->send));
ASSERT_TRUE(isState<StreamReceiveStates::Closed>(stream->recv));
} }
class QuicSendResetTest : public Test {}; class QuicSendResetTest : public Test {};
@@ -392,47 +421,61 @@ TEST_F(QuicSendResetTest, FromOpen) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
invokeHandler<StreamStateMachine>( invokeHandler<StreamSendStateMachine>(
stream, StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN)); stream.send,
EXPECT_TRUE(isState<StreamStates::WaitingForRstAck>(stream)); StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN),
stream);
EXPECT_TRUE(isState<StreamSendStates::ResetSent>(stream.send));
} }
TEST_F(QuicSendResetTest, FromHalfCloseRemote) { TEST_F(QuicSendResetTest, FromHalfCloseRemote) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::HalfClosedRemote(); stream.send.state = StreamSendStates::Open();
invokeHandler<StreamStateMachine>( stream.recv.state = StreamReceiveStates::Closed();
stream, StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN)); invokeHandler<StreamSendStateMachine>(
EXPECT_TRUE(isState<StreamStates::WaitingForRstAck>(stream)); stream.send,
StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN),
stream);
EXPECT_TRUE(isState<StreamSendStates::ResetSent>(stream.send));
} }
TEST_F(QuicSendResetTest, FromHalfCloseLocal) { TEST_F(QuicSendResetTest, FromHalfCloseLocal) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::HalfClosedLocal(); stream.send.state = StreamSendStates::Closed();
invokeHandler<StreamStateMachine>( stream.recv.state = StreamReceiveStates::Open();
stream, StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN)); invokeHandler<StreamSendStateMachine>(
EXPECT_TRUE(isState<StreamStates::WaitingForRstAck>(stream)); stream.send,
StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN),
stream);
// You cannot send a reset after FIN has been acked
EXPECT_TRUE(isState<StreamSendStates::Closed>(stream.send));
} }
TEST_F(QuicSendResetTest, FromClosed) { TEST_F(QuicSendResetTest, FromClosed) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Closed(); stream.send.state = StreamSendStates::Closed();
invokeHandler<StreamStateMachine>( invokeHandler<StreamSendStateMachine>(
stream, StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN)); stream.send,
StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN),
stream);
} }
TEST_F(QuicSendResetTest, FromWaitingForRstAck) { TEST_F(QuicSendResetTest, FromResetSent) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::WaitingForRstAck(); stream.send.state = StreamSendStates::ResetSent();
invokeHandler<StreamStateMachine>( invokeHandler<StreamSendStateMachine>(
stream, StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN)); stream.send,
StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN),
stream);
} }
class QuicRecvResetTest : public Test {}; class QuicRecvResetTest : public Test {};
@@ -443,8 +486,9 @@ TEST_F(QuicRecvResetTest, FromOpen) {
StreamId rstStream = 1; StreamId rstStream = 1;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
RstStreamFrame rst(rstStream, GenericApplicationErrorCode::UNKNOWN, 100); RstStreamFrame rst(rstStream, GenericApplicationErrorCode::UNKNOWN, 100);
invokeHandler<StreamStateMachine>(stream, std::move(rst)); invokeHandler<StreamReceiveStateMachine>(stream.recv, std::move(rst), stream);
EXPECT_TRUE(isState<StreamStates::WaitingForRstAck>(stream)); EXPECT_TRUE(isState<StreamSendStates::ResetSent>(stream.send));
EXPECT_TRUE(isState<StreamReceiveStates::Closed>(stream.recv));
verifyStreamReset(stream, 100); verifyStreamReset(stream, 100);
} }
@@ -456,7 +500,8 @@ TEST_F(QuicRecvResetTest, FromOpenReadEOFMismatch) {
RstStreamFrame rst(1, GenericApplicationErrorCode::UNKNOWN, 100); RstStreamFrame rst(1, GenericApplicationErrorCode::UNKNOWN, 100);
stream.finalReadOffset = 1024; stream.finalReadOffset = 1024;
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>(stream, std::move(rst)), invokeHandler<StreamReceiveStateMachine>(
stream.recv, std::move(rst), stream),
QuicTransportException); QuicTransportException);
} }
@@ -464,10 +509,14 @@ TEST_F(QuicRecvResetTest, FromHalfClosedRemoteNoReadOffsetYet) {
StreamId id = 5; StreamId id = 5;
auto conn = createConn(); auto conn = createConn();
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::HalfClosedRemote(); stream.send.state = StreamSendStates::Open();
invokeHandler<StreamStateMachine>( stream.recv.state = StreamReceiveStates::Closed();
stream, RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 100)); invokeHandler<StreamReceiveStateMachine>(
EXPECT_TRUE(isState<StreamStates::WaitingForRstAck>(stream)); stream.recv,
RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 100),
stream);
EXPECT_TRUE(isState<StreamSendStates::ResetSent>(stream.send));
EXPECT_TRUE(isState<StreamReceiveStates::Closed>(stream.recv));
verifyStreamReset(stream, 100); verifyStreamReset(stream, 100);
} }
@@ -475,11 +524,15 @@ TEST_F(QuicRecvResetTest, FromHalfClosedRemoteReadOffsetMatch) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::HalfClosedRemote(); stream.send.state = StreamSendStates::Open();
stream.recv.state = StreamReceiveStates::Closed();
stream.finalReadOffset = 1024; stream.finalReadOffset = 1024;
invokeHandler<StreamStateMachine>( invokeHandler<StreamReceiveStateMachine>(
stream, RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1024)); stream.recv,
EXPECT_TRUE(isState<StreamStates::WaitingForRstAck>(stream)); RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1024),
stream);
EXPECT_TRUE(isState<StreamSendStates::ResetSent>(stream.send));
EXPECT_TRUE(isState<StreamReceiveStates::Closed>(stream.recv));
verifyStreamReset(stream, 1024); verifyStreamReset(stream, 1024);
} }
@@ -487,11 +540,14 @@ TEST_F(QuicRecvResetTest, FromHalfClosedRemoteReadOffsetMismatch) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::HalfClosedRemote(); stream.send.state = StreamSendStates::Open();
stream.recv.state = StreamReceiveStates::Closed();
stream.finalReadOffset = 1024; stream.finalReadOffset = 1024;
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>( invokeHandler<StreamReceiveStateMachine>(
stream, RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 100)), stream.recv,
RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 100),
stream),
QuicTransportException); QuicTransportException);
} }
@@ -499,10 +555,14 @@ TEST_F(QuicRecvResetTest, FromHalfClosedLocal) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::HalfClosedLocal(); stream.send.state = StreamSendStates::Closed();
invokeHandler<StreamStateMachine>( stream.recv.state = StreamReceiveStates::Open();
stream, RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 200)); invokeHandler<StreamReceiveStateMachine>(
EXPECT_TRUE(isState<StreamStates::Closed>(stream)); stream.recv,
RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 200),
stream);
EXPECT_TRUE(isState<StreamSendStates::Closed>(stream.send));
EXPECT_TRUE(isState<StreamReceiveStates::Closed>(stream.recv));
verifyStreamReset(stream, 200); verifyStreamReset(stream, 200);
} }
@@ -510,46 +570,60 @@ TEST_F(QuicRecvResetTest, FromHalfClosedLocalReadEOFMismatch) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::HalfClosedLocal(); stream.send.state = StreamSendStates::Closed();
stream.recv.state = StreamReceiveStates::Open();
stream.finalReadOffset = 2014; stream.finalReadOffset = 2014;
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>( invokeHandler<StreamReceiveStateMachine>(
stream, RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 200)), stream.recv,
RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 200),
stream),
QuicTransportException); QuicTransportException);
} }
TEST_F(QuicRecvResetTest, FromWaitingForRstAckNoReadOffsetYet) { TEST_F(QuicRecvResetTest, FromResetSentNoReadOffsetYet) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::WaitingForRstAck(); stream.send.state = StreamSendStates::ResetSent();
invokeHandler<StreamStateMachine>( stream.recv.state = StreamReceiveStates::Open();
stream, RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 200)); invokeHandler<StreamReceiveStateMachine>(
EXPECT_TRUE(isState<StreamStates::WaitingForRstAck>(stream)); stream.recv,
RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 200),
stream);
EXPECT_TRUE(isState<StreamSendStates::ResetSent>(stream.send));
EXPECT_TRUE(isState<StreamReceiveStates::Closed>(stream.recv));
verifyStreamReset(stream, 200); verifyStreamReset(stream, 200);
} }
TEST_F(QuicRecvResetTest, FromWaitingForRstAckOffsetMatch) { TEST_F(QuicRecvResetTest, FromResetSentOffsetMatch) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::WaitingForRstAck(); stream.send.state = StreamSendStates::ResetSent();
stream.recv.state = StreamReceiveStates::Open();
stream.finalReadOffset = 200; stream.finalReadOffset = 200;
invokeHandler<StreamStateMachine>( invokeHandler<StreamReceiveStateMachine>(
stream, RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 200)); stream.recv,
EXPECT_TRUE(isState<StreamStates::WaitingForRstAck>(stream)); RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 200),
stream);
EXPECT_TRUE(isState<StreamSendStates::ResetSent>(stream.send));
EXPECT_TRUE(isState<StreamReceiveStates::Closed>(stream.recv));
verifyStreamReset(stream, 200); verifyStreamReset(stream, 200);
} }
TEST_F(QuicRecvResetTest, FromWaitingForRstAckOffsetMismatch) { TEST_F(QuicRecvResetTest, FromResetSentOffsetMismatch) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::WaitingForRstAck(); stream.send.state = StreamSendStates::ResetSent();
stream.recv.state = StreamReceiveStates::Open();
stream.finalReadOffset = 300; stream.finalReadOffset = 300;
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>( invokeHandler<StreamReceiveStateMachine>(
stream, RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 200)), stream.recv,
RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 200),
stream),
QuicTransportException); QuicTransportException);
} }
@@ -557,10 +631,14 @@ TEST_F(QuicRecvResetTest, FromClosedNoReadOffsetYet) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Closed(); stream.send.state = StreamSendStates::Closed();
invokeHandler<StreamStateMachine>( stream.recv.state = StreamReceiveStates::Closed();
stream, RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 200)); invokeHandler<StreamReceiveStateMachine>(
EXPECT_TRUE(isState<StreamStates::Closed>(stream)); stream.recv,
RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 200),
stream);
EXPECT_TRUE(isState<StreamSendStates::Closed>(stream.send));
EXPECT_TRUE(isState<StreamReceiveStates::Closed>(stream.recv));
verifyStreamReset(stream, 200); verifyStreamReset(stream, 200);
} }
@@ -568,11 +646,15 @@ TEST_F(QuicRecvResetTest, FromClosedOffsetMatch) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Closed(); stream.send.state = StreamSendStates::Closed();
stream.recv.state = StreamReceiveStates::Closed();
stream.finalReadOffset = 1234; stream.finalReadOffset = 1234;
invokeHandler<StreamStateMachine>( invokeHandler<StreamReceiveStateMachine>(
stream, RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1234)); stream.recv,
EXPECT_TRUE(isState<StreamStates::Closed>(stream)); RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1234),
stream);
EXPECT_TRUE(isState<StreamSendStates::Closed>(stream.send));
EXPECT_TRUE(isState<StreamReceiveStates::Closed>(stream.recv));
verifyStreamReset(stream, 1234); verifyStreamReset(stream, 1234);
} }
@@ -580,12 +662,14 @@ TEST_F(QuicRecvResetTest, FromClosedOffsetMismatch) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 5; StreamId id = 5;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Closed(); stream.send.state = StreamSendStates::Closed();
stream.recv.state = StreamReceiveStates::Closed();
stream.finalReadOffset = 123; stream.finalReadOffset = 123;
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>( invokeHandler<StreamReceiveStateMachine>(
stream, stream.recv,
RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1234)), RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1234),
stream),
QuicTransportException); QuicTransportException);
} }
@@ -595,9 +679,11 @@ TEST_F(QuicUnidirectionalStreamTest, OpenInvalidReadStream) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b111; StreamId id = 0b111;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Open(); stream.send.state = StreamSendStates::Open();
stream.recv.state = StreamReceiveStates::Invalid();
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>(stream, ReadStreamFrame(id, 1, false)), invokeHandler<StreamReceiveStateMachine>(
stream.recv, ReadStreamFrame(id, 1, false), stream),
QuicTransportException); QuicTransportException);
} }
@@ -605,11 +691,13 @@ TEST_F(QuicUnidirectionalStreamTest, OpenInvalidRstStream) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b111; StreamId id = 0b111;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Open(); stream.send.state = StreamSendStates::Open();
stream.recv.state = StreamReceiveStates::Invalid();
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>( invokeHandler<StreamReceiveStateMachine>(
stream, stream.recv,
RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1234)), RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1234),
stream),
QuicTransportException); QuicTransportException);
} }
@@ -617,11 +705,13 @@ TEST_F(QuicUnidirectionalStreamTest, OpenInvalidSendReset) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b110; StreamId id = 0b110;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Open(); stream.send.state = StreamSendStates::Invalid();
stream.recv.state = StreamReceiveStates::Open();
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>( invokeHandler<StreamSendStateMachine>(
stream, stream.send,
StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN)), StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN),
stream),
QuicTransportException); QuicTransportException);
} }
@@ -629,20 +719,25 @@ TEST_F(QuicUnidirectionalStreamTest, OpenInvalidAckStreamFrame) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b110; StreamId id = 0b110;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Open(); stream.send.state = StreamSendStates::Invalid();
stream.recv.state = StreamReceiveStates::Open();
StreamEvents::AckStreamFrame ack(WriteStreamFrame(id, 0, 0, false)); StreamEvents::AckStreamFrame ack(WriteStreamFrame(id, 0, 0, false));
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>(stream, ack), QuicTransportException); invokeHandler<StreamSendStateMachine>(stream.send, ack, stream),
QuicTransportException);
} }
TEST_F(QuicUnidirectionalStreamTest, OpenInvalidStopSending) { TEST_F(QuicUnidirectionalStreamTest, OpenInvalidStopSending) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b110; StreamId id = 0b110;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Open(); stream.send.state = StreamSendStates::Invalid();
stream.recv.state = StreamReceiveStates::Open();
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>( invokeHandler<StreamSendStateMachine>(
stream, StopSendingFrame(id, GenericApplicationErrorCode::UNKNOWN)), stream.send,
StopSendingFrame(id, GenericApplicationErrorCode::UNKNOWN),
stream),
QuicTransportException); QuicTransportException);
} }
@@ -650,9 +745,11 @@ TEST_F(QuicUnidirectionalStreamTest, ClosedInvalidReadStream) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b111; StreamId id = 0b111;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Closed(); stream.send.state = StreamSendStates::Open();
stream.recv.state = StreamReceiveStates::Invalid();
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>(stream, ReadStreamFrame(id, 1, false)), invokeHandler<StreamReceiveStateMachine>(
stream.recv, ReadStreamFrame(id, 1, false), stream),
QuicTransportException); QuicTransportException);
} }
@@ -660,11 +757,13 @@ TEST_F(QuicUnidirectionalStreamTest, ClosedInvalidRstStream) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b111; StreamId id = 0b111;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Closed(); stream.send.state = StreamSendStates::Open();
stream.recv.state = StreamReceiveStates::Invalid();
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>( invokeHandler<StreamReceiveStateMachine>(
stream, stream.recv,
RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1234)), RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1234),
stream),
QuicTransportException); QuicTransportException);
} }
@@ -672,11 +771,13 @@ TEST_F(QuicUnidirectionalStreamTest, ClosedInvalidSendReset) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b110; StreamId id = 0b110;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Closed(); stream.send.state = StreamSendStates::Invalid();
stream.recv.state = StreamReceiveStates::Closed();
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>( invokeHandler<StreamSendStateMachine>(
stream, stream.send,
StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN)), StreamEvents::SendReset(GenericApplicationErrorCode::UNKNOWN),
stream),
QuicTransportException); QuicTransportException);
} }
@@ -684,20 +785,25 @@ TEST_F(QuicUnidirectionalStreamTest, ClosedInvalidAckStreamFrame) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b110; StreamId id = 0b110;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Closed(); stream.send.state = StreamSendStates::Invalid();
stream.recv.state = StreamReceiveStates::Closed();
StreamEvents::AckStreamFrame ack(WriteStreamFrame(id, 0, 0, false)); StreamEvents::AckStreamFrame ack(WriteStreamFrame(id, 0, 0, false));
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>(stream, ack), QuicTransportException); invokeHandler<StreamSendStateMachine>(stream.send, ack, stream),
QuicTransportException);
} }
TEST_F(QuicUnidirectionalStreamTest, ClosedInvalidStopSending) { TEST_F(QuicUnidirectionalStreamTest, ClosedInvalidStopSending) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b110; StreamId id = 0b110;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Closed(); stream.send.state = StreamSendStates::Invalid();
stream.recv.state = StreamReceiveStates::Closed();
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>( invokeHandler<StreamSendStateMachine>(
stream, StopSendingFrame(id, GenericApplicationErrorCode::UNKNOWN)), stream.send,
StopSendingFrame(id, GenericApplicationErrorCode::UNKNOWN),
stream),
QuicTransportException); QuicTransportException);
} }
@@ -705,23 +811,30 @@ TEST_F(QuicUnidirectionalStreamTest, OpenReadStreamFin) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b110; StreamId id = 0b110;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Open(); stream.send.state = StreamSendStates::Invalid();
stream.recv.state = StreamReceiveStates::Open();
stream.currentReadOffset = 100; stream.currentReadOffset = 100;
ReadStreamFrame receivedStreamFrame(stream.id, 100, true); ReadStreamFrame receivedStreamFrame(stream.id, 100, true);
receivedStreamFrame.data = folly::IOBuf::create(10); receivedStreamFrame.data = folly::IOBuf::create(10);
receivedStreamFrame.data->append(10); receivedStreamFrame.data->append(10);
invokeHandler<StreamStateMachine>(stream, std::move(receivedStreamFrame)); invokeHandler<StreamReceiveStateMachine>(
EXPECT_TRUE(isState<StreamStates::Closed>(stream)); stream.recv, std::move(receivedStreamFrame), stream);
EXPECT_TRUE(isState<StreamSendStates::Invalid>(stream.send));
EXPECT_TRUE(isState<StreamReceiveStates::Closed>(stream.recv));
} }
TEST_F(QuicUnidirectionalStreamTest, OpenRstStream) { TEST_F(QuicUnidirectionalStreamTest, OpenRstStream) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b110; StreamId id = 0b110;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::Open(); stream.send.state = StreamSendStates::Invalid();
invokeHandler<StreamStateMachine>( stream.recv.state = StreamReceiveStates::Open();
stream, RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1234)); invokeHandler<StreamReceiveStateMachine>(
EXPECT_TRUE(isState<StreamStates::Closed>(stream)); stream.recv,
RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1234),
stream);
EXPECT_TRUE(isState<StreamSendStates::Invalid>(stream.send));
EXPECT_TRUE(isState<StreamReceiveStates::Closed>(stream.recv));
} }
TEST_F(QuicUnidirectionalStreamTest, OpenFinalAckStreamFrame) { TEST_F(QuicUnidirectionalStreamTest, OpenFinalAckStreamFrame) {
@@ -729,48 +842,44 @@ TEST_F(QuicUnidirectionalStreamTest, OpenFinalAckStreamFrame) {
StreamId id = 0b111; StreamId id = 0b111;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
WriteStreamFrame streamFrame(id, 1, 1, false); WriteStreamFrame streamFrame(id, 1, 1, false);
stream.state = StreamStates::Open(); stream.send.state = StreamSendStates::Open();
stream.recv.state = StreamReceiveStates::Invalid();
stream.finalWriteOffset = 1; stream.finalWriteOffset = 1;
stream.currentWriteOffset = 2; stream.currentWriteOffset = 2;
auto buf = folly::IOBuf::create(1); auto buf = folly::IOBuf::create(1);
buf->append(1); buf->append(1);
stream.retransmissionBuffer.emplace_back(std::move(buf), 1, false); stream.retransmissionBuffer.emplace_back(std::move(buf), 1, false);
StreamEvents::AckStreamFrame ack(streamFrame); StreamEvents::AckStreamFrame ack(streamFrame);
invokeHandler<StreamStateMachine>(stream, ack); invokeHandler<StreamSendStateMachine>(stream.send, ack, stream);
EXPECT_TRUE(isState<StreamStates::Closed>(stream)); EXPECT_TRUE(isState<StreamSendStates::Closed>(stream.send));
EXPECT_TRUE(isState<StreamReceiveStates::Invalid>(stream.recv));
} }
TEST_F(QuicUnidirectionalStreamTest, WaitingForRstAckInvalidReadStream) { TEST_F(QuicUnidirectionalStreamTest, ResetSentInvalidReadStream) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b111; StreamId id = 0b111;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::WaitingForRstAck(); stream.send.state = StreamSendStates::ResetSent();
stream.recv.state = StreamReceiveStates::Invalid();
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>(stream, ReadStreamFrame(id, 1, false)), invokeHandler<StreamReceiveStateMachine>(
stream.recv, ReadStreamFrame(id, 1, false), stream),
QuicTransportException); QuicTransportException);
} }
TEST_F(QuicUnidirectionalStreamTest, WaitingForRstAckInvalidRstStream) { TEST_F(QuicUnidirectionalStreamTest, ResetSentInvalidRstStream) {
auto conn = createConn(); auto conn = createConn();
StreamId id = 0b111; StreamId id = 0b111;
QuicStreamState stream(id, *conn); QuicStreamState stream(id, *conn);
stream.state = StreamStates::WaitingForRstAck(); stream.send.state = StreamSendStates::ResetSent();
stream.recv.state = StreamReceiveStates::Invalid();
EXPECT_THROW( EXPECT_THROW(
invokeHandler<StreamStateMachine>( invokeHandler<StreamReceiveStateMachine>(
stream, stream.recv,
RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1234)), RstStreamFrame(1, GenericApplicationErrorCode::UNKNOWN, 1234),
stream),
QuicTransportException); QuicTransportException);
} }
TEST_F(QuicUnidirectionalStreamTest, WaitingForRstAckInvalidStopSending) {
auto conn = createConn();
StreamId id = 0b110;
QuicStreamState stream(id, *conn);
stream.state = StreamStates::WaitingForRstAck();
EXPECT_THROW(
invokeHandler<StreamStateMachine>(
stream, StopSendingFrame(id, GenericApplicationErrorCode::UNKNOWN)),
QuicTransportException);
}
} // namespace test } // namespace test
} // namespace quic } // namespace quic

View File

@@ -194,7 +194,8 @@ TEST_F(QPRFunctionsTest, RecvMinStreamDataFrameShrinkBuffer) {
TEST_F(QPRFunctionsTest, RecvMinStreamDataFrameOnUnidirectionalStream) { TEST_F(QPRFunctionsTest, RecvMinStreamDataFrameOnUnidirectionalStream) {
auto stream = conn.streamManager->createNextUnidirectionalStream().value(); auto stream = conn.streamManager->createNextUnidirectionalStream().value();
stream->state = StreamStates::Closed{}; stream->send.state = StreamSendStates::Closed{};
stream->recv.state = StreamReceiveStates::Closed{};
PacketNum packetNum(10); PacketNum packetNum(10);
MinStreamDataFrame frame( MinStreamDataFrame frame(
stream->id, stream->flowControlState.peerAdvertisedMaxOffset + 100, 100); stream->id, stream->flowControlState.peerAdvertisedMaxOffset + 100, 100);

View File

@@ -345,10 +345,11 @@ TEST_F(QuicStateFunctionsTest, TestInvokeStreamStateMachineConnectionError) {
RstStreamFrame rst(1, GenericApplicationErrorCode::UNKNOWN, 100); RstStreamFrame rst(1, GenericApplicationErrorCode::UNKNOWN, 100);
stream.finalReadOffset = 1024; stream.finalReadOffset = 1024;
EXPECT_THROW( EXPECT_THROW(
invokeStreamStateMachine(conn, stream, std::move(rst)), invokeStreamReceiveStateMachine(conn, stream, std::move(rst)),
QuicTransportException); QuicTransportException);
bool matches = matchesStates<StreamStateData, StreamStates::WaitingForRstAck>( bool matches =
stream.state); matchesStates<StreamSendStateData, StreamSendStates::ResetSent>(
stream.send.state);
EXPECT_TRUE(matches); EXPECT_TRUE(matches);
} }
@@ -361,9 +362,10 @@ TEST_F(QuicStateFunctionsTest, InvokeResetDoesNotSendFlowControl) {
stream.flowControlState.windowSize = 100; stream.flowControlState.windowSize = 100;
conn.flowControlState.advertisedMaxOffset = 100; conn.flowControlState.advertisedMaxOffset = 100;
conn.flowControlState.windowSize = 100; conn.flowControlState.windowSize = 100;
invokeStreamStateMachine(conn, stream, std::move(rst)); invokeStreamReceiveStateMachine(conn, stream, std::move(rst));
bool matches = matchesStates<StreamStateData, StreamStates::WaitingForRstAck>( bool matches =
stream.state); matchesStates<StreamSendStateData, StreamSendStates::ResetSent>(
stream.send.state);
EXPECT_TRUE(matches); EXPECT_TRUE(matches);
EXPECT_FALSE(conn.streamManager->hasWindowUpdates()); EXPECT_FALSE(conn.streamManager->hasWindowUpdates());
EXPECT_TRUE(conn.pendingEvents.connWindowUpdate); EXPECT_TRUE(conn.pendingEvents.connWindowUpdate);
@@ -376,13 +378,13 @@ TEST_F(QuicStateFunctionsTest, TestInvokeStreamStateMachineStreamError) {
QuicStreamState stream(1, conn); QuicStreamState stream(1, conn);
RstStreamFrame rst(1, GenericApplicationErrorCode::UNKNOWN, 100); RstStreamFrame rst(1, GenericApplicationErrorCode::UNKNOWN, 100);
try { try {
invokeStreamStateMachine(conn, stream, StreamEvents::RstAck(rst)); invokeStreamSendStateMachine(conn, stream, StreamEvents::RstAck(rst));
ADD_FAILURE(); ADD_FAILURE();
} catch (QuicTransportException& ex) { } catch (QuicTransportException& ex) {
EXPECT_EQ(ex.errorCode(), TransportErrorCode::STREAM_STATE_ERROR); EXPECT_EQ(ex.errorCode(), TransportErrorCode::STREAM_STATE_ERROR);
} }
bool matches = bool matches = matchesStates<StreamSendStateData, StreamSendStates::Open>(
matchesStates<StreamStateData, StreamStates::Open>(stream.state); stream.send.state);
EXPECT_TRUE(matches); EXPECT_TRUE(matches);
} }

View File

@@ -1518,7 +1518,8 @@ TEST_F(QuicStreamFunctionsTest, RemovedClosedState) {
conn.streamManager->queueWindowUpdate(streamId); conn.streamManager->queueWindowUpdate(streamId);
conn.streamManager->addStopSending( conn.streamManager->addStopSending(
streamId, GenericApplicationErrorCode::UNKNOWN); streamId, GenericApplicationErrorCode::UNKNOWN);
stream->state = StreamStates::Closed{}; stream->send.state = StreamSendStates::Closed{};
stream->recv.state = StreamReceiveStates::Closed{};
conn.streamManager->removeClosedStream(streamId); conn.streamManager->removeClosedStream(streamId);
EXPECT_FALSE(conn.streamManager->streamExists(streamId)); EXPECT_FALSE(conn.streamManager->streamExists(streamId));
EXPECT_TRUE(conn.streamManager->readableStreams().empty()); EXPECT_TRUE(conn.streamManager->readableStreams().empty());
@@ -1594,8 +1595,8 @@ TEST_F(QuicServerStreamFunctionsTest, ServerGetCloseBothDirections) {
conn.streamManager->createStream(serverBiStream).value(); conn.streamManager->createStream(serverBiStream).value();
EXPECT_EQ(conn.streamManager->getStream(serverBiStream)->id, serverBiStream); EXPECT_EQ(conn.streamManager->getStream(serverBiStream)->id, serverBiStream);
StreamId serverUniStream = 0x0B; StreamId serverUniStream = 0x0B;
conn.streamManager->createStream(serverUniStream).value()->state = auto stream = conn.streamManager->createStream(serverUniStream).value();
StreamStates::Closed{}; stream->send.state = StreamSendStates::Closed{};
conn.streamManager->removeClosedStream(serverUniStream); conn.streamManager->removeClosedStream(serverUniStream);
EXPECT_TRUE( EXPECT_TRUE(
@@ -1679,7 +1680,10 @@ TEST_F(QuicStreamFunctionsTest, StreamExists) {
EXPECT_FALSE(conn.streamManager->streamExists(peerStream)); EXPECT_FALSE(conn.streamManager->streamExists(peerStream));
EXPECT_FALSE(conn.streamManager->streamExists(peerAutoOpened)); EXPECT_FALSE(conn.streamManager->streamExists(peerAutoOpened));
conn.streamManager->getStream(peerStream)->state = StreamStates::Closed{}; conn.streamManager->getStream(peerStream)->send.state =
StreamSendStates::Closed{};
conn.streamManager->getStream(peerStream)->recv.state =
StreamReceiveStates::Closed{};
EXPECT_TRUE(conn.streamManager->streamExists(localStream)); EXPECT_TRUE(conn.streamManager->streamExists(localStream));
EXPECT_TRUE(conn.streamManager->streamExists(localAutoOpened)); EXPECT_TRUE(conn.streamManager->streamExists(localAutoOpened));
EXPECT_FALSE(conn.streamManager->streamExists(notOpenedLocal)); EXPECT_FALSE(conn.streamManager->streamExists(notOpenedLocal));

View File

@@ -41,6 +41,7 @@ void ThrowExceptionHandler(const ConnectionState&) {
struct TestMachine { struct TestMachine {
using StateData = ConnectionState; using StateData = ConnectionState;
using UserData = ConnectionState;
static auto constexpr InvalidEventHandler = &ThrowExceptionHandler; static auto constexpr InvalidEventHandler = &ThrowExceptionHandler;
}; };
@@ -51,14 +52,16 @@ QUIC_DECLARE_STATE_HANDLER(TestMachine, State2, Event2, State1, State3);
// The handlers // The handlers
void Handler<TestMachine, State1, Event1>::handle( void Handler<TestMachine, State1, Event1>::handle(
ConnectionState& connState, ConnectionState& connState,
Event1 /*event*/) { Event1 /*event*/,
TestMachine::UserData&) {
connState.visitedState1Event1 = true; connState.visitedState1Event1 = true;
transit<State2>(connState); transit<State2>(connState);
} }
void Handler<TestMachine, State1, Event2>::handle( void Handler<TestMachine, State1, Event2>::handle(
ConnectionState& connState, ConnectionState& connState,
Event2 /*event*/) { Event2 /*event*/,
TestMachine::UserData&) {
connState.visitedState1Event2 = true; connState.visitedState1Event2 = true;
transit<State3>(connState); transit<State3>(connState);
// transit<State1>(connState); This will fail to compile because it // transit<State1>(connState); This will fail to compile because it
@@ -67,7 +70,8 @@ void Handler<TestMachine, State1, Event2>::handle(
void Handler<TestMachine, State2, Event2>::handle( void Handler<TestMachine, State2, Event2>::handle(
ConnectionState& connState, ConnectionState& connState,
Event2 /*event*/) { Event2 /*event*/,
TestMachine::UserData&) {
connState.visitedState2Event2 = true; connState.visitedState2Event2 = true;
transit<State3>(connState); transit<State3>(connState);
} }
@@ -83,6 +87,7 @@ struct ConnectionStateT {
template <typename T> template <typename T>
struct TestMachineT { struct TestMachineT {
using StateData = ConnectionStateT<T>; using StateData = ConnectionStateT<T>;
using UserData = ConnectionStateT<T>;
static void InvalidEventHandler(ConnectionStateT<T>& /*s*/) { static void InvalidEventHandler(ConnectionStateT<T>& /*s*/) {
throw InvalidHandlerException("invalid state in template machine"); throw InvalidHandlerException("invalid state in template machine");
} }
@@ -95,7 +100,8 @@ QUIC_DECLARE_STATE_HANDLER_T(State2, Event2, State3);
template <typename Machine> template <typename Machine>
void Handler<Machine, State1, Event1>::handle( void Handler<Machine, State1, Event1>::handle(
typename Machine::StateData& s, typename Machine::StateData& s,
Event1) { Event1,
typename Machine::UserData&) {
s.visitedState1Event1 = true; s.visitedState1Event1 = true;
transit<State2>(s); transit<State2>(s);
} }
@@ -103,7 +109,8 @@ void Handler<Machine, State1, Event1>::handle(
template <typename Machine> template <typename Machine>
void Handler<Machine, State1, Event2>::handle( void Handler<Machine, State1, Event2>::handle(
typename Machine::StateData& s, typename Machine::StateData& s,
Event2) { Event2,
typename Machine::UserData&) {
s.visitedState1Event2 = true; s.visitedState1Event2 = true;
transit<State3>(s); transit<State3>(s);
} }
@@ -111,7 +118,8 @@ void Handler<Machine, State1, Event2>::handle(
template <typename Machine> template <typename Machine>
void Handler<Machine, State2, Event2>::handle( void Handler<Machine, State2, Event2>::handle(
typename Machine::StateData& s, typename Machine::StateData& s,
Event2) { Event2,
typename Machine::UserData&) {
s.visitedState2Event2 = true; s.visitedState2Event2 = true;
transit<State3>(s); transit<State3>(s);
} }
@@ -126,7 +134,7 @@ class StateMachineTest : public Test {
TEST_F(StateMachineTest, TestTransitions) { TEST_F(StateMachineTest, TestTransitions) {
state.state = State1(); state.state = State1();
invokeHandler<TestMachine>(state, Event1()); invokeHandler<TestMachine>(state, Event1(), state);
EXPECT_TRUE(state.visitedState1Event1); EXPECT_TRUE(state.visitedState1Event1);
EXPECT_FALSE(state.visitedState1Event2); EXPECT_FALSE(state.visitedState1Event2);
EXPECT_FALSE(state.visitedState2Event2); EXPECT_FALSE(state.visitedState2Event2);
@@ -134,7 +142,7 @@ TEST_F(StateMachineTest, TestTransitions) {
// check that the state is correct. // check that the state is correct.
boost::get<State2>(state.state); boost::get<State2>(state.state);
invokeHandler<TestMachine>(state, Event2()); invokeHandler<TestMachine>(state, Event2(), state);
EXPECT_TRUE(state.visitedState1Event1); EXPECT_TRUE(state.visitedState1Event1);
EXPECT_FALSE(state.visitedState1Event2); EXPECT_FALSE(state.visitedState1Event2);
@@ -146,19 +154,20 @@ TEST_F(StateMachineTest, TestTransitions) {
TEST_F(StateMachineTest, TestInvalid) { TEST_F(StateMachineTest, TestInvalid) {
state.state = State2(); state.state = State2();
EXPECT_THROW( EXPECT_THROW(
invokeHandler<TestMachine>(state, Event1()), InvalidHandlerException); invokeHandler<TestMachine>(state, Event1(), state),
InvalidHandlerException);
} }
TEST_F(StateMachineTest, TestTemplateTransitions) { TEST_F(StateMachineTest, TestTemplateTransitions) {
stateT.state = State1(); stateT.state = State1();
invokeHandler<TestMachineT<int>>(stateT, Event1()); invokeHandler<TestMachineT<int>>(stateT, Event1(), stateT);
EXPECT_TRUE(stateT.visitedState1Event1); EXPECT_TRUE(stateT.visitedState1Event1);
EXPECT_FALSE(stateT.visitedState1Event2); EXPECT_FALSE(stateT.visitedState1Event2);
EXPECT_FALSE(stateT.visitedState2Event2); EXPECT_FALSE(stateT.visitedState2Event2);
boost::get<State2>(stateT.state); boost::get<State2>(stateT.state);
invokeHandler<TestMachineT<int>>(stateT, Event2()); invokeHandler<TestMachineT<int>>(stateT, Event2(), stateT);
EXPECT_TRUE(stateT.visitedState1Event1); EXPECT_TRUE(stateT.visitedState1Event1);
EXPECT_FALSE(stateT.visitedState1Event2); EXPECT_FALSE(stateT.visitedState1Event2);
@@ -170,7 +179,7 @@ TEST_F(StateMachineTest, TestTemplateTransitions) {
TEST_F(StateMachineTest, TestTemplateInvalid) { TEST_F(StateMachineTest, TestTemplateInvalid) {
stateT.state = State2(); stateT.state = State2();
EXPECT_THROW( EXPECT_THROW(
invokeHandler<TestMachineT<int>>(stateT, Event1()), invokeHandler<TestMachineT<int>>(stateT, Event1(), stateT),
InvalidHandlerException); InvalidHandlerException);
} }
} }