1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

QuicStreamAsyncTransport fixes

Summary:
I was using this for hq-interop testing, and I discovered a couple bugs.

1) readCb_ may not be set initially, so only attempt an initial read if it's non-null

2) When this transport closes, we shouldn't close the underlying QUIC socket.  Instead we should attempt to write a FIN (if we haven't already).  If that doesn't immediately succeed (perhaps queued writes are blocked on flow control), send a reset.

Reviewed By: kvtsoy

Differential Revision: D40741000

fbshipit-source-id: f3f925b884ae30feac0d86cbca13084248566099
This commit is contained in:
Alan Frindell
2022-11-16 18:25:26 -08:00
committed by Facebook GitHub Bot
parent c4fbdf7eeb
commit 7be403c697
2 changed files with 202 additions and 98 deletions

View File

@@ -43,9 +43,10 @@ void QuicStreamAsyncTransport::setStreamId(quic::StreamId id) {
id_ = id; id_ = id;
// TODO: handle timeout for assigning stream id // TODO: handle timeout for assigning stream id
if (readCb_) {
sock_->setReadCallback(*id_, this); sock_->setReadCallback(*id_, this);
handleRead(); handleRead();
}
if (!writeCallbacks_.empty()) { if (!writeCallbacks_.empty()) {
// adjust offsets of buffered writes // adjust offsets of buffered writes
@@ -67,8 +68,7 @@ void QuicStreamAsyncTransport::setStreamId(quic::StreamId id) {
void QuicStreamAsyncTransport::destroy() { void QuicStreamAsyncTransport::destroy() {
if (state_ != CloseState::CLOSED) { if (state_ != CloseState::CLOSED) {
state_ = CloseState::CLOSED; closeNow();
sock_->closeNow(folly::none);
} }
// Then call DelayedDestruction::destroy() to take care of // Then call DelayedDestruction::destroy() to take care of
// whether or not we need immediate or delayed destruction // whether or not we need immediate or delayed destruction
@@ -207,12 +207,15 @@ void QuicStreamAsyncTransport::close() {
readEOF_ = EOFState::QUEUED; readEOF_ = EOFState::QUEUED;
handleRead(); handleRead();
} }
sock_->closeGracefully();
} }
void QuicStreamAsyncTransport::closeNow() { void QuicStreamAsyncTransport::closeNow() {
folly::AsyncSocketException ex( folly::AsyncSocketException ex(
folly::AsyncSocketException::UNKNOWN, "Quic closeNow"); folly::AsyncSocketException::UNKNOWN, "Quic closeNow");
if (id_) {
sock_->stopSending(*id_, quic::GenericApplicationErrorCode::UNKNOWN);
shutdownWriteNow();
}
closeNowImpl(std::move(ex)); closeNowImpl(std::move(ex));
} }
@@ -240,14 +243,12 @@ void QuicStreamAsyncTransport::shutdownWriteNow() {
// writes already shutdown // writes already shutdown
return; return;
} }
if (writeBuf_.empty()) {
shutdownWrite(); shutdownWrite();
} else { send(0);
if (id_) { if (id_ && writeEOF_ != EOFState::DELIVERED) {
sock_->resetStream(*id_, quic::GenericApplicationErrorCode::UNKNOWN); sock_->resetStream(*id_, quic::GenericApplicationErrorCode::UNKNOWN);
VLOG(4) << "Reset stream from shutdownWriteNow"; VLOG(4) << "Reset stream from shutdownWriteNow";
} }
}
} }
bool QuicStreamAsyncTransport::good() const { bool QuicStreamAsyncTransport::good() const {
@@ -453,7 +454,8 @@ void QuicStreamAsyncTransport::send(uint64_t maxToSend) {
} }
uint64_t sentOffset = *streamWriteOffset + toSend; uint64_t sentOffset = *streamWriteOffset + toSend;
bool writeEOF = (writeEOF_ == EOFState::QUEUED); bool writeEOF =
(writeEOF_ == EOFState::QUEUED && writeBuf_.chainLength() == toSend);
auto res = sock_->writeChain( auto res = sock_->writeChain(
*id_, *id_,
writeBuf_.split(toSend), writeBuf_.split(toSend),
@@ -531,7 +533,6 @@ void QuicStreamAsyncTransport::closeNowImpl(folly::AsyncSocketException&& ex) {
sock_->unregisterStreamWriteCallback(*id_); sock_->unregisterStreamWriteCallback(*id_);
id_.reset(); id_.reset();
} }
sock_->closeNow(folly::none);
failWrites(*ex_); failWrites(*ex_);
} }

