1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

add onByteEventRegistered callback

Summary:
Add an onByteEventRegistered callback that will be invoked when the
registration for a ByteEvent callback is successful.
This makes it easy for recipients of the callback to track the successful
registration AND callback recipt in a single place (the recipient class). For
example, if all successfully registered ByteEvents have been received, the
recipient can destroy itself safely, knowing that no more ByteEvents will be
sent to it.

Reviewed By: bschlinker

Differential Revision: D27100624

fbshipit-source-id: dbfeff1f4cf2367587fdb73cbd334165b3b159de
This commit is contained in:
Sridhar Srinivasan
2021-03-18 18:12:10 -07:00
committed by Facebook GitHub Bot
parent ef613ec410
commit 0ddb6e77b5
5 changed files with 183 additions and 4 deletions

View File

@@ -1363,14 +1363,20 @@ TEST_F(QuicTransportImplTest, RegisterTxDeliveryCallbackLowerThanExpected) {
NiceMock<MockDeliveryCallback> dcb2;
NiceMock<MockDeliveryCallback> dcb3;
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 10)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 20)));
transport->registerTxCallback(stream, 10, &txcb1);
transport->registerTxCallback(stream, 20, &txcb2);
transport->registerDeliveryCallback(stream, 10, &dcb1);
transport->registerDeliveryCallback(stream, 20, &dcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
auto streamState = transport->transportConn->streamManager->getStream(stream);
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
EXPECT_CALL(txcb3, onByteEventRegistered(getTxMatcher(stream, 2)));
EXPECT_CALL(txcb3, onByteEvent(getTxMatcher(stream, 2)));
EXPECT_CALL(dcb3, onDeliveryAck(stream, 2, _));
transport->registerTxCallback(stream, 2, &txcb3);
@@ -1401,6 +1407,7 @@ TEST_F(
auto streamState = transport->transportConn->streamManager->getStream(stream);
streamState->currentWriteOffset = 7;
EXPECT_CALL(txcb, onByteEventRegistered(getTxMatcher(stream, 2)));
EXPECT_CALL(txcb, onByteEventCanceled(getTxMatcher(stream, 2)));
EXPECT_CALL(dcb, onCanceled(_, _));
transport->registerTxCallback(stream, 2, &txcb);
@@ -1417,6 +1424,8 @@ TEST_F(QuicTransportImplTest, CancelAllByteEventCallbacks) {
NiceMock<MockByteEventCallback> txcb1;
NiceMock<MockByteEventCallback> txcb2;
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 20)));
transport->registerTxCallback(stream1, 10, &txcb1);
transport->registerTxCallback(stream2, 20, &txcb2);
@@ -1494,6 +1503,8 @@ TEST_F(QuicTransportImplTest, CancelByteEventCallbacksForStream) {
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 20)));
transport->registerTxCallback(stream1, 10, &txcb1);
transport->registerTxCallback(stream2, 20, &txcb2);
transport->registerDeliveryCallback(stream1, 10, &dcb1);
@@ -1587,6 +1598,12 @@ TEST_F(QuicTransportImplTest, CancelByteEventCallbacksForStreamWithOffset) {
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 15)));
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 20)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 10)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 15)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 20)));
transport->registerTxCallback(stream1, 10, &txcb1);
transport->registerTxCallback(stream1, 15, &txcb1);
transport->registerTxCallback(stream1, 20, &txcb1);
@@ -1740,6 +1757,10 @@ TEST_F(QuicTransportImplTest, CancelByteEventCallbacksTx) {
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 15)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 10)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 15)));
transport->registerTxCallback(stream1, 10, &txcb1);
transport->registerTxCallback(stream1, 15, &txcb1);
transport->registerTxCallback(stream2, 10, &txcb2);
@@ -1818,6 +1839,10 @@ TEST_F(QuicTransportImplTest, CancelByteEventCallbacksDelivery) {
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 15)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 10)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 15)));
transport->registerTxCallback(stream1, 10, &txcb1);
transport->registerTxCallback(stream1, 15, &txcb1);
transport->registerTxCallback(stream2, 10, &txcb2);
@@ -1990,6 +2015,8 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithActiveStream) {
EXPECT_CALL(*socketPtr, write(_, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb);
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 0)));
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 4)));
EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError());
EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError());
transport->closeGracefully();
@@ -2000,6 +2027,7 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithActiveStream) {
EXPECT_TRUE(transport->setReadCallback(stream, &rcb).hasError());
EXPECT_TRUE(transport->notifyPendingWriteOnStream(stream, &wcb).hasError());
EXPECT_TRUE(transport->notifyPendingWriteOnConnection(&wcbConn).hasError());
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 2))).Times(0);
EXPECT_TRUE(transport->registerTxCallback(stream, 2, &txCb).hasError());
EXPECT_TRUE(
transport->registerDeliveryCallback(stream, 2, &deliveryCb).hasError());
@@ -2041,6 +2069,8 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithNoActiveStream) {
EXPECT_CALL(*socketPtr, write(_, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb);
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 0)));
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 4)));
EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError());
EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError());
@@ -2059,6 +2089,7 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithNoActiveStream) {
EXPECT_TRUE(transport->setReadCallback(stream, &rcb).hasError());
EXPECT_TRUE(transport->notifyPendingWriteOnStream(stream, &wcb).hasError());
EXPECT_TRUE(transport->notifyPendingWriteOnConnection(&wcbConn).hasError());
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 2))).Times(0);
EXPECT_TRUE(transport->registerTxCallback(stream, 2, &txCb).hasError());
EXPECT_TRUE(
transport->registerDeliveryCallback(stream, 2, &deliveryCb).hasError());
@@ -2095,6 +2126,8 @@ TEST_F(QuicTransportImplTest, TestImmediateClose) {
EXPECT_CALL(*socketPtr, write(_, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb);
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 0)));
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 4)));
EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError());
EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError());
transport->close(std::make_pair(
@@ -2107,6 +2140,7 @@ TEST_F(QuicTransportImplTest, TestImmediateClose) {
EXPECT_TRUE(transport->setReadCallback(stream, &rcb).hasError());
EXPECT_TRUE(transport->notifyPendingWriteOnStream(stream, &wcb).hasError());
EXPECT_TRUE(transport->notifyPendingWriteOnConnection(&wcbConn).hasError());
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 2))).Times(0);
EXPECT_TRUE(transport->registerTxCallback(stream, 2, &txCb).hasError());
EXPECT_TRUE(
transport->registerDeliveryCallback(stream, 2, &deliveryCb).hasError());