1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

remove the unsupported cork param from QUIC writeChain interface

Summary: this param is passed to transport then ignored

Reviewed By: avasylev

Differential Revision: D26133327

fbshipit-source-id: 459dd0132185513215ba034f213d4137d7b56ba1
This commit is contained in:
Yang Chi
2021-01-29 10:48:12 -08:00
committed by Facebook GitHub Bot
parent 8320f7369a
commit 7c23fc75cc
13 changed files with 123 additions and 185 deletions

View File

@@ -993,10 +993,8 @@ class QuicSocket {
/** /**
* Write data/eof to the given stream. * Write data/eof to the given stream.
* *
* cork indicates to the transport that the application expects to write * Passing a delivery callback registers a callback from the transport when
* more data soon. Passing a delivery callback registers a callback from the * the peer has acknowledged the receipt of all the data/eof passed to write.
* transport when the peer has acknowledged the receipt of all the data/eof
* passed to write.
* *
* An error code is present if there was an error with the write. * An error code is present if there was an error with the write.
*/ */
@@ -1005,7 +1003,6 @@ class QuicSocket {
StreamId id, StreamId id,
Buf data, Buf data,
bool eof, bool eof,
bool cork,
DeliveryCallback* cb = nullptr) = 0; DeliveryCallback* cb = nullptr) = 0;
/** /**

View File

@@ -460,7 +460,6 @@ void QuicStreamAsyncTransport::send(uint64_t maxToSend) {
*id_, *id_,
writeBuf_.split(toSend), writeBuf_.split(toSend),
writeEOF, writeEOF,
false,
nullptr); // no delivery callbacks right now nullptr); // no delivery callbacks right now
if (res.hasError()) { if (res.hasError()) {
folly::AsyncSocketException ex( folly::AsyncSocketException ex(

View File

@@ -2084,7 +2084,6 @@ QuicSocket::WriteResult QuicTransportBase::writeChain(
StreamId id, StreamId id,
Buf data, Buf data,
bool eof, bool eof,
bool /*cork*/,
DeliveryCallback* cb) { DeliveryCallback* cb) {
if (isReceivingStream(conn_->nodeType, id)) { if (isReceivingStream(conn_->nodeType, id)) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);

View File

@@ -192,7 +192,6 @@ class QuicTransportBase : public QuicSocket {
StreamId id, StreamId id,
Buf data, Buf data,
bool eof, bool eof,
bool cork,
DeliveryCallback* cb = nullptr) override; DeliveryCallback* cb = nullptr) override;
folly::Expected<folly::Unit, LocalErrorCode> registerDeliveryCallback( folly::Expected<folly::Unit, LocalErrorCode> registerDeliveryCallback(

View File

@@ -188,18 +188,14 @@ class MockQuicSocket : public QuicSocket {
MOCK_CONST_METHOD2( MOCK_CONST_METHOD2(
getNumByteEventCallbacksForStream, getNumByteEventCallbacksForStream,
size_t(const ByteEvent::Type, const StreamId)); size_t(const ByteEvent::Type, const StreamId));
folly::Expected<folly::Unit, LocalErrorCode> writeChain( folly::Expected<folly::Unit, LocalErrorCode>
StreamId id, writeChain(StreamId id, Buf data, bool eof, DeliveryCallback* cb) override {
Buf data,
bool eof,
bool cork,
DeliveryCallback* cb) override {
SharedBuf sharedData(data.release()); SharedBuf sharedData(data.release());
return writeChain(id, sharedData, eof, cork, cb); return writeChain(id, sharedData, eof, cb);
} }
MOCK_METHOD5( MOCK_METHOD4(
writeChain, writeChain,
WriteResult(StreamId, SharedBuf, bool, bool, DeliveryCallback*)); WriteResult(StreamId, SharedBuf, bool, DeliveryCallback*));
MOCK_METHOD3( MOCK_METHOD3(
registerDeliveryCallback, registerDeliveryCallback,
folly::Expected<folly::Unit, LocalErrorCode>( folly::Expected<folly::Unit, LocalErrorCode>(

View File

@@ -1210,7 +1210,7 @@ TEST_F(QuicTransportImplTest, onUniStreamsAvailableCallbackAfterExausted) {
TEST_F(QuicTransportImplTest, ReadDataAlsoChecksLossAlarm) { TEST_F(QuicTransportImplTest, ReadDataAlsoChecksLossAlarm) {
transport->transportConn->oneRttWriteCipher = test::createNoOpAead(); transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
auto stream = transport->createBidirectionalStream().value(); auto stream = transport->createBidirectionalStream().value();
transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), true, false); transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), true);
// Artificially stop the write looper so that the read can trigger it. // Artificially stop the write looper so that the read can trigger it.
transport->writeLooper()->stop(); transport->writeLooper()->stop();
transport->addDataToStream( transport->addDataToStream(
@@ -1227,8 +1227,7 @@ TEST_F(QuicTransportImplTest, ConnectionErrorOnWrite) {
auto stream = transport->createBidirectionalStream().value(); auto stream = transport->createBidirectionalStream().value();
EXPECT_CALL(*socketPtr, write(_, _)) EXPECT_CALL(*socketPtr, write(_, _))
.WillOnce(SetErrnoAndReturn(ENETUNREACH, -1)); .WillOnce(SetErrnoAndReturn(ENETUNREACH, -1));
transport->writeChain( transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), true, nullptr);
stream, folly::IOBuf::copyBuffer("Hey"), true, false, nullptr);
transport->addDataToStream( transport->addDataToStream(
stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0)); stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
evb->loopOnce(); evb->loopOnce();
@@ -1262,7 +1261,6 @@ TEST_F(QuicTransportImplTest, ReadErrorUnsanitizedErrorMsg) {
stream, stream,
folly::IOBuf::copyBuffer("You are being too loud."), folly::IOBuf::copyBuffer("You are being too loud."),
true, true,
false,
nullptr); nullptr);
evb->loopOnce(); evb->loopOnce();
@@ -1281,8 +1279,7 @@ TEST_F(QuicTransportImplTest, ConnectionErrorUnhandledException) {
throw std::runtime_error("Well there's your problem"); throw std::runtime_error("Well there's your problem");
return 0; return 0;
})); }));
transport->writeChain( transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), true, nullptr);
stream, folly::IOBuf::copyBuffer("Hey"), true, false, nullptr);
transport->addDataToStream( transport->addDataToStream(
stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0)); stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
evb->loopOnce(); evb->loopOnce();
@@ -2110,8 +2107,7 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithActiveStream) {
transport->setReadCallback(stream, &rcb); transport->setReadCallback(stream, &rcb);
EXPECT_CALL(*socketPtr, write(_, _)) EXPECT_CALL(*socketPtr, write(_, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1)); .WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
transport->writeChain( transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb);
stream, IOBuf::copyBuffer("hello"), true, false, &deliveryCb);
EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError()); EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError());
EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError()); EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError());
transport->closeGracefully(); transport->closeGracefully();
@@ -2162,8 +2158,7 @@ TEST_F(QuicTransportImplTest, TestGracefulCloseWithNoActiveStream) {
transport->setReadCallback(stream, &rcb); transport->setReadCallback(stream, &rcb);
EXPECT_CALL(*socketPtr, write(_, _)) EXPECT_CALL(*socketPtr, write(_, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1)); .WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
transport->writeChain( transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb);
stream, IOBuf::copyBuffer("hello"), true, false, &deliveryCb);
EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError()); EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError());
EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError()); EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError());
@@ -2217,8 +2212,7 @@ TEST_F(QuicTransportImplTest, TestImmediateClose) {
transport->setReadCallback(stream, &rcb); transport->setReadCallback(stream, &rcb);
EXPECT_CALL(*socketPtr, write(_, _)) EXPECT_CALL(*socketPtr, write(_, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1)); .WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
transport->writeChain( transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb);
stream, IOBuf::copyBuffer("hello"), true, false, &deliveryCb);
EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError()); EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError());
EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError()); EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError());
transport->close(std::make_pair( transport->close(std::make_pair(
@@ -2309,8 +2303,7 @@ TEST_F(QuicTransportImplTest, AsyncStreamFlowControlWrite) {
TEST_F(QuicTransportImplTest, ExceptionInWriteLooperDoesNotCrash) { TEST_F(QuicTransportImplTest, ExceptionInWriteLooperDoesNotCrash) {
auto stream = transport->createBidirectionalStream().value(); auto stream = transport->createBidirectionalStream().value();
transport->setReadCallback(stream, nullptr); transport->setReadCallback(stream, nullptr);
transport->writeChain( transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, nullptr);
stream, IOBuf::copyBuffer("hello"), true, false, nullptr);
transport->addDataToStream( transport->addDataToStream(
stream, StreamBuffer(IOBuf::copyBuffer("hello"), 0, false)); stream, StreamBuffer(IOBuf::copyBuffer("hello"), 0, false));
EXPECT_CALL(*socketPtr, write(_, _)).WillOnce(SetErrnoAndReturn(EBADF, -1)); EXPECT_CALL(*socketPtr, write(_, _)).WillOnce(SetErrnoAndReturn(EBADF, -1));
@@ -2430,8 +2423,7 @@ TEST_F(QuicTransportImplTest, UnidirectionalInvalidWriteFuncs) {
.thenOrThrow([&](auto) {}), .thenOrThrow([&](auto) {}),
folly::Unexpected<LocalErrorCode>::BadExpectedAccess); folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
EXPECT_THROW( EXPECT_THROW(
transport transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), false)
->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), false, false)
.thenOrThrow([&](auto) {}), .thenOrThrow([&](auto) {}),
folly::Unexpected<LocalErrorCode>::BadExpectedAccess); folly::Unexpected<LocalErrorCode>::BadExpectedAccess);
EXPECT_THROW( EXPECT_THROW(

View File

@@ -253,7 +253,7 @@ TEST_F(QuicTransportTest, WriteDataWithProbing) {
socketWriteCounter++; socketWriteCounter++;
return iobuf->computeChainDataLength(); return iobuf->computeChainDataLength();
})); }));
transport_->writeChain(streamId, buf->clone(), true, false); transport_->writeChain(streamId, buf->clone(), true);
loopForWrites(); loopForWrites();
// Pending numProbePackets is cleared: // Pending numProbePackets is cleared:
EXPECT_EQ(0, conn.pendingEvents.numProbePackets); EXPECT_EQ(0, conn.pendingEvents.numProbePackets);
@@ -274,11 +274,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLoss) {
auto lossStream = transport_->createBidirectionalStream().value(); auto lossStream = transport_->createBidirectionalStream().value();
conn.streamManager->addLoss(lossStream); conn.streamManager->addLoss(lossStream);
transport_->writeChain( transport_->writeChain(
stream, stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr);
IOBuf::copyBuffer("An elephant sitting still"),
false,
false,
nullptr);
EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0); EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0);
EXPECT_CALL(connCallback_, onAppRateLimited()).Times(0); EXPECT_CALL(connCallback_, onAppRateLimited()).Times(0);
loopForWrites(); loopForWrites();
@@ -302,11 +298,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithNoWritableBytes) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
transport_->writeChain( transport_->writeChain(
stream, stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr);
IOBuf::copyBuffer("An elephant sitting still"),
false,
false,
nullptr);
EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0); EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0);
EXPECT_CALL(connCallback_, onAppRateLimited()).Times(0); EXPECT_CALL(connCallback_, onAppRateLimited()).Times(0);
loopForWrites(); loopForWrites();
@@ -325,7 +317,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBuffer) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(100 * 2000); auto buf = buildRandomInputData(100 * 2000);
transport_->writeChain(stream, buf->clone(), false, false, nullptr); transport_->writeChain(stream, buf->clone(), false, nullptr);
EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0); EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(0);
EXPECT_CALL(connCallback_, onAppRateLimited()).Times(0); EXPECT_CALL(connCallback_, onAppRateLimited()).Times(0);
loopForWrites(); loopForWrites();
@@ -344,11 +336,7 @@ TEST_F(QuicTransportTest, AppLimited) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
transport_->writeChain( transport_->writeChain(
stream, stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr);
IOBuf::copyBuffer("An elephant sitting still"),
false,
false,
nullptr);
EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(1); EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(1);
EXPECT_CALL(connCallback_, onAppRateLimited()).Times(1); EXPECT_CALL(connCallback_, onAppRateLimited()).Times(1);
loopForWrites(); loopForWrites();
@@ -379,11 +367,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithNoWritableBytesWithObservers) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
transport_->writeChain( transport_->writeChain(
stream, stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr);
IOBuf::copyBuffer("An elephant sitting still"),
false,
false,
nullptr);
EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0); EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0);
loopForWrites(); loopForWrites();
Mock::VerifyAndClearExpectations(cb.get()); Mock::VerifyAndClearExpectations(cb.get());
@@ -413,7 +397,7 @@ TEST_F(QuicTransportTest, NotAppLimitedWithLargeBufferWithObservers) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(100 * 2000); auto buf = buildRandomInputData(100 * 2000);
transport_->writeChain(stream, buf->clone(), false, false, nullptr); transport_->writeChain(stream, buf->clone(), false, nullptr);
EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0); EXPECT_CALL(*cb, appRateLimited(transport_.get())).Times(0);
loopForWrites(); loopForWrites();
Mock::VerifyAndClearExpectations(cb.get()); Mock::VerifyAndClearExpectations(cb.get());
@@ -445,11 +429,7 @@ TEST_F(QuicTransportTest, AppLimitedWithObservers) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
transport_->writeChain( transport_->writeChain(
stream, stream, IOBuf::copyBuffer("An elephant sitting still"), false, nullptr);
IOBuf::copyBuffer("An elephant sitting still"),
false,
false,
nullptr);
EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(1); EXPECT_CALL(*rawCongestionController, setAppLimited()).Times(1);
EXPECT_CALL(*cb1, appRateLimited(transport_.get())); EXPECT_CALL(*cb1, appRateLimited(transport_.get()));
EXPECT_CALL(*cb2, appRateLimited(transport_.get())); EXPECT_CALL(*cb2, appRateLimited(transport_.get()));
@@ -472,7 +452,7 @@ TEST_F(QuicTransportTest, WriteSmall) {
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
transport_->setStreamPriority(stream, 0, false); transport_->setStreamPriority(stream, 0, false);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
@@ -505,7 +485,7 @@ TEST_F(QuicTransportTest, WriteLarge) {
EXPECT_CALL(*socket_, write(_, _)) EXPECT_CALL(*socket_, write(_, _))
.Times(NumFullPackets + 1) .Times(NumFullPackets + 1)
.WillRepeatedly(Invoke(bufLength)); .WillRepeatedly(Invoke(bufLength));
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
EXPECT_EQ(NumFullPackets + 1, conn.outstandings.packets.size()); EXPECT_EQ(NumFullPackets + 1, conn.outstandings.packets.size());
@@ -534,7 +514,7 @@ TEST_F(QuicTransportTest, WriteMultipleTimes) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
size_t originalWriteOffset = size_t originalWriteOffset =
@@ -545,7 +525,7 @@ TEST_F(QuicTransportTest, WriteMultipleTimes) {
conn.streamManager->findStream(stream)->retransmissionBuffer.clear(); conn.streamManager->findStream(stream)->retransmissionBuffer.clear();
buf = buildRandomInputData(50); buf = buildRandomInputData(50);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
verifyCorrectness(conn, originalWriteOffset, stream, *buf); verifyCorrectness(conn, originalWriteOffset, stream, *buf);
EXPECT_EQ(WriteDataReason::NO_WRITE, shouldWriteData(conn)); EXPECT_EQ(WriteDataReason::NO_WRITE, shouldWriteData(conn));
@@ -557,14 +537,14 @@ TEST_F(QuicTransportTest, WriteMultipleStreams) {
auto s2 = transport_->createBidirectionalStream().value(); auto s2 = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(s1, buf->clone(), false, false); transport_->writeChain(s1, buf->clone(), false);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
verifyCorrectness(conn, 0, s1, *buf); verifyCorrectness(conn, 0, s1, *buf);
auto buf2 = buildRandomInputData(20); auto buf2 = buildRandomInputData(20);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(s2, buf2->clone(), false, false); transport_->writeChain(s2, buf2->clone(), false);
loopForWrites(); loopForWrites();
verifyCorrectness(conn, 0, s2, *buf2); verifyCorrectness(conn, 0, s2, *buf2);
@@ -603,7 +583,7 @@ TEST_F(QuicTransportTest, WriteFlowControl) {
folly::IOBuf passedIn; folly::IOBuf passedIn;
// Write stream blocked frame // Write stream blocked frame
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(streamId, buf->clone(), false, false); transport_->writeChain(streamId, buf->clone(), false);
loopForWrites(); loopForWrites();
EXPECT_EQ(conn.outstandings.packets.size(), 1); EXPECT_EQ(conn.outstandings.packets.size(), 1);
@@ -710,7 +690,7 @@ TEST_F(QuicTransportTest, WriteErrorEagain) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(SetErrnoAndReturn(EAGAIN, -1)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(SetErrnoAndReturn(EAGAIN, -1));
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
} }
@@ -719,7 +699,7 @@ TEST_F(QuicTransportTest, WriteErrorBad) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(SetErrnoAndReturn(EBADF, -1)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(SetErrnoAndReturn(EBADF, -1));
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
EXPECT_TRUE(transport_->closed); EXPECT_TRUE(transport_->closed);
} }
@@ -728,7 +708,7 @@ TEST_F(QuicTransportTest, WriteInvalid) {
// Test writing to invalid stream // Test writing to invalid stream
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
auto res = transport_->writeChain(stream + 2, buf->clone(), false, false); auto res = transport_->writeChain(stream + 2, buf->clone(), false);
loopForWrites(); loopForWrites();
EXPECT_EQ(LocalErrorCode::STREAM_NOT_EXISTS, res.error()); EXPECT_EQ(LocalErrorCode::STREAM_NOT_EXISTS, res.error());
} }
@@ -737,7 +717,7 @@ TEST_F(QuicTransportTest, WriteFin) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(stream, buf->clone(), true, false); transport_->writeChain(stream, buf->clone(), true);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
verifyCorrectness(conn, 0, stream, *buf, true); verifyCorrectness(conn, 0, stream, *buf, true);
@@ -762,10 +742,10 @@ TEST_F(QuicTransportTest, WriteOnlyFin) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(stream, nullptr, true, false); transport_->writeChain(stream, nullptr, true);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
verifyCorrectness(conn, 0, stream, *buf, true); verifyCorrectness(conn, 0, stream, *buf, true);
@@ -790,7 +770,7 @@ TEST_F(QuicTransportTest, WriteDataWithRetransmission) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
verifyCorrectness(conn, 0, stream, *buf); verifyCorrectness(conn, 0, stream, *buf);
@@ -798,7 +778,7 @@ TEST_F(QuicTransportTest, WriteDataWithRetransmission) {
dropPackets(conn); dropPackets(conn);
auto buf2 = buildRandomInputData(50); auto buf2 = buildRandomInputData(50);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(stream, buf2->clone(), false, false); transport_->writeChain(stream, buf2->clone(), false);
loopForWrites(); loopForWrites();
// The first packet was lost. We should expect this packet contains both // The first packet was lost. We should expect this packet contains both
// lost data and new data // lost data and new data
@@ -841,7 +821,7 @@ TEST_F(QuicTransportTest, WritePendingAckIfHavingData) {
conn.ackStates.appDataAckState.numNonRxPacketsRecvd = 3; conn.ackStates.appDataAckState.numNonRxPacketsRecvd = 3;
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
// We should write acks if there is data pending // We should write acks if there is data pending
transport_->writeChain(streamId, buf->clone(), true, false); transport_->writeChain(streamId, buf->clone(), true);
loopForWrites(); loopForWrites();
EXPECT_EQ(conn.outstandings.packets.size(), 1); EXPECT_EQ(conn.outstandings.packets.size(), 1);
auto& packet = auto& packet =
@@ -1270,7 +1250,7 @@ TEST_F(QuicTransportTest, SendPathResponse) {
TEST_F(QuicTransportTest, CloneAfterRecvReset) { TEST_F(QuicTransportTest, CloneAfterRecvReset) {
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
auto streamId = transport_->createBidirectionalStream().value(); auto streamId = transport_->createBidirectionalStream().value();
transport_->writeChain(streamId, IOBuf::create(0), true, false); transport_->writeChain(streamId, IOBuf::create(0), true);
loopForWrites(); loopForWrites();
EXPECT_EQ(1, conn.outstandings.packets.size()); EXPECT_EQ(1, conn.outstandings.packets.size());
auto stream = conn.streamManager->getStream(streamId); auto stream = conn.streamManager->getStream(streamId);
@@ -1437,7 +1417,7 @@ TEST_F(QuicTransportTest, BusyWriteLoopDetection) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(100); auto buf = buildRandomInputData(100);
transport_->writeChain(stream, buf->clone(), true, false); transport_->writeChain(stream, buf->clone(), true);
transport_->updateWriteLooper(true); transport_->updateWriteLooper(true);
EXPECT_TRUE(conn.writeDebugState.needsWriteLoopDetect); EXPECT_TRUE(conn.writeDebugState.needsWriteLoopDetect);
EXPECT_EQ(0, conn.writeDebugState.currentEmptyLoopCount); EXPECT_EQ(0, conn.writeDebugState.currentEmptyLoopCount);
@@ -1583,7 +1563,7 @@ TEST_F(QuicTransportTest, NonWritableStreamAPI) {
auto streamState = conn.streamManager->getStream(streamId); auto streamState = conn.streamManager->getStream(streamId);
// write EOF // write EOF
transport_->writeChain(streamId, buf->clone(), true, false); transport_->writeChain(streamId, buf->clone(), true);
loopForWrites(); loopForWrites();
EXPECT_FALSE(streamState->writable()); EXPECT_FALSE(streamState->writable());
@@ -1611,7 +1591,7 @@ TEST_F(QuicTransportTest, NonWritableStreamAPI) {
TEST_F(QuicTransportTest, RstWrittenStream) { TEST_F(QuicTransportTest, RstWrittenStream) {
auto streamId = transport_->createBidirectionalStream().value(); auto streamId = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
transport_->writeChain(streamId, buf->clone(), false, false); transport_->writeChain(streamId, buf->clone(), false);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
auto stream = conn.streamManager->findStream(streamId); auto stream = conn.streamManager->findStream(streamId);
@@ -1702,7 +1682,7 @@ TEST_F(QuicTransportTest, RstStreamUDPWriteFailFatal) {
TEST_F(QuicTransportTest, WriteAfterSendRst) { TEST_F(QuicTransportTest, WriteAfterSendRst) {
auto streamId = transport_->createBidirectionalStream().value(); auto streamId = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
transport_->writeChain(streamId, buf->clone(), false, false); transport_->writeChain(streamId, buf->clone(), false);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
auto stream = conn.streamManager->findStream(streamId); auto stream = conn.streamManager->findStream(streamId);
@@ -1722,7 +1702,7 @@ TEST_F(QuicTransportTest, WriteAfterSendRst) {
// Write again: // Write again:
buf = buildRandomInputData(50); buf = buildRandomInputData(50);
// This shall fail: // This shall fail:
auto res = transport_->writeChain(streamId, buf->clone(), false, false); auto res = transport_->writeChain(streamId, buf->clone(), false);
loopForWrites(); loopForWrites();
EXPECT_EQ(LocalErrorCode::STREAM_CLOSED, res.error()); EXPECT_EQ(LocalErrorCode::STREAM_CLOSED, res.error());
@@ -1768,7 +1748,7 @@ TEST_F(QuicTransportTest, WriteStreamDataSetLossAlarm) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(1); auto buf = buildRandomInputData(1);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Invoke(bufLength));
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
EXPECT_TRUE(transport_->isLossTimeoutScheduled()); EXPECT_TRUE(transport_->isLossTimeoutScheduled());
} }
@@ -1877,7 +1857,7 @@ TEST_F(QuicTransportTest, DeliveryCallbackClosesClosedTransport) {
auto buf1 = buildRandomInputData(20); auto buf1 = buildRandomInputData(20);
TransportClosingDeliveryCallback dc(transport_.get(), 20); TransportClosingDeliveryCallback dc(transport_.get(), 20);
EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength));
transport_->writeChain(stream1, buf1->clone(), true, false, &dc); transport_->writeChain(stream1, buf1->clone(), true, &dc);
loopForWrites(); loopForWrites();
transport_->close(folly::none); transport_->close(folly::none);
} }
@@ -1888,7 +1868,7 @@ TEST_F(QuicTransportTest, DeliveryCallbackClosesTransportOnDelivered) {
TransportClosingDeliveryCallback dc(transport_.get(), 0); TransportClosingDeliveryCallback dc(transport_.get(), 0);
EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength));
transport_->registerDeliveryCallback(stream1, 0, &dc); transport_->registerDeliveryCallback(stream1, 0, &dc);
transport_->writeChain(stream1, buf1->clone(), true, false); transport_->writeChain(stream1, buf1->clone(), true);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
@@ -1907,7 +1887,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksNothingDelivered) {
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength));
transport_->registerDeliveryCallback(stream, 1, &mockedDeliveryCallback); transport_->registerDeliveryCallback(stream, 1, &mockedDeliveryCallback);
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
@@ -1922,7 +1902,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksNothingDelivered) {
// Otherwise, transport will be holding on to delivery callback pointers // Otherwise, transport will be holding on to delivery callback pointers
// that are already dead: // that are already dead:
auto buf2 = buildRandomInputData(100); auto buf2 = buildRandomInputData(100);
transport_->writeChain(stream, buf2->clone(), true, false); transport_->writeChain(stream, buf2->clone(), true);
streamState->ackedIntervals.insert(20, 99); streamState->ackedIntervals.insert(20, 99);
loopForWrites(); loopForWrites();
@@ -1941,7 +1921,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksAllDelivered) {
auto buf = buildRandomInputData(20); auto buf = buildRandomInputData(20);
EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength));
transport_->registerDeliveryCallback(stream, 1, &mockedDeliveryCallback); transport_->registerDeliveryCallback(stream, 1, &mockedDeliveryCallback);
transport_->writeChain(stream, buf->clone(), true, false); transport_->writeChain(stream, buf->clone(), true);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
@@ -1966,7 +1946,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksPartialDelivered) {
EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength));
transport_->registerDeliveryCallback(stream, 50, &mockedDeliveryCallback1); transport_->registerDeliveryCallback(stream, 50, &mockedDeliveryCallback1);
transport_->registerDeliveryCallback(stream, 150, &mockedDeliveryCallback2); transport_->registerDeliveryCallback(stream, 150, &mockedDeliveryCallback2);
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
@@ -1987,7 +1967,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksPartialDelivered) {
// Otherwise, transport will be holding on to delivery callback pointers // Otherwise, transport will be holding on to delivery callback pointers
// that are already dead: // that are already dead:
auto buf2 = buildRandomInputData(100); auto buf2 = buildRandomInputData(100);
transport_->writeChain(stream, buf2->clone(), true, false); transport_->writeChain(stream, buf2->clone(), true);
loopForWrites(); loopForWrites();
streamState->retransmissionBuffer.clear(); streamState->retransmissionBuffer.clear();
streamState->lossBuffer.clear(); streamState->lossBuffer.clear();
@@ -2007,7 +1987,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksRetxBuffer) {
EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength));
transport_->registerDeliveryCallback(stream, 50, &mockedDeliveryCallback1); transport_->registerDeliveryCallback(stream, 50, &mockedDeliveryCallback1);
transport_->registerDeliveryCallback(stream, 150, &mockedDeliveryCallback2); transport_->registerDeliveryCallback(stream, 150, &mockedDeliveryCallback2);
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
@@ -2033,7 +2013,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksRetxBuffer) {
// Otherwise, transport will be holding on to delivery callback pointers // Otherwise, transport will be holding on to delivery callback pointers
// that are already dead: // that are already dead:
auto buf2 = buildRandomInputData(100); auto buf2 = buildRandomInputData(100);
transport_->writeChain(stream, buf2->clone(), true, false); transport_->writeChain(stream, buf2->clone(), true);
loopForWrites(); loopForWrites();
streamState->retransmissionBuffer.clear(); streamState->retransmissionBuffer.clear();
streamState->lossBuffer.clear(); streamState->lossBuffer.clear();
@@ -2054,7 +2034,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksLossAndRetxBuffer) {
transport_->registerDeliveryCallback(stream, 30, &mockedDeliveryCallback1); transport_->registerDeliveryCallback(stream, 30, &mockedDeliveryCallback1);
transport_->registerDeliveryCallback(stream, 50, &mockedDeliveryCallback2); transport_->registerDeliveryCallback(stream, 50, &mockedDeliveryCallback2);
transport_->registerDeliveryCallback(stream, 150, &mockedDeliveryCallback3); transport_->registerDeliveryCallback(stream, 150, &mockedDeliveryCallback3);
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
auto& conn = transport_->getConnectionState(); auto& conn = transport_->getConnectionState();
@@ -2083,7 +2063,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksLossAndRetxBuffer) {
// Otherwise, transport will be holding on to delivery callback pointers // Otherwise, transport will be holding on to delivery callback pointers
// that are already dead: // that are already dead:
auto buf2 = buildRandomInputData(100); auto buf2 = buildRandomInputData(100);
transport_->writeChain(stream, buf2->clone(), true, false); transport_->writeChain(stream, buf2->clone(), true);
loopForWrites(); loopForWrites();
streamState->retransmissionBuffer.clear(); streamState->retransmissionBuffer.clear();
streamState->lossBuffer.clear(); streamState->lossBuffer.clear();
@@ -2110,11 +2090,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksSingleByte) {
auto buf = buildRandomInputData(1); auto buf = buildRandomInputData(1);
transport_->writeChain( transport_->writeChain(
stream, stream, buf->clone(), false /* eof */, &writeChainDeliveryCb);
buf->clone(),
false /* eof */,
false /* cork */,
&writeChainDeliveryCb);
transport_->registerDeliveryCallback(stream, 0, &firstByteDeliveryCb); transport_->registerDeliveryCallback(stream, 0, &firstByteDeliveryCb);
transport_->registerDeliveryCallback(stream, 0, &lastByteDeliveryCb); transport_->registerDeliveryCallback(stream, 0, &lastByteDeliveryCb);
transport_->registerDeliveryCallback(stream, 1, &unsentByteDeliveryCb); transport_->registerDeliveryCallback(stream, 1, &unsentByteDeliveryCb);
@@ -2166,11 +2142,7 @@ TEST_F(QuicTransportTest, InvokeDeliveryCallbacksSingleByteWithFin) {
auto buf = buildRandomInputData(1); auto buf = buildRandomInputData(1);
transport_->writeChain( transport_->writeChain(
stream, stream, buf->clone(), true /* eof */, &writeChainDeliveryCb);
buf->clone(),
true /* eof */,
false /* cork */,
&writeChainDeliveryCb);
transport_->registerDeliveryCallback(stream, 0, &firstByteDeliveryCb); transport_->registerDeliveryCallback(stream, 0, &firstByteDeliveryCb);
transport_->registerDeliveryCallback(stream, 0, &lastByteDeliveryCb); transport_->registerDeliveryCallback(stream, 0, &lastByteDeliveryCb);
transport_->registerDeliveryCallback(stream, 1, &finDeliveryCb); transport_->registerDeliveryCallback(stream, 1, &finDeliveryCb);
@@ -2220,8 +2192,7 @@ TEST_F(QuicTransportTest, InvokeTxCallbacksSingleByte) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(1); auto buf = buildRandomInputData(1);
transport_->writeChain( transport_->writeChain(stream, buf->clone(), false /* eof */);
stream, buf->clone(), false /* eof */, false /* cork */);
transport_->registerTxCallback(stream, 0, &firstByteTxCb); transport_->registerTxCallback(stream, 0, &firstByteTxCb);
transport_->registerTxCallback(stream, 0, &lastByteTxCb); transport_->registerTxCallback(stream, 0, &lastByteTxCb);
transport_->registerTxCallback(stream, 1, &pastlastByteTxCb); transport_->registerTxCallback(stream, 1, &pastlastByteTxCb);
@@ -2263,8 +2234,7 @@ TEST_F(QuicTransportTest, InvokeTxCallbacksSingleByteWithFin) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(1); auto buf = buildRandomInputData(1);
transport_->writeChain( transport_->writeChain(stream, buf->clone(), true /* eof */);
stream, buf->clone(), true /* eof */, false /* cork */);
transport_->registerTxCallback(stream, 0, &firstByteTxCb); transport_->registerTxCallback(stream, 0, &firstByteTxCb);
transport_->registerTxCallback(stream, 0, &lastByteTxCb); transport_->registerTxCallback(stream, 0, &lastByteTxCb);
transport_->registerTxCallback(stream, 1, &finTxCb); transport_->registerTxCallback(stream, 1, &finTxCb);
@@ -2311,8 +2281,7 @@ TEST_F(QuicTransportTest, InvokeTxCallbacksMultipleBytes) {
auto buf = buildRandomInputData(streamBytes); auto buf = buildRandomInputData(streamBytes);
CHECK_EQ(streamBytes, buf->length()); CHECK_EQ(streamBytes, buf->length());
transport_->writeChain( transport_->writeChain(stream, buf->clone(), false /* eof */);
stream, buf->clone(), false /* eof */, false /* cork */);
transport_->registerTxCallback(stream, 0, &firstByteTxCb); transport_->registerTxCallback(stream, 0, &firstByteTxCb);
transport_->registerTxCallback(stream, lastByte, &lastByteTxCb); transport_->registerTxCallback(stream, lastByte, &lastByteTxCb);
transport_->registerTxCallback(stream, lastByte + 1, &pastlastByteTxCb); transport_->registerTxCallback(stream, lastByte + 1, &pastlastByteTxCb);
@@ -2360,8 +2329,7 @@ TEST_F(QuicTransportTest, InvokeTxCallbacksMultipleBytesWriteRateLimited) {
const uint64_t lastByte = streamBytes - 1; const uint64_t lastByte = streamBytes - 1;
auto buf = buildRandomInputData(streamBytes); auto buf = buildRandomInputData(streamBytes);
CHECK_EQ(streamBytes, buf->length()); CHECK_EQ(streamBytes, buf->length());
transport_->writeChain( transport_->writeChain(stream, buf->clone(), false /* eof */);
stream, buf->clone(), false /* eof */, false /* cork */);
transport_->registerTxCallback(stream, 0, &firstByteTxCb); transport_->registerTxCallback(stream, 0, &firstByteTxCb);
transport_->registerTxCallback( transport_->registerTxCallback(
stream, kDefaultUDPSendPacketLen * 2, &secondPacketByteOffsetTxCb); stream, kDefaultUDPSendPacketLen * 2, &secondPacketByteOffsetTxCb);
@@ -2413,8 +2381,7 @@ TEST_F(QuicTransportTest, InvokeTxCallbacksMultipleBytesMultipleWrites) {
// call writeChain, writing 10 bytes // call writeChain, writing 10 bytes
{ {
auto buf = buildRandomInputData(10); auto buf = buildRandomInputData(10);
transport_->writeChain( transport_->writeChain(stream, buf->clone(), false /* eof */);
stream, buf->clone(), false /* eof */, false /* cork */);
} }
transport_->registerTxCallback(stream, 0, &txCb1); transport_->registerTxCallback(stream, 0, &txCb1);
EXPECT_CALL(txCb1, onByteEvent(getTxMatcher(stream, 0))).Times(1); EXPECT_CALL(txCb1, onByteEvent(getTxMatcher(stream, 0))).Times(1);
@@ -2424,8 +2391,7 @@ TEST_F(QuicTransportTest, InvokeTxCallbacksMultipleBytesMultipleWrites) {
// call writeChain and write another 10 bytes // call writeChain and write another 10 bytes
{ {
auto buf = buildRandomInputData(10); auto buf = buildRandomInputData(10);
transport_->writeChain( transport_->writeChain(stream, buf->clone(), false /* eof */);
stream, buf->clone(), false /* eof */, false /* cork */);
} }
transport_->registerTxCallback(stream, 10, &txCb2); transport_->registerTxCallback(stream, 10, &txCb2);
EXPECT_CALL(txCb2, onByteEvent(getTxMatcher(stream, 10))).Times(1); EXPECT_CALL(txCb2, onByteEvent(getTxMatcher(stream, 10))).Times(1);
@@ -2435,8 +2401,7 @@ TEST_F(QuicTransportTest, InvokeTxCallbacksMultipleBytesMultipleWrites) {
// write the fin // write the fin
{ {
auto buf = buildRandomInputData(0); auto buf = buildRandomInputData(0);
transport_->writeChain( transport_->writeChain(stream, buf->clone(), true /* eof */);
stream, buf->clone(), true /* eof */, false /* cork */);
} }
transport_->registerTxCallback(stream, 20, &txCb3); transport_->registerTxCallback(stream, 20, &txCb3);
EXPECT_CALL(txCb3, onByteEvent(getTxMatcher(stream, 20))).Times(1); EXPECT_CALL(txCb3, onByteEvent(getTxMatcher(stream, 20))).Times(1);
@@ -2463,8 +2428,7 @@ TEST_F(
// call writeChain, writing 10 bytes // call writeChain, writing 10 bytes
{ {
auto buf = buildRandomInputData(10); auto buf = buildRandomInputData(10);
transport_->writeChain( transport_->writeChain(stream, buf->clone(), false /* eof */, &deliveryCb1);
stream, buf->clone(), false /* eof */, false /* cork */, &deliveryCb1);
} }
transport_->registerTxCallback(stream, 0, &txCb1); transport_->registerTxCallback(stream, 0, &txCb1);
EXPECT_CALL(txCb1, onByteEvent(getTxMatcher(stream, 0))).Times(1); EXPECT_CALL(txCb1, onByteEvent(getTxMatcher(stream, 0))).Times(1);
@@ -2474,8 +2438,7 @@ TEST_F(
// call writeChain and write another 10 bytes // call writeChain and write another 10 bytes
{ {
auto buf = buildRandomInputData(10); auto buf = buildRandomInputData(10);
transport_->writeChain( transport_->writeChain(stream, buf->clone(), false /* eof */, &deliveryCb2);
stream, buf->clone(), false /* eof */, false /* cork */, &deliveryCb2);
} }
transport_->registerTxCallback(stream, 10, &txCb2); transport_->registerTxCallback(stream, 10, &txCb2);
EXPECT_CALL(txCb2, onByteEvent(getTxMatcher(stream, 10))).Times(1); EXPECT_CALL(txCb2, onByteEvent(getTxMatcher(stream, 10))).Times(1);
@@ -2485,8 +2448,7 @@ TEST_F(
// write the fin // write the fin
{ {
auto buf = buildRandomInputData(0); auto buf = buildRandomInputData(0);
transport_->writeChain( transport_->writeChain(stream, buf->clone(), true /* eof */, &deliveryCb3);
stream, buf->clone(), true /* eof */, false /* cork */, &deliveryCb3);
} }
transport_->registerTxCallback(stream, 20, &txCb3); transport_->registerTxCallback(stream, 20, &txCb3);
EXPECT_CALL(txCb3, onByteEvent(getTxMatcher(stream, 20))).Times(1); EXPECT_CALL(txCb3, onByteEvent(getTxMatcher(stream, 20))).Times(1);
@@ -2994,7 +2956,7 @@ TEST_F(QuicTransportTest, CloseTransportCancelsAckTimeout) {
auto buf = buildRandomInputData(kDefaultUDPSendPacketLen + 20); auto buf = buildRandomInputData(kDefaultUDPSendPacketLen + 20);
folly::IOBuf passedIn; folly::IOBuf passedIn;
EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength)); EXPECT_CALL(*socket_, write(_, _)).WillRepeatedly(Invoke(bufLength));
transport_->writeChain(stream, buf->clone(), false, false); transport_->writeChain(stream, buf->clone(), false);
loopForWrites(); loopForWrites();
transport_->scheduleLossTimeout(500ms); transport_->scheduleLossTimeout(500ms);
EXPECT_TRUE(transport_->isLossTimeoutScheduled()); EXPECT_TRUE(transport_->isLossTimeoutScheduled());
@@ -3070,7 +3032,7 @@ TEST_F(QuicTransportTest, PacingWillBurstFirst) {
auto buf = buildRandomInputData(200); auto buf = buildRandomInputData(200);
auto streamId = transport_->createBidirectionalStream().value(); auto streamId = transport_->createBidirectionalStream().value();
transport_->writeChain(streamId, buf->clone(), false, false); transport_->writeChain(streamId, buf->clone(), false);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0));
EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_)) EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_))
.WillRepeatedly(Return(1)); .WillRepeatedly(Return(1));
@@ -3095,7 +3057,7 @@ TEST_F(QuicTransportTest, AlreadyScheduledPacingNoWrite) {
auto buf = buildRandomInputData(200); auto buf = buildRandomInputData(200);
auto streamId = transport_->createBidirectionalStream().value(); auto streamId = transport_->createBidirectionalStream().value();
transport_->writeChain(streamId, buf->clone(), false, false); transport_->writeChain(streamId, buf->clone(), false);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0));
EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_)) EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_))
.WillRepeatedly(Return(1)); .WillRepeatedly(Return(1));
@@ -3127,7 +3089,7 @@ TEST_F(QuicTransportTest, NoScheduleIfNoNewData) {
auto buf = buildRandomInputData(200); auto buf = buildRandomInputData(200);
auto streamId = transport_->createBidirectionalStream().value(); auto streamId = transport_->createBidirectionalStream().value();
transport_->writeChain(streamId, buf->clone(), false, false); transport_->writeChain(streamId, buf->clone(), false);
EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0)); EXPECT_CALL(*socket_, write(_, _)).WillOnce(Return(0));
EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_)) EXPECT_CALL(*rawPacer, updateAndGetWriteBatchSize(_))
.WillRepeatedly(Return(1)); .WillRepeatedly(Return(1));
@@ -3158,8 +3120,7 @@ TEST_F(QuicTransportTest, GetStreamPackestTxedSingleByte) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(1); auto buf = buildRandomInputData(1);
transport_->writeChain( transport_->writeChain(stream, buf->clone(), false /* eof */);
stream, buf->clone(), false /* eof */, false /* cork */);
transport_->registerTxCallback(stream, 0, &firstByteTxCb); transport_->registerTxCallback(stream, 0, &firstByteTxCb);
// when first byte TX callback gets invoked, numPacketsTxWithNewData should be // when first byte TX callback gets invoked, numPacketsTxWithNewData should be
@@ -3184,8 +3145,7 @@ TEST_F(QuicTransportTest, GetStreamPacketsTxedMultipleBytes) {
auto buf = buildRandomInputData(streamBytes); auto buf = buildRandomInputData(streamBytes);
CHECK_EQ(streamBytes, buf->length()); CHECK_EQ(streamBytes, buf->length());
transport_->writeChain( transport_->writeChain(stream, buf->clone(), false /* eof */);
stream, buf->clone(), false /* eof */, false /* cork */);
transport_->registerTxCallback(stream, 0, &firstByteTxCb); transport_->registerTxCallback(stream, 0, &firstByteTxCb);
transport_->registerTxCallback(stream, lastByte, &lastByteTxCb); transport_->registerTxCallback(stream, lastByte, &lastByteTxCb);
@@ -3228,8 +3188,7 @@ TEST_F(QuicTransportTest, GetStreamPacketsTxedMultiplePackets) {
auto stream = transport_->createBidirectionalStream().value(); auto stream = transport_->createBidirectionalStream().value();
auto buf = buildRandomInputData(streamBytes); auto buf = buildRandomInputData(streamBytes);
CHECK_EQ(streamBytes, buf->length()); CHECK_EQ(streamBytes, buf->length());
transport_->writeChain( transport_->writeChain(stream, buf->clone(), false /* eof */);
stream, buf->clone(), false /* eof */, false /* cork */);
transport_->registerTxCallback(stream, 0, &firstByteTxCb); transport_->registerTxCallback(stream, 0, &firstByteTxCb);
transport_->registerTxCallback( transport_->registerTxCallback(
stream, firstPacketNearTailByte, &firstPacketNearTailByteTxCb); stream, firstPacketNearTailByte, &firstPacketNearTailByteTxCb);

View File

@@ -414,7 +414,7 @@ QuicClientTransportIntegrationTest::sendRequestAndResponse(
StreamId streamId, StreamId streamId,
MockReadCallback* readCallback) { MockReadCallback* readCallback) {
client->setReadCallback(streamId, readCallback); client->setReadCallback(streamId, readCallback);
client->writeChain(streamId, data->clone(), true, false); client->writeChain(streamId, data->clone(), true);
auto streamData = new StreamData(streamId); auto streamData = new StreamData(streamId);
auto dataCopy = std::shared_ptr<folly::IOBuf>(std::move(data)); auto dataCopy = std::shared_ptr<folly::IOBuf>(std::move(data));
EXPECT_CALL(*readCallback, readAvailable(streamId)) EXPECT_CALL(*readCallback, readAvailable(streamId))
@@ -3545,7 +3545,7 @@ TEST_F(QuicClientTransportAfterStartTest, CloseConnectionWithStreamPending) {
client->getNonConstConn().qLogger = qLogger; client->getNonConstConn().qLogger = qLogger;
auto expected = IOBuf::copyBuffer("hello"); auto expected = IOBuf::copyBuffer("hello");
client->setReadCallback(streamId, &readCb); client->setReadCallback(streamId, &readCb);
client->writeChain(streamId, expected->clone(), true, false); client->writeChain(streamId, expected->clone(), true);
loopForWrites(); loopForWrites();
// ack all the packets // ack all the packets
ASSERT_FALSE(client->getConn().outstandings.packets.empty()); ASSERT_FALSE(client->getConn().outstandings.packets.empty());
@@ -3618,7 +3618,7 @@ TEST_F(QuicClientTransportAfterStartTest, CloseConnectionWithNoStreamPending) {
auto expected = IOBuf::copyBuffer("hello"); auto expected = IOBuf::copyBuffer("hello");
client->setReadCallback(streamId, &readCb); client->setReadCallback(streamId, &readCb);
client->writeChain(streamId, expected->clone(), true, false); client->writeChain(streamId, expected->clone(), true);
loopForWrites(); loopForWrites();
@@ -3680,7 +3680,7 @@ TEST_P(
client->getNonConstConn().qLogger = qLogger; client->getNonConstConn().qLogger = qLogger;
auto expected = IOBuf::copyBuffer("hello"); auto expected = IOBuf::copyBuffer("hello");
client->setReadCallback(streamId, &readCb); client->setReadCallback(streamId, &readCb);
client->writeChain(streamId, expected->clone(), true, false); client->writeChain(streamId, expected->clone(), true);
loopForWrites(); loopForWrites();
socketWrites.clear(); socketWrites.clear();
@@ -3809,7 +3809,7 @@ TEST_F(QuicClientTransportAfterStartTest, RecvOneRttAck) {
auto expected = IOBuf::copyBuffer("hello"); auto expected = IOBuf::copyBuffer("hello");
client->setReadCallback(streamId, &readCb); client->setReadCallback(streamId, &readCb);
client->writeChain(streamId, expected->clone(), true, false); client->writeChain(streamId, expected->clone(), true);
loopForWrites(); loopForWrites();
AckBlocks sentPackets; AckBlocks sentPackets;
@@ -3838,7 +3838,7 @@ TEST_P(QuicClientTransportAfterStartTestClose, CloseConnectionWithError) {
auto expected = IOBuf::copyBuffer("hello"); auto expected = IOBuf::copyBuffer("hello");
client->setReadCallback(streamId, &readCb); client->setReadCallback(streamId, &readCb);
client->writeChain(streamId, expected->clone(), true, false); client->writeChain(streamId, expected->clone(), true);
loopForWrites(); loopForWrites();
auto packet = packetToBuf(createStreamPacket( auto packet = packetToBuf(createStreamPacket(
*serverChosenConnId /* src */, *serverChosenConnId /* src */,
@@ -4026,7 +4026,7 @@ TEST_F(QuicClientTransportAfterStartTest, IdleTimerNotResetOnWritingOldData) {
auto expected = IOBuf::copyBuffer("hello"); auto expected = IOBuf::copyBuffer("hello");
client->idleTimeout().cancelTimeout(); client->idleTimeout().cancelTimeout();
ASSERT_FALSE(client->idleTimeout().isScheduled()); ASSERT_FALSE(client->idleTimeout().isScheduled());
client->writeChain(streamId, expected->clone(), false, false); client->writeChain(streamId, expected->clone(), false);
loopForWrites(); loopForWrites();
ASSERT_FALSE(client->getConn().receivedNewPacketBeforeWrite); ASSERT_FALSE(client->getConn().receivedNewPacketBeforeWrite);
@@ -4056,7 +4056,7 @@ TEST_F(QuicClientTransportAfterStartTest, IdleTimerResetNoOutstandingPackets) {
client->idleTimeout().cancelTimeout(); client->idleTimeout().cancelTimeout();
auto streamId = client->createBidirectionalStream().value(); auto streamId = client->createBidirectionalStream().value();
auto expected = folly::IOBuf::copyBuffer("hello"); auto expected = folly::IOBuf::copyBuffer("hello");
client->writeChain(streamId, expected->clone(), false, false); client->writeChain(streamId, expected->clone(), false);
loopForWrites(); loopForWrites();
ASSERT_TRUE(client->idleTimeout().isScheduled()); ASSERT_TRUE(client->idleTimeout().isScheduled());
} }
@@ -4292,7 +4292,7 @@ TEST_F(
AckBlocks sentPackets; AckBlocks sentPackets;
auto writeData = IOBuf::copyBuffer("some data"); auto writeData = IOBuf::copyBuffer("some data");
client->writeChain(streamId, writeData->clone(), true, false); client->writeChain(streamId, writeData->clone(), true);
loopForWrites(); loopForWrites();
verifyShortPackets(sentPackets); verifyShortPackets(sentPackets);
@@ -4316,7 +4316,7 @@ TEST_F(QuicClientTransportAfterStartTest, StreamClosedIfReadCallbackNull) {
AckBlocks sentPackets; AckBlocks sentPackets;
auto writeData = IOBuf::copyBuffer("some data"); auto writeData = IOBuf::copyBuffer("some data");
client->writeChain(streamId, writeData->clone(), true, false); client->writeChain(streamId, writeData->clone(), true);
loopForWrites(); loopForWrites();
verifyShortPackets(sentPackets); verifyShortPackets(sentPackets);
@@ -4353,7 +4353,7 @@ TEST_F(QuicClientTransportAfterStartTest, ReceiveAckInvokesDeliveryCallback) {
client->registerDeliveryCallback(streamId, 0, &deliveryCallback); client->registerDeliveryCallback(streamId, 0, &deliveryCallback);
auto data = IOBuf::copyBuffer("some data"); auto data = IOBuf::copyBuffer("some data");
client->writeChain(streamId, data->clone(), true, false); client->writeChain(streamId, data->clone(), true);
loopForWrites(); loopForWrites();
verifyShortPackets(sentPackets); verifyShortPackets(sentPackets);
@@ -4376,7 +4376,7 @@ TEST_F(QuicClientTransportAfterStartTest, InvokesDeliveryCallbackFinOnly) {
client->createBidirectionalStream(false /* replaySafe */).value(); client->createBidirectionalStream(false /* replaySafe */).value();
auto data = IOBuf::copyBuffer("some data"); auto data = IOBuf::copyBuffer("some data");
client->writeChain(streamId, nullptr, true, false, &deliveryCallback); client->writeChain(streamId, nullptr, true, &deliveryCallback);
loopForWrites(); loopForWrites();
verifyShortPackets(sentPackets); verifyShortPackets(sentPackets);
@@ -4403,7 +4403,7 @@ TEST_F(
client->createBidirectionalStream(false /* replaySafe */).value(); client->createBidirectionalStream(false /* replaySafe */).value();
auto data = IOBuf::copyBuffer("some data"); auto data = IOBuf::copyBuffer("some data");
client->writeChain(streamId, data->clone(), true, false); client->writeChain(streamId, data->clone(), true);
loopForWrites(); loopForWrites();
verifyShortPackets(sentPackets); verifyShortPackets(sentPackets);
@@ -4432,7 +4432,7 @@ TEST_F(QuicClientTransportAfterStartTest, DeliveryCallbackFromWriteChain) {
// Write 10 bytes of data, and write EOF on an empty stream. So EOF offset is // Write 10 bytes of data, and write EOF on an empty stream. So EOF offset is
// 10 // 10
auto data = test::buildRandomInputData(10); auto data = test::buildRandomInputData(10);
client->writeChain(streamId, data->clone(), true, false, &deliveryCallback); client->writeChain(streamId, data->clone(), true, &deliveryCallback);
loopForWrites(); loopForWrites();
verifyShortPackets(sentPackets); verifyShortPackets(sentPackets);
@@ -4525,7 +4525,7 @@ TEST_F(QuicClientTransportVersionAndRetryTest, RetryPacket) {
StreamId streamId = *client->createBidirectionalStream(); StreamId streamId = *client->createBidirectionalStream();
auto write = IOBuf::copyBuffer("ice cream"); auto write = IOBuf::copyBuffer("ice cream");
client->writeChain(streamId, write->clone(), true, false, nullptr); client->writeChain(streamId, write->clone(), true, nullptr);
loopForWrites(); loopForWrites();
std::unique_ptr<IOBuf> bytesWrittenToNetwork = nullptr; std::unique_ptr<IOBuf> bytesWrittenToNetwork = nullptr;
@@ -4571,7 +4571,7 @@ TEST_F(
client->setReadCallback(streamId, &readCb); client->setReadCallback(streamId, &readCb);
auto write = IOBuf::copyBuffer("no"); auto write = IOBuf::copyBuffer("no");
client->writeChain(streamId, write->clone(), true, false, &deliveryCallback); client->writeChain(streamId, write->clone(), true, &deliveryCallback);
loopForWrites(); loopForWrites();
auto packet = VersionNegotiationPacketBuilder( auto packet = VersionNegotiationPacketBuilder(
*client->getConn().initialDestinationConnectionId, *client->getConn().initialDestinationConnectionId,
@@ -4598,7 +4598,7 @@ TEST_F(
client->setReadCallback(streamId, &readCb); client->setReadCallback(streamId, &readCb);
auto write = IOBuf::copyBuffer("no"); auto write = IOBuf::copyBuffer("no");
client->writeChain(streamId, write->clone(), true, false, &deliveryCallback); client->writeChain(streamId, write->clone(), true, &deliveryCallback);
loopForWrites(); loopForWrites();
auto packet = VersionNegotiationPacketBuilder( auto packet = VersionNegotiationPacketBuilder(
@@ -4778,7 +4778,7 @@ TEST_F(QuicClientTransportAfterStartTest, ResetClearsPendingLoss) {
SCOPE_EXIT { SCOPE_EXIT {
client->close(folly::none); client->close(folly::none);
}; };
client->writeChain(streamId, IOBuf::copyBuffer("hello"), true, false); client->writeChain(streamId, IOBuf::copyBuffer("hello"), true);
loopForWrites(); loopForWrites();
ASSERT_FALSE(client->getConn().outstandings.packets.empty()); ASSERT_FALSE(client->getConn().outstandings.packets.empty());
@@ -4798,7 +4798,7 @@ TEST_F(QuicClientTransportAfterStartTest, LossAfterResetStream) {
SCOPE_EXIT { SCOPE_EXIT {
client->close(folly::none); client->close(folly::none);
}; };
client->writeChain(streamId, IOBuf::copyBuffer("hello"), true, false); client->writeChain(streamId, IOBuf::copyBuffer("hello"), true);
loopForWrites(); loopForWrites();
ASSERT_FALSE(client->getConn().outstandings.packets.empty()); ASSERT_FALSE(client->getConn().outstandings.packets.empty());
@@ -4820,7 +4820,7 @@ TEST_F(QuicClientTransportAfterStartTest, SendResetAfterEom) {
client->setReadCallback(streamId, &readCb); client->setReadCallback(streamId, &readCb);
client->registerDeliveryCallback(streamId, 100, &deliveryCallback); client->registerDeliveryCallback(streamId, 100, &deliveryCallback);
EXPECT_CALL(deliveryCallback, onCanceled(streamId, 100)); EXPECT_CALL(deliveryCallback, onCanceled(streamId, 100));
client->writeChain(streamId, IOBuf::copyBuffer("hello"), true, false); client->writeChain(streamId, IOBuf::copyBuffer("hello"), true);
client->resetStream(streamId, GenericApplicationErrorCode::UNKNOWN); client->resetStream(streamId, GenericApplicationErrorCode::UNKNOWN);
loopForWrites(); loopForWrites();
@@ -4849,7 +4849,7 @@ TEST_F(QuicClientTransportAfterStartTest, HalfClosedLocalToClosed) {
StreamId streamId = client->createBidirectionalStream().value(); StreamId streamId = client->createBidirectionalStream().value();
client->setReadCallback(streamId, &readCb); client->setReadCallback(streamId, &readCb);
auto data = test::buildRandomInputData(10); auto data = test::buildRandomInputData(10);
client->writeChain(streamId, data->clone(), true, false, &deliveryCallback); client->writeChain(streamId, data->clone(), true, &deliveryCallback);
loopForWrites(); loopForWrites();
verifyShortPackets(sentPackets); verifyShortPackets(sentPackets);
@@ -4899,8 +4899,8 @@ TEST_F(QuicClientTransportAfterStartTest, SendResetSyncOnAck) {
NiceMock<MockDeliveryCallback> deliveryCallback2; NiceMock<MockDeliveryCallback> deliveryCallback2;
auto data = IOBuf::copyBuffer("hello"); auto data = IOBuf::copyBuffer("hello");
client->writeChain(streamId, data->clone(), true, false, &deliveryCallback); client->writeChain(streamId, data->clone(), true, &deliveryCallback);
client->writeChain(streamId2, data->clone(), true, false, &deliveryCallback2); client->writeChain(streamId2, data->clone(), true, &deliveryCallback2);
EXPECT_CALL(deliveryCallback, onDeliveryAck(streamId, _, _)) EXPECT_CALL(deliveryCallback, onDeliveryAck(streamId, _, _))
.WillOnce(Invoke([&](auto, auto, auto) { .WillOnce(Invoke([&](auto, auto, auto) {
@@ -4976,7 +4976,7 @@ TEST_F(QuicClientTransportAfterStartTest, HalfClosedRemoteToClosed) {
EXPECT_EQ(conn.streamManager->readableStreams().count(streamId), 0); EXPECT_EQ(conn.streamManager->readableStreams().count(streamId), 0);
AckBlocks sentPackets; AckBlocks sentPackets;
client->writeChain(streamId, data->clone(), true, false, &deliveryCallback); client->writeChain(streamId, data->clone(), true, &deliveryCallback);
loopForWrites(); loopForWrites();
verifyShortPackets(sentPackets); verifyShortPackets(sentPackets);
@@ -5097,7 +5097,7 @@ TEST_F(QuicClientTransportAfterStartTest, DestroyWithoutClosing) {
EXPECT_CALL(clientConnCallback, onConnectionEnd()); EXPECT_CALL(clientConnCallback, onConnectionEnd());
auto write = IOBuf::copyBuffer("no"); auto write = IOBuf::copyBuffer("no");
client->writeChain(streamId, write->clone(), true, false, &deliveryCallback); client->writeChain(streamId, write->clone(), true, &deliveryCallback);
loopForWrites(); loopForWrites();
EXPECT_CALL(deliveryCallback, onCanceled(_, _)); EXPECT_CALL(deliveryCallback, onCanceled(_, _));
@@ -5110,7 +5110,7 @@ TEST_F(QuicClientTransportAfterStartTest, DestroyWhileDraining) {
client->setReadCallback(streamId, &readCb); client->setReadCallback(streamId, &readCb);
auto write = IOBuf::copyBuffer("no"); auto write = IOBuf::copyBuffer("no");
client->writeChain(streamId, write->clone(), true, false, &deliveryCallback); client->writeChain(streamId, write->clone(), true, &deliveryCallback);
loopForWrites(); loopForWrites();
EXPECT_CALL(clientConnCallback, onConnectionError(_)).Times(0); EXPECT_CALL(clientConnCallback, onConnectionError(_)).Times(0);
@@ -5163,7 +5163,7 @@ TEST_F(QuicClientTransportAfterStartTest, DestroyEvbWhileLossTimeoutActive) {
client->setReadCallback(streamId, &readCb); client->setReadCallback(streamId, &readCb);
auto write = IOBuf::copyBuffer("no"); auto write = IOBuf::copyBuffer("no");
client->writeChain(streamId, write->clone(), true, false); client->writeChain(streamId, write->clone(), true);
loopForWrites(); loopForWrites();
EXPECT_TRUE(client->lossTimeout().isScheduled()); EXPECT_TRUE(client->lossTimeout().isScheduled());
eventbase_.reset(); eventbase_.reset();
@@ -5440,7 +5440,7 @@ TEST_F(QuicZeroRttClientTest, TestReplaySafeCallback) {
socketWrites.clear(); socketWrites.clear();
auto streamId = client->createBidirectionalStream().value(); auto streamId = client->createBidirectionalStream().value();
client->writeChain(streamId, IOBuf::copyBuffer("hello"), true, false); client->writeChain(streamId, IOBuf::copyBuffer("hello"), true);
loopForWrites(); loopForWrites();
EXPECT_TRUE(zeroRttPacketsOutstanding()); EXPECT_TRUE(zeroRttPacketsOutstanding());
assertWritten(false, LongHeader::Types::ZeroRtt); assertWritten(false, LongHeader::Types::ZeroRtt);
@@ -5511,7 +5511,7 @@ TEST_F(QuicZeroRttClientTest, TestZeroRttRejection) {
socketWrites.clear(); socketWrites.clear();
auto streamId = client->createBidirectionalStream().value(); auto streamId = client->createBidirectionalStream().value();
client->writeChain(streamId, IOBuf::copyBuffer("hello"), true, false); client->writeChain(streamId, IOBuf::copyBuffer("hello"), true);
loopForWrites(); loopForWrites();
EXPECT_TRUE(zeroRttPacketsOutstanding()); EXPECT_TRUE(zeroRttPacketsOutstanding());
EXPECT_CALL(clientConnCallback, onReplaySafe()); EXPECT_CALL(clientConnCallback, onReplaySafe());
@@ -5563,7 +5563,7 @@ TEST_F(QuicZeroRttClientTest, TestZeroRttRejectionWithSmallerFlowControl) {
mockClientHandshake->maxInitialStreamData = 10; mockClientHandshake->maxInitialStreamData = 10;
socketWrites.clear(); socketWrites.clear();
auto streamId = client->createBidirectionalStream().value(); auto streamId = client->createBidirectionalStream().value();
client->writeChain(streamId, IOBuf::copyBuffer("hello"), true, false); client->writeChain(streamId, IOBuf::copyBuffer("hello"), true);
loopForWrites(); loopForWrites();
EXPECT_TRUE(zeroRttPacketsOutstanding()); EXPECT_TRUE(zeroRttPacketsOutstanding());
mockClientHandshake->setZeroRttRejected(true); mockClientHandshake->setZeroRttRejected(true);
@@ -5651,7 +5651,7 @@ TEST_F(
client->happyEyeballsConnAttemptDelayTimeout().cancelTimeout(); client->happyEyeballsConnAttemptDelayTimeout().cancelTimeout();
auto streamId = client->createBidirectionalStream().value(); auto streamId = client->createBidirectionalStream().value();
client->writeChain(streamId, IOBuf::copyBuffer("hello"), true, false); client->writeChain(streamId, IOBuf::copyBuffer("hello"), true);
loopForWrites(); loopForWrites();
EXPECT_TRUE(zeroRttPacketsOutstanding()); EXPECT_TRUE(zeroRttPacketsOutstanding());
assertWritten(false, LongHeader::Types::ZeroRtt); assertWritten(false, LongHeader::Types::ZeroRtt);

View File

@@ -169,7 +169,7 @@ class EchoClient : public quic::QuicSocket::ConnectionCallback,
private: private:
void sendMessage(quic::StreamId id, BufQueue& data) { void sendMessage(quic::StreamId id, BufQueue& data) {
auto message = data.move(); auto message = data.move();
auto res = quicClient_->writeChain(id, message->clone(), true, false); auto res = quicClient_->writeChain(id, message->clone(), true);
if (res.hasError()) { if (res.hasError()) {
LOG(ERROR) << "EchoClient writeChain error=" << uint32_t(res.error()); LOG(ERROR) << "EchoClient writeChain error=" << uint32_t(res.error());
} else { } else {

View File

@@ -101,8 +101,7 @@ class EchoHandler : public quic::QuicSocket::ConnectionCallback,
} }
auto echoedData = folly::IOBuf::copyBuffer("echo "); auto echoedData = folly::IOBuf::copyBuffer("echo ");
echoedData->prependChain(data.first.move()); echoedData->prependChain(data.first.move());
auto res = auto res = sock->writeChain(id, std::move(echoedData), true, nullptr);
sock->writeChain(id, std::move(echoedData), true, false, nullptr);
if (res.hasError()) { if (res.hasError()) {
LOG(ERROR) << "write error=" << toString(res.error()); LOG(ERROR) << "write error=" << toString(res.error());
} else { } else {
@@ -141,7 +140,7 @@ class EchoHandler : public quic::QuicSocket::ConnectionCallback,
originalData.splitAtMost(toSplit); originalData.splitAtMost(toSplit);
auto res = sock->writeChain(id, originalData.move(), true, false, nullptr); auto res = sock->writeChain(id, originalData.move(), true, nullptr);
if (res.hasError()) { if (res.hasError()) {
LOG(ERROR) << "write error=" << toString(res.error()); LOG(ERROR) << "write error=" << toString(res.error());
} else { } else {

View File

@@ -859,7 +859,6 @@ TEST_F(QuicServerTransportTest, IdleTimerNotResetWhenDataOutstanding) {
server->writeChain( server->writeChain(
streamId, streamId,
IOBuf::copyBuffer("And if the darkness is to keep us apart"), IOBuf::copyBuffer("And if the darkness is to keep us apart"),
false,
false); false);
loopForWrites(); loopForWrites();
// It was the first packet // It was the first packet
@@ -871,7 +870,6 @@ TEST_F(QuicServerTransportTest, IdleTimerNotResetWhenDataOutstanding) {
server->writeChain( server->writeChain(
streamId, streamId,
IOBuf::copyBuffer("And if the daylight feels like it's a long way off"), IOBuf::copyBuffer("And if the daylight feels like it's a long way off"),
false,
false); false);
loopForWrites(); loopForWrites();
EXPECT_FALSE(server->idleTimeout().isScheduled()); EXPECT_FALSE(server->idleTimeout().isScheduled());
@@ -1005,7 +1003,7 @@ TEST_F(QuicServerTransportTest, TestClientAddressChanges) {
TEST_F(QuicServerTransportTest, TestCloseConnectionWithNoErrorPendingStreams) { TEST_F(QuicServerTransportTest, TestCloseConnectionWithNoErrorPendingStreams) {
auto streamId = server->createBidirectionalStream().value(); auto streamId = server->createBidirectionalStream().value();
server->writeChain(streamId, IOBuf::copyBuffer("hello"), true, false); server->writeChain(streamId, IOBuf::copyBuffer("hello"), true);
loopForWrites(); loopForWrites();
AckBlocks acks; AckBlocks acks;
@@ -1214,10 +1212,10 @@ TEST_F(QuicServerTransportTest, TestOpenAckStreamFrame) {
server->getNonConstConn().outstandings.packets.clear(); server->getNonConstConn().outstandings.packets.clear();
server->getNonConstConn().outstandings.initialPacketsCount = 0; server->getNonConstConn().outstandings.initialPacketsCount = 0;
server->getNonConstConn().outstandings.handshakePacketsCount = 0; server->getNonConstConn().outstandings.handshakePacketsCount = 0;
server->writeChain(streamId, data->clone(), false, false); server->writeChain(streamId, data->clone(), false);
loopForWrites(); loopForWrites();
server->writeChain(streamId, data->clone(), false, false); server->writeChain(streamId, data->clone(), false);
server->writeChain(streamId, data->clone(), false, false); server->writeChain(streamId, data->clone(), false);
loopForWrites(); loopForWrites();
auto stream = server->getNonConstConn().streamManager->getStream(streamId); auto stream = server->getNonConstConn().streamManager->getStream(streamId);
@@ -1300,7 +1298,7 @@ TEST_F(QuicServerTransportTest, TestOpenAckStreamFrame) {
EXPECT_EQ(stream->recvState, StreamRecvState::Open); EXPECT_EQ(stream->recvState, StreamRecvState::Open);
auto empty = IOBuf::create(0); auto empty = IOBuf::create(0);
server->writeChain(streamId, std::move(empty), true, false); server->writeChain(streamId, std::move(empty), true);
loopForWrites(); loopForWrites();
ASSERT_FALSE(server->getConn().outstandings.packets.empty()); ASSERT_FALSE(server->getConn().outstandings.packets.empty());
@@ -2070,7 +2068,7 @@ TEST_F(QuicServerTransportTest, DestroyWithoutClosing) {
EXPECT_CALL(connCallback, onConnectionEnd()).Times(0); EXPECT_CALL(connCallback, onConnectionEnd()).Times(0);
MockDeliveryCallback deliveryCallback; MockDeliveryCallback deliveryCallback;
auto write = IOBuf::copyBuffer("no"); auto write = IOBuf::copyBuffer("no");
server->writeChain(streamId, write->clone(), true, false, &deliveryCallback); server->writeChain(streamId, write->clone(), true, &deliveryCallback);
EXPECT_CALL(deliveryCallback, onCanceled(_, _)); EXPECT_CALL(deliveryCallback, onCanceled(_, _));
EXPECT_CALL(readCb, readError(_, _)); EXPECT_CALL(readCb, readError(_, _));
@@ -2087,7 +2085,7 @@ TEST_F(QuicServerTransportTest, DestroyWithoutClosingCancelByteEvents) {
EXPECT_CALL(connCallback, onConnectionError(_)).Times(0); EXPECT_CALL(connCallback, onConnectionError(_)).Times(0);
EXPECT_CALL(connCallback, onConnectionEnd()).Times(0); EXPECT_CALL(connCallback, onConnectionEnd()).Times(0);
auto write = IOBuf::copyBuffer("no"); auto write = IOBuf::copyBuffer("no");
server->writeChain(streamId, write->clone(), true, false); server->writeChain(streamId, write->clone(), true);
MockByteEventCallback txCallback; MockByteEventCallback txCallback;
MockByteEventCallback deliveryCallback; MockByteEventCallback deliveryCallback;
@@ -3093,7 +3091,7 @@ TEST_F(QuicServerTransportTest, ClientPortChangeNATRebinding) {
StreamId streamId = server->createBidirectionalStream().value(); StreamId streamId = server->createBidirectionalStream().value();
auto data1 = IOBuf::copyBuffer("Aloha"); auto data1 = IOBuf::copyBuffer("Aloha");
server->writeChain(streamId, data1->clone(), false, false); server->writeChain(streamId, data1->clone(), false);
loopForWrites(); loopForWrites();
PacketNum packetNum1 = PacketNum packetNum1 =
getFirstOutstandingPacket( getFirstOutstandingPacket(
@@ -3140,7 +3138,7 @@ TEST_F(QuicServerTransportTest, ClientAddressChangeNATRebinding) {
server->getNonConstConn().transportSettings.disableMigration = false; server->getNonConstConn().transportSettings.disableMigration = false;
StreamId streamId = server->createBidirectionalStream().value(); StreamId streamId = server->createBidirectionalStream().value();
auto data1 = IOBuf::copyBuffer("Aloha"); auto data1 = IOBuf::copyBuffer("Aloha");
server->writeChain(streamId, data1->clone(), false, false); server->writeChain(streamId, data1->clone(), false);
loopForWrites(); loopForWrites();
PacketNum packetNum1 = PacketNum packetNum1 =
getFirstOutstandingPacket( getFirstOutstandingPacket(
@@ -3663,7 +3661,7 @@ TEST_F(QuicUnencryptedServerTransportTest, TestWriteHandshakeAndZeroRtt) {
recvClientHello(); recvClientHello();
auto streamId = server->createBidirectionalStream().value(); auto streamId = server->createBidirectionalStream().value();
server->writeChain(streamId, IOBuf::copyBuffer("hello"), true, false); server->writeChain(streamId, IOBuf::copyBuffer("hello"), true);
loopForWrites(); loopForWrites();
auto clientCodec = makeClientEncryptedCodec(true); auto clientCodec = makeClientEncryptedCodec(true);

View File

@@ -48,7 +48,7 @@ TEST_F(QuicSocketTest, simple) {
EXPECT_CALL(*socket_, readNaked(3, _)) EXPECT_CALL(*socket_, readNaked(3, _))
.WillOnce(Return(readResult("hello world", true))); .WillOnce(Return(readResult("hello world", true)));
EXPECT_CALL(*socket_, writeChain(3, _, true, false, nullptr)) EXPECT_CALL(*socket_, writeChain(3, _, true, nullptr))
.WillOnce(Return(folly::unit)); .WillOnce(Return(folly::unit));
handler_.readAvailable(3); handler_.readAvailable(3);
} }
@@ -63,7 +63,7 @@ TEST_F(QuicSocketTest, multiple_reads) {
EXPECT_CALL(*socket_, readNaked(3, _)) EXPECT_CALL(*socket_, readNaked(3, _))
.WillOnce(Return(readResult("world", true))); .WillOnce(Return(readResult("world", true)));
EXPECT_CALL(*socket_, writeChain(3, _, true, false, nullptr)) EXPECT_CALL(*socket_, writeChain(3, _, true, nullptr))
.WillOnce(Return(folly::unit)); .WillOnce(Return(folly::unit));
handler_.readAvailable(3); handler_.readAvailable(3);
} }

View File

@@ -330,7 +330,7 @@ class ServerStreamHandler : public quic::QuicSocket::ConnectionCallback,
curBuf->append(curBuf->capacity()); curBuf->append(curBuf->capacity());
curBuf = curBuf->next(); curBuf = curBuf->next();
} while (curBuf != buf.get()); } while (curBuf != buf.get());
auto res = sock_->writeChain(id, std::move(buf), eof, true, nullptr); auto res = sock_->writeChain(id, std::move(buf), eof, nullptr);
if (res.hasError()) { if (res.hasError()) {
LOG(FATAL) << "Got error on write: " << quic::toString(res.error()); LOG(FATAL) << "Got error on write: " << quic::toString(res.error());
} }