diff --git a/quic/api/CMakeLists.txt b/quic/api/CMakeLists.txt index 6c84dd98a..321e9b6f8 100644 --- a/quic/api/CMakeLists.txt +++ b/quic/api/CMakeLists.txt @@ -8,6 +8,7 @@ add_library( IoBufQuicBatch.cpp QuicBatchWriter.cpp QuicPacketScheduler.cpp + QuicStreamAsyncTransport.cpp QuicTransportBase.cpp QuicTransportFunctions.cpp ) diff --git a/quic/api/QuicStreamAsyncTransport.cpp b/quic/api/QuicStreamAsyncTransport.cpp new file mode 100644 index 000000000..00480fff1 --- /dev/null +++ b/quic/api/QuicStreamAsyncTransport.cpp @@ -0,0 +1,433 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + */ + +#include + +#include + +namespace quic { + +QuicStreamAsyncTransport::UniquePtr +QuicStreamAsyncTransport::createWithNewStream( + std::shared_ptr sock) { + auto streamId = sock->createBidirectionalStream(); + if (!streamId) { + return nullptr; + } + UniquePtr ptr( + new QuicStreamAsyncTransport(std::move(sock), streamId.value())); + return ptr; +} + +QuicStreamAsyncTransport::UniquePtr +QuicStreamAsyncTransport::createWithExistingStream( + std::shared_ptr sock, + quic::StreamId streamId) { + UniquePtr ptr(new QuicStreamAsyncTransport(std::move(sock), streamId)); + return ptr; +} + +QuicStreamAsyncTransport::QuicStreamAsyncTransport( + std::shared_ptr sock, + quic::StreamId id) + : sock_(std::move(sock)), id_(id) {} + +QuicStreamAsyncTransport::~QuicStreamAsyncTransport() { + sock_->setReadCallback(id_, nullptr); + closeWithReset(); +} + +void QuicStreamAsyncTransport::setReadCB( + AsyncTransport::ReadCallback* callback) { + readCb_ = callback; + // It should be ok to do this immediately, rather than in the loop + handleRead(); +} + +folly::AsyncTransport::ReadCallback* QuicStreamAsyncTransport::getReadCallback() + const { + return readCb_; +} + +void QuicStreamAsyncTransport::addWriteCallback( + AsyncTransport::WriteCallback* callback, + size_t offset, + size_t size) { + writeCallbacks_.emplace_back(offset + size, callback); + sock_->notifyPendingWriteOnStream(id_, this); +} + +void QuicStreamAsyncTransport::handleOffsetError( + AsyncTransport::WriteCallback* callback, + LocalErrorCode error) { + folly::AsyncSocketException ex( + folly::AsyncSocketException::UNKNOWN, + folly::to("Quic write error: ", toString(error))); + callback->writeErr(0, ex); +} + +void QuicStreamAsyncTransport::write( + AsyncTransport::WriteCallback* callback, + const void* buf, + size_t bytes, + folly::WriteFlags /*flags*/) { + auto streamWriteOffset = sock_->getStreamWriteOffset(id_); + if (streamWriteOffset.hasError()) { + handleOffsetError(callback, streamWriteOffset.error()); + return; + } + writeBuf_.append(folly::IOBuf::wrapBuffer(buf, bytes)); + addWriteCallback(callback, *streamWriteOffset, bytes); +} + +void QuicStreamAsyncTransport::writev( + AsyncTransport::WriteCallback* callback, + const iovec* vec, + size_t count, + folly::WriteFlags /*flags*/) { + auto streamWriteOffset = sock_->getStreamWriteOffset(id_); + if (streamWriteOffset.hasError()) { + handleOffsetError(callback, streamWriteOffset.error()); + return; + } + size_t totalBytes = 0; + for (size_t i = 0; i < count; i++) { + writeBuf_.append(folly::IOBuf::wrapBuffer(vec[i].iov_base, vec[i].iov_len)); + totalBytes += vec[i].iov_len; + } + addWriteCallback(callback, *streamWriteOffset, totalBytes); +} + +void QuicStreamAsyncTransport::writeChain( + AsyncTransport::WriteCallback* callback, + std::unique_ptr&& buf, + folly::WriteFlags /*flags*/) { + auto streamWriteOffset = sock_->getStreamWriteOffset(id_); + if (streamWriteOffset.hasError()) { + handleOffsetError(callback, streamWriteOffset.error()); + return; + } + size_t len = buf->computeChainDataLength(); + writeBuf_.append(std::move(buf)); + addWriteCallback(callback, *streamWriteOffset, len); +} + +void QuicStreamAsyncTransport::close() { + sock_->stopSending(id_, quic::GenericApplicationErrorCode::UNKNOWN); + shutdownWrite(); + if (readCb_ && readEOF_ != EOFState::DELIVERED) { + // This is such a bizarre operation. I almost think if we haven't seen + // a fin then we should readErr instead of readEOF, this mirrors + // AsyncSocket though + readEOF_ = EOFState::QUEUED; + handleRead(); + } +} + +void QuicStreamAsyncTransport::closeNow() { + if (writeBuf_.empty()) { + close(); + } else { + sock_->stopSending(id_, quic::GenericApplicationErrorCode::UNKNOWN); + sock_->resetStream(id_, quic::GenericApplicationErrorCode::UNKNOWN); + VLOG(4) << "Reset stream from closeNow"; + } +} + +void QuicStreamAsyncTransport::closeWithReset() { + sock_->stopSending(id_, quic::GenericApplicationErrorCode::UNKNOWN); + sock_->resetStream(id_, quic::GenericApplicationErrorCode::UNKNOWN); + VLOG(4) << "Reset stream from closeWithReset"; +} + +void QuicStreamAsyncTransport::shutdownWrite() { + if (writeEOF_ == EOFState::NOT_SEEN) { + writeEOF_ = EOFState::QUEUED; + sock_->notifyPendingWriteOnStream(id_, this); + } +} + +void QuicStreamAsyncTransport::shutdownWriteNow() { + if (readEOF_ == EOFState::DELIVERED) { + // writes already shutdown + return; + } + if (writeBuf_.empty()) { + shutdownWrite(); + } else { + sock_->resetStream(id_, quic::GenericApplicationErrorCode::UNKNOWN); + VLOG(4) << "Reset stream from shutdownWriteNow"; + } +} + +bool QuicStreamAsyncTransport::good() const { + return ( + !ex_ && + (readEOF_ == EOFState::NOT_SEEN || writeEOF_ == EOFState::NOT_SEEN)); +} + +bool QuicStreamAsyncTransport::readable() const { + return !ex_ && readEOF_ == EOFState::NOT_SEEN; +} + +bool QuicStreamAsyncTransport::writable() const { + return !ex_ && writeEOF_ == EOFState::NOT_SEEN; +} + +bool QuicStreamAsyncTransport::isPending() const { + return false; +} + +bool QuicStreamAsyncTransport::connecting() const { + return false; +} + +bool QuicStreamAsyncTransport::error() const { + return bool(ex_); +} + +folly::EventBase* QuicStreamAsyncTransport::getEventBase() const { + return sock_->getEventBase(); +} + +void QuicStreamAsyncTransport::attachEventBase( + folly::EventBase* /*eventBase*/) { + LOG(FATAL) << "Does QUICSocket support this?"; +} + +void QuicStreamAsyncTransport::detachEventBase() { + LOG(FATAL) << "Does QUICSocket support this?"; +} + +bool QuicStreamAsyncTransport::isDetachable() const { + return false; // ? +} + +void QuicStreamAsyncTransport::setSendTimeout(uint32_t /*milliseconds*/) { + // QuicSocket needs this +} + +uint32_t QuicStreamAsyncTransport::getSendTimeout() const { + // TODO: follow up on getSendTimeout() use, 0 indicates that no timeout is + // set. + return 0; +} + +void QuicStreamAsyncTransport::getLocalAddress( + folly::SocketAddress* address) const { + *address = sock_->getLocalAddress(); +} + +void QuicStreamAsyncTransport::getPeerAddress( + folly::SocketAddress* address) const { + *address = sock_->getPeerAddress(); +} + +bool QuicStreamAsyncTransport::isEorTrackingEnabled() const { + return false; +} + +void QuicStreamAsyncTransport::setEorTracking(bool /*track*/) {} + +size_t QuicStreamAsyncTransport::getAppBytesWritten() const { + auto res = sock_->getStreamWriteOffset(id_); + // TODO: track written bytes to have it available after QUIC stream closure + return res.hasError() ? 0 : res.value(); +} + +size_t QuicStreamAsyncTransport::getRawBytesWritten() const { + auto res = sock_->getStreamWriteOffset(id_); + // TODO: track written bytes to have it available after QUIC stream closure + return res.hasError() ? 0 : res.value(); +} + +size_t QuicStreamAsyncTransport::getAppBytesReceived() const { + auto res = sock_->getStreamReadOffset(id_); + // TODO: track read bytes to have it available after QUIC stream closure + return res.hasError() ? 0 : res.value(); +} + +size_t QuicStreamAsyncTransport::getRawBytesReceived() const { + auto res = sock_->getStreamReadOffset(id_); + // TODO: track read bytes to have it available after QUIC stream closure + return res.hasError() ? 0 : res.value(); +} + +std::string QuicStreamAsyncTransport::getApplicationProtocol() const noexcept { + return sock_->getAppProtocol().value_or(""); +} + +std::string QuicStreamAsyncTransport::getSecurityProtocol() const { + return "quic/tls1.3"; +} + +void QuicStreamAsyncTransport::readAvailable( + quic::StreamId /*streamId*/) noexcept { + CHECK(readCb_); + // defer the actual read until the loop callback. This prevents possible + // tail recursion with readAvailable -> setReadCallback -> readAvailable + sock_->getEventBase()->runInLoop(this, true); +} + +void QuicStreamAsyncTransport::readError( + quic::StreamId /*streamId*/, + std::pair> + error) noexcept { + ex_ = folly::AsyncSocketException( + folly::AsyncSocketException::UNKNOWN, + folly::to("Quic read error: ", toString(error))); + sock_->getEventBase()->runInLoop(this, true); + // TODO: RST here? +} + +void QuicStreamAsyncTransport::runLoopCallback() noexcept { + handleRead(); +} + +void QuicStreamAsyncTransport::handleRead() { + folly::DelayedDestruction::DestructorGuard dg(this); + bool emptyRead = false; + size_t numReads = 0; + while (readCb_ && !ex_ && readEOF_ == EOFState::NOT_SEEN && !emptyRead && + ++numReads < 16 /* max reads per event */) { + void* buf = nullptr; + size_t len = 0; + if (readCb_->isBufferMovable()) { + len = readCb_->maxBufferSize(); + } else { + readCb_->getReadBuffer(&buf, &len); + if (buf == nullptr || len == 0) { + ex_ = folly::AsyncSocketException( + folly::AsyncSocketException::BAD_ARGS, + "ReadCallback::getReadBuffer() returned empty buffer"); + break; + } + } + auto readData = sock_->read(id_, len); + if (readData.hasError()) { + ex_ = folly::AsyncSocketException( + folly::AsyncSocketException::UNKNOWN, + folly::to("Quic read error: ", readData.error())); + } else { + if (!readData->first) { + emptyRead = true; + } else { + if (readCb_->isBufferMovable()) { + readCb_->readBufferAvailable(std::move(readData->first)); + } else { + size_t readLen = readData->first->computeChainDataLength(); + folly::io::Cursor c(readData->first.get()); + CHECK_NOTNULL(buf); + c.pull(buf, readLen); + readCb_->readDataAvailable(readLen); + } + } + if (readData->second && readEOF_ == EOFState::NOT_SEEN) { + readEOF_ = EOFState::QUEUED; + } + } + } + if (readCb_) { + if (ex_) { + auto cb = readCb_; + readCb_ = nullptr; + cb->readErr(*ex_); + } else if (readEOF_ == EOFState::QUEUED) { + auto cb = readCb_; + readCb_ = nullptr; + cb->readEOF(); + readEOF_ = EOFState::DELIVERED; + } + } + if (readCb_ && readEOF_ == EOFState::NOT_SEEN && !ex_) { + sock_->setReadCallback(id_, this); + } else { + sock_->setReadCallback(id_, nullptr); + } +} + +void QuicStreamAsyncTransport::send(uint64_t maxToSend) { + // overkill until there are delivery cbs + folly::DelayedDestruction::DestructorGuard dg(this); + uint64_t toSend = + std::min(maxToSend, folly::to(writeBuf_.chainLength())); + auto streamWriteOffset = sock_->getStreamWriteOffset(id_); + if (streamWriteOffset.hasError()) { + // handle error + folly::AsyncSocketException ex( + folly::AsyncSocketException::UNKNOWN, + folly::to( + "Quic write error: ", toString(streamWriteOffset.error()))); + failWrites(ex); + return; + } + + uint64_t sentOffset = *streamWriteOffset + toSend; + bool writeEOF = (writeEOF_ == EOFState::QUEUED); + auto res = sock_->writeChain( + id_, + writeBuf_.split(toSend), + writeEOF, + false, + nullptr); // no delivery callbacks right now + if (res.hasError()) { + folly::AsyncSocketException ex( + folly::AsyncSocketException::UNKNOWN, + folly::to("Quic write error: ", toString(res.error()))); + failWrites(ex); + } else { + if (writeEOF) { + writeEOF_ = EOFState::DELIVERED; + VLOG(4) << "Closed stream id_=" << id_; + } + // not actually sent. Mirrors AsyncSocket and invokes when data is in + // transport buffers + invokeWriteCallbacks(sentOffset); + } +} + +void QuicStreamAsyncTransport::invokeWriteCallbacks(size_t sentOffset) { + while (!writeCallbacks_.empty() && + writeCallbacks_.front().first <= sentOffset) { + auto wcb = writeCallbacks_.front().second; + writeCallbacks_.pop_front(); + wcb->writeSuccess(); + } +} + +void QuicStreamAsyncTransport::failWrites(folly::AsyncSocketException& ex) { + while (!writeCallbacks_.empty()) { + auto& front = writeCallbacks_.front(); + auto wcb = front.second; + writeCallbacks_.pop_front(); + // TODO: track bytesWritten, when buffer was split it may not be 0 + wcb->writeErr(0, ex); + } +} + +void QuicStreamAsyncTransport::onStreamWriteReady( + quic::StreamId /*id*/, + uint64_t maxToSend) noexcept { + if (writeEOF_ == EOFState::DELIVERED && writeBuf_.empty()) { + // nothing left to write + return; + } + send(maxToSend); +} + +void QuicStreamAsyncTransport::onStreamWriteError( + StreamId /*id*/, + std::pair> + error) noexcept { + folly::AsyncSocketException ex( + folly::AsyncSocketException::UNKNOWN, + folly::to("Quic write error: ", toString(error))); + failWrites(ex); +} + +} // namespace quic diff --git a/quic/api/QuicStreamAsyncTransport.h b/quic/api/QuicStreamAsyncTransport.h new file mode 100644 index 000000000..4dd4377b8 --- /dev/null +++ b/quic/api/QuicStreamAsyncTransport.h @@ -0,0 +1,162 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + */ + +#pragma once + +// #include +#include +#include + +namespace quic { + +/** + * Adaptor for multiplexing over quic an existing use-case that + * expects an AsyncTransport + */ +class QuicStreamAsyncTransport : public folly::AsyncTransport, + public QuicSocket::ReadCallback, + public QuicSocket::WriteCallback, + public folly::EventBase::LoopCallback { + public: + using UniquePtr = std::unique_ptr< + QuicStreamAsyncTransport, + folly::DelayedDestruction::Destructor>; + + static UniquePtr createWithNewStream(std::shared_ptr sock); + + static UniquePtr createWithExistingStream( + std::shared_ptr sock, + quic::StreamId streamId); + + protected: + QuicStreamAsyncTransport( + std::shared_ptr sock, + quic::StreamId id); + + public: + ~QuicStreamAsyncTransport() override; + + void setReadCB(AsyncTransport::ReadCallback* callback) override; + + AsyncTransport::ReadCallback* getReadCallback() const override; + + void addWriteCallback( + AsyncTransport::WriteCallback* callback, + size_t offset, + size_t size); + + void handleOffsetError( + AsyncTransport::WriteCallback* callback, + LocalErrorCode error); + + void write( + AsyncTransport::WriteCallback* callback, + const void* buf, + size_t bytes, + folly::WriteFlags flags = folly::WriteFlags::NONE) override; + + void writev( + AsyncTransport::WriteCallback* callback, + const iovec* vec, + size_t count, + folly::WriteFlags flags = folly::WriteFlags::NONE) override; + void writeChain( + AsyncTransport::WriteCallback* callback, + std::unique_ptr&& buf, + folly::WriteFlags flags = folly::WriteFlags::NONE) override; + + void close() override; + void closeNow() override; + + void closeWithReset() override; + + void shutdownWrite() override; + + void shutdownWriteNow() override; + + bool good() const override; + + bool readable() const override; + + bool writable() const override; + + bool isPending() const override; + + bool connecting() const override; + + bool error() const override; + + folly::EventBase* getEventBase() const override; + + void attachEventBase(folly::EventBase* /*eventBase*/) override; + + void detachEventBase() override; + + bool isDetachable() const override; + + void setSendTimeout(uint32_t /*milliseconds*/) override; + + uint32_t getSendTimeout() const override; + + void getLocalAddress(folly::SocketAddress* address) const override; + + void getPeerAddress(folly::SocketAddress* address) const override; + + bool isEorTrackingEnabled() const override; + + void setEorTracking(bool track) override; + + size_t getAppBytesWritten() const override; + + size_t getRawBytesWritten() const override; + + size_t getAppBytesReceived() const override; + + size_t getRawBytesReceived() const override; + + std::string getApplicationProtocol() const noexcept override; + + std::string getSecurityProtocol() const override; + + private: + void readAvailable(quic::StreamId /*streamId*/) noexcept override; + + void readError( + quic::StreamId /*streamId*/, + std::pair> + error) noexcept override; + + void runLoopCallback() noexcept override; + + void handleRead(); + void send(uint64_t maxToSend); + + void invokeWriteCallbacks(size_t sentOffset); + + void failWrites(folly::AsyncSocketException& ex); + + void onStreamWriteReady( + quic::StreamId /*id*/, + uint64_t maxToSend) noexcept override; + + void onStreamWriteError( + StreamId /*id*/, + std::pair> + error) noexcept override; + + std::shared_ptr sock_; + quic::StreamId id_; + enum class EOFState { NOT_SEEN, QUEUED, DELIVERED }; + EOFState readEOF_{EOFState::NOT_SEEN}; + EOFState writeEOF_{EOFState::NOT_SEEN}; + AsyncTransport::ReadCallback* readCb_{nullptr}; + folly::IOBufQueue writeBuf_{folly::IOBufQueue::cacheChainLength()}; + std::deque> writeCallbacks_; + folly::Optional ex_; +}; +} // namespace quic diff --git a/quic/api/test/CMakeLists.txt b/quic/api/test/CMakeLists.txt index cdd6843d2..11c68151f 100644 --- a/quic/api/test/CMakeLists.txt +++ b/quic/api/test/CMakeLists.txt @@ -76,3 +76,14 @@ quic_add_test(TARGET QuicBatchWriterTest mvfst_server mvfst_transport ) + +quic_add_test(TARGET QuicStreamAsyncTransportTest + SOURCES + QuicStreamAsyncTransportTest.cpp + DEPENDS + Folly::folly + mvfst_buf_accessor + mvfst_client + mvfst_server + mvfst_transport +) diff --git a/quic/api/test/QuicStreamAsyncTransportTest.cpp b/quic/api/test/QuicStreamAsyncTransportTest.cpp new file mode 100644 index 000000000..186de7a67 --- /dev/null +++ b/quic/api/test/QuicStreamAsyncTransportTest.cpp @@ -0,0 +1,184 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace testing; + +namespace quic::test { + +class QuicStreamAsyncTransportTest : public Test { + public: + void SetUp() override { + folly::ssl::init(); + createServer(); + createClient(); + } + + void createServer() { + auto serverTransportFactory = + std::make_unique(); + EXPECT_CALL(*serverTransportFactory, _make(_, _, _, _)) + .WillOnce(Invoke( + [&](folly::EventBase* evb, + std::unique_ptr& socket, + const folly::SocketAddress& /*addr*/, + std::shared_ptr ctx) { + auto transport = quic::QuicServerTransport::make( + evb, std::move(socket), serverConnectionCB_, std::move(ctx)); + CHECK(serverSocket_.get() == nullptr); + serverSocket_ = transport; + return transport; + })); + + EXPECT_CALL(serverConnectionCB_, onNewBidirectionalStream(_)) + .WillOnce(Invoke([&](StreamId id) { + serverAsyncWrapper_ = + QuicStreamAsyncTransport::createWithExistingStream( + serverSocket_, id); + serverAsyncWrapper_->setReadCB(&serverReadCB_); + })); + + EXPECT_CALL(serverReadCB_, isBufferMovable_()) + .WillRepeatedly(Return(false)); + EXPECT_CALL(serverReadCB_, getReadBuffer(_, _)) + .WillRepeatedly(Invoke([&](void** buf, size_t* len) { + *buf = serverBuf_.data(); + *len = serverBuf_.size(); + })); + EXPECT_CALL(serverReadCB_, readDataAvailable_(_)) + .WillOnce(Invoke([&](auto len) { + auto echoData = folly::IOBuf::copyBuffer("echo "); + echoData->appendChain( + folly::IOBuf::wrapBuffer(serverBuf_.data(), len)); + serverAsyncWrapper_->writeChain(&serverWriteCB_, std::move(echoData)); + serverAsyncWrapper_->shutdownWrite(); + })); + EXPECT_CALL(serverReadCB_, readEOF_()).WillOnce(Return()); + + EXPECT_CALL(serverWriteCB_, writeSuccess_()).WillOnce(Return()); + + server_ = QuicServer::createQuicServer(); + auto serverCtx = test::createServerCtx(); + server_->setFizzContext(serverCtx); + + server_->setQuicServerTransportFactory(std::move(serverTransportFactory)); + + folly::SocketAddress addr("::1", 0); + server_->start(addr, 1); + server_->waitUntilInitialized(); + serverAddr_ = server_->getAddress(); + } + + void createClient() { + clientEvbThread_ = std::thread([&]() { clientEvb_.loopForever(); }); + + EXPECT_CALL(clientConnectionCB_, onTransportReady()).WillOnce(Invoke([&]() { + clientAsyncWrapper_ = + QuicStreamAsyncTransport::createWithNewStream(client_); + ASSERT_TRUE(clientAsyncWrapper_); + clientAsyncWrapper_->setReadCB(&clientReadCB_); + startPromise_.setValue(); + })); + + EXPECT_CALL(clientReadCB_, isBufferMovable_()) + .WillRepeatedly(Return(false)); + EXPECT_CALL(clientReadCB_, getReadBuffer(_, _)) + .WillRepeatedly(Invoke([&](void** buf, size_t* len) { + *buf = clientBuf_.data(); + *len = clientBuf_.size(); + })); + EXPECT_CALL(clientReadCB_, readDataAvailable_(_)) + .WillOnce(Invoke([&](auto len) { + clientReadPromise_.setValue( + std::string(reinterpret_cast(clientBuf_.data()), len)); + })); + EXPECT_CALL(clientReadCB_, readEOF_()).WillOnce(Return()); + + EXPECT_CALL(clientWriteCB_, writeSuccess_()).WillOnce(Return()); + + auto [promise, future] = folly::makePromiseContract(); + startPromise_ = std::move(promise); + + clientEvb_.runInEventBaseThreadAndWait([&]() { + auto sock = std::make_unique(&clientEvb_); + auto fizzClientContext = + FizzClientQuicHandshakeContext::Builder() + .setCertificateVerifier(test::createTestCertificateVerifier()) + .build(); + client_ = std::make_shared( + &clientEvb_, std::move(sock), std::move(fizzClientContext)); + client_->setHostname("echo.com"); + client_->addNewPeerAddress(serverAddr_); + client_->start(&clientConnectionCB_); + }); + + std::move(future).get(1s); + } + + void TearDown() override { + server_->shutdown(); + server_ = nullptr; + clientEvb_.runInEventBaseThreadAndWait([&] { + clientAsyncWrapper_ = nullptr; + client_ = nullptr; + }); + clientEvb_.terminateLoopSoon(); + clientEvbThread_.join(); + } + + protected: + std::shared_ptr server_; + folly::SocketAddress serverAddr_; + NiceMock serverConnectionCB_; + std::shared_ptr serverSocket_; + QuicStreamAsyncTransport::UniquePtr serverAsyncWrapper_; + folly::test::MockWriteCallback serverWriteCB_; + folly::test::MockReadCallback serverReadCB_; + std::array serverBuf_; + + std::shared_ptr client_; + folly::EventBase clientEvb_; + std::thread clientEvbThread_; + NiceMock clientConnectionCB_; + QuicStreamAsyncTransport::UniquePtr clientAsyncWrapper_; + folly::Promise startPromise_; + folly::test::MockWriteCallback clientWriteCB_; + folly::test::MockReadCallback clientReadCB_; + std::array clientBuf_; + folly::Promise clientReadPromise_; +}; + +TEST_F(QuicStreamAsyncTransportTest, ReadWrite) { + auto [promise, future] = folly::makePromiseContract(); + clientReadPromise_ = std::move(promise); + + std::string msg = "yo yo!"; + clientEvb_.runInEventBaseThreadAndWait([&] { + clientAsyncWrapper_->write(&clientWriteCB_, msg.data(), msg.size()); + clientAsyncWrapper_->shutdownWrite(); + }); + + std::string clientReadString = std::move(future).get(1s); + EXPECT_EQ(clientReadString, "echo yo yo!"); +} + +} // namespace quic::test