View File

@@ -5,6 +5,8 @@
* LICENSE file in the root directory of this source tree. * LICENSE file in the root directory of this source tree.
*/ */
#include <folly/MoveWrapper.h>
#include <folly/container/F14Map.h>
#include <folly/futures/Future.h> #include <folly/futures/Future.h>
#include <folly/io/async/test/MockAsyncTransport.h> #include <folly/io/async/test/MockAsyncTransport.h>
#include <folly/portability/GMock.h> #include <folly/portability/GMock.h>
@@ -26,11 +28,25 @@ using namespace testing;
namespace quic::test { namespace quic::test {
class QuicStreamAsyncTransportTest : public Test { class QuicStreamAsyncTransportTest : public Test {
protected:
struct Stream {
Stream() = default;
Stream(const Stream&) = delete;
Stream& operator=(const Stream&) = delete;
Stream(Stream&&) = delete;
Stream& operator=(Stream&&) = delete;
folly::test::MockWriteCallback writeCb;
folly::test::MockReadCallback readCb;
QuicStreamAsyncTransport::UniquePtr transport;
std::array<uint8_t, 1024> buf;
uint8_t serverDone{2}; // need to finish reads & writes
};
public: public:
void SetUp() override { void SetUp() override {
folly::ssl::init(); folly::ssl::init();
createServer(); createServer();
createClient(); connect();
} }
void createServer() { void createServer() {
@@ -53,33 +69,6 @@ class QuicStreamAsyncTransportTest : public Test {
return transport; return transport;
})); }));
EXPECT_CALL(serverConnectionCB_, onNewBidirectionalStream(_))
.WillOnce(Invoke([&](StreamId id) {
serverAsyncWrapper_ =
QuicStreamAsyncTransport::createWithExistingStream(
serverSocket_, id);
serverAsyncWrapper_->setReadCB(&serverReadCB_);
}));
EXPECT_CALL(serverReadCB_, isBufferMovable_())
.WillRepeatedly(Return(false));
EXPECT_CALL(serverReadCB_, getReadBuffer(_, _))
.WillRepeatedly(Invoke([&](void** buf, size_t* len) {
*buf = serverBuf_.data();
*len = serverBuf_.size();
}));
EXPECT_CALL(serverReadCB_, readDataAvailable_(_))
.WillOnce(Invoke([&](auto len) {
auto echoData = folly::IOBuf::copyBuffer("echo ");
echoData->appendChain(
folly::IOBuf::wrapBuffer(serverBuf_.data(), len));
serverAsyncWrapper_->writeChain(&serverWriteCB_, std::move(echoData));
serverAsyncWrapper_->shutdownWrite();
}));
EXPECT_CALL(serverReadCB_, readEOF_()).WillOnce(Return());
EXPECT_CALL(serverWriteCB_, writeSuccess_()).WillOnce(Return());
server_ = QuicServer::createQuicServer(); server_ = QuicServer::createQuicServer();
auto serverCtx = test::createServerCtx(); auto serverCtx = test::createServerCtx();
server_->setFizzContext(serverCtx); server_->setFizzContext(serverCtx);
@@ -92,38 +81,83 @@ class QuicStreamAsyncTransportTest : public Test {
serverAddr_ = server_->getAddress(); serverAddr_ = server_->getAddress();
} }
void createClient() { void expectNewServerStream() {
clientEvbThread_ = std::thread([&]() { clientEvb_.loopForever(); }); EXPECT_CALL(serverConnectionCB_, onNewBidirectionalStream(_))
.WillOnce(Invoke([&](StreamId id) {
EXPECT_CALL(clientConnectionSetupCB_, onTransportReady()) auto res = streams_.emplace(
.WillOnce(Invoke([&]() { std::piecewise_construct,
clientAsyncWrapper_ = std::forward_as_tuple(id),
QuicStreamAsyncTransport::createWithNewStream(client_); std::forward_as_tuple(std::make_unique<Stream>()));
ASSERT_TRUE(clientAsyncWrapper_); auto& newStream = *res.first->second;
clientAsyncWrapper_->setReadCB(&clientReadCB_); newStream.transport =
startPromise_.setValue(); QuicStreamAsyncTransport::createWithExistingStream(
serverSocket_, id);
EXPECT_CALL(newStream.readCb, readEOF_()).WillOnce(Invoke([this, id] {
auto& stream = *streams_[id];
if (--stream.serverDone == 0) {
stream.transport->close();
}
})); }));
EXPECT_CALL(newStream.readCb, isBufferMovable_())
EXPECT_CALL(clientReadCB_, isBufferMovable_())
.WillRepeatedly(Return(false)); .WillRepeatedly(Return(false));
EXPECT_CALL(clientReadCB_, getReadBuffer(_, _)) EXPECT_CALL(newStream.readCb, getReadBuffer(_, _))
.WillRepeatedly(Invoke([&](void** buf, size_t* len) { .WillRepeatedly(Invoke([this, id](void** buf, size_t* len) {
*buf = clientBuf_.data(); auto& stream = *streams_[id];
*len = clientBuf_.size(); *buf = stream.buf.data();
*len = stream.buf.size();
})); }));
EXPECT_CALL(clientReadCB_, readDataAvailable_(_)) EXPECT_CALL(newStream.readCb, readDataAvailable_(_))
.WillOnce(Invoke([&](auto len) { .WillRepeatedly(Invoke([this, id](auto len) {
clientReadPromise_.setValue( auto& stream = *streams_[id];
std::string(reinterpret_cast<char*>(clientBuf_.data()), len)); auto echoData = folly::IOBuf::copyBuffer("echo ");
echoData->appendChain(
folly::IOBuf::wrapBuffer(stream.buf.data(), len));
EXPECT_CALL(stream.writeCb, writeSuccess_())
.WillOnce(Return())
.RetiresOnSaturation();
if (stream.transport->good()) {
// Echo the first readDataAvailable_ only
stream.transport->writeChain(
&stream.writeCb, std::move(echoData));
stream.transport->shutdownWrite();
if (--stream.serverDone == 0) {
stream.transport->close();
}
}
})); }));
EXPECT_CALL(clientReadCB_, readEOF_()).WillOnce(Return()); newStream.transport->setReadCB(&newStream.readCb);
}))
.RetiresOnSaturation();
}
EXPECT_CALL(clientWriteCB_, writeSuccess_()).WillOnce(Return()); std::unique_ptr<Stream> createClient(bool setReadCB = true) {
auto clientStream = std::make_unique<Stream>();
clientStream->transport =
QuicStreamAsyncTransport::createWithNewStream(client_);
CHECK(clientStream->transport);
auto [promise, future] = folly::makePromiseContract<folly::Unit>(); EXPECT_CALL(clientStream->readCb, isBufferMovable_())
startPromise_ = std::move(promise); .WillRepeatedly(Return(false));
EXPECT_CALL(clientStream->readCb, getReadBuffer(_, _))
.WillRepeatedly(Invoke(
[clientStream = clientStream.get()](void** buf, size_t* len) {
*buf = clientStream->buf.data();
*len = clientStream->buf.size();
}));
clientEvb_.runInEventBaseThreadAndWait([&]() { if (setReadCB) {
clientStream->transport->setReadCB(&clientStream->readCb);
}
return clientStream;
}
void connect() {
auto [promiseX, future] = folly::makePromiseContract<folly::Unit>();
auto promise = std::move(promiseX);
EXPECT_CALL(clientConnectionSetupCB_, onTransportReady())
.WillOnce(Invoke([&promise]() mutable { promise.setValue(); }));
clientEvb_.runInLoop([&]() {
auto sock = std::make_unique<folly::AsyncUDPSocket>(&clientEvb_); auto sock = std::make_unique<folly::AsyncUDPSocket>(&clientEvb_);
auto fizzClientContext = auto fizzClientContext =
FizzClientQuicHandshakeContext::Builder() FizzClientQuicHandshakeContext::Builder()
@@ -136,22 +170,17 @@ class QuicStreamAsyncTransportTest : public Test {
client_->start(&clientConnectionSetupCB_, &clientConnectionCB_); client_->start(&clientConnectionSetupCB_, &clientConnectionCB_);
}); });
std::move(future).get(1s); std::move(future).via(&clientEvb_).waitVia(&clientEvb_);
} }
void TearDown() override { void TearDown() override {
if (serverAsyncWrapper_) { if (client_) {
serverAsyncWrapper_->getEventBase()->runInEventBaseThreadAndWait( client_->close(folly::none);
[&]() { serverAsyncWrapper_.reset(); });
} }
clientEvb_.loop();
server_->shutdown(); server_->shutdown();
server_ = nullptr; server_ = nullptr;
clientEvb_.runInEventBaseThreadAndWait([&] {
clientAsyncWrapper_ = nullptr;
client_ = nullptr; client_ = nullptr;
});
clientEvb_.terminateLoopSoon();
clientEvbThread_.join();
} }
protected: protected:
@@ -160,36 +189,110 @@ class QuicStreamAsyncTransportTest : public Test {
NiceMock<MockConnectionSetupCallback> serverConnectionSetupCB_; NiceMock<MockConnectionSetupCallback> serverConnectionSetupCB_;
NiceMock<MockConnectionCallback> serverConnectionCB_; NiceMock<MockConnectionCallback> serverConnectionCB_;
std::shared_ptr<quic::QuicSocket> serverSocket_; std::shared_ptr<quic::QuicSocket> serverSocket_;
QuicStreamAsyncTransport::UniquePtr serverAsyncWrapper_; folly::F14FastMap<quic::StreamId, std::unique_ptr<Stream>> streams_;
folly::test::MockWriteCallback serverWriteCB_;
folly::test::MockReadCallback serverReadCB_;
std::array<uint8_t, 1024> serverBuf_;
std::shared_ptr<QuicClientTransport> client_; std::shared_ptr<QuicClientTransport> client_;
folly::EventBase clientEvb_; folly::EventBase clientEvb_;
std::thread clientEvbThread_;
NiceMock<MockConnectionSetupCallback> clientConnectionSetupCB_; NiceMock<MockConnectionSetupCallback> clientConnectionSetupCB_;
NiceMock<MockConnectionCallback> clientConnectionCB_; NiceMock<MockConnectionCallback> clientConnectionCB_;
QuicStreamAsyncTransport::UniquePtr clientAsyncWrapper_;
folly::Promise<folly::Unit> startPromise_;
folly::test::MockWriteCallback clientWriteCB_;
folly::test::MockReadCallback clientReadCB_;
std::array<uint8_t, 1024> clientBuf_;
folly::Promise<std::string> clientReadPromise_;
}; };
TEST_F(QuicStreamAsyncTransportTest, ReadWrite) { TEST_F(QuicStreamAsyncTransportTest, ReadWrite) {
auto [promise, future] = folly::makePromiseContract<std::string>(); expectNewServerStream();
clientReadPromise_ = std::move(promise); auto clientStream = createClient();
EXPECT_CALL(clientStream->readCb, readEOF_()).WillOnce(Return());
auto [promiseX, future] = folly::makePromiseContract<std::string>();
auto promise = std::move(promiseX);
EXPECT_CALL(clientStream->readCb, readDataAvailable_(_))
.WillOnce(Invoke([&clientStream, &promise](auto len) mutable {
promise.setValue(std::string(
reinterpret_cast<char*>(clientStream->buf.data()), len));
}));
std::string msg = "yo yo!"; std::string msg = "yo yo!";
clientEvb_.runInEventBaseThreadAndWait([&] { EXPECT_CALL(clientStream->writeCb, writeSuccess_()).WillOnce(Return());
clientAsyncWrapper_->write(&clientWriteCB_, msg.data(), msg.size()); clientStream->transport->write(
clientAsyncWrapper_->shutdownWrite(); &clientStream->writeCb, msg.data(), msg.size());
}); clientStream->transport->shutdownWrite();
std::string clientReadString = std::move(future).get(1s); EXPECT_EQ(
EXPECT_EQ(clientReadString, "echo yo yo!"); std::move(future).via(&clientEvb_).getVia(&clientEvb_), "echo yo yo!");
}
TEST_F(QuicStreamAsyncTransportTest, TwoClients) {
std::list<std::unique_ptr<Stream>> clientStreams;
std::list<folly::SemiFuture<std::string>> futures;
std::string msg = "yo yo!";
for (auto i = 0; i < 2; i++) {
expectNewServerStream();
clientStreams.emplace_back(createClient());
auto& clientStream = clientStreams.back();
EXPECT_CALL(clientStream->readCb, readEOF_()).WillOnce(Return());
auto [promiseX, future] = folly::makePromiseContract<std::string>();
auto promise = std::move(promiseX);
futures.emplace_back(std::move(future));
EXPECT_CALL(clientStream->readCb, readDataAvailable_(_))
.WillOnce(Invoke(
[clientStream = clientStream.get(),
p = folly::MoveWrapper(std::move(promise))](auto len) mutable {
p->setValue(std::string(
reinterpret_cast<char*>(clientStream->buf.data()), len));
}));
EXPECT_CALL(clientStream->writeCb, writeSuccess_()).WillOnce(Return());
clientStream->transport->write(
&clientStream->writeCb, msg.data(), msg.size());
clientStream->transport->shutdownWrite();
}
for (auto& future : futures) {
EXPECT_EQ(
std::move(future).via(&clientEvb_).getVia(&clientEvb_), "echo yo yo!");
}
}
TEST_F(QuicStreamAsyncTransportTest, DelayedSetReadCB) {
expectNewServerStream();
auto clientStream = createClient(/*setReadCB=*/false);
auto [promiseX, future] = folly::makePromiseContract<std::string>();
auto promise = std::move(promiseX);
EXPECT_CALL(clientStream->readCb, readDataAvailable_(_))
.WillOnce(Invoke([&clientStream, &promise](auto len) mutable {
promise.setValue(std::string(
reinterpret_cast<char*>(clientStream->buf.data()), len));
}));
std::string msg = "yo yo!";
EXPECT_CALL(clientStream->writeCb, writeSuccess_()).WillOnce(Return());
clientStream->transport->write(
&clientStream->writeCb, msg.data(), msg.size());
clientEvb_.runAfterDelay(
[&clientStream] {
EXPECT_CALL(clientStream->readCb, readEOF_()).WillOnce(Return());
clientStream->transport->setReadCB(&clientStream->readCb);
clientStream->transport->shutdownWrite();
},
750);
EXPECT_EQ(
std::move(future).via(&clientEvb_).getVia(&clientEvb_), "echo yo yo!");
}
TEST_F(QuicStreamAsyncTransportTest, close) {
auto clientStream = createClient(/*setReadCB=*/false);
EXPECT_TRUE(client_->good());
clientStream->transport->close();
clientStream->transport.reset();
EXPECT_TRUE(client_->good());
clientEvb_.loopOnce();
}
TEST_F(QuicStreamAsyncTransportTest, closeNow) {
auto clientStream = createClient(/*setReadCB=*/false);
EXPECT_TRUE(client_->good());
clientStream->transport->closeNow();
clientStream->transport.reset();
// The quic socket is still good
EXPECT_TRUE(client_->good());
clientEvb_.loopOnce();
} }
} // namespace quic::test } // namespace quic::test