1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-02 13:06:47 +03:00
Files
mvfst/quic/api/test/QuicTransportBaseTest.cpp
Matt Joras b74208392c Fix [[maybe_unused]] anti-patterns in QUIC tests
Summary:
Replace `[[maybe_unused]] auto variable = method_call` patterns with proper assertions using `ASSERT_FALSE(method_call.hasError())` for quic::Expected return values. This improves test reliability by actually validating method call success instead of suppressing unused return value warnings.

## Changes Made

### 1. Fixed Anti-Patterns in Tests (84 instances across 5 test files):
- QuicTransportBaseTest.cpp: 74 patterns
- QuicTransportTest.cpp: 5 patterns
- QuicTypedTransportTest.cpp: 3 patterns
- QuicClientTransportLiteTest.cpp: 1 pattern
- QuicClientTransportTest.cpp: 1 pattern

For cleanup scenarios where failure is acceptable (e.g., setting read callback to nullptr), used `(void)method_call` instead of assertions to properly suppress warnings without incorrect success assertions.

### 2. Removed Unhelpful Comments (5 instances):
- QuicStreamAsyncTransport.cpp: Removed comments referencing "original behavior" that provided no actionable context

The logging statements (WARNING/VLOG) already make error handling behavior clear without need for historical commentary.

## Comprehensive Audit Results
Performed comprehensive audit of all `[[maybe_unused]]` usage in fbcode/quic/ (45 total instances):
-  **Self-reference guards**: `[[maybe_unused]] auto self = sharedGuard();` - **LEGITIMATE**
-  **Function parameter suppression**: Intentionally unused parameters - **LEGITIMATE**
-  **Loop variable suppression**: Iteration without using values - **LEGITIMATE**
-  **Static initialization**: Thread-local initialization patterns - **LEGITIMATE**
-  **Third-party code**: Left untouched as required
 ---
> Generated by [Confucius Code Assist (CCA)](https://www.internalfb.com/wiki/Confucius/Analect/Shared_Analects/Confucius_Code_Assist_(CCA)/)
[Session](https://www.internalfb.com/confucius?session_id=7be75dc0-61d5-11f0-8f26-27b21c240401&tab=Chat), [Trace](https://www.internalfb.com/confucius?session_id=7be75dc0-61d5-11f0-8f26-27b21c240401&tab=Trace)

Reviewed By: knekritz

Differential Revision: D78385516

fbshipit-source-id: 98c8989a147ed639be4582be3460b146aaa1075f
2025-07-16 14:23:39 -07:00

5275 lines
198 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/api/test/Mocks.h>
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>
#include <quic/api/QuicSocket.h>
#include <quic/api/QuicTransportBase.h>
#include <quic/codec/DefaultConnectionIdAlgo.h>
#include <quic/common/events/FollyQuicEventBase.h>
#include <quic/common/test/TestUtils.h>
#include <quic/fizz/server/handshake/FizzServerQuicHandshakeContext.h>
#include <quic/server/state/ServerStateMachine.h>
#include <quic/state/DatagramHandlers.h>
#include <quic/state/QuicStreamFunctions.h>
#include <quic/state/QuicStreamUtilities.h>
#include <quic/state/stream/StreamReceiveHandlers.h>
#include <quic/state/stream/StreamSendHandlers.h>
#include <quic/state/test/MockQuicStats.h>
#include <quic/state/test/Mocks.h>
#include <quic/common/testutil/MockAsyncUDPSocket.h>
#include <memory>
using namespace testing;
using namespace folly;
namespace quic::test {
constexpr uint8_t kStreamIncrement = 0x04;
enum class TestFrameType : uint8_t {
STREAM,
CRYPTO,
EXPIRED_DATA,
REJECTED_DATA,
MAX_STREAMS,
DATAGRAM,
STREAM_GROUP
};
// A made up encoding decoding of a stream.
BufPtr encodeStreamBuffer(
StreamId id,
StreamBuffer data,
OptionalIntegral<StreamGroupId> groupId = std::nullopt) {
auto buf = IOBuf::create(10);
folly::io::Appender appender(buf.get(), 10);
if (!groupId) {
appender.writeBE(static_cast<uint8_t>(TestFrameType::STREAM));
} else {
appender.writeBE(static_cast<uint8_t>(TestFrameType::STREAM_GROUP));
}
appender.writeBE(id);
if (groupId) {
appender.writeBE(*groupId);
}
auto dataBuf = data.data.move();
dataBuf->coalesce();
appender.writeBE<uint32_t>(dataBuf->length());
appender.push(dataBuf->coalesce());
appender.writeBE<uint64_t>(data.offset);
appender.writeBE<uint8_t>(data.eof);
return buf;
}
BufPtr encodeCryptoBuffer(StreamBuffer data) {
auto buf = IOBuf::create(10);
folly::io::Appender appender(buf.get(), 10);
appender.writeBE(static_cast<uint8_t>(TestFrameType::CRYPTO));
auto dataBuf = data.data.move();
dataBuf->coalesce();
appender.writeBE<uint32_t>(dataBuf->length());
appender.push(dataBuf->coalesce());
appender.writeBE<uint64_t>(data.offset);
return buf;
}
// A made up encoding of a MaxStreamsFrame.
BufPtr encodeMaxStreamsFrame(const MaxStreamsFrame& frame) {
auto buf = IOBuf::create(25);
folly::io::Appender appender(buf.get(), 25);
appender.writeBE(static_cast<uint8_t>(TestFrameType::MAX_STREAMS));
appender.writeBE<uint8_t>(frame.isForBidirectionalStream() ? 1 : 0);
appender.writeBE<uint64_t>(frame.maxStreams);
return buf;
}
// Build a datagram frame
BufPtr encodeDatagramFrame(BufQueue data) {
auto buf = IOBuf::create(10);
folly::io::Appender appender(buf.get(), 10);
appender.writeBE(static_cast<uint8_t>(TestFrameType::DATAGRAM));
auto dataBuf = data.move();
dataBuf->coalesce();
appender.writeBE<uint32_t>(dataBuf->length());
appender.push(dataBuf->coalesce());
return buf;
}
std::pair<BufPtr, uint32_t> decodeDatagramFrame(Cursor& cursor) {
BufPtr outData;
auto len = cursor.readBE<uint32_t>();
cursor.clone(outData, len);
return std::make_pair(std::move(outData), len);
}
std::pair<BufPtr, uint64_t> decodeDataBuffer(Cursor& cursor) {
BufPtr outData;
auto len = cursor.readBE<uint32_t>();
cursor.clone(outData, len);
uint64_t offset = cursor.readBE<uint64_t>();
return std::make_pair(std::move(outData), offset);
}
std::pair<StreamId, StreamBuffer> decodeStreamBuffer(Cursor& cursor) {
auto streamId = cursor.readBE<StreamId>();
auto dataBuffer = decodeDataBuffer(cursor);
bool eof = (bool)cursor.readBE<uint8_t>();
return std::make_pair(
streamId,
StreamBuffer(std::move(dataBuffer.first), dataBuffer.second, eof));
}
struct StreamGroupIdBuf {
StreamId id;
StreamGroupId groupId;
StreamBuffer buf;
};
StreamGroupIdBuf decodeStreamGroupBuffer(Cursor& cursor) {
auto streamId = cursor.readBE<StreamId>();
auto groupId = cursor.readBE<StreamGroupId>();
auto dataBuffer = decodeDataBuffer(cursor);
bool eof = (bool)cursor.readBE<uint8_t>();
return StreamGroupIdBuf{
streamId,
groupId,
StreamBuffer(std::move(dataBuffer.first), dataBuffer.second, eof)};
}
StreamBuffer decodeCryptoBuffer(Cursor& cursor) {
auto dataBuffer = decodeDataBuffer(cursor);
return StreamBuffer(std::move(dataBuffer.first), dataBuffer.second, false);
}
MaxStreamsFrame decodeMaxStreamsFrame(Cursor& cursor) {
bool isBidi = cursor.readBE<uint8_t>();
auto maxStreams = cursor.readBE<uint64_t>();
return MaxStreamsFrame(maxStreams, isBidi);
}
class TestPingCallback : public QuicSocket::PingCallback {
public:
void pingAcknowledged() noexcept override {}
void pingTimeout() noexcept override {}
void onPing() noexcept override {}
};
class TestByteEventCallback : public ByteEventCallback {
public:
using HashFn = std::function<size_t(const ByteEvent&)>;
using ComparatorFn = std::function<bool(const ByteEvent&, const ByteEvent&)>;
enum class Status { REGISTERED = 1, RECEIVED = 2, CANCELLED = 3 };
void onByteEventRegistered(ByteEvent event) override {
EXPECT_TRUE(byteEventTracker_.find(event) == byteEventTracker_.end());
byteEventTracker_[event] = Status::REGISTERED;
}
void onByteEvent(ByteEvent event) override {
EXPECT_TRUE(byteEventTracker_.find(event) != byteEventTracker_.end());
byteEventTracker_[event] = Status::RECEIVED;
}
void onByteEventCanceled(ByteEventCancellation cancellation) override {
const ByteEvent& event = cancellation;
EXPECT_TRUE(byteEventTracker_.find(event) != byteEventTracker_.end());
byteEventTracker_[event] = Status::CANCELLED;
}
std::unordered_map<ByteEvent, Status, HashFn, ComparatorFn>
getByteEventTracker() const {
return byteEventTracker_;
}
private:
// Custom hash and comparator functions that use only id, offset and types
// (not the srtt)
HashFn hash = [](const ByteEvent& e) {
return folly::hash::hash_combine(e.id, e.offset, e.type);
};
ComparatorFn comparator = [](const ByteEvent& lhs, const ByteEvent& rhs) {
return (
(lhs.id == rhs.id) && (lhs.offset == rhs.offset) &&
(lhs.type == rhs.type));
};
std::unordered_map<ByteEvent, Status, HashFn, ComparatorFn> byteEventTracker_{
/* bucket count */ 4,
hash,
comparator};
};
static auto
getByteEventMatcher(ByteEvent::Type type, StreamId id, uint64_t offset) {
return AllOf(
testing::Field(&ByteEvent::type, testing::Eq(type)),
testing::Field(&ByteEvent::id, testing::Eq(id)),
testing::Field(&ByteEvent::offset, testing::Eq(offset)));
}
static auto getByteEventTrackerMatcher(
ByteEvent event,
TestByteEventCallback::Status status) {
return Pair(getByteEventMatcher(event.type, event.id, event.offset), status);
}
class TestQuicTransport
: public QuicTransportBase,
public std::enable_shared_from_this<TestQuicTransport> {
public:
TestQuicTransport(
std::shared_ptr<QuicEventBase> evb,
std::unique_ptr<QuicAsyncUDPSocket> socket,
ConnectionSetupCallback* connSetupCb,
ConnectionCallback* connCb)
: QuicTransportBaseLite(evb, std::move(socket)), QuicTransportBase(evb, nullptr /* Initialized through the QuicTransportBaseLite constructor */), observerContainer_(std::make_shared<SocketObserverContainer>(this)) {
auto conn = std::make_unique<QuicServerConnectionState>(
FizzServerQuicHandshakeContext::Builder().build());
conn->clientConnectionId = ConnectionId::createAndMaybeCrash({10, 9, 8, 7});
conn->version = QuicVersion::MVFST;
conn->observerContainer = observerContainer_;
transportConn = conn.get();
conn_.reset(conn.release());
aead = test::createNoOpAead();
headerCipher = test::createNoOpHeaderCipher().value();
connIdAlgo_ = std::make_unique<DefaultConnectionIdAlgo>();
setConnectionSetupCallback(connSetupCb);
setConnectionCallbackFromCtor(connCb);
}
~TestQuicTransport() override {
resetConnectionCallbacks();
// we need to call close in the derived class.
resetConnectionCallbacks();
closeImpl(
QuicError(
QuicErrorCode(LocalErrorCode::SHUTTING_DOWN),
std::string("shutdown")),
false /* drainConnection */);
// closeImpl may have been called earlier with drain = true, so force close.
closeUdpSocket();
}
WriteResult writeBufMeta(
StreamId /* id */,
const BufferMeta& /* data */,
bool /* eof */,
ByteEventCallback* /* cb */) override {
return quic::make_unexpected(LocalErrorCode::INVALID_OPERATION);
}
WriteResult setDSRPacketizationRequestSender(
StreamId /* id */,
std::unique_ptr<DSRPacketizationRequestSender> /* sender */) override {
return quic::make_unexpected(LocalErrorCode::INVALID_OPERATION);
}
Optional<std::vector<TransportParameter>> getPeerTransportParams()
const override {
return std::nullopt;
}
std::chrono::milliseconds getLossTimeoutRemainingTime() {
return lossTimeout_.getTimerCallbackTimeRemaining();
}
quic::Expected<void, QuicError> onReadData(
const folly::SocketAddress&,
ReceivedUdpPacket&& udpPacket) override {
if (udpPacket.buf.empty()) {
return {};
}
Cursor cursor(udpPacket.buf.front());
while (!cursor.isAtEnd()) {
// create server chosen connId with processId = 0 and workerId = 0
ServerConnectionIdParams params(0, 0, 0);
conn_->serverConnectionId = *connIdAlgo_->encodeConnectionId(params);
auto type = static_cast<TestFrameType>(cursor.readBE<uint8_t>());
if (type == TestFrameType::CRYPTO) {
auto cryptoBuffer = decodeCryptoBuffer(cursor);
auto cryptoResult = appendDataToReadBuffer(
conn_->cryptoState->initialStream, std::move(cryptoBuffer));
if (cryptoResult.hasError()) {
return quic::make_unexpected(cryptoResult.error());
}
} else if (type == TestFrameType::MAX_STREAMS) {
auto maxStreamsFrame = decodeMaxStreamsFrame(cursor);
if (maxStreamsFrame.isForBidirectionalStream()) {
auto bidirResult =
conn_->streamManager->setMaxLocalBidirectionalStreams(
maxStreamsFrame.maxStreams);
if (bidirResult.hasError()) {
return quic::make_unexpected(bidirResult.error());
}
} else {
auto unidirResult =
conn_->streamManager->setMaxLocalUnidirectionalStreams(
maxStreamsFrame.maxStreams);
if (unidirResult.hasError()) {
return quic::make_unexpected(unidirResult.error());
}
}
} else if (type == TestFrameType::DATAGRAM) {
auto buffer = decodeDatagramFrame(cursor);
auto frame = DatagramFrame(buffer.second, std::move(buffer.first));
handleDatagram(*conn_, frame, udpPacket.timings.receiveTimePoint);
} else if (type == TestFrameType::STREAM_GROUP) {
auto res = decodeStreamGroupBuffer(cursor);
auto streamResult =
conn_->streamManager->getStream(res.id, res.groupId);
if (streamResult.hasError()) {
return quic::make_unexpected(streamResult.error());
}
QuicStreamState* stream = streamResult.value();
if (!stream) {
continue;
}
auto streamGroupResult =
appendDataToReadBuffer(*stream, std::move(res.buf));
if (streamGroupResult.hasError()) {
return quic::make_unexpected(streamGroupResult.error());
}
conn_->streamManager->updateReadableStreams(*stream);
conn_->streamManager->updatePeekableStreams(*stream);
} else {
auto buffer = decodeStreamBuffer(cursor);
auto streamResult = conn_->streamManager->getStream(buffer.first);
if (streamResult.hasError()) {
return quic::make_unexpected(streamResult.error());
}
QuicStreamState* stream = streamResult.value();
if (!stream) {
continue;
}
auto result = appendDataToReadBuffer(*stream, std::move(buffer.second));
if (result.hasError()) {
return quic::make_unexpected(result.error());
}
conn_->streamManager->updateReadableStreams(*stream);
conn_->streamManager->updatePeekableStreams(*stream);
}
}
return {};
}
[[nodiscard]] quic::Expected<void, QuicError> writeData() override {
auto result = writeQuicDataToSocket(
*socket_,
*conn_,
conn_->serverConnectionId.value_or(ConnectionId::createZeroLength()),
conn_->clientConnectionId.value_or(ConnectionId::createZeroLength()),
*aead,
*headerCipher,
*conn_->version,
conn_->transportSettings.writeConnectionDataPacketsLimit);
if (result.hasError()) {
return quic::make_unexpected(result.error());
}
return {};
}
// This is to expose the protected pacedWriteDataToSocket() function
void pacedWriteDataToSocketThroughTransportBase() {
pacedWriteDataToSocket();
}
bool hasWriteCipher() const {
return conn_->oneRttWriteCipher != nullptr;
}
std::shared_ptr<QuicTransportBaseLite> sharedGuard() override {
return shared_from_this();
}
QuicConnectionStateBase& getConnectionState() {
return *conn_;
}
void closeTransport() {
transportClosed = true;
}
void AckTimeout() {
ackTimeoutExpired();
}
void setIdleTimeout() {
setIdleTimer();
}
void invokeIdleTimeout() {
idleTimeout_.timeoutExpired();
}
void invokeAckTimeout() {
ackTimeout_.timeoutExpired();
}
void invokeSendPing(std::chrono::milliseconds interval) {
sendPing(interval);
}
void invokeCancelPingTimeout() {
pingTimeout_.cancelTimerCallback();
}
void invokeHandlePingCallbacks() {
handlePingCallbacks();
}
void invokeHandleKnobCallbacks() {
handleKnobCallbacks();
}
bool isPingTimeoutScheduled() {
return pingTimeout_.isTimerCallbackScheduled();
}
auto& writeLooper() {
return writeLooper_;
}
auto& readLooper() {
return readLooper_;
}
void unbindConnection() {}
void onReadError(const folly::AsyncSocketException&) noexcept {}
void addDataToStream(
StreamId id,
StreamBuffer data,
OptionalIntegral<StreamGroupId> groupId = std::nullopt) {
auto buf = encodeStreamBuffer(id, std::move(data), std::move(groupId));
SocketAddress addr("127.0.0.1", 1000);
onNetworkData(addr, NetworkData(std::move(buf), Clock::now(), 0));
}
void addCryptoData(StreamBuffer data) {
auto buf = encodeCryptoBuffer(std::move(data));
SocketAddress addr("127.0.0.1", 1000);
onNetworkData(addr, NetworkData(std::move(buf), Clock::now(), 0));
}
void addMaxStreamsFrame(MaxStreamsFrame frame) {
auto buf = encodeMaxStreamsFrame(frame);
SocketAddress addr("127.0.0.1", 1000);
onNetworkData(addr, NetworkData(std::move(buf), Clock::now(), 0));
}
void addStreamReadError(StreamId id, QuicErrorCode ex) {
auto streamResult = conn_->streamManager->getStream(id);
ASSERT_FALSE(streamResult.hasError());
QuicStreamState* stream = streamResult.value();
stream->streamReadError = ex;
conn_->streamManager->updateReadableStreams(*stream);
conn_->streamManager->updatePeekableStreams(*stream);
// peekableStreams is updated to contain streams with streamReadError
updatePeekLooper();
updateReadLooper();
}
void addDatagram(BufPtr data, TimePoint recvTime = Clock::now()) {
auto buf = encodeDatagramFrame(std::move(data));
SocketAddress addr("127.0.0.1", 1000);
onNetworkData(addr, NetworkData(std::move(buf), recvTime, 0));
}
void closeStream(StreamId id) {
auto streamResult = conn_->streamManager->getStream(id);
ASSERT_FALSE(streamResult.hasError());
QuicStreamState* stream = streamResult.value();
stream->sendState = StreamSendState::Closed;
stream->recvState = StreamRecvState::Closed;
conn_->streamManager->addClosed(id);
auto deliveryCb = deliveryCallbacks_.find(id);
if (deliveryCb != deliveryCallbacks_.end()) {
for (auto& cbs : deliveryCb->second) {
ByteEvent event = {};
event.id = id;
event.offset = cbs.offset;
event.type = ByteEvent::Type::ACK;
event.srtt = stream->conn.lossState.srtt;
cbs.callback->onByteEvent(event);
if (closeState_ != CloseState::OPEN) {
break;
}
}
deliveryCallbacks_.erase(deliveryCb);
}
auto txCallbacksForStream = txCallbacks_.find(id);
if (txCallbacksForStream != txCallbacks_.end()) {
for (auto& cbs : txCallbacksForStream->second) {
ByteEvent event = {};
event.id = id;
event.offset = cbs.offset;
event.type = ByteEvent::Type::TX;
cbs.callback->onByteEvent(event);
if (closeState_ != CloseState::OPEN) {
break;
}
}
txCallbacks_.erase(txCallbacksForStream);
}
SocketAddress addr("127.0.0.1", 1000);
// some fake data to trigger close behavior.
auto buf = encodeStreamBuffer(
id,
StreamBuffer(IOBuf::create(0), stream->maxOffsetObserved + 1, true));
auto networkData = NetworkData(std::move(buf), Clock::now(), 0);
onNetworkData(addr, std::move(networkData));
}
QuicStreamState* getStream(StreamId id) {
return conn_->streamManager->getStream(id).value_or(nullptr);
}
void setServerConnectionId() {
// create server chosen connId with processId = 0 and workerId = 0
ServerConnectionIdParams params(0, 0, 0);
conn_->serverConnectionId = *connIdAlgo_->encodeConnectionId(params);
}
void driveReadCallbacks() {
getEventBase()->loopOnce();
}
QuicErrorCode getConnectionError() {
return conn_->localConnectionError->code;
}
bool isClosed() const noexcept {
return closeState_ == CloseState::CLOSED;
}
void closeWithoutWrite() {
closeImpl(std::nullopt, false, false);
}
void invokeWriteSocketData() {
CHECK(!writeSocketData().hasError());
}
[[nodiscard]] auto invokeWriteSocketDataReturn() {
return writeSocketData();
}
void invokeProcessCallbacksAfterNetworkData() {
processCallbacksAfterNetworkData();
}
// Simulates the delivery of a Byte Event callback, similar to the way it
// happens in QuicTransportBase::processCallbacksAfterNetworkData() or
// in the runOnEvbAsync lambda in
// QuicTransportBase::registerByteEventCallback()
bool deleteRegisteredByteEvent(
StreamId id,
uint64_t offset,
ByteEventCallback* cb,
ByteEvent::Type type) {
auto& byteEventMap = getByteEventMap(type);
auto streamByteEventCbIt = byteEventMap.find(id);
if (streamByteEventCbIt == byteEventMap.end()) {
return false;
}
auto pos = std::find_if(
streamByteEventCbIt->second.begin(),
streamByteEventCbIt->second.end(),
[offset, cb](const ByteEventDetail& p) {
return ((p.offset == offset) && (p.callback == cb));
});
if (pos == streamByteEventCbIt->second.end()) {
return false;
}
streamByteEventCbIt->second.erase(pos);
return true;
}
void enableDatagram() {
// Note: the RFC says to use 65535 to enable the datagram extension.
// We are using +1 in tests to make sure that we avoid representing this
// value with an uint16
conn_->datagramState.maxReadFrameSize = 65536;
conn_->datagramState.maxReadBufferSize = 10;
}
SocketObserverContainer* getSocketObserverContainer() const override {
return observerContainer_.get();
}
Optional<std::vector<uint8_t>> getExportedKeyingMaterial(
const std::string&,
const Optional<ByteRange>&,
uint16_t) const override {
return std::nullopt;
}
void updateWriteLooper(bool thisIteration, bool /* runInline */ = false) {
QuicTransportBase::updateWriteLooper(thisIteration);
}
void maybeStopWriteLooperAndArmSocketWritableEvent() {
QuicTransportBase::maybeStopWriteLooperAndArmSocketWritableEvent();
}
void closeImpl(
Optional<QuicError> error,
bool drainConnection = true,
bool sendCloseImmediately = true) {
QuicTransportBase::closeImpl(
std::move(error), drainConnection, sendCloseImmediately);
}
void onSocketWritable() noexcept override {
QuicTransportBase::onSocketWritable();
}
QuicServerConnectionState* transportConn;
std::unique_ptr<Aead> aead;
std::unique_ptr<PacketNumberCipher> headerCipher;
std::unique_ptr<ConnectionIdAlgo> connIdAlgo_;
bool transportClosed{false};
PacketNum packetNum_{0};
// Container of observers for the socket / transport.
//
// This member MUST be last in the list of members to ensure it is destroyed
// first, before any other members are destroyed. This ensures that observers
// can inspect any socket / transport state available through public methods
// when destruction of the transport begins.
const std::shared_ptr<SocketObserverContainer> observerContainer_;
};
class QuicTransportImplTest : public Test {
public:
void SetUp() override {
fEvb = std::make_unique<folly::EventBase>();
qEvb = std::make_shared<FollyQuicEventBase>(fEvb.get());
auto socket =
std::make_unique<NiceMock<quic::test::MockAsyncUDPSocket>>(qEvb);
ON_CALL(*socket, setAdditionalCmsgsFunc(_))
.WillByDefault(Return(quic::Expected<void, QuicError>{}));
ON_CALL(*socket, close())
.WillByDefault(Return(quic::Expected<void, QuicError>{}));
ON_CALL(*socket, resumeWrite(_))
.WillByDefault(Return(quic::Expected<void, QuicError>{}));
ON_CALL(*socket, bind(_))
.WillByDefault(Return(quic::Expected<void, QuicError>{}));
ON_CALL(*socket, connect(_))
.WillByDefault(Return(quic::Expected<void, QuicError>{}));
ON_CALL(*socket, setReuseAddr(_))
.WillByDefault(Return(quic::Expected<void, QuicError>{}));
ON_CALL(*socket, setReusePort(_))
.WillByDefault(Return(quic::Expected<void, QuicError>{}));
ON_CALL(*socket, setRecvTos(_))
.WillByDefault(Return(quic::Expected<void, QuicError>{}));
ON_CALL(*socket, getRecvTos()).WillByDefault(Return(false));
ON_CALL(*socket, getGSO()).WillByDefault(Return(0));
ON_CALL(*socket, setCmsgs(_))
.WillByDefault(Return(quic::Expected<void, QuicError>{}));
ON_CALL(*socket, appendCmsgs(_))
.WillByDefault(Return(quic::Expected<void, QuicError>{}));
socketPtr = socket.get();
transport = std::make_shared<TestQuicTransport>(
qEvb, std::move(socket), &connSetupCallback, &connCallback);
auto& conn = *transport->transportConn;
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiLocal =
kDefaultStreamFlowControlWindow;
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetBidiRemote =
kDefaultStreamFlowControlWindow;
conn.flowControlState.peerAdvertisedInitialMaxStreamOffsetUni =
kDefaultStreamFlowControlWindow;
conn.flowControlState.peerAdvertisedMaxOffset =
kDefaultConnectionFlowControlWindow;
CHECK(
!conn.streamManager
->setMaxLocalBidirectionalStreams(kDefaultMaxStreamsBidirectional)
.hasError());
CHECK(!conn.streamManager
->setMaxLocalUnidirectionalStreams(
kDefaultMaxStreamsUnidirectional)
.hasError());
maybeSetNotifyOnNewStreamsExplicitly();
}
virtual void maybeSetNotifyOnNewStreamsExplicitly() {}
auto getTxMatcher(StreamId id, uint64_t offset) {
return MockByteEventCallback::getTxMatcher(id, offset);
}
auto getAckMatcher(StreamId id, uint64_t offset) {
return MockByteEventCallback::getAckMatcher(id, offset);
}
protected:
std::unique_ptr<folly::EventBase> fEvb;
std::shared_ptr<FollyQuicEventBase> qEvb;
NiceMock<MockConnectionSetupCallback> connSetupCallback;
NiceMock<MockConnectionCallback> connCallback;
TestByteEventCallback byteEventCallback;
std::shared_ptr<TestQuicTransport> transport;
quic::test::MockAsyncUDPSocket* socketPtr;
};
class QuicTransportImplTestClose : public QuicTransportImplTest,
public testing::WithParamInterface<bool> {};
INSTANTIATE_TEST_SUITE_P(
QuicTransportImplTest,
QuicTransportImplTestClose,
Values(true, false));
struct DelayedStreamNotifsTestParam {
bool notifyOnNewStreamsExplicitly;
};
class QuicTransportImplTestBase
: public QuicTransportImplTest,
public WithParamInterface<DelayedStreamNotifsTestParam> {
void maybeSetNotifyOnNewStreamsExplicitly() override {
auto transportSettings = transport->getTransportSettings();
transportSettings.notifyOnNewStreamsExplicitly =
GetParam().notifyOnNewStreamsExplicitly;
transport->setTransportSettings(transportSettings);
}
};
INSTANTIATE_TEST_SUITE_P(
QuicTransportImplTestBase,
QuicTransportImplTestBase,
::testing::Values(
DelayedStreamNotifsTestParam{false},
DelayedStreamNotifsTestParam{true}));
TEST_P(QuicTransportImplTestBase, AckTimeoutExpiredWillResetTimeoutFlag) {
transport->invokeAckTimeout();
EXPECT_FALSE(transport->transportConn->pendingEvents.scheduleAckTimeout);
}
TEST_P(QuicTransportImplTestBase, IdleTimeoutExpiredDestroysTransport) {
EXPECT_CALL(connSetupCallback, onConnectionSetupError(_))
.WillOnce(Invoke([&](auto) { transport = nullptr; }));
transport->invokeIdleTimeout();
}
TEST_P(QuicTransportImplTestBase, DelayConnCallback) {
ASSERT_FALSE(transport->transportConn->streamManager
->setMaxLocalBidirectionalStreams(0, /*force=*/true)
.hasError());
transport->setConnectionCallback(nullptr);
transport->addMaxStreamsFrame(
MaxStreamsFrame(10, /*isBidirectionalIn=*/true));
transport->setConnectionCallback(&connCallback);
EXPECT_CALL(connCallback, onBidirectionalStreamsAvailable(_))
.WillOnce(Invoke([](uint64_t numAvailableStreams) {
EXPECT_EQ(numAvailableStreams, 10);
}));
transport->getEventBase()->loopOnce();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, IdleTimeoutStreamMaessage) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
auto stream3 = transport->createUnidirectionalStream().value();
transport->setControlStream(stream3);
NiceMock<MockReadCallback> readCb1;
NiceMock<MockReadCallback> readCb2;
ASSERT_TRUE(transport->setReadCallback(stream1, &readCb1).has_value());
ASSERT_TRUE(transport->setReadCallback(stream2, &readCb2).has_value());
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addDataToStream(
stream2,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
EXPECT_CALL(readCb1, readError(stream1, _))
.Times(1)
.WillOnce(Invoke([](auto, auto error) {
EXPECT_EQ("Idle timeout, num non control streams: 2", error.message);
}));
transport->invokeIdleTimeout();
}
TEST_P(QuicTransportImplTestBase, StopSendingClosesIngress) {
// update transport settings
auto transportSettings = transport->getTransportSettings();
transportSettings.dropIngressOnStopSending = true;
transport->setTransportSettings(transportSettings);
auto& streamManager = *transport->transportConn->streamManager;
auto unknownErrorCode = GenericApplicationErrorCode::UNKNOWN;
std::string ingressData = "some ingress stream data";
auto ingressDataLen = ingressData.size();
StreamId streamID;
QuicStreamState* stream;
// create bidirectional stream
streamID = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> readCb1;
ASSERT_TRUE(transport->setReadCallback(streamID, &readCb1).has_value());
// add ingress & egress data to stream
transport->addDataToStream(
streamID, StreamBuffer(folly::IOBuf::copyBuffer(ingressData), 0));
ASSERT_TRUE(transport
->writeChain(
streamID,
folly::IOBuf::copyBuffer("some egress stream data"),
false)
.has_value());
transport->driveReadCallbacks();
stream = CHECK_NOTNULL(transport->getStream(streamID));
// check stream has readable data and SM is open
EXPECT_TRUE(stream->hasReadableData());
EXPECT_EQ(stream->sendState, StreamSendState::Open);
EXPECT_EQ(stream->recvState, StreamRecvState::Open);
// send stop sending to peer this and later invoking reset stream should not
// invoke ReadCallback::readError()
EXPECT_CALL(readCb1, readError(streamID, _)).Times(0);
ASSERT_TRUE(
transport->stopSending(streamID, GenericApplicationErrorCode::NO_ERROR)
.has_value());
// check that we've discarded any ingress data and ingress SM is closed
EXPECT_FALSE(stream->hasReadableData());
EXPECT_FALSE(streamManager.readableStreams().contains(streamID));
EXPECT_EQ(stream->sendState, StreamSendState::Open);
EXPECT_EQ(stream->recvState, StreamRecvState::Closed);
// suppose we tx a rst stream (and rx its corresponding ack), expect
// terminal state and queued in closed streams
ASSERT_TRUE(
transport->resetStream(streamID, GenericApplicationErrorCode::NO_ERROR)
.has_value());
ASSERT_FALSE(sendRstAckSMHandler(*stream, std::nullopt).hasError());
EXPECT_TRUE(stream->inTerminalStates());
EXPECT_TRUE(streamManager.closedStreams().contains(streamID));
transport->driveReadCallbacks();
// now if we rx a rst_stream we should deliver ReadCallback::readError()
EXPECT_TRUE(streamManager.streamExists(streamID));
EXPECT_CALL(readCb1, readError(streamID, QuicError(unknownErrorCode)))
.Times(1);
ASSERT_FALSE(
receiveRstStreamSMHandler(
*stream, RstStreamFrame(streamID, unknownErrorCode, ingressDataLen))
.hasError());
transport->readLooper()->runLoopCallback();
// same test as above, but we tx a rst stream first followed by send stop
// sending second to validate that .stopSending() queues stream to be closed
NiceMock<MockReadCallback> readCb2;
streamID = transport->createBidirectionalStream().value();
ASSERT_FALSE(transport->setReadCallback(streamID, &readCb2).hasError());
// add ingress & egress data to new stream
transport->addDataToStream(
streamID, StreamBuffer(folly::IOBuf::copyBuffer(ingressData), 0));
ASSERT_FALSE(transport
->writeChain(
streamID,
folly::IOBuf::copyBuffer("some egress stream data"),
false)
.hasError());
transport->driveReadCallbacks();
stream = CHECK_NOTNULL(transport->getStream(streamID));
// check stream has readable data and SM is open
EXPECT_TRUE(stream->hasReadableData());
EXPECT_EQ(stream->sendState, StreamSendState::Open);
EXPECT_EQ(stream->recvState, StreamRecvState::Open);
// suppose we tx a rst stream (and rx its corresponding ack)
ASSERT_FALSE(
transport->resetStream(streamID, GenericApplicationErrorCode::NO_ERROR)
.hasError());
ASSERT_FALSE(sendRstAckSMHandler(*stream, std::nullopt).hasError());
EXPECT_EQ(stream->sendState, StreamSendState::Closed);
EXPECT_EQ(stream->recvState, StreamRecvState::Open);
transport->driveReadCallbacks();
// send stop sending to peer does not deliver an error to the read callback
// even tho the stream is in terminal state and queued for closing
EXPECT_CALL(readCb2, readError(streamID, _)).Times(0);
ASSERT_FALSE(
transport->stopSending(streamID, GenericApplicationErrorCode::NO_ERROR)
.hasError());
// check that we've discarded any ingress data and ingress SM is closed,
// expect terminal state and queued in closed streams
EXPECT_FALSE(stream->hasReadableData());
EXPECT_FALSE(streamManager.readableStreams().contains(streamID));
EXPECT_TRUE(stream->inTerminalStates());
EXPECT_TRUE(streamManager.closedStreams().contains(streamID));
// we need to rx a rst stream before queue stream to be closed to allow
// delivering callback to application
EXPECT_CALL(readCb2, readError(streamID, QuicError(unknownErrorCode)))
.Times(1);
ASSERT_FALSE(
receiveRstStreamSMHandler(
*stream, RstStreamFrame(streamID, unknownErrorCode, ingressDataLen))
.hasError());
EXPECT_TRUE(stream->inTerminalStates());
EXPECT_TRUE(streamManager.closedStreams().contains(streamID));
transport->readLooper()->runLoopCallback();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, NoopStopSendingIngressClosed) {
// create bidi stream
auto streamID = transport->createBidirectionalStream().value();
auto* stream = CHECK_NOTNULL(transport->getStream(streamID));
EXPECT_EQ(stream->sendState, StreamSendState::Open);
EXPECT_EQ(stream->recvState, StreamRecvState::Open);
// suppose we rx a reset from peer which closes our ingress SM
ASSERT_FALSE(
receiveRstStreamSMHandler(
*stream,
RstStreamFrame(stream->id, GenericApplicationErrorCode::NO_ERROR, 0))
.hasError());
EXPECT_EQ(stream->sendState, StreamSendState::Open);
EXPECT_EQ(stream->recvState, StreamRecvState::Closed);
// send stop sending to peer should no-op
ASSERT_FALSE(
transport->stopSending(streamID, GenericApplicationErrorCode::NO_ERROR)
.hasError());
EXPECT_EQ(transport->transportConn->pendingEvents.frames.size(), 0);
// now test ingress uni-directional stream
auto& streamManager = *transport->transportConn->streamManager;
auto nextPeerUniStream =
streamManager.nextAcceptablePeerUnidirectionalStreamId();
EXPECT_TRUE(nextPeerUniStream.has_value());
auto streamResult = streamManager.getStream(*nextPeerUniStream);
ASSERT_FALSE(streamResult.hasError());
stream = streamResult.value();
EXPECT_EQ(stream->sendState, StreamSendState::Invalid);
EXPECT_EQ(stream->recvState, StreamRecvState::Open);
// suppose we rx a reset from peer which closes our ingress SM
ASSERT_FALSE(
receiveRstStreamSMHandler(
*stream,
RstStreamFrame(stream->id, GenericApplicationErrorCode::NO_ERROR, 0))
.hasError());
EXPECT_EQ(stream->sendState, StreamSendState::Invalid);
EXPECT_EQ(stream->recvState, StreamRecvState::Closed);
EXPECT_TRUE(stream->inTerminalStates());
// send stop sending to peer should no-op
ASSERT_FALSE(
transport->stopSending(stream->id, GenericApplicationErrorCode::NO_ERROR)
.hasError());
EXPECT_EQ(transport->transportConn->pendingEvents.frames.size(), 0);
transport.reset();
}
TEST_P(QuicTransportImplTestBase, WriteAckPacketUnsetsLooper) {
// start looper in running state first
transport->writeLooper()->run(true);
// Write data which will be acked immediately.
PacketNum packetSeen = 10;
bool pktHasRetransmittableData = true;
bool pktHasCryptoData = true;
updateAckState(
*transport->transportConn,
PacketNumberSpace::Initial,
packetSeen,
pktHasRetransmittableData,
pktHasCryptoData,
Clock::now());
ASSERT_TRUE(transport->transportConn->ackStates.initialAckState
->needsToSendAckImmediately);
// Trigger the loop callback. This will trigger writes and we assume this will
// write the acks since we have nothing else to write.
transport->writeLooper()->runLoopCallback();
EXPECT_FALSE(transport->transportConn->pendingEvents.scheduleAckTimeout);
EXPECT_FALSE(transport->writeLooper()->isLoopCallbackScheduled());
}
TEST_P(QuicTransportImplTestBase, ReadCallbackDataAvailable) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StreamId stream3 = 0x6;
NiceMock<MockReadCallback> readCb1;
NiceMock<MockReadCallback> readCb2;
NiceMock<MockReadCallback> readCb3;
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
ASSERT_FALSE(transport->setReadCallback(stream2, &readCb2).hasError());
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addDataToStream(
stream2,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
transport->addDataToStream(
stream3, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
ASSERT_FALSE(transport->setReadCallback(stream3, &readCb3).hasError());
EXPECT_CALL(readCb1, readAvailable(stream1));
EXPECT_CALL(readCb3, readAvailable(stream3));
transport->driveReadCallbacks();
transport->addDataToStream(
stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(readCb1, readAvailable(stream1));
EXPECT_CALL(readCb2, readAvailable(stream2));
EXPECT_CALL(readCb3, readAvailable(stream3));
transport->driveReadCallbacks();
EXPECT_CALL(readCb1, readAvailable(stream1));
EXPECT_CALL(readCb2, readAvailable(stream2));
EXPECT_CALL(readCb3, readAvailable(stream3));
transport->driveReadCallbacks();
EXPECT_CALL(readCb2, readAvailable(stream2));
EXPECT_CALL(readCb3, readAvailable(stream3));
ASSERT_FALSE(transport->setReadCallback(stream1, nullptr).hasError());
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReliableResetReadCallback) {
auto stream = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> readCb;
ASSERT_FALSE(transport->setReadCallback(stream, &readCb).hasError());
transport->addDataToStream(
stream,
StreamBuffer(
folly::IOBuf::copyBuffer("this string has 29 characters"), 0));
EXPECT_CALL(readCb, readAvailable(stream));
transport->driveReadCallbacks();
// Simulate receiving a reliable reset with a reliableSize of 29
ASSERT_FALSE(
receiveRstStreamSMHandler(
*transport->getStream(stream),
RstStreamFrame(stream, GenericApplicationErrorCode::UNKNOWN, 100, 29))
.hasError());
// The application hasn't yet read all of the reliable data, so we
// shouldn't fire the readError callback yet.
EXPECT_CALL(readCb, readAvailable(stream));
transport->driveReadCallbacks();
ASSERT_FALSE(transport->read(stream, 29).hasError());
// The application has yet read all of the reliable data, so we should fire
// the readError callback.
EXPECT_CALL(
readCb, readError(stream, IsError(GenericApplicationErrorCode::UNKNOWN)));
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(
QuicTransportImplTestBase,
ReadCallbackDataAvailableWithUnidirPrioritized) {
InSequence seq;
auto transportSettings = transport->getTransportSettings();
transportSettings.unidirectionalStreamsReadCallbacksFirst = true;
transport->setTransportSettings(transportSettings);
auto& streamManager = *transport->transportConn->streamManager;
auto nextPeerUniStream =
streamManager.nextAcceptablePeerUnidirectionalStreamId();
EXPECT_TRUE(nextPeerUniStream.has_value());
auto qpackStreamResult = streamManager.getStream(*nextPeerUniStream);
ASSERT_FALSE(qpackStreamResult.hasError());
StreamId qpackStream = qpackStreamResult.value()->id;
auto requestStream = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> requestStreamCb;
NiceMock<MockReadCallback> qpackStreamCb;
ASSERT_FALSE(
transport->setReadCallback(requestStream, &requestStreamCb).hasError());
ASSERT_FALSE(
transport->setReadCallback(qpackStream, &qpackStreamCb).hasError());
transport->addDataToStream(
qpackStream,
StreamBuffer(
folly::IOBuf::copyBuffer(
"and i'm qpack data i will block you no tomorrow"),
0));
transport->addDataToStream(
requestStream, StreamBuffer(folly::IOBuf::copyBuffer("i'm headers"), 0));
bool qpackCbCalled = false;
EXPECT_CALL(qpackStreamCb, readAvailable(qpackStream))
.WillOnce(Invoke([&](StreamId) {
EXPECT_FALSE(qpackCbCalled);
qpackCbCalled = true;
}));
EXPECT_CALL(requestStreamCb, readAvailable(requestStream))
.WillOnce(Invoke([&](StreamId) { EXPECT_TRUE(qpackCbCalled); }));
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadCallbackDataAvailableNoReap) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StreamId stream3 = 0x6;
NiceMock<MockReadCallback> readCb1;
NiceMock<MockReadCallback> readCb2;
NiceMock<MockReadCallback> readCb3;
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
ASSERT_FALSE(transport->setReadCallback(stream2, &readCb2).hasError());
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addDataToStream(
stream2,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
transport->addDataToStream(
stream3, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(readCb1, readAvailable(stream1));
transport->driveReadCallbacks();
ASSERT_FALSE(transport->setReadCallback(stream3, &readCb3).hasError());
transport->addDataToStream(
stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(readCb1, readAvailable(stream1));
EXPECT_CALL(readCb2, readAvailable(stream2));
EXPECT_CALL(readCb3, readAvailable(stream3));
transport->driveReadCallbacks();
EXPECT_CALL(readCb1, readAvailable(stream1));
EXPECT_CALL(readCb2, readAvailable(stream2));
EXPECT_CALL(readCb3, readAvailable(stream3));
transport->driveReadCallbacks();
EXPECT_CALL(readCb2, readAvailable(stream2));
EXPECT_CALL(readCb3, readAvailable(stream3));
ASSERT_FALSE(transport->setReadCallback(stream1, nullptr).hasError());
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadCallbackDataAvailableOrdered) {
auto transportSettings = transport->getTransportSettings();
transportSettings.orderedReadCallbacks = true;
transport->setTransportSettings(transportSettings);
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StreamId stream3 = 0x6;
InSequence s;
NiceMock<MockReadCallback> readCb1;
NiceMock<MockReadCallback> readCb2;
NiceMock<MockReadCallback> readCb3;
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
ASSERT_FALSE(transport->setReadCallback(stream2, &readCb2).hasError());
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addDataToStream(
stream2,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
transport->addDataToStream(
stream3, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
ASSERT_FALSE(transport->setReadCallback(stream3, &readCb3).hasError());
EXPECT_CALL(readCb1, readAvailable(stream1));
EXPECT_CALL(readCb3, readAvailable(stream3));
transport->driveReadCallbacks();
transport->addDataToStream(
stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(readCb1, readAvailable(stream1));
EXPECT_CALL(readCb2, readAvailable(stream2));
EXPECT_CALL(readCb3, readAvailable(stream3));
transport->driveReadCallbacks();
EXPECT_CALL(readCb1, readAvailable(stream1));
EXPECT_CALL(readCb2, readAvailable(stream2));
EXPECT_CALL(readCb3, readAvailable(stream3));
transport->driveReadCallbacks();
EXPECT_CALL(readCb2, readAvailable(stream2));
EXPECT_CALL(readCb3, readAvailable(stream3));
ASSERT_FALSE(transport->setReadCallback(stream1, nullptr).hasError());
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadCallbackChangeReadCallback) {
auto stream1 = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> readCb1;
NiceMock<MockReadCallback> readCb2;
EXPECT_TRUE(transport->setReadCallback(stream1, nullptr).hasError());
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(readCb1, readAvailable(stream1));
transport->driveReadCallbacks();
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb2).hasError());
EXPECT_CALL(readCb2, readAvailable(stream1));
transport->driveReadCallbacks();
auto& conn = transport->getConnectionState();
EXPECT_EQ(conn.pendingEvents.frames.size(), 0);
ASSERT_FALSE(transport->setReadCallback(stream1, nullptr).hasError());
EXPECT_EQ(conn.pendingEvents.frames.size(), 1);
EXPECT_CALL(readCb2, readAvailable(_)).Times(0);
transport->driveReadCallbacks();
EXPECT_TRUE(transport->setReadCallback(stream1, &readCb2).hasError());
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadCallbackUnsetAll) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> readCb1;
NiceMock<MockReadCallback> readCb2;
// Set the read callbacks, and then add data to the stream, and see that the
// callbacks are, in fact, called.
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
ASSERT_FALSE(transport->setReadCallback(stream2, &readCb2).hasError());
EXPECT_CALL(readCb1, readAvailable(stream1));
EXPECT_CALL(readCb2, readAvailable(stream2));
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addDataToStream(
stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->driveReadCallbacks();
// Unset all of the read callbacks, then add data to the stream, and see that
// the read callbacks are not called.
transport->unsetAllReadCallbacks();
EXPECT_CALL(readCb1, readAvailable(stream1)).Times(0);
EXPECT_CALL(readCb2, readAvailable(stream2)).Times(0);
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addDataToStream(
stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadCallbackPauseResume) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> readCb1;
NiceMock<MockReadCallback> readCb2;
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
ASSERT_FALSE(transport->setReadCallback(stream2, &readCb2).hasError());
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addDataToStream(
stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
auto res = transport->pauseRead(stream1);
EXPECT_TRUE(res);
EXPECT_CALL(readCb1, readAvailable(stream1)).Times(0);
EXPECT_CALL(readCb2, readAvailable(stream2));
transport->driveReadCallbacks();
res = transport->resumeRead(stream1);
EXPECT_TRUE(res);
res = transport->pauseRead(stream2);
EXPECT_CALL(readCb1, readAvailable(stream1));
EXPECT_CALL(readCb2, readAvailable(stream2)).Times(0);
transport->driveReadCallbacks();
auto stream3 = transport->createBidirectionalStream().value();
res = transport->pauseRead(stream3);
EXPECT_FALSE(res);
EXPECT_EQ(LocalErrorCode::APP_ERROR, res.error());
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadCallbackNoCallbackSet) {
auto stream1 = transport->createBidirectionalStream().value();
transport->addDataToStream(
stream1,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
transport->driveReadCallbacks();
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadCallbackInvalidStream) {
NiceMock<MockReadCallback> readCb1;
StreamId invalidStream = 10;
EXPECT_TRUE(transport->setReadCallback(invalidStream, &readCb1).hasError());
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadData) {
auto stream1 = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> readCb1;
auto readData = folly::IOBuf::copyBuffer("actual stream data");
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
EXPECT_CALL(readCb1, readAvailable(stream1));
transport->addDataToStream(stream1, StreamBuffer(readData->clone(), 0));
transport->driveReadCallbacks();
{
auto readResult = transport->read(stream1, 10);
ASSERT_TRUE(readResult.has_value());
auto data = std::move(readResult).value();
IOBufEqualTo eq;
auto expected = readData->clone();
expected->trimEnd(expected->length() - 10);
EXPECT_TRUE(eq(*data.first, *expected));
}
EXPECT_CALL(readCb1, readAvailable(stream1));
transport->driveReadCallbacks();
{
auto readResult = transport->read(stream1, 100);
ASSERT_TRUE(readResult.has_value());
auto data = std::move(readResult).value();
IOBufEqualTo eq;
auto expected = readData->clone();
expected->trimStart(10);
EXPECT_TRUE(eq(*data.first, *expected));
}
transport->driveReadCallbacks();
transport.reset();
}
// TODO The finest copypasta around. We need a better story for parameterizing
// unidirectional vs. bidirectional.
TEST_P(QuicTransportImplTestBase, UnidirectionalReadData) {
auto stream1 = 0x6;
NiceMock<MockReadCallback> readCb1;
auto readData = folly::IOBuf::copyBuffer("actual stream data");
transport->addDataToStream(stream1, StreamBuffer(readData->clone(), 0));
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
EXPECT_CALL(readCb1, readAvailable(stream1));
transport->driveReadCallbacks();
{
auto readResult = transport->read(stream1, 10);
ASSERT_TRUE(readResult.has_value());
auto data = std::move(readResult).value();
IOBufEqualTo eq;
auto expected = readData->clone();
expected->trimEnd(expected->length() - 10);
EXPECT_TRUE(eq(*data.first, *expected));
}
EXPECT_CALL(readCb1, readAvailable(stream1));
transport->driveReadCallbacks();
{
auto readResult2 = transport->read(stream1, 100);
ASSERT_TRUE(readResult2.has_value());
auto data = std::move(readResult2).value();
IOBufEqualTo eq;
auto expected = readData->clone();
expected->trimStart(10);
EXPECT_TRUE(eq(*data.first, *expected));
}
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadDataUnsetReadCallbackInCallback) {
auto stream1 = transport->createBidirectionalStream().value();
auto readData = folly::IOBuf::copyBuffer("actual stream data");
NiceMock<MockReadCallback> readCb1;
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
transport->addDataToStream(stream1, StreamBuffer(readData->clone(), 0, true));
EXPECT_CALL(readCb1, readAvailable(stream1))
.WillOnce(Invoke([&](StreamId id) {
ASSERT_FALSE(transport->setReadCallback(id, nullptr).hasError());
}));
transport->driveReadCallbacks();
transport->driveReadCallbacks();
transport->getEventBase()->loop();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadDataNoCallback) {
auto stream1 = transport->createBidirectionalStream().value();
auto readData = folly::IOBuf::copyBuffer("actual stream data");
transport->addDataToStream(stream1, StreamBuffer(readData->clone(), 0, true));
transport->driveReadCallbacks();
{
auto readResult = transport->read(stream1, 100);
ASSERT_TRUE(readResult.has_value());
auto data = std::move(readResult).value();
IOBufEqualTo eq;
EXPECT_TRUE(eq(*data.first, *readData));
EXPECT_TRUE(data.second);
}
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadCallbackForClientOutOfOrderStream) {
auto const notifyOnNewStreamsExplicitly =
transport->getTransportSettings().notifyOnNewStreamsExplicitly;
InSequence dummy;
StreamId clientOutOfOrderStream = 96;
StreamId clientOutOfOrderStream2 = 76;
auto readData = folly::IOBuf::copyBuffer("actual stream data");
NiceMock<MockReadCallback> streamRead;
if (notifyOnNewStreamsExplicitly) {
EXPECT_CALL(connCallback, onNewBidirectionalStream(clientOutOfOrderStream))
.WillOnce(Invoke([&](StreamId id) {
ASSERT_FALSE(transport->setReadCallback(id, &streamRead).hasError());
}));
} else {
for (StreamId start = 0x00; start <= clientOutOfOrderStream;
start += kStreamIncrement) {
EXPECT_CALL(connCallback, onNewBidirectionalStream(start))
.WillOnce(Invoke([&](StreamId id) {
ASSERT_FALSE(
transport->setReadCallback(id, &streamRead).hasError());
}));
}
}
EXPECT_CALL(streamRead, readAvailable(clientOutOfOrderStream))
.WillOnce(Invoke([&](StreamId id) {
auto readResult = transport->read(id, 100);
ASSERT_TRUE(readResult.has_value());
auto data = std::move(readResult).value();
IOBufEqualTo eq;
EXPECT_TRUE(eq(*data.first, *readData));
EXPECT_TRUE(data.second);
}));
transport->addDataToStream(
clientOutOfOrderStream, StreamBuffer(readData->clone(), 0, true));
transport->driveReadCallbacks();
if (notifyOnNewStreamsExplicitly) {
EXPECT_CALL(connCallback, onNewBidirectionalStream(clientOutOfOrderStream2))
.WillOnce(Invoke([&](StreamId id) {
ASSERT_FALSE(transport->setReadCallback(id, &streamRead).hasError());
}));
}
transport->addDataToStream(
clientOutOfOrderStream2, StreamBuffer(readData->clone(), 0, true));
EXPECT_CALL(streamRead, readAvailable(clientOutOfOrderStream2))
.WillOnce(Invoke([&](StreamId id) {
auto readResult = transport->read(id, 100);
ASSERT_TRUE(readResult.has_value());
auto data = std::move(readResult).value();
IOBufEqualTo eq;
EXPECT_TRUE(eq(*data.first, *readData));
EXPECT_TRUE(data.second);
}));
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadDataInvalidStream) {
StreamId invalidStream = 10;
auto readResult = transport->read(invalidStream, 100);
EXPECT_FALSE(readResult.has_value());
EXPECT_EQ(LocalErrorCode::STREAM_NOT_EXISTS, readResult.error());
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadError) {
auto stream1 = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> readCb1;
auto readData = folly::IOBuf::copyBuffer("actual stream data");
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
EXPECT_CALL(
readCb1, readError(stream1, IsError(LocalErrorCode::STREAM_CLOSED)));
transport->addStreamReadError(stream1, LocalErrorCode::STREAM_CLOSED);
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ReadCallbackDeleteTransport) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> readCb1;
NiceMock<MockReadCallback> readCb2;
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
ASSERT_FALSE(transport->setReadCallback(stream2, &readCb2).hasError());
transport->addStreamReadError(stream1, LocalErrorCode::NO_ERROR);
transport->addDataToStream(
stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(readCb1, readError(stream1, _)).WillOnce(Invoke([&](auto, auto) {
transport.reset();
}));
EXPECT_CALL(readCb2, readAvailable(stream2));
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, onNewBidirectionalStreamCallback) {
auto const notifyOnNewStreamsExplicitly =
transport->getTransportSettings().notifyOnNewStreamsExplicitly;
auto readData = folly::IOBuf::copyBuffer("actual stream data");
StreamId stream2 = 0x00;
EXPECT_CALL(connCallback, onNewBidirectionalStream(stream2));
transport->addDataToStream(stream2, StreamBuffer(readData->clone(), 0, true));
StreamId stream3 = 0x04;
EXPECT_CALL(connCallback, onNewBidirectionalStream(stream3));
transport->addDataToStream(stream3, StreamBuffer(readData->clone(), 0, true));
StreamId uniStream3 = 0xa;
if (!notifyOnNewStreamsExplicitly) {
EXPECT_CALL(
connCallback,
onNewUnidirectionalStream(uniStream3 - 2 * kStreamIncrement));
EXPECT_CALL(
connCallback, onNewUnidirectionalStream(uniStream3 - kStreamIncrement));
}
EXPECT_CALL(connCallback, onNewUnidirectionalStream(uniStream3));
transport->addDataToStream(
uniStream3, StreamBuffer(readData->clone(), 0, true));
transport.reset();
}
TEST_P(QuicTransportImplTestBase, onNewStreamCallbackDoesNotRemove) {
auto readData = folly::IOBuf::copyBuffer("actual stream data");
StreamId uniStream1 = 2;
StreamId uniStream2 = uniStream1 + kStreamIncrement;
EXPECT_CALL(connCallback, onNewUnidirectionalStream(uniStream1))
.WillOnce(Invoke([&](StreamId id) {
ASSERT_FALSE(transport->read(id, 100).hasError());
}));
EXPECT_CALL(connCallback, onNewUnidirectionalStream(uniStream2))
.WillOnce(Invoke([&](StreamId id) {
ASSERT_FALSE(transport->read(id, 100).hasError());
}));
transport->addDataToStream(
uniStream1, StreamBuffer(readData->clone(), 0, true));
transport->addDataToStream(
uniStream2, StreamBuffer(readData->clone(), 0, true));
transport.reset();
}
TEST_P(QuicTransportImplTestBase, onNewBidirectionalStreamStreamOutOfOrder) {
InSequence dummy;
auto readData = folly::IOBuf::copyBuffer("actual stream data");
StreamId biStream1 = 28;
StreamId uniStream1 = 30;
auto const notifyOnNewStreamsExplicitly =
transport->getTransportSettings().notifyOnNewStreamsExplicitly;
if (notifyOnNewStreamsExplicitly) {
EXPECT_CALL(connCallback, onNewBidirectionalStream(biStream1));
EXPECT_CALL(connCallback, onNewUnidirectionalStream(uniStream1));
} else {
for (StreamId id = 0x00; id <= biStream1; id += kStreamIncrement) {
EXPECT_CALL(connCallback, onNewBidirectionalStream(id));
}
for (StreamId id = 0x02; id <= uniStream1; id += kStreamIncrement) {
EXPECT_CALL(connCallback, onNewUnidirectionalStream(id));
}
}
transport->addDataToStream(
biStream1, StreamBuffer(readData->clone(), 0, true));
transport->addDataToStream(
uniStream1, StreamBuffer(readData->clone(), 0, true));
StreamId biStream2 = 56;
StreamId uniStream2 = 38;
if (notifyOnNewStreamsExplicitly) {
EXPECT_CALL(connCallback, onNewBidirectionalStream(biStream2));
EXPECT_CALL(connCallback, onNewUnidirectionalStream(uniStream2));
} else {
for (StreamId id = biStream1 + kStreamIncrement; id <= biStream2;
id += kStreamIncrement) {
EXPECT_CALL(connCallback, onNewBidirectionalStream(id));
}
for (StreamId id = uniStream1 + kStreamIncrement; id <= uniStream2;
id += kStreamIncrement) {
EXPECT_CALL(connCallback, onNewUnidirectionalStream(id));
}
}
transport->addDataToStream(
biStream2, StreamBuffer(readData->clone(), 0, true));
transport->addDataToStream(
uniStream2, StreamBuffer(readData->clone(), 0, true));
transport.reset();
}
TEST_P(QuicTransportImplTestBase, onNewBidirectionalStreamSetReadCallback) {
auto const notifyOnNewStreamsExplicitly =
transport->getTransportSettings().notifyOnNewStreamsExplicitly;
InSequence dummy;
auto readData = folly::IOBuf::copyBuffer("actual stream data");
transport->addCryptoData(StreamBuffer(readData->clone(), 0, true));
NiceMock<MockReadCallback> stream2Read;
StreamId stream2 = 0x00;
EXPECT_CALL(connCallback, onNewBidirectionalStream(stream2))
.WillOnce(Invoke([&](StreamId id) {
ASSERT_FALSE(transport->setReadCallback(id, &stream2Read).hasError());
}));
transport->addDataToStream(stream2, StreamBuffer(readData->clone(), 0, true));
StreamId stream3 = 0x10;
NiceMock<MockReadCallback> streamRead;
if (notifyOnNewStreamsExplicitly) {
EXPECT_CALL(connCallback, onNewBidirectionalStream(stream3))
.WillOnce(Invoke([&](StreamId id) {
ASSERT_FALSE(transport->setReadCallback(id, &streamRead).hasError());
}));
} else {
for (StreamId start = stream2 + kStreamIncrement; start <= stream3;
start += kStreamIncrement) {
EXPECT_CALL(connCallback, onNewBidirectionalStream(start))
.WillOnce(Invoke([&](StreamId id) {
ASSERT_FALSE(
transport->setReadCallback(id, &streamRead).hasError());
}));
}
}
transport->addDataToStream(stream3, StreamBuffer(readData->clone(), 0, true));
qEvb->loopOnce();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, OnInvalidServerStream) {
EXPECT_CALL(
connSetupCallback,
onConnectionSetupError(IsError(TransportErrorCode::STREAM_STATE_ERROR)));
auto readData = folly::IOBuf::copyBuffer("actual stream data");
StreamId stream1 = 29;
transport->addDataToStream(stream1, StreamBuffer(readData->clone(), 0, true));
EXPECT_TRUE(transport->isClosed());
EXPECT_EQ(
transport->getConnectionError(),
QuicErrorCode(TransportErrorCode::STREAM_STATE_ERROR));
transport.reset();
}
TEST_P(QuicTransportImplTestBase, CreateStream) {
auto streamId = transport->createBidirectionalStream().value();
auto streamId2 = transport->createBidirectionalStream().value();
auto streamId3 = transport->createBidirectionalStream().value();
auto streamId4 = transport->createBidirectionalStream().value();
EXPECT_EQ(streamId2, streamId + kStreamIncrement);
EXPECT_EQ(streamId3, streamId2 + kStreamIncrement);
EXPECT_EQ(streamId4, streamId3 + kStreamIncrement);
transport.reset();
}
TEST_P(QuicTransportImplTestBase, CreateUnidirectionalStream) {
auto streamId = transport->createUnidirectionalStream().value();
auto streamId2 = transport->createUnidirectionalStream().value();
auto streamId3 = transport->createUnidirectionalStream().value();
auto streamId4 = transport->createUnidirectionalStream().value();
EXPECT_EQ(streamId2, streamId + kStreamIncrement);
EXPECT_EQ(streamId3, streamId2 + kStreamIncrement);
EXPECT_EQ(streamId4, streamId3 + kStreamIncrement);
transport.reset();
}
TEST_P(QuicTransportImplTestBase, CreateBothStream) {
auto uniStreamId = transport->createUnidirectionalStream().value();
auto biStreamId = transport->createBidirectionalStream().value();
auto uniStreamId2 = transport->createUnidirectionalStream().value();
auto biStreamId2 = transport->createBidirectionalStream().value();
auto uniStreamId3 = transport->createUnidirectionalStream().value();
auto biStreamId3 = transport->createBidirectionalStream().value();
auto uniStreamId4 = transport->createUnidirectionalStream().value();
auto biStreamId4 = transport->createBidirectionalStream().value();
EXPECT_EQ(uniStreamId2, uniStreamId + kStreamIncrement);
EXPECT_EQ(uniStreamId3, uniStreamId2 + kStreamIncrement);
EXPECT_EQ(uniStreamId4, uniStreamId3 + kStreamIncrement);
EXPECT_EQ(biStreamId2, biStreamId + kStreamIncrement);
EXPECT_EQ(biStreamId3, biStreamId2 + kStreamIncrement);
EXPECT_EQ(biStreamId4, biStreamId3 + kStreamIncrement);
transport.reset();
}
TEST_P(QuicTransportImplTestBase, CreateStreamLimitsBidirectionalZero) {
ASSERT_FALSE(transport->transportConn->streamManager
->setMaxLocalBidirectionalStreams(0, true)
.hasError());
EXPECT_EQ(transport->getNumOpenableBidirectionalStreams(), 0);
auto result = transport->createBidirectionalStream();
ASSERT_FALSE(result);
EXPECT_EQ(result.error(), LocalErrorCode::STREAM_LIMIT_EXCEEDED);
result = transport->createUnidirectionalStream();
EXPECT_TRUE(result);
transport.reset();
}
TEST_P(QuicTransportImplTestBase, CreateStreamLimitsUnidirectionalZero) {
ASSERT_FALSE(transport->transportConn->streamManager
->setMaxLocalUnidirectionalStreams(0, true)
.hasError());
EXPECT_EQ(transport->getNumOpenableUnidirectionalStreams(), 0);
auto result = transport->createUnidirectionalStream();
ASSERT_FALSE(result);
EXPECT_EQ(result.error(), LocalErrorCode::STREAM_LIMIT_EXCEEDED);
result = transport->createBidirectionalStream();
EXPECT_TRUE(result);
transport.reset();
}
TEST_P(QuicTransportImplTestBase, CreateStreamLimitsBidirectionalFew) {
ASSERT_FALSE(transport->transportConn->streamManager
->setMaxLocalBidirectionalStreams(10, true)
.hasError());
EXPECT_EQ(transport->getNumOpenableBidirectionalStreams(), 10);
for (int i = 0; i < 10; i++) {
EXPECT_TRUE(transport->createBidirectionalStream());
EXPECT_EQ(transport->getNumOpenableBidirectionalStreams(), 10 - (i + 1));
}
auto result = transport->createBidirectionalStream();
ASSERT_FALSE(result);
EXPECT_EQ(result.error(), LocalErrorCode::STREAM_LIMIT_EXCEEDED);
EXPECT_TRUE(transport->createUnidirectionalStream());
transport.reset();
}
TEST_P(QuicTransportImplTestBase, CreateStreamLimitsUnidirectionalFew) {
ASSERT_FALSE(transport->transportConn->streamManager
->setMaxLocalUnidirectionalStreams(10, true)
.hasError());
EXPECT_EQ(transport->getNumOpenableUnidirectionalStreams(), 10);
for (int i = 0; i < 10; i++) {
EXPECT_TRUE(transport->createUnidirectionalStream());
EXPECT_EQ(transport->getNumOpenableUnidirectionalStreams(), 10 - (i + 1));
}
auto result = transport->createUnidirectionalStream();
ASSERT_FALSE(result);
EXPECT_EQ(result.error(), LocalErrorCode::STREAM_LIMIT_EXCEEDED);
EXPECT_TRUE(transport->createBidirectionalStream());
transport.reset();
}
TEST_P(QuicTransportImplTestBase, onBidiStreamsAvailableCallback) {
ASSERT_FALSE(transport->transportConn->streamManager
->setMaxLocalBidirectionalStreams(0, /*force=*/true)
.hasError());
EXPECT_CALL(connCallback, onBidirectionalStreamsAvailable(_))
.WillOnce(Invoke([](uint64_t numAvailableStreams) {
EXPECT_EQ(numAvailableStreams, 10);
}));
transport->addMaxStreamsFrame(
MaxStreamsFrame(10, /*isBidirectionalIn=*/true));
EXPECT_EQ(transport->getNumOpenableBidirectionalStreams(), 10);
// same value max streams frame doesn't trigger callback
transport->addMaxStreamsFrame(
MaxStreamsFrame(10, /*isBidirectionalIn=*/true));
}
TEST_P(QuicTransportImplTestBase, onBidiStreamsAvailableCallbackAfterExausted) {
ASSERT_FALSE(transport->transportConn->streamManager
->setMaxLocalBidirectionalStreams(0, /*force=*/true)
.hasError());
EXPECT_CALL(connCallback, onBidirectionalStreamsAvailable(_)).Times(2);
transport->addMaxStreamsFrame(MaxStreamsFrame(
1,
/*isBidirectionalIn=*/true));
EXPECT_EQ(transport->getNumOpenableBidirectionalStreams(), 1);
auto result = transport->createBidirectionalStream();
EXPECT_TRUE(result);
EXPECT_EQ(transport->getNumOpenableBidirectionalStreams(), 0);
transport->addMaxStreamsFrame(MaxStreamsFrame(
2,
/*isBidirectionalIn=*/true));
}
TEST_P(QuicTransportImplTestBase, oneUniStreamsAvailableCallback) {
ASSERT_FALSE(transport->transportConn->streamManager
->setMaxLocalUnidirectionalStreams(0, /*force=*/true)
.hasError());
EXPECT_CALL(connCallback, onUnidirectionalStreamsAvailable(_))
.WillOnce(Invoke([](uint64_t numAvailableStreams) {
EXPECT_EQ(numAvailableStreams, 1);
}));
transport->addMaxStreamsFrame(
MaxStreamsFrame(1, /*isBidirectionalIn=*/false));
EXPECT_EQ(transport->getNumOpenableUnidirectionalStreams(), 1);
// same value max streams frame doesn't trigger callback
transport->addMaxStreamsFrame(
MaxStreamsFrame(1, /*isBidirectionalIn=*/false));
}
TEST_P(QuicTransportImplTestBase, onUniStreamsAvailableCallbackAfterExausted) {
ASSERT_FALSE(transport->transportConn->streamManager
->setMaxLocalUnidirectionalStreams(0, /*force=*/true)
.hasError());
EXPECT_CALL(connCallback, onUnidirectionalStreamsAvailable(_)).Times(2);
transport->addMaxStreamsFrame(
MaxStreamsFrame(1, /*isBidirectionalIn=*/false));
EXPECT_EQ(transport->getNumOpenableUnidirectionalStreams(), 1);
auto result = transport->createUnidirectionalStream();
EXPECT_TRUE(result);
EXPECT_EQ(transport->getNumOpenableUnidirectionalStreams(), 0);
transport->addMaxStreamsFrame(
MaxStreamsFrame(2, /*isBidirectionalIn=*/false));
}
TEST_P(QuicTransportImplTestBase, ReadDataAlsoChecksLossAlarm) {
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
auto stream = transport->createBidirectionalStream().value();
ASSERT_FALSE(
transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), true)
.hasError());
// Artificially stop the write looper so that the read can trigger it.
transport->writeLooper()->stop();
transport->addDataToStream(
stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
EXPECT_TRUE(transport->writeLooper()->isRunning());
// Drive the event loop once to allow for the write looper to continue.
qEvb->loopOnce();
EXPECT_TRUE(transport->isLossTimeoutScheduled());
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ConnectionErrorOnWrite) {
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
auto stream = transport->createBidirectionalStream().value();
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillOnce(SetErrnoAndReturn(ENETUNREACH, -1));
ASSERT_FALSE(
transport
->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), true, nullptr)
.hasError());
transport->addDataToStream(
stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
qEvb->loopOnce();
EXPECT_TRUE(transport->isClosed());
EXPECT_EQ(
transport->getConnectionError(),
QuicErrorCode(LocalErrorCode::CONNECTION_ABANDONED));
}
TEST_P(QuicTransportImplTestBase, ReadErrorUnsanitizedErrorMsg) {
transport->setServerConnectionId();
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
auto stream = transport->createBidirectionalStream().value();
MockReadCallback rcb;
ASSERT_FALSE(transport->setReadCallback(stream, &rcb).hasError());
EXPECT_CALL(rcb, readError(stream, _))
.Times(1)
.WillOnce(Invoke([](StreamId, QuicError error) {
EXPECT_EQ("You need to calm down.", error.message);
}));
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillOnce(
Invoke([](const folly::SocketAddress&, const struct iovec*, size_t) {
throw std::runtime_error("You need to calm down.");
return 0;
}));
auto writeChain_tmp = transport->writeChain(
stream,
folly::IOBuf::copyBuffer("You are being too loud."),
true,
nullptr);
qEvb->loopOnce();
EXPECT_TRUE(transport->isClosed());
}
TEST_P(QuicTransportImplTestBase, ConnectionErrorUnhandledException) {
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
auto stream = transport->createBidirectionalStream().value();
EXPECT_CALL(
connSetupCallback,
onConnectionSetupError(QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("Well there's your problem"))));
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillOnce(
Invoke([](const folly::SocketAddress&, const struct iovec*, size_t) {
throw std::runtime_error("Well there's your problem");
return 0;
}));
ASSERT_FALSE(
transport
->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), true, nullptr)
.hasError());
transport->addDataToStream(
stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
qEvb->loopOnce();
EXPECT_TRUE(transport->isClosed());
EXPECT_EQ(
transport->getConnectionError(),
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR));
}
TEST_P(QuicTransportImplTestBase, LossTimeoutNoLessThanTickInterval) {
auto tickInterval = qEvb->getTimerTickInterval();
transport->scheduleLossTimeout(tickInterval - 1ms);
EXPECT_NEAR(
tickInterval.count(),
transport->getLossTimeoutRemainingTime().count(),
2);
}
TEST_P(QuicTransportImplTestBase, CloseStreamAfterReadError) {
auto qLogger = std::make_shared<FileQLogger>(VantagePoint::Client);
transport->transportConn->qLogger = qLogger;
auto stream1 = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> readCb1;
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
transport->addStreamReadError(stream1, LocalErrorCode::NO_ERROR);
transport->closeStream(stream1);
EXPECT_CALL(readCb1, readError(stream1, IsError(LocalErrorCode::NO_ERROR)));
EXPECT_CALL(connCallback, onStreamPreReaped(stream1));
transport->driveReadCallbacks();
EXPECT_FALSE(transport->transportConn->streamManager->streamExists(stream1));
transport.reset();
std::vector<int> indices =
getQLogEventIndices(QLogEventType::TransportStateUpdate, qLogger);
EXPECT_EQ(indices.size(), 1);
auto tmp = std::move(qLogger->logs[indices[0]]);
auto event = dynamic_cast<QLogTransportStateUpdateEvent*>(tmp.get());
EXPECT_EQ(event->update, getClosingStream("1"));
}
TEST_P(QuicTransportImplTestBase, CloseStreamAfterReadFin) {
auto stream2 = transport->createBidirectionalStream().value();
NiceMock<MockReadCallback> readCb2;
ASSERT_FALSE(transport->setReadCallback(stream2, &readCb2).hasError());
transport->addDataToStream(
stream2,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0, true));
EXPECT_CALL(readCb2, readAvailable(stream2)).WillOnce(Invoke([&](StreamId) {
auto data = transport->read(stream2, 100);
EXPECT_TRUE(data->second);
transport->closeStream(stream2);
}));
transport->driveReadCallbacks();
EXPECT_FALSE(transport->transportConn->streamManager->streamExists(stream2));
transport.reset();
}
TEST_P(QuicTransportImplTestBase, CloseTransportCleansupOutstandingCounters) {
transport->transportConn->outstandings
.packetCount[PacketNumberSpace::Handshake] = 200;
transport->closeNow(std::nullopt);
EXPECT_EQ(
0,
transport->transportConn->outstandings
.packetCount[PacketNumberSpace::Handshake]);
}
TEST_P(QuicTransportImplTestBase, DeliveryCallbackUnsetAll) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
auto registerDelivery1 =
transport->registerDeliveryCallback(stream1, 10, &dcb1);
auto registerDelivery2 =
transport->registerDeliveryCallback(stream2, 20, &dcb2);
EXPECT_CALL(dcb1, onCanceled(_, _));
EXPECT_CALL(dcb2, onCanceled(_, _));
transport->unsetAllDeliveryCallbacks();
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
transport->close(std::nullopt);
}
TEST_P(QuicTransportImplTestBase, DeliveryCallbackUnsetOne) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
auto registerDelivery3 =
transport->registerDeliveryCallback(stream1, 10, &dcb1);
auto registerDelivery4 =
transport->registerDeliveryCallback(stream2, 20, &dcb2);
EXPECT_CALL(dcb1, onCanceled(_, _));
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
transport->cancelDeliveryCallbacksForStream(stream1);
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
EXPECT_CALL(dcb2, onCanceled(_, _));
transport->close(std::nullopt);
}
TEST_P(QuicTransportImplTestBase, ByteEventCallbacksManagementSingleStream) {
auto stream = transport->createBidirectionalStream().value();
uint64_t offset1 = 10, offset2 = 20;
ByteEvent txEvent1 = ByteEvent{stream, offset1, ByteEvent::Type::TX};
ByteEvent txEvent2 = ByteEvent{stream, offset2, ByteEvent::Type::TX};
ByteEvent ackEvent1 = ByteEvent{stream, offset1, ByteEvent::Type::ACK};
ByteEvent ackEvent2 = ByteEvent{stream, offset2, ByteEvent::Type::ACK};
// Register 2 TX and 2 ACK events for the same stream at 2 different offsets
ASSERT_FALSE(
transport
->registerTxCallback(txEvent1.id, txEvent1.offset, &byteEventCallback)
.hasError());
ASSERT_FALSE(
transport
->registerTxCallback(txEvent2.id, txEvent2.offset, &byteEventCallback)
.hasError());
auto registerByteEvent1 = transport->registerByteEventCallback(
ByteEvent::Type::ACK, ackEvent1.id, ackEvent1.offset, &byteEventCallback);
auto registerByteEvent2 = transport->registerByteEventCallback(
ByteEvent::Type::ACK, ackEvent2.id, ackEvent2.offset, &byteEventCallback);
EXPECT_THAT(
byteEventCallback.getByteEventTracker(),
UnorderedElementsAre(
getByteEventTrackerMatcher(
txEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
txEvent2, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent2, TestByteEventCallback::Status::REGISTERED)));
// Registering the same events a second time will result in an error.
// as double registrations are not allowed.
quic::Expected<void, LocalErrorCode> ret;
ret = transport->registerTxCallback(
txEvent1.id, txEvent1.offset, &byteEventCallback);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
ret = transport->registerTxCallback(
txEvent2.id, txEvent2.offset, &byteEventCallback);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
ret = transport->registerByteEventCallback(
ByteEvent::Type::ACK, ackEvent1.id, ackEvent1.offset, &byteEventCallback);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
ret = transport->registerByteEventCallback(
ByteEvent::Type::ACK, ackEvent2.id, ackEvent2.offset, &byteEventCallback);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
EXPECT_THAT(
byteEventCallback.getByteEventTracker(),
UnorderedElementsAre(
getByteEventTrackerMatcher(
txEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
txEvent2, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent2, TestByteEventCallback::Status::REGISTERED)));
// On the ACK events, the transport usually sets the srtt value. This value
// should have NO EFFECT on the ByteEvent's hash and we still should be able
// to identify the previously registered byte event correctly.
ackEvent1.srtt = std::chrono::microseconds(1000);
ackEvent2.srtt = std::chrono::microseconds(2000);
// Deliver 1 TX and 1 ACK event. Cancel the other TX anc ACK event
byteEventCallback.onByteEvent(txEvent1);
byteEventCallback.onByteEvent(ackEvent2);
byteEventCallback.onByteEventCanceled(txEvent2);
byteEventCallback.onByteEventCanceled((ByteEventCancellation)ackEvent1);
EXPECT_THAT(
byteEventCallback.getByteEventTracker(),
UnorderedElementsAre(
getByteEventTrackerMatcher(
txEvent1, TestByteEventCallback::Status::RECEIVED),
getByteEventTrackerMatcher(
txEvent2, TestByteEventCallback::Status::CANCELLED),
getByteEventTrackerMatcher(
ackEvent1, TestByteEventCallback::Status::CANCELLED),
getByteEventTrackerMatcher(
ackEvent2, TestByteEventCallback::Status::RECEIVED)));
}
TEST_P(
QuicTransportImplTestBase,
ByteEventCallbacksManagementDifferentStreams) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
ByteEvent txEvent1 = ByteEvent{stream1, 10, ByteEvent::Type::TX};
ByteEvent txEvent2 = ByteEvent{stream2, 20, ByteEvent::Type::TX};
ByteEvent ackEvent1 = ByteEvent{stream1, 10, ByteEvent::Type::ACK};
ByteEvent ackEvent2 = ByteEvent{stream2, 20, ByteEvent::Type::ACK};
EXPECT_THAT(byteEventCallback.getByteEventTracker(), IsEmpty());
// Register 2 TX and 2 ACK events for 2 separate streams.
auto registerTx1 = transport->registerTxCallback(
txEvent1.id, txEvent1.offset, &byteEventCallback);
auto registerTx2 = transport->registerTxCallback(
txEvent2.id, txEvent2.offset, &byteEventCallback);
auto registerByteEvent3 = transport->registerByteEventCallback(
ByteEvent::Type::ACK, ackEvent1.id, ackEvent1.offset, &byteEventCallback);
auto registerByteEvent4 = transport->registerByteEventCallback(
ByteEvent::Type::ACK, ackEvent2.id, ackEvent2.offset, &byteEventCallback);
EXPECT_THAT(
byteEventCallback.getByteEventTracker(),
UnorderedElementsAre(
getByteEventTrackerMatcher(
txEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
txEvent2, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent2, TestByteEventCallback::Status::REGISTERED)));
// On the ACK events, the transport usually sets the srtt value. This value
// should have NO EFFECT on the ByteEvent's hash and we should still be able
// to identify the previously registered byte event correctly.
ackEvent1.srtt = std::chrono::microseconds(1000);
ackEvent2.srtt = std::chrono::microseconds(2000);
// Deliver the TX event for stream 1 and cancel the ACK event for stream 2
byteEventCallback.onByteEvent(txEvent1);
byteEventCallback.onByteEventCanceled((ByteEventCancellation)ackEvent2);
EXPECT_THAT(
byteEventCallback.getByteEventTracker(),
UnorderedElementsAre(
getByteEventTrackerMatcher(
txEvent1, TestByteEventCallback::Status::RECEIVED),
getByteEventTrackerMatcher(
txEvent2, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent1, TestByteEventCallback::Status::REGISTERED),
getByteEventTrackerMatcher(
ackEvent2, TestByteEventCallback::Status::CANCELLED)));
// Deliver the TX event for stream 2 and cancel the ACK event for stream 1
byteEventCallback.onByteEvent(txEvent2);
byteEventCallback.onByteEventCanceled((ByteEventCancellation)ackEvent1);
EXPECT_THAT(
byteEventCallback.getByteEventTracker(),
UnorderedElementsAre(
getByteEventTrackerMatcher(
txEvent1, TestByteEventCallback::Status::RECEIVED),
getByteEventTrackerMatcher(
txEvent2, TestByteEventCallback::Status::RECEIVED),
getByteEventTrackerMatcher(
ackEvent1, TestByteEventCallback::Status::CANCELLED),
getByteEventTrackerMatcher(
ackEvent2, TestByteEventCallback::Status::CANCELLED)));
}
TEST_P(QuicTransportImplTestBase, RegisterTxDeliveryCallbackLowerThanExpected) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
StrictMock<MockByteEventCallback> txcb3;
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
NiceMock<MockDeliveryCallback> dcb3;
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 10)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 20)));
auto registerTx3 = transport->registerTxCallback(stream, 10, &txcb1);
auto registerTx4 = transport->registerTxCallback(stream, 20, &txcb2);
auto registerDelivery5 =
transport->registerDeliveryCallback(stream, 10, &dcb1);
auto registerDelivery6 =
transport->registerDeliveryCallback(stream, 20, &dcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
auto streamStateResult =
transport->transportConn->streamManager->getStream(stream);
ASSERT_FALSE(streamStateResult.hasError());
auto streamState = streamStateResult.value();
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
EXPECT_CALL(txcb3, onByteEventRegistered(getTxMatcher(stream, 2)));
EXPECT_CALL(txcb3, onByteEvent(getTxMatcher(stream, 2)));
EXPECT_CALL(dcb3, onDeliveryAck(stream, 2, _));
auto registerTx5 = transport->registerTxCallback(stream, 2, &txcb3);
auto registerDelivery7 =
transport->registerDeliveryCallback(stream, 2, &dcb3);
qEvb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb3);
Mock::VerifyAndClearExpectations(&dcb3);
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream, 10)));
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream, 20)));
EXPECT_CALL(dcb1, onCanceled(_, _));
EXPECT_CALL(dcb2, onCanceled(_, _));
transport->close(std::nullopt);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&txcb3);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
Mock::VerifyAndClearExpectations(&dcb3);
}
TEST_F(
QuicTransportImplTest,
RegisterTxDeliveryCallbackLowerThanExpectedClose) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb;
NiceMock<MockDeliveryCallback> dcb;
auto streamStateResult =
transport->transportConn->streamManager->getStream(stream);
ASSERT_FALSE(streamStateResult.hasError());
auto streamState = streamStateResult.value();
streamState->currentWriteOffset = 7;
EXPECT_CALL(txcb, onByteEventRegistered(getTxMatcher(stream, 2)));
EXPECT_CALL(txcb, onByteEventCanceled(getTxMatcher(stream, 2)));
EXPECT_CALL(dcb, onCanceled(_, _));
auto registerTx6 = transport->registerTxCallback(stream, 2, &txcb);
auto registerDelivery8 = transport->registerDeliveryCallback(stream, 2, &dcb);
transport->close(std::nullopt);
qEvb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb);
Mock::VerifyAndClearExpectations(&dcb);
}
TEST_P(
QuicTransportImplTestBase,
RegisterDeliveryCallbackMultipleRegistrationsTx) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
// Set the current write offset to 7.
auto streamStateResult =
transport->transportConn->streamManager->getStream(stream);
ASSERT_FALSE(streamStateResult.hasError());
auto streamState = streamStateResult.value();
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
// Have 2 different recipients register for a callback on the same stream ID
// and offset that is before the current write offset, they will both be
// scheduled for immediate delivery.
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 3)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 3)));
auto registerTx7 = transport->registerTxCallback(stream, 3, &txcb1);
auto registerTx8 = transport->registerTxCallback(stream, 3, &txcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
// Now, re-register the same callbacks, it should not go through.
auto ret = transport->registerTxCallback(stream, 3, &txcb1);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
ret = transport->registerTxCallback(stream, 3, &txcb2);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
// Deliver the first set of registrations.
EXPECT_CALL(txcb1, onByteEvent(getTxMatcher(stream, 3))).Times(1);
EXPECT_CALL(txcb2, onByteEvent(getTxMatcher(stream, 3))).Times(1);
qEvb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
}
TEST_F(
QuicTransportImplTest,
RegisterDeliveryCallbackMultipleRegistrationsAck) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
// Set the current write offset to 7.
auto streamStateResult =
transport->transportConn->streamManager->getStream(stream);
ASSERT_FALSE(streamStateResult.hasError());
auto streamState = streamStateResult.value();
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
// Have 2 different recipients register for a callback on the same stream ID
// and offset that is before the current write offset, they will both be
// scheduled for immediate delivery.
EXPECT_CALL(txcb1, onByteEventRegistered(getAckMatcher(stream, 3)));
EXPECT_CALL(txcb2, onByteEventRegistered(getAckMatcher(stream, 3)));
auto registerByteEvent5 = transport->registerByteEventCallback(
ByteEvent::Type::ACK, stream, 3, &txcb1);
auto registerByteEvent6 = transport->registerByteEventCallback(
ByteEvent::Type::ACK, stream, 3, &txcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
// Now, re-register the same callbacks, it should not go through.
auto ret = transport->registerByteEventCallback(
ByteEvent::Type::ACK, stream, 3, &txcb1);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
ret = transport->registerByteEventCallback(
ByteEvent::Type::ACK, stream, 3, &txcb2);
EXPECT_EQ(LocalErrorCode::INVALID_OPERATION, ret.error());
// Deliver the first set of registrations.
EXPECT_CALL(txcb1, onByteEvent(getAckMatcher(stream, 3))).Times(1);
EXPECT_CALL(txcb2, onByteEvent(getAckMatcher(stream, 3))).Times(1);
qEvb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
}
TEST_P(
QuicTransportImplTestBase,
RegisterDeliveryCallbackMultipleRecipientsTx) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
// Set the current write offset to 7.
auto streamStateResult =
transport->transportConn->streamManager->getStream(stream);
ASSERT_FALSE(streamStateResult.hasError());
auto streamState = streamStateResult.value();
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
// Have 2 different recipients register for a callback on the same stream ID
// and offset.
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 3)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 3)));
auto registerTx9 = transport->registerTxCallback(stream, 3, &txcb1);
auto registerTx10 = transport->registerTxCallback(stream, 3, &txcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
// Now, *before* the runOnEvbAsync gets a chance to run, simulate the
// delivery of the callback for txcb1 (offset = 3) by deleting it from the
// outstanding callback queue for this stream ID. This is similar to what
// happens in processCallbacksAfterNetworkData.
bool deleted = transport->deleteRegisteredByteEvent(
stream, 3, &txcb1, ByteEvent::Type::TX);
CHECK_EQ(true, deleted);
// Only the callback for txcb2 should be outstanding now. Run the loop to
// confirm.
EXPECT_CALL(txcb1, onByteEvent(getTxMatcher(stream, 3))).Times(0);
EXPECT_CALL(txcb2, onByteEvent(getTxMatcher(stream, 3))).Times(1);
qEvb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
}
TEST_P(
QuicTransportImplTestBase,
RegisterDeliveryCallbackMultipleRecipientsAck) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
// Set the current write offset to 7.
auto streamStateResult =
transport->transportConn->streamManager->getStream(stream);
ASSERT_FALSE(streamStateResult.hasError());
auto streamState = streamStateResult.value();
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
// Have 2 different recipients register for a callback on the same stream ID
// and offset.
EXPECT_CALL(txcb1, onByteEventRegistered(getAckMatcher(stream, 3)));
EXPECT_CALL(txcb2, onByteEventRegistered(getAckMatcher(stream, 3)));
auto registerByteEvent7 = transport->registerByteEventCallback(
ByteEvent::Type::ACK, stream, 3, &txcb1);
auto registerByteEvent8 = transport->registerByteEventCallback(
ByteEvent::Type::ACK, stream, 3, &txcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
// Now, *before* the runOnEvbAsync gets a chance to run, simulate the
// delivery of the callback for txcb1 (offset = 3) by deleting it from the
// outstanding callback queue for this stream ID. This is similar to what
// happens in processCallbacksAfterNetworkData.
bool deleted = transport->deleteRegisteredByteEvent(
stream, 3, &txcb1, ByteEvent::Type::ACK);
CHECK_EQ(true, deleted);
// Only the callback for txcb2 should be outstanding now. Run the loop to
// confirm.
EXPECT_CALL(txcb1, onByteEvent(getAckMatcher(stream, 3))).Times(0);
EXPECT_CALL(txcb2, onByteEvent(getAckMatcher(stream, 3))).Times(1);
qEvb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
}
TEST_P(QuicTransportImplTestBase, RegisterDeliveryCallbackAsyncDeliveryTx) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
// Set the current write offset to 7.
auto streamStateResult =
transport->transportConn->streamManager->getStream(stream);
ASSERT_FALSE(streamStateResult.hasError());
auto streamState = streamStateResult.value();
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
// Register tx callbacks for the same stream at offsets 3 (before current
// write offset) and 10 (after current write offset).
// txcb1 (offset = 3) will be scheduled in the lambda (runOnEvbAsync)
// for immediate delivery. txcb2 (offset = 10) will be queued for delivery
// when the actual TX for this offset occurs in the future.
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream, 3)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream, 10)));
auto registerTx11 = transport->registerTxCallback(stream, 3, &txcb1);
auto registerTx12 = transport->registerTxCallback(stream, 10, &txcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
// Now, *before* the runOnEvbAsync gets a chance to run, simulate the
// delivery of the callback for txcb1 (offset = 3) by deleting it from the
// outstanding callback queue for this stream ID. This is similar to what
// happens in processCallbacksAfterNetworkData.
bool deleted = transport->deleteRegisteredByteEvent(
stream, 3, &txcb1, ByteEvent::Type::TX);
CHECK_EQ(true, deleted);
// Only txcb2 (offset = 10) should be outstanding now. Run the loop.
// txcb1 (offset = 3) should not be delivered now because it is already
// delivered. txcb2 (offset = 10) should not be delivered because the
// current write offset (7) is still less than the offset requested (10)
EXPECT_CALL(txcb1, onByteEvent(getTxMatcher(stream, 3))).Times(0);
EXPECT_CALL(txcb2, onByteEvent(getTxMatcher(stream, 10))).Times(0);
qEvb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream, 10)));
transport->close(std::nullopt);
Mock::VerifyAndClearExpectations(&txcb2);
}
TEST_P(QuicTransportImplTestBase, RegisterDeliveryCallbackAsyncDeliveryAck) {
auto stream = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
// Set the current write offset to 7.
auto streamStateResult =
transport->transportConn->streamManager->getStream(stream);
ASSERT_FALSE(streamStateResult.hasError());
auto streamState = streamStateResult.value();
streamState->currentWriteOffset = 7;
streamState->ackedIntervals.insert(0, 6);
// Register tx callbacks for the same stream at offsets 3 (before current
// write offset) and 10 (after current write offset).
// txcb1 (offset = 3) will be scheduled in the lambda (runOnEvbAsync)
// for immediate delivery. txcb2 (offset = 10) will be queued for delivery
// when the actual TX for this offset occurs in the future.
EXPECT_CALL(txcb1, onByteEventRegistered(getAckMatcher(stream, 3)));
EXPECT_CALL(txcb2, onByteEventRegistered(getAckMatcher(stream, 10)));
auto registerByteEvent9 = transport->registerByteEventCallback(
ByteEvent::Type::ACK, stream, 3, &txcb1);
auto registerByteEvent10 = transport->registerByteEventCallback(
ByteEvent::Type::ACK, stream, 10, &txcb2);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
// Now, *before* the runOnEvbAsync gets a chance to run, simulate the
// delivery of the callback for txcb1 (offset = 3) by deleting it from the
// outstanding callback queue for this stream ID. This is similar to what
// happens in processCallbacksAfterNetworkData.
bool deleted = transport->deleteRegisteredByteEvent(
stream, 3, &txcb1, ByteEvent::Type::ACK);
CHECK_EQ(true, deleted);
// Only txcb2 (offset = 10) should be outstanding now. Run the loop.
// txcb1 (offset = 3) should not be delivered now because it is already
// delivered. txcb2 (offset = 10) should not be delivered because the
// current write offset (7) is still less than the offset requested (10)
EXPECT_CALL(txcb1, onByteEvent(getAckMatcher(stream, 3))).Times(0);
EXPECT_CALL(txcb2, onByteEvent(getAckMatcher(stream, 10))).Times(0);
qEvb->loopOnce();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
EXPECT_CALL(txcb2, onByteEventCanceled(getAckMatcher(stream, 10)));
transport->close(std::nullopt);
Mock::VerifyAndClearExpectations(&txcb2);
}
TEST_P(QuicTransportImplTestBase, CancelAllByteEventCallbacks) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
NiceMock<MockByteEventCallback> txcb1;
NiceMock<MockByteEventCallback> txcb2;
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 20)));
auto registerTx13 = transport->registerTxCallback(stream1, 10, &txcb1);
auto registerTx14 = transport->registerTxCallback(stream2, 20, &txcb2);
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
auto registerDelivery9 =
transport->registerDeliveryCallback(stream1, 10, &dcb1);
auto registerDelivery10 =
transport->registerDeliveryCallback(stream2, 20, &dcb2);
EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
1,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
1,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
1,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
1,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20)));
EXPECT_CALL(dcb1, onCanceled(_, _));
EXPECT_CALL(dcb2, onCanceled(_, _));
transport->cancelAllByteEventCallbacks();
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(dcb1, onCanceled(_, _)).Times(0);
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
transport->close(std::nullopt);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
}
TEST_P(QuicTransportImplTestBase, CancelByteEventCallbacksForStream) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 20)));
auto registerTx15 = transport->registerTxCallback(stream1, 10, &txcb1);
auto registerTx16 = transport->registerTxCallback(stream2, 20, &txcb2);
auto registerDelivery11 =
transport->registerDeliveryCallback(stream1, 10, &dcb1);
auto registerDelivery12 =
transport->registerDeliveryCallback(stream2, 20, &dcb2);
EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
1,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
1,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
1,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
1,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb2, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(dcb1, onCanceled(stream1, 10));
EXPECT_CALL(dcb2, onCanceled(_, _)).Times(0);
transport->cancelByteEventCallbacksForStream(stream1);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
1,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
1,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(txcb1, onByteEventCanceled(_)).Times(0);
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20)));
EXPECT_CALL(dcb1, onCanceled(stream1, _)).Times(0);
EXPECT_CALL(dcb2, onCanceled(_, 20));
transport->close(std::nullopt);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
}
TEST_P(QuicTransportImplTestBase, CancelByteEventCallbacksForStreamWithOffset) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 15)));
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 20)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 10)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 15)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 20)));
auto registerTx17 = transport->registerTxCallback(stream1, 10, &txcb1);
auto registerTx18 = transport->registerTxCallback(stream1, 15, &txcb1);
auto registerTx19 = transport->registerTxCallback(stream1, 20, &txcb1);
auto registerTx20 = transport->registerTxCallback(stream2, 10, &txcb2);
auto registerTx21 = transport->registerTxCallback(stream2, 15, &txcb2);
auto registerTx22 = transport->registerTxCallback(stream2, 20, &txcb2);
EXPECT_EQ(3, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(3, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
3,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
3,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
auto registerDelivery13 =
transport->registerDeliveryCallback(stream1, 10, &dcb1);
auto registerDelivery14 =
transport->registerDeliveryCallback(stream1, 15, &dcb1);
auto registerDelivery15 =
transport->registerDeliveryCallback(stream1, 20, &dcb1);
auto registerDelivery16 =
transport->registerDeliveryCallback(stream2, 10, &dcb2);
auto registerDelivery17 =
transport->registerDeliveryCallback(stream2, 15, &dcb2);
auto registerDelivery18 =
transport->registerDeliveryCallback(stream2, 20, &dcb2);
EXPECT_EQ(6, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(6, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
3,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
3,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
3,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
3,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10)));
EXPECT_CALL(dcb1, onCanceled(stream1, 10));
// cancels if offset is < (not <=) offset provided
transport->cancelByteEventCallbacksForStream(stream1, 15);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(6, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
3,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
3,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 15)));
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 20)));
EXPECT_CALL(dcb1, onCanceled(stream1, 15));
EXPECT_CALL(dcb1, onCanceled(stream1, 20));
// cancels if offset is < (not <=) offset provided
transport->cancelByteEventCallbacksForStream(stream1, 21);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(6, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
3,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
3,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10)));
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 15)));
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 20)));
EXPECT_CALL(dcb2, onCanceled(stream2, 10));
EXPECT_CALL(dcb2, onCanceled(stream2, 15));
EXPECT_CALL(dcb2, onCanceled(stream2, 20));
transport->close(std::nullopt);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(0, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
}
TEST_P(QuicTransportImplTestBase, CancelByteEventCallbacksTx) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 15)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 10)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 15)));
auto registerTx23 = transport->registerTxCallback(stream1, 10, &txcb1);
auto registerTx24 = transport->registerTxCallback(stream1, 15, &txcb1);
auto registerTx25 = transport->registerTxCallback(stream2, 10, &txcb2);
auto registerTx26 = transport->registerTxCallback(stream2, 15, &txcb2);
auto registerDelivery19 =
transport->registerDeliveryCallback(stream1, 10, &dcb1);
auto registerDelivery20 =
transport->registerDeliveryCallback(stream1, 15, &dcb1);
auto registerDelivery21 =
transport->registerDeliveryCallback(stream2, 10, &dcb2);
auto registerDelivery22 =
transport->registerDeliveryCallback(stream2, 15, &dcb2);
EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 15)));
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10)));
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 15)));
transport->cancelByteEventCallbacks(ByteEvent::Type::TX);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(dcb1, onCanceled(stream1, 10));
EXPECT_CALL(dcb1, onCanceled(stream1, 15));
EXPECT_CALL(dcb2, onCanceled(stream2, 10));
EXPECT_CALL(dcb2, onCanceled(stream2, 15));
transport->close(std::nullopt);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
}
TEST_P(QuicTransportImplTestBase, CancelByteEventCallbacksDelivery) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
StrictMock<MockByteEventCallback> txcb1;
StrictMock<MockByteEventCallback> txcb2;
NiceMock<MockDeliveryCallback> dcb1;
NiceMock<MockDeliveryCallback> dcb2;
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb1, onByteEventRegistered(getTxMatcher(stream1, 15)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 10)));
EXPECT_CALL(txcb2, onByteEventRegistered(getTxMatcher(stream2, 15)));
auto registerTx27 = transport->registerTxCallback(stream1, 10, &txcb1);
auto registerTx28 = transport->registerTxCallback(stream1, 15, &txcb1);
auto registerTx29 = transport->registerTxCallback(stream2, 10, &txcb2);
auto registerTx30 = transport->registerTxCallback(stream2, 15, &txcb2);
auto registerDelivery23 =
transport->registerDeliveryCallback(stream1, 10, &dcb1);
auto registerDelivery24 =
transport->registerDeliveryCallback(stream1, 15, &dcb1);
auto registerDelivery25 =
transport->registerDeliveryCallback(stream2, 10, &dcb2);
auto registerDelivery26 =
transport->registerDeliveryCallback(stream2, 15, &dcb2);
EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(4, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(dcb1, onCanceled(stream1, 10));
EXPECT_CALL(dcb1, onCanceled(stream1, 15));
EXPECT_CALL(dcb2, onCanceled(stream2, 10));
EXPECT_CALL(dcb2, onCanceled(stream2, 15));
transport->cancelByteEventCallbacks(ByteEvent::Type::ACK);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream1));
EXPECT_EQ(2, transport->getNumByteEventCallbacksForStream(stream2));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream1));
EXPECT_EQ(
2,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::TX, stream2));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream1));
EXPECT_EQ(
0,
transport->getNumByteEventCallbacksForStream(
ByteEvent::Type::ACK, stream2));
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 10)));
EXPECT_CALL(txcb1, onByteEventCanceled(getTxMatcher(stream1, 15)));
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 10)));
EXPECT_CALL(txcb2, onByteEventCanceled(getTxMatcher(stream2, 15)));
transport->close(std::nullopt);
Mock::VerifyAndClearExpectations(&txcb1);
Mock::VerifyAndClearExpectations(&txcb2);
Mock::VerifyAndClearExpectations(&dcb1);
Mock::VerifyAndClearExpectations(&dcb2);
}
TEST_P(
QuicTransportImplTestBase,
TestNotifyPendingConnWriteOnCloseWithoutError) {
NiceMock<MockWriteCallback> wcb;
EXPECT_CALL(
wcb,
onConnectionWriteError(IsError(GenericApplicationErrorCode::NO_ERROR)));
auto notifyWrite1 = transport->notifyPendingWriteOnConnection(&wcb);
transport->close(std::nullopt);
qEvb->loopOnce();
}
TEST_P(QuicTransportImplTestClose, TestNotifyPendingConnWriteOnCloseWithError) {
NiceMock<MockWriteCallback> wcb;
auto notifyWrite2 = transport->notifyPendingWriteOnConnection(&wcb);
if (GetParam()) {
EXPECT_CALL(
wcb,
onConnectionWriteError(
IsAppError(GenericApplicationErrorCode::UNKNOWN)));
transport->close(QuicError(
QuicErrorCode(GenericApplicationErrorCode::UNKNOWN),
std::string("Bye")));
} else {
transport->close(std::nullopt);
}
qEvb->loopOnce();
}
TEST_P(QuicTransportImplTestBase, TestNotifyPendingWriteWithActiveCallback) {
auto stream = transport->createBidirectionalStream().value();
NiceMock<MockWriteCallback> wcb;
EXPECT_CALL(wcb, onStreamWriteReady(stream, _));
auto ok1 = transport->notifyPendingWriteOnStream(stream, &wcb);
EXPECT_TRUE(ok1.has_value());
auto ok2 = transport->notifyPendingWriteOnStream(stream, &wcb);
EXPECT_EQ(ok2.error(), quic::LocalErrorCode::CALLBACK_ALREADY_INSTALLED);
qEvb->loopOnce();
}
TEST_P(QuicTransportImplTestBase, TestNotifyPendingWriteOnCloseWithoutError) {
auto stream = transport->createBidirectionalStream().value();
NiceMock<MockWriteCallback> wcb;
EXPECT_CALL(
wcb,
onStreamWriteError(
stream, IsError(GenericApplicationErrorCode::NO_ERROR)));
auto notifyWrite3 = transport->notifyPendingWriteOnStream(stream, &wcb);
transport->close(std::nullopt);
qEvb->loopOnce();
}
TEST_P(QuicTransportImplTestClose, TestNotifyPendingWriteOnCloseWithError) {
auto stream = transport->createBidirectionalStream().value();
NiceMock<MockWriteCallback> wcb;
auto notifyWrite4 = transport->notifyPendingWriteOnStream(stream, &wcb);
if (GetParam()) {
EXPECT_CALL(
wcb,
onStreamWriteError(
stream, IsAppError(GenericApplicationErrorCode::UNKNOWN)));
transport->close(QuicError(
QuicErrorCode(GenericApplicationErrorCode::UNKNOWN),
std::string("Bye")));
} else {
transport->close(std::nullopt);
}
qEvb->loopOnce();
}
TEST_P(QuicTransportImplTestBase, TestTransportCloseWithMaxPacketNumber) {
transport->setServerConnectionId();
transport->transportConn->pendingEvents.closeTransport = false;
ASSERT_FALSE(transport->invokeWriteSocketDataReturn().hasError());
transport->transportConn->pendingEvents.closeTransport = true;
auto result = transport->invokeWriteSocketDataReturn();
ASSERT_TRUE(result.hasError());
ASSERT_NE(result.error().code.asTransportErrorCode(), nullptr);
EXPECT_EQ(
*result.error().code.asTransportErrorCode(),
TransportErrorCode::PROTOCOL_VIOLATION);
}
TEST_P(QuicTransportImplTestBase, TestGracefulCloseWithActiveStream) {
EXPECT_CALL(connCallback, onConnectionEnd()).Times(0);
EXPECT_CALL(connCallback, onConnectionError(_)).Times(0);
auto stream = transport->createBidirectionalStream().value();
NiceMock<MockWriteCallback> wcb;
NiceMock<MockWriteCallback> wcbConn;
NiceMock<MockReadCallback> rcb;
StrictMock<MockByteEventCallback> txCb;
StrictMock<MockDeliveryCallback> deliveryCb;
EXPECT_CALL(
wcb, onStreamWriteError(stream, IsError(LocalErrorCode::NO_ERROR)));
EXPECT_CALL(
wcbConn, onConnectionWriteError(IsError(LocalErrorCode::NO_ERROR)));
EXPECT_CALL(rcb, readError(stream, IsError(LocalErrorCode::NO_ERROR)));
EXPECT_CALL(deliveryCb, onCanceled(stream, _));
EXPECT_CALL(txCb, onByteEventCanceled(getTxMatcher(stream, 0)));
EXPECT_CALL(txCb, onByteEventCanceled(getTxMatcher(stream, 4)));
auto notifyWrite5 = transport->notifyPendingWriteOnConnection(&wcbConn);
auto notifyWrite6 = transport->notifyPendingWriteOnStream(stream, &wcb);
ASSERT_FALSE(transport->setReadCallback(stream, &rcb).hasError());
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
ASSERT_FALSE(
transport
->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb)
.hasError());
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 0)));
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 4)));
EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError());
EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError());
transport->closeGracefully();
ASSERT_FALSE(transport->transportClosed);
EXPECT_FALSE(transport->createBidirectionalStream());
EXPECT_TRUE(transport->setReadCallback(stream, &rcb).hasError());
EXPECT_TRUE(transport->notifyPendingWriteOnStream(stream, &wcb).hasError());
EXPECT_TRUE(transport->notifyPendingWriteOnConnection(&wcbConn).hasError());
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 2))).Times(0);
EXPECT_TRUE(transport->registerTxCallback(stream, 2, &txCb).hasError());
EXPECT_TRUE(
transport->registerDeliveryCallback(stream, 2, &deliveryCb).hasError());
EXPECT_TRUE(
transport->resetStream(stream, GenericApplicationErrorCode::UNKNOWN)
.hasError());
transport->addDataToStream(
stream, StreamBuffer(IOBuf::copyBuffer("hello"), 0, false));
auto streamResult =
transport->transportConn->streamManager->getStream(stream);
ASSERT_FALSE(streamResult.hasError());
EXPECT_FALSE(streamResult.value()->readBuffer.empty());
// Close the last stream.
// TODO: replace this when we call conn callbacks.
// EXPECT_CALL(connCallback, onConnectionEnd());
transport->closeStream(stream);
ASSERT_TRUE(transport->transportClosed);
qEvb->loopOnce();
}
TEST_P(QuicTransportImplTestBase, TestGracefulCloseWithNoActiveStream) {
auto stream = transport->createBidirectionalStream().value();
NiceMock<MockWriteCallback> wcb;
NiceMock<MockWriteCallback> wcbConn;
NiceMock<MockReadCallback> rcb;
NiceMock<MockDeliveryCallback> deliveryCb;
NiceMock<MockByteEventCallback> txCb;
EXPECT_CALL(
rcb, readError(stream, IsError(GenericApplicationErrorCode::NO_ERROR)));
EXPECT_CALL(deliveryCb, onDeliveryAck(stream, _, _));
EXPECT_CALL(txCb, onByteEvent(getTxMatcher(stream, 0)));
EXPECT_CALL(txCb, onByteEvent(getTxMatcher(stream, 4)));
EXPECT_CALL(connCallback, onConnectionEnd()).Times(0);
EXPECT_CALL(connCallback, onConnectionError(_)).Times(0);
ASSERT_FALSE(transport->setReadCallback(stream, &rcb).hasError());
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
ASSERT_FALSE(
transport
->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb)
.hasError());
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 0)));
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 4)));
EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError());
EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError());
// Close the last stream.
auto streamStateResult =
transport->transportConn->streamManager->getStream(stream);
ASSERT_FALSE(streamStateResult.hasError());
auto streamState = streamStateResult.value();
// Fake that the data was TXed and delivered to keep all the state
// consistent.
streamState->currentWriteOffset = 7;
transport->transportConn->streamManager->addTx(stream);
transport->transportConn->streamManager->addDeliverable(stream);
transport->closeStream(stream);
transport->close(std::nullopt);
ASSERT_TRUE(transport->transportClosed);
EXPECT_FALSE(transport->createBidirectionalStream());
EXPECT_TRUE(transport->setReadCallback(stream, &rcb).hasError());
EXPECT_TRUE(transport->notifyPendingWriteOnStream(stream, &wcb).hasError());
EXPECT_TRUE(transport->notifyPendingWriteOnConnection(&wcbConn).hasError());
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 2))).Times(0);
EXPECT_TRUE(transport->registerTxCallback(stream, 2, &txCb).hasError());
EXPECT_TRUE(
transport->registerDeliveryCallback(stream, 2, &deliveryCb).hasError());
EXPECT_TRUE(
transport->resetStream(stream, GenericApplicationErrorCode::UNKNOWN)
.hasError());
}
TEST_P(QuicTransportImplTestBase, TestResetRemovesDeliveryCb) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
NiceMock<MockDeliveryCallback> deliveryCb1;
NiceMock<MockDeliveryCallback> deliveryCb2;
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
ASSERT_FALSE(
transport->writeChain(stream1, IOBuf::copyBuffer("hello"), true, nullptr)
.hasError());
ASSERT_FALSE(
transport->writeChain(stream2, IOBuf::copyBuffer("hello"), true, nullptr)
.hasError());
EXPECT_FALSE(
transport->registerDeliveryCallback(stream1, 2, &deliveryCb1).hasError());
EXPECT_FALSE(
transport->registerDeliveryCallback(stream2, 2, &deliveryCb2).hasError());
EXPECT_EQ(transport->getNumByteEventCallbacksForStream(stream1), 1);
EXPECT_EQ(transport->getNumByteEventCallbacksForStream(stream2), 1);
EXPECT_FALSE(
transport->resetStream(stream1, GenericApplicationErrorCode::UNKNOWN)
.hasError());
EXPECT_EQ(transport->getNumByteEventCallbacksForStream(stream1), 0);
EXPECT_EQ(transport->getNumByteEventCallbacksForStream(stream2), 1);
transport->close(std::nullopt);
}
TEST_P(QuicTransportImplTestBase, TestImmediateClose) {
auto stream = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
NiceMock<MockWriteCallback> wcb;
NiceMock<MockWriteCallback> wcbConn;
NiceMock<MockReadCallback> rcb;
NiceMock<MockReadCallback> rcb2;
NiceMock<MockPeekCallback> pcb;
NiceMock<MockDeliveryCallback> deliveryCb;
NiceMock<MockByteEventCallback> txCb;
uint8_t resetCount = 0;
EXPECT_CALL(
wcb,
onStreamWriteError(
stream, IsAppError(GenericApplicationErrorCode::UNKNOWN)));
EXPECT_CALL(
wcbConn,
onConnectionWriteError(IsAppError(GenericApplicationErrorCode::UNKNOWN)));
// The first stream to get a reset will clear the other read callback, so only
// one will receive a reset.
ON_CALL(
rcb, readError(stream, IsAppError(GenericApplicationErrorCode::UNKNOWN)))
.WillByDefault(InvokeWithoutArgs([this, stream2, &resetCount] {
(void)transport->setReadCallback(stream2, nullptr);
resetCount++;
}));
ON_CALL(
rcb2,
readError(stream2, IsAppError(GenericApplicationErrorCode::UNKNOWN)))
.WillByDefault(InvokeWithoutArgs([this, stream, &resetCount] {
(void)transport->setReadCallback(stream, nullptr);
resetCount++;
}));
EXPECT_CALL(
pcb, peekError(stream, IsAppError(GenericApplicationErrorCode::UNKNOWN)));
EXPECT_CALL(deliveryCb, onCanceled(stream, _));
EXPECT_CALL(txCb, onByteEventCanceled(getTxMatcher(stream, 0)));
EXPECT_CALL(txCb, onByteEventCanceled(getTxMatcher(stream, 4)));
EXPECT_CALL(connCallback, onConnectionError(_)).Times(0);
auto notifyWrite7 = transport->notifyPendingWriteOnConnection(&wcbConn);
auto notifyWrite8 = transport->notifyPendingWriteOnStream(stream, &wcb);
ASSERT_FALSE(transport->setReadCallback(stream, &rcb).hasError());
ASSERT_FALSE(transport->setReadCallback(stream2, &rcb2).hasError());
auto setPeek1 = transport->setPeekCallback(stream, &pcb);
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
ASSERT_FALSE(
transport
->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb)
.hasError());
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 0)));
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 4)));
EXPECT_FALSE(transport->registerTxCallback(stream, 0, &txCb).hasError());
EXPECT_FALSE(transport->registerTxCallback(stream, 4, &txCb).hasError());
transport->close(QuicError(
QuicErrorCode(GenericApplicationErrorCode::UNKNOWN),
std::string("Error")));
ASSERT_TRUE(transport->transportClosed);
EXPECT_FALSE(transport->createBidirectionalStream());
EXPECT_TRUE(transport->setReadCallback(stream, &rcb).hasError());
EXPECT_TRUE(transport->notifyPendingWriteOnStream(stream, &wcb).hasError());
EXPECT_TRUE(transport->notifyPendingWriteOnConnection(&wcbConn).hasError());
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 2))).Times(0);
EXPECT_TRUE(transport->registerTxCallback(stream, 2, &txCb).hasError());
EXPECT_TRUE(
transport->registerDeliveryCallback(stream, 2, &deliveryCb).hasError());
EXPECT_TRUE(
transport->resetStream(stream, GenericApplicationErrorCode::UNKNOWN)
.hasError());
transport->addDataToStream(
stream, StreamBuffer(IOBuf::copyBuffer("hello"), 0, false));
auto streamResult =
transport->transportConn->streamManager->getStream(stream);
ASSERT_FALSE(streamResult.hasError());
EXPECT_EQ(streamResult.value(), nullptr);
qEvb->loopOnce();
EXPECT_EQ(resetCount, 1);
}
TEST_P(QuicTransportImplTestBase, ResetStreamUnsetWriteCallback) {
auto stream = transport->createBidirectionalStream().value();
NiceMock<MockWriteCallback> wcb;
EXPECT_CALL(wcb, onStreamWriteError(stream, _)).Times(0);
auto notifyWrite9 = transport->notifyPendingWriteOnStream(stream, &wcb);
EXPECT_FALSE(
transport->resetStream(stream, GenericApplicationErrorCode::UNKNOWN)
.hasError());
qEvb->loopOnce();
}
TEST_P(QuicTransportImplTestBase, ResetAllNonControlStreams) {
auto stream1 = transport->createBidirectionalStream().value();
ASSERT_FALSE(transport->setControlStream(stream1));
NiceMock<MockWriteCallback> wcb1;
NiceMock<MockReadCallback> rcb1;
EXPECT_CALL(wcb1, onStreamWriteError(stream1, _)).Times(0);
EXPECT_CALL(rcb1, readError(stream1, _)).Times(0);
auto notifyWrite10 = transport->notifyPendingWriteOnStream(stream1, &wcb1);
ASSERT_FALSE(transport->setReadCallback(stream1, &rcb1).hasError());
auto stream2 = transport->createBidirectionalStream().value();
NiceMock<MockWriteCallback> wcb2;
NiceMock<MockReadCallback> rcb2;
EXPECT_CALL(wcb2, onStreamWriteError(stream2, _)).Times(1);
EXPECT_CALL(rcb2, readError(stream2, _)).Times(1);
auto notifyWrite11 = transport->notifyPendingWriteOnStream(stream2, &wcb2);
ASSERT_FALSE(transport->setReadCallback(stream2, &rcb2).hasError());
auto stream3 = transport->createUnidirectionalStream().value();
NiceMock<MockWriteCallback> wcb3;
auto notifyWrite12 = transport->notifyPendingWriteOnStream(stream3, &wcb3);
EXPECT_CALL(wcb3, onStreamWriteError(stream3, _)).Times(1);
auto stream4 = transport->createBidirectionalStream().value();
NiceMock<MockWriteCallback> wcb4;
NiceMock<MockReadCallback> rcb4;
EXPECT_CALL(wcb4, onStreamWriteError(stream4, _))
.WillOnce(Invoke([&](auto, auto) {
ASSERT_FALSE(transport->setReadCallback(stream4, nullptr).hasError());
}));
EXPECT_CALL(rcb4, readError(_, _)).Times(0);
auto notifyWrite13 = transport->notifyPendingWriteOnStream(stream4, &wcb4);
ASSERT_FALSE(transport->setReadCallback(stream4, &rcb4).hasError());
transport->resetNonControlStreams(
GenericApplicationErrorCode::UNKNOWN, "bye bye");
qEvb->loopOnce();
// Have to manually unset the read callbacks so they aren't use-after-freed.
transport->unsetAllReadCallbacks();
}
TEST_P(QuicTransportImplTestBase, DestroyWithoutClosing) {
EXPECT_CALL(connCallback, onConnectionError(_)).Times(0);
EXPECT_CALL(connCallback, onConnectionEnd()).Times(0);
transport.reset();
}
TEST_P(QuicTransportImplTestBase, UncleanShutdownEventBase) {
// if abruptly shutting down the eventbase we should avoid scheduling
// any new timer.
transport->setIdleTimeout();
qEvb.reset();
}
TEST_P(QuicTransportImplTestBase, GetLocalAddressBoundSocket) {
SocketAddress addr("127.0.0.1", 443);
EXPECT_CALL(*socketPtr, isBound()).WillOnce(Return(true));
EXPECT_CALL(*socketPtr, addressRef()).WillRepeatedly(ReturnRef(addr));
SocketAddress localAddr = transport->getLocalAddress();
EXPECT_TRUE(localAddr == addr);
}
TEST_P(QuicTransportImplTestBase, GetLocalAddressUnboundSocket) {
EXPECT_CALL(*socketPtr, isBound()).WillOnce(Return(false));
SocketAddress localAddr = transport->getLocalAddress();
EXPECT_FALSE(localAddr.isInitialized());
}
TEST_P(QuicTransportImplTestBase, GetLocalAddressBadSocket) {
auto badTransport = std::make_shared<TestQuicTransport>(
qEvb, nullptr, &connSetupCallback, &connCallback);
badTransport->closeWithoutWrite();
SocketAddress localAddr = badTransport->getLocalAddress();
EXPECT_FALSE(localAddr.isInitialized());
}
TEST_P(QuicTransportImplTestBase, AsyncStreamFlowControlWrite) {
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
auto stream = transport->createBidirectionalStream().value();
auto streamStateResult =
transport->transportConn->streamManager->getStream(stream);
ASSERT_FALSE(streamStateResult.hasError());
auto streamState = streamStateResult.value();
transport->setServerConnectionId();
transport->writeLooper()->stop();
streamState->flowControlState.advertisedMaxOffset = 0; // Easier to calculate
auto setFlow1 = transport->setStreamFlowControlWindow(stream, 4000);
EXPECT_EQ(0, streamState->flowControlState.advertisedMaxOffset);
// Loop it:
EXPECT_TRUE(transport->writeLooper()->isRunning());
transport->writeLooper()->runLoopCallback();
EXPECT_EQ(4000, streamState->flowControlState.advertisedMaxOffset);
}
TEST_P(QuicTransportImplTestBase, ExceptionInWriteLooperDoesNotCrash) {
auto stream = transport->createBidirectionalStream().value();
(void)transport->setReadCallback(stream, nullptr);
ASSERT_FALSE(
transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, nullptr)
.hasError());
transport->addDataToStream(
stream, StreamBuffer(IOBuf::copyBuffer("hello"), 0, false));
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillOnce(SetErrnoAndReturn(EBADF, -1));
EXPECT_CALL(connSetupCallback, onConnectionSetupError(_))
.WillOnce(Invoke([&](auto) { transport.reset(); }));
transport->writeLooper()->runLoopCallback();
}
class QuicTransportImplTestUniBidi : public QuicTransportImplTest,
public testing::WithParamInterface<bool> {
};
quic::StreamId createStream(
std::shared_ptr<TestQuicTransport> transport,
bool unidirectional) {
if (unidirectional) {
return transport->createUnidirectionalStream().value();
} else {
return transport->createBidirectionalStream().value();
}
}
INSTANTIATE_TEST_SUITE_P(
QuicTransportImplTest,
QuicTransportImplTestUniBidi,
Values(true, false));
TEST_P(QuicTransportImplTestUniBidi, AppIdleTest) {
auto& conn = transport->getConnectionState();
auto mockCongestionController =
std::make_unique<NiceMock<MockCongestionController>>();
auto rawCongestionController = mockCongestionController.get();
conn.congestionController = std::move(mockCongestionController);
EXPECT_CALL(*rawCongestionController, setAppIdle(false, _)).Times(0);
auto stream = createStream(transport, GetParam());
EXPECT_CALL(*rawCongestionController, setAppIdle(true, _));
transport->closeStream(stream);
}
TEST_P(QuicTransportImplTestUniBidi, AppIdleTestControlStreams) {
auto& conn = transport->getConnectionState();
auto mockCongestionController =
std::make_unique<NiceMock<MockCongestionController>>();
auto rawCongestionController = mockCongestionController.get();
conn.congestionController = std::move(mockCongestionController);
EXPECT_CALL(*rawCongestionController, setAppIdle(false, _)).Times(0);
auto stream = createStream(transport, GetParam());
ASSERT_TRUE(stream);
auto ctrlStream1 = createStream(transport, GetParam());
ASSERT_TRUE(ctrlStream1);
transport->setControlStream(ctrlStream1);
auto ctrlStream2 = createStream(transport, GetParam());
ASSERT_TRUE(ctrlStream2);
transport->setControlStream(ctrlStream2);
EXPECT_CALL(*rawCongestionController, setAppIdle(true, _));
transport->closeStream(stream);
}
TEST_P(QuicTransportImplTestUniBidi, AppIdleTestOnlyControlStreams) {
auto& conn = transport->getConnectionState();
auto mockCongestionController =
std::make_unique<NiceMock<MockCongestionController>>();
auto rawCongestionController = mockCongestionController.get();
conn.congestionController = std::move(mockCongestionController);
auto ctrlStream1 = createStream(transport, GetParam());
EXPECT_CALL(*rawCongestionController, setAppIdle(true, _)).Times(1);
transport->setControlStream(ctrlStream1);
EXPECT_CALL(*rawCongestionController, setAppIdle(false, _)).Times(1);
auto ctrlStream2 = createStream(transport, GetParam());
EXPECT_CALL(*rawCongestionController, setAppIdle(true, _)).Times(1);
transport->setControlStream(ctrlStream2);
EXPECT_CALL(*rawCongestionController, setAppIdle(_, _)).Times(0);
transport->closeStream(ctrlStream1);
transport->closeStream(ctrlStream2);
}
TEST_P(QuicTransportImplTestBase, UnidirectionalInvalidReadFuncs) {
auto stream = transport->createUnidirectionalStream().value();
EXPECT_FALSE(transport->read(stream, 100).has_value());
EXPECT_FALSE(transport->setReadCallback(stream, nullptr).has_value());
EXPECT_FALSE(transport->pauseRead(stream).has_value());
EXPECT_FALSE(transport->resumeRead(stream).has_value());
EXPECT_FALSE(
transport->stopSending(stream, GenericApplicationErrorCode::UNKNOWN)
.has_value());
}
TEST_P(QuicTransportImplTestBase, UnidirectionalInvalidWriteFuncs) {
auto readData = folly::IOBuf::copyBuffer("actual stream data");
StreamId stream = 0x6;
transport->addDataToStream(stream, StreamBuffer(readData->clone(), 0, true));
EXPECT_FALSE(transport->getStreamWriteOffset(stream).has_value());
EXPECT_FALSE(transport->getStreamWriteBufferedBytes(stream).has_value());
EXPECT_FALSE(
transport->notifyPendingWriteOnStream(stream, nullptr).has_value());
EXPECT_FALSE(
transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), false)
.has_value());
EXPECT_FALSE(
transport->registerDeliveryCallback(stream, 0, nullptr).has_value());
EXPECT_FALSE(transport->registerTxCallback(stream, 0, nullptr).has_value());
EXPECT_FALSE(
transport
->registerByteEventCallback(ByteEvent::Type::ACK, stream, 0, nullptr)
.has_value());
EXPECT_FALSE(
transport
->registerByteEventCallback(ByteEvent::Type::TX, stream, 0, nullptr)
.has_value());
EXPECT_FALSE(
transport->resetStream(stream, GenericApplicationErrorCode::UNKNOWN)
.has_value());
}
TEST_P(QuicTransportImplTestUniBidi, IsServerStream) {
auto stream = createStream(transport, GetParam());
EXPECT_TRUE(transport->isServerStream(stream));
}
TEST_P(QuicTransportImplTestUniBidi, IsClientStream) {
auto stream = createStream(transport, GetParam());
EXPECT_FALSE(transport->isClientStream(stream));
}
TEST_P(QuicTransportImplTestBase, IsUnidirectionalStream) {
auto stream = transport->createUnidirectionalStream().value();
EXPECT_TRUE(transport->isUnidirectionalStream(stream));
}
TEST_P(QuicTransportImplTestBase, IsBidirectionalStream) {
auto stream = transport->createBidirectionalStream().value();
EXPECT_TRUE(transport->isBidirectionalStream(stream));
}
TEST_P(QuicTransportImplTestBase, GetStreamDirectionalityUnidirectional) {
auto stream = transport->createUnidirectionalStream().value();
EXPECT_EQ(
StreamDirectionality::Unidirectional,
transport->getStreamDirectionality(stream));
}
TEST_P(QuicTransportImplTestBase, GetStreamDirectionalityBidirectional) {
auto stream = transport->createBidirectionalStream().value();
EXPECT_EQ(
StreamDirectionality::Bidirectional,
transport->getStreamDirectionality(stream));
}
TEST_P(QuicTransportImplTestBase, PeekCallbackDataAvailable) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
NiceMock<MockPeekCallback> peekCb1;
NiceMock<MockPeekCallback> peekCb2;
auto setPeek2 = transport->setPeekCallback(stream1, &peekCb1);
auto setPeek3 = transport->setPeekCallback(stream2, &peekCb2);
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addDataToStream(
stream2,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
transport->driveReadCallbacks();
transport->addDataToStream(
stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(peekCb2, onDataAvailable(stream2, _));
transport->driveReadCallbacks();
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addDataToStream(
stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
EXPECT_CALL(peekCb2, onDataAvailable(stream2, _));
transport->driveReadCallbacks();
auto setPeek4 = transport->setPeekCallback(stream1, nullptr);
auto setPeek5 = transport->setPeekCallback(stream2, nullptr);
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, PeekError) {
auto stream1 = transport->createBidirectionalStream().value();
NiceMock<MockPeekCallback> peekCb1;
auto setPeek6 = transport->setPeekCallback(stream1, &peekCb1);
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addStreamReadError(stream1, LocalErrorCode::STREAM_CLOSED);
EXPECT_CALL(
peekCb1, peekError(stream1, IsError(LocalErrorCode::STREAM_CLOSED)));
transport->driveReadCallbacks();
EXPECT_CALL(peekCb1, peekError(stream1, _));
transport.reset();
}
TEST_P(QuicTransportImplTestBase, PeekCallbackUnsetAll) {
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
NiceMock<MockPeekCallback> peekCb1;
NiceMock<MockPeekCallback> peekCb2;
// Set the peek callbacks and add data to the streams, and see that the
// callbacks do indeed fire
auto setPeek7 = transport->setPeekCallback(stream1, &peekCb1);
auto setPeek8 = transport->setPeekCallback(stream2, &peekCb2);
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addDataToStream(
stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
EXPECT_CALL(peekCb2, onDataAvailable(stream2, _));
transport->driveReadCallbacks();
// unset all of the peek callbacks and see that the callbacks don't fire
// after data is added to the streams
transport->unsetAllPeekCallbacks();
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->addDataToStream(
stream2, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(peekCb1, onDataAvailable(stream1, _)).Times(0);
EXPECT_CALL(peekCb2, onDataAvailable(stream2, _)).Times(0);
transport->driveReadCallbacks();
}
TEST_P(QuicTransportImplTestBase, PeekCallbackChangePeekCallback) {
InSequence enforceOrder;
auto stream1 = transport->createBidirectionalStream().value();
NiceMock<MockPeekCallback> peekCb1;
NiceMock<MockPeekCallback> peekCb2;
auto setPeek9 = transport->setPeekCallback(stream1, &peekCb1);
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
transport->driveReadCallbacks();
auto setPeek10 = transport->setPeekCallback(stream1, &peekCb2);
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
EXPECT_CALL(peekCb2, onDataAvailable(stream1, _));
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, PeekCallbackPauseResume) {
InSequence enforceOrder;
auto stream1 = transport->createBidirectionalStream().value();
NiceMock<MockPeekCallback> peekCb1;
auto setPeek11 = transport->setPeekCallback(stream1, &peekCb1);
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
auto res = transport->pausePeek(stream1);
EXPECT_TRUE(res);
EXPECT_CALL(peekCb1, onDataAvailable(stream1, _)).Times(0);
transport->driveReadCallbacks();
res = transport->resumePeek(stream1);
EXPECT_TRUE(res);
EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
transport->driveReadCallbacks();
auto stream2 = transport->createBidirectionalStream().value();
res = transport->pausePeek(stream2);
EXPECT_FALSE(res);
EXPECT_EQ(LocalErrorCode::APP_ERROR, res.error());
transport.reset();
}
TEST_P(QuicTransportImplTestBase, PeekCallbackNoCallbackSet) {
auto stream1 = transport->createBidirectionalStream().value();
transport->addDataToStream(
stream1,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10));
transport->driveReadCallbacks();
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, PeekCallbackInvalidStream) {
NiceMock<MockPeekCallback> peekCb1;
StreamId invalidStream = 10;
EXPECT_TRUE(transport->setPeekCallback(invalidStream, &peekCb1).hasError());
transport.reset();
}
TEST_P(QuicTransportImplTestBase, PeekData) {
InSequence enforceOrder;
auto stream1 = transport->createBidirectionalStream().value();
NiceMock<MockPeekCallback> peekCb1;
auto peekData = folly::IOBuf::copyBuffer("actual stream data");
auto setPeek12 = transport->setPeekCallback(stream1, &peekCb1);
EXPECT_CALL(peekCb1, onDataAvailable(stream1, _));
transport->addDataToStream(stream1, StreamBuffer(peekData->clone(), 0));
transport->driveReadCallbacks();
bool cbCalled = false;
auto peekCallback = [&](StreamId id,
const folly::Range<PeekIterator>& range) {
cbCalled = true;
EXPECT_EQ(id, stream1);
EXPECT_EQ(range.size(), 1);
auto bufClone = range[0].data.front()->clone();
EXPECT_EQ("actual stream data", bufClone->to<std::string>());
};
auto peekResult = transport->peek(stream1, peekCallback);
EXPECT_TRUE(cbCalled);
transport.reset();
}
TEST_P(QuicTransportImplTestBase, PeekDataWithError) {
InSequence enforceOrder;
auto streamId = transport->createBidirectionalStream().value();
auto peekData = folly::IOBuf::copyBuffer("actual stream data");
transport->addDataToStream(streamId, StreamBuffer(peekData->clone(), 0));
bool cbCalled = false;
auto peekCallback = [&](StreamId, const folly::Range<PeekIterator>&) {
cbCalled = true;
};
// Same local error code should be returned.
transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR);
auto result = transport->peek(streamId, peekCallback);
EXPECT_FALSE(cbCalled);
EXPECT_TRUE(result.hasError());
EXPECT_EQ(LocalErrorCode::NO_ERROR, result.error());
// LocalErrorCode::INTERNAL_ERROR should be returned.
transport->addStreamReadError(
streamId, TransportErrorCode::FLOW_CONTROL_ERROR);
result = transport->peek(streamId, peekCallback);
EXPECT_FALSE(cbCalled);
EXPECT_TRUE(result.hasError());
EXPECT_EQ(LocalErrorCode::INTERNAL_ERROR, result.error());
transport.reset();
}
TEST_P(QuicTransportImplTestBase, ConsumeDataWithError) {
InSequence enforceOrder;
auto streamId = transport->createBidirectionalStream().value();
auto peekData = folly::IOBuf::copyBuffer("actual stream data");
transport->addDataToStream(streamId, StreamBuffer(peekData->clone(), 0));
// Same local error code should be returned.
transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR);
auto result = transport->consume(streamId, 1);
EXPECT_TRUE(result.hasError());
EXPECT_EQ(LocalErrorCode::NO_ERROR, result.error());
// LocalErrorCode::INTERNAL_ERROR should be returned.
transport->addStreamReadError(
streamId, TransportErrorCode::FLOW_CONTROL_ERROR);
result = transport->consume(streamId, 1);
EXPECT_TRUE(result.hasError());
EXPECT_EQ(LocalErrorCode::INTERNAL_ERROR, result.error());
transport.reset();
}
TEST_P(QuicTransportImplTestBase, PeekConsumeReadTest) {
InSequence enforceOrder;
auto stream1 = transport->createBidirectionalStream().value();
auto readData = folly::IOBuf::copyBuffer("actual stream data");
NiceMock<MockPeekCallback> peekCb;
NiceMock<MockReadCallback> readCb;
auto setPeek13 = transport->setPeekCallback(stream1, &peekCb);
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb).hasError());
transport->addDataToStream(
stream1, StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
// Both peek and read should be called.
EXPECT_CALL(readCb, readAvailable(stream1));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
transport->driveReadCallbacks();
// Only read should be called
EXPECT_CALL(readCb, readAvailable(stream1));
transport->driveReadCallbacks();
// Consume 5 bytes.
auto transportConsumeResult1 = transport->consume(stream1, 5);
// Both peek and read should be called.
// Read - because it is called every time
// Peek - because the peekable range has changed
EXPECT_CALL(readCb, readAvailable(stream1));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
transport->driveReadCallbacks();
// Read 10 bytes.
{
auto readResult = transport->read(stream1, 10);
ASSERT_TRUE(readResult.has_value());
auto data = std::move(readResult).value();
EXPECT_EQ("l stream d", data.first->to<std::string>());
}
// Both peek and read should be called.
// Read - because it is called every time
// Peek - because the peekable range has changed
EXPECT_CALL(readCb, readAvailable(stream1));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
transport->driveReadCallbacks();
// Only read should be called.
EXPECT_CALL(readCb, readAvailable(stream1));
transport->driveReadCallbacks();
// Consume the rest of the data.
// Only 3 bytes left, try consuming 42.
auto transportConsumeResult2 = transport->consume(stream1, 42);
// Neither read nor peek should be called.
EXPECT_CALL(readCb, readAvailable(stream1)).Times(0);
EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0);
transport->driveReadCallbacks();
// Add more data, this time with a gap.
auto buf1 = IOBuf::copyBuffer("I just met you and this is crazy.");
auto buf2 = IOBuf::copyBuffer(" Here is my number, so call");
auto buf3 = IOBuf::copyBuffer(" me maybe.");
transport->addDataToStream(stream1, StreamBuffer(buf1->clone(), 0));
transport->addDataToStream(
stream1,
StreamBuffer(
buf3->clone(),
buf1->computeChainDataLength() + buf2->computeChainDataLength()));
// Both peek and read should be called.
EXPECT_CALL(readCb, readAvailable(stream1));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
transport->driveReadCallbacks();
// Consume left part.
auto transportConsumeResult3 =
transport->consume(stream1, buf1->computeChainDataLength());
// Only peek should be called.
EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
transport->driveReadCallbacks();
// Fill in the gap.
transport->addDataToStream(
stream1, StreamBuffer(buf2->clone(), buf1->computeChainDataLength()));
// Both peek and read should be called.
EXPECT_CALL(readCb, readAvailable(stream1));
EXPECT_CALL(peekCb, onDataAvailable(stream1, _));
transport->driveReadCallbacks();
// Read the rest of the buffer.
{
auto readResult = transport->read(stream1, 0);
ASSERT_TRUE(readResult.has_value());
auto data = std::move(readResult).value();
EXPECT_EQ(
" Here is my number, so call me maybe.", data.first->to<std::string>());
}
// Neither read nor peek should be called.
EXPECT_CALL(readCb, readAvailable(stream1)).Times(0);
EXPECT_CALL(peekCb, onDataAvailable(stream1, _)).Times(0);
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestBase, UpdatePeekableListNoDataTest) {
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
auto stream = transport->getStream(streamId);
// Insert streamId into the list.
conn->streamManager->peekableStreams().insert(streamId);
// After the call the streamId should be removed
// from the list since there is no peekable data in the stream.
conn->streamManager->updatePeekableStreams(*stream);
EXPECT_FALSE(conn->streamManager->peekableStreams().contains(streamId));
}
TEST_P(QuicTransportImplTestBase, UpdatePeekableListWithDataTest) {
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
auto stream = transport->getStream(streamId);
// Add some data to the stream.
transport->addDataToStream(
streamId,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
// streamId is in the list after the above call.
EXPECT_TRUE(conn->streamManager->peekableStreams().contains(streamId));
// After the call the streamId shall remain
// in the list since there is data in the stream.
conn->streamManager->updatePeekableStreams(*stream);
EXPECT_TRUE(conn->streamManager->peekableStreams().contains(streamId));
}
TEST_P(QuicTransportImplTestBase, UpdatePeekableListEmptyListTest) {
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
auto stream = transport->getStream(streamId);
// Add some data to the stream.
transport->addDataToStream(
streamId,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
// Erase streamId from the list.
conn->streamManager->peekableStreams().erase(streamId);
EXPECT_FALSE(conn->streamManager->peekableStreams().contains(streamId));
// After the call the streamId should be added to the list
// because there is data in the stream and the streamId is
// not in the list.
conn->streamManager->updatePeekableStreams(*stream);
EXPECT_TRUE(conn->streamManager->peekableStreams().contains(streamId));
}
TEST_P(QuicTransportImplTestBase, UpdatePeekableListWithStreamErrorTest) {
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
// Add some data to the stream.
transport->addDataToStream(
streamId,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0));
// streamId is in the list.
EXPECT_TRUE(conn->streamManager->peekableStreams().contains(streamId));
transport->addStreamReadError(streamId, LocalErrorCode::NO_ERROR);
// peekableStreams is updated to allow stream with streamReadError.
// So the streamId shall be in the list
EXPECT_TRUE(conn->streamManager->peekableStreams().contains(streamId));
}
TEST_P(QuicTransportImplTestBase, SuccessfulPing) {
auto conn = transport->transportConn;
std::chrono::milliseconds interval(10);
TestPingCallback pingCallback;
auto transportSetPingCallback1 = transport->setPingCallback(&pingCallback);
transport->invokeSendPing(interval);
EXPECT_EQ(transport->isPingTimeoutScheduled(), true);
EXPECT_EQ(conn->pendingEvents.cancelPingTimeout, false);
conn->pendingEvents.cancelPingTimeout = true;
transport->invokeHandlePingCallbacks();
qEvb->loopOnce();
EXPECT_EQ(transport->isPingTimeoutScheduled(), false);
EXPECT_EQ(conn->pendingEvents.cancelPingTimeout, false);
}
TEST_P(QuicTransportImplTestBase, FailedPing) {
auto conn = transport->transportConn;
std::chrono::milliseconds interval(10);
TestPingCallback pingCallback;
auto transportSetPingCallback2 = transport->setPingCallback(&pingCallback);
transport->invokeSendPing(interval);
EXPECT_EQ(transport->isPingTimeoutScheduled(), true);
EXPECT_EQ(conn->pendingEvents.cancelPingTimeout, false);
conn->pendingEvents.cancelPingTimeout = true;
transport->invokeCancelPingTimeout();
transport->invokeHandlePingCallbacks();
EXPECT_EQ(conn->pendingEvents.cancelPingTimeout, false);
}
TEST_P(QuicTransportImplTestBase, HandleKnobCallbacks) {
auto conn = transport->transportConn;
LegacyObserver::EventSet eventSet;
eventSet.enable(SocketObserverInterface::Events::knobFrameEvents);
auto obs1 = std::make_unique<NiceMock<MockLegacyObserver>>();
auto obs2 = std::make_unique<NiceMock<MockLegacyObserver>>(eventSet);
auto obs3 = std::make_unique<NiceMock<MockLegacyObserver>>(eventSet);
transport->addObserver(obs1.get());
transport->addObserver(obs2.get());
transport->addObserver(obs3.get());
// set test knob frame
uint64_t knobSpace = 0xfaceb00c;
uint64_t knobId = 42;
folly::StringPiece data = "test knob data";
BufPtr buf(folly::IOBuf::create(data.size()));
memcpy(buf->writableData(), data.data(), data.size());
buf->append(data.size());
conn->pendingEvents.knobs.emplace_back(
KnobFrame(knobSpace, knobId, std::move(buf)));
EXPECT_CALL(connCallback, onKnobMock(knobSpace, knobId, _))
.WillOnce(Invoke([](Unused, Unused, Unused) { /* do nothing */ }));
EXPECT_CALL(*obs1, knobFrameReceived(transport.get(), _)).Times(0);
EXPECT_CALL(*obs2, knobFrameReceived(transport.get(), _)).Times(1);
EXPECT_CALL(*obs3, knobFrameReceived(transport.get(), _)).Times(1);
transport->invokeHandleKnobCallbacks();
qEvb->loopOnce();
EXPECT_EQ(conn->pendingEvents.knobs.size(), 0);
// detach the observer from the socket
EXPECT_TRUE(transport->removeObserver(obs1.get()));
EXPECT_TRUE(transport->removeObserver(obs2.get()));
EXPECT_TRUE(transport->removeObserver(obs3.get()));
}
TEST_P(QuicTransportImplTestBase, StreamWriteCallbackUnregister) {
auto stream = transport->createBidirectionalStream().value();
// Unset before set
EXPECT_FALSE(transport->unregisterStreamWriteCallback(stream));
// Set
auto wcb = std::make_unique<MockWriteCallback>();
EXPECT_CALL(*wcb, onStreamWriteReady(stream, _)).Times(1);
auto result = transport->notifyPendingWriteOnStream(stream, wcb.get());
EXPECT_TRUE(result);
qEvb->loopOnce();
// Set then unset
EXPECT_CALL(*wcb, onStreamWriteReady(stream, _)).Times(0);
result = transport->notifyPendingWriteOnStream(stream, wcb.get());
EXPECT_TRUE(result);
EXPECT_TRUE(transport->unregisterStreamWriteCallback(stream));
qEvb->loopOnce();
// Set, close, unset
result = transport->notifyPendingWriteOnStream(stream, wcb.get());
EXPECT_TRUE(result);
MockReadCallback rcb;
ASSERT_FALSE(transport->setReadCallback(stream, &rcb).hasError());
// ReadCallback kills WriteCallback
EXPECT_CALL(rcb, readError(stream, _))
.WillOnce(Invoke([&](StreamId stream, auto) {
EXPECT_TRUE(transport->unregisterStreamWriteCallback(stream));
wcb.reset();
}));
transport->close(std::nullopt);
qEvb->loopOnce();
}
TEST_P(QuicTransportImplTestBase, ObserverRemove) {
auto cb = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
EXPECT_CALL(*cb, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeObserver(cb.get()));
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_THAT(transport->getObservers(), IsEmpty());
}
TEST_P(QuicTransportImplTestBase, ObserverDestroy) {
auto cb = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
InSequence s;
EXPECT_CALL(*cb, closeStarted(transport.get(), _));
EXPECT_CALL(*cb, closing(transport.get(), _));
EXPECT_CALL(*cb, destroy(transport.get()));
transport = nullptr;
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_P(QuicTransportImplTestBase, ObserverRemoveMissing) {
auto cb = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_FALSE(transport->removeObserver(cb.get()));
EXPECT_THAT(transport->getObservers(), IsEmpty());
}
TEST_P(QuicTransportImplTestBase, ObserverSharedPtrRemove) {
auto cb = std::make_shared<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addObserver(cb);
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
EXPECT_CALL(*cb, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeObserver(cb));
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_THAT(transport->getObservers(), IsEmpty());
}
TEST_P(QuicTransportImplTestBase, ObserverSharedPtrDestroy) {
auto cb = std::make_shared<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addObserver(cb);
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
InSequence s;
EXPECT_CALL(*cb, closeStarted(transport.get(), _));
EXPECT_CALL(*cb, closing(transport.get(), _));
EXPECT_CALL(*cb, destroy(transport.get()));
transport = nullptr;
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_P(QuicTransportImplTestBase, ObserverSharedPtrReleasedDestroy) {
auto cb = std::make_shared<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addObserver(cb);
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
// now that observer is attached, we release shared_ptr but keep raw ptr
// since the container holds shared_ptr too, observer should not be destroyed
MockLegacyObserver::Safety dc(*cb.get());
auto cbRaw = cb.get();
cb = nullptr;
EXPECT_FALSE(dc.destroyed()); // should still exist
InSequence s;
EXPECT_CALL(*cbRaw, closeStarted(transport.get(), _));
EXPECT_CALL(*cbRaw, closing(transport.get(), _));
EXPECT_CALL(*cbRaw, destroy(transport.get()));
transport = nullptr;
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_P(QuicTransportImplTestBase, ObserverSharedPtrRemoveMissing) {
auto cb = std::make_shared<StrictMock<MockLegacyObserver>>();
EXPECT_FALSE(transport->removeObserver(cb.get()));
EXPECT_THAT(transport->getObservers(), IsEmpty());
}
TEST_P(QuicTransportImplTestBase, ObserverDetachImmediately) {
auto cb = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
EXPECT_CALL(*cb, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeObserver(cb.get()));
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_THAT(transport->getObservers(), IsEmpty());
}
TEST_P(QuicTransportImplTestBase, ObserverDetachAfterClose) {
// disable draining to ensure closing() event occurs immediately after close()
{
auto transportSettings = transport->getTransportSettings();
transportSettings.shouldDrain = false;
transport->setTransportSettings(transportSettings);
}
auto cb = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
EXPECT_CALL(*cb, closeStarted(transport.get(), _));
EXPECT_CALL(*cb, closing(transport.get(), _));
transport->close(std::nullopt);
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_CALL(*cb, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeObserver(cb.get()));
Mock::VerifyAndClearExpectations(cb.get());
EXPECT_THAT(transport->getObservers(), IsEmpty());
}
TEST_F(QuicTransportImplTest, ObserverDetachOnCloseStartedDuringDestroy) {
auto cb = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
InSequence s;
EXPECT_CALL(*cb, closeStarted(transport.get(), _))
.WillOnce(Invoke([&cb](auto callbackTransport, auto /* errorOpt */) {
EXPECT_TRUE(callbackTransport->removeObserver(cb.get()));
}));
EXPECT_CALL(*cb, observerDetach(transport.get()));
transport = nullptr;
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_F(QuicTransportImplTest, ObserverDetachOnClosingDuringDestroy) {
auto cb = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb, observerAttach(transport.get()));
transport->addObserver(cb.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb.get()));
InSequence s;
EXPECT_CALL(*cb, closeStarted(transport.get(), _));
EXPECT_CALL(*cb, closing(transport.get(), _))
.WillOnce(Invoke([&cb](auto callbackTransport, auto /* errorOpt */) {
EXPECT_TRUE(callbackTransport->removeObserver(cb.get()));
}));
EXPECT_CALL(*cb, observerDetach(transport.get()));
transport = nullptr;
Mock::VerifyAndClearExpectations(cb.get());
}
TEST_P(QuicTransportImplTestBase, ObserverMultipleAttachRemove) {
auto cb1 = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb1, observerAttach(transport.get()));
transport->addObserver(cb1.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
auto cb2 = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb2, observerAttach(transport.get()));
transport->addObserver(cb2.get());
EXPECT_THAT(
transport->getObservers(), UnorderedElementsAre(cb1.get(), cb2.get()));
EXPECT_CALL(*cb2, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeObserver(cb2.get()));
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
Mock::VerifyAndClearExpectations(cb1.get());
Mock::VerifyAndClearExpectations(cb2.get());
EXPECT_CALL(*cb1, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeObserver(cb1.get()));
EXPECT_THAT(transport->getObservers(), IsEmpty());
Mock::VerifyAndClearExpectations(cb1.get());
Mock::VerifyAndClearExpectations(cb2.get());
transport = nullptr;
}
TEST_P(QuicTransportImplTestBase, ObserverSharedPtrMultipleAttachRemove) {
auto cb1 = std::make_shared<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb1, observerAttach(transport.get()));
transport->addObserver(cb1);
Mock::VerifyAndClearExpectations(cb1.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
auto cb2 = std::make_shared<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb2, observerAttach(transport.get()));
transport->addObserver(cb2);
Mock::VerifyAndClearExpectations(cb2.get());
EXPECT_THAT(
transport->getObservers(), UnorderedElementsAre(cb1.get(), cb2.get()));
EXPECT_CALL(*cb1, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeObserver(cb1));
Mock::VerifyAndClearExpectations(cb1.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb2.get()));
EXPECT_CALL(*cb2, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeObserver(cb2));
Mock::VerifyAndClearExpectations(cb2.get());
EXPECT_THAT(transport->getObservers(), IsEmpty());
}
TEST_P(QuicTransportImplTestBase, ObserverMultipleAttachRemoveReverse) {
auto cb1 = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb1, observerAttach(transport.get()));
transport->addObserver(cb1.get());
Mock::VerifyAndClearExpectations(cb1.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
auto cb2 = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb2, observerAttach(transport.get()));
transport->addObserver(cb2.get());
Mock::VerifyAndClearExpectations(cb2.get());
EXPECT_THAT(
transport->getObservers(), UnorderedElementsAre(cb1.get(), cb2.get()));
EXPECT_CALL(*cb2, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeObserver(cb2.get()));
Mock::VerifyAndClearExpectations(cb1.get());
Mock::VerifyAndClearExpectations(cb2.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
EXPECT_CALL(*cb1, observerDetach(transport.get()));
EXPECT_TRUE(transport->removeObserver(cb1.get()));
Mock::VerifyAndClearExpectations(cb1.get());
Mock::VerifyAndClearExpectations(cb2.get());
EXPECT_THAT(transport->getObservers(), IsEmpty());
}
TEST_P(QuicTransportImplTestBase, ObserverMultipleAttachDestroy) {
auto cb1 = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb1, observerAttach(transport.get()));
transport->addObserver(cb1.get());
EXPECT_THAT(transport->getObservers(), UnorderedElementsAre(cb1.get()));
auto cb2 = std::make_unique<StrictMock<MockLegacyObserver>>();
EXPECT_CALL(*cb2, observerAttach(transport.get()));
transport->addObserver(cb2.get());
EXPECT_THAT(
transport->getObservers(), UnorderedElementsAre(cb1.get(), cb2.get()));
InSequence s;
EXPECT_CALL(*cb1, closeStarted(transport.get(), _));
EXPECT_CALL(*cb2, closeStarted(transport.get(), _));
EXPECT_CALL(*cb1, closing(transport.get(), _));
EXPECT_CALL(*cb2, closing(transport.get(), _));
EXPECT_CALL(*cb1, destroy(transport.get()));
EXPECT_CALL(*cb2, destroy(transport.get()));
transport = nullptr;
Mock::VerifyAndClearExpectations(cb1.get());
Mock::VerifyAndClearExpectations(cb2.get());
}
TEST_P(QuicTransportImplTestBase, ObserverDetachAndAttachEvb) {
LegacyObserver::EventSet eventSet;
eventSet.enable(SocketObserverInterface::Events::evbEvents);
auto obs1 = std::make_unique<NiceMock<MockLegacyObserver>>();
auto obs2 = std::make_unique<NiceMock<MockLegacyObserver>>(eventSet);
auto obs3 = std::make_unique<NiceMock<MockLegacyObserver>>(eventSet);
transport->addObserver(obs1.get());
transport->addObserver(obs2.get());
transport->addObserver(obs3.get());
// check the current event base and create a new one
EXPECT_EQ(qEvb, transport->getEventBase());
folly::EventBase fEvb2;
std::shared_ptr<QuicEventBase> qEvb2 =
std::make_shared<FollyQuicEventBase>(&fEvb2);
// Detach the event base evb
EXPECT_CALL(*obs1, evbDetach(transport.get(), qEvb.get())).Times(0);
EXPECT_CALL(*obs2, evbDetach(transport.get(), qEvb.get())).Times(1);
EXPECT_CALL(*obs3, evbDetach(transport.get(), qEvb.get())).Times(1);
transport->detachEventBase();
EXPECT_EQ(nullptr, transport->getEventBase());
// Attach a new event base evb2
EXPECT_CALL(*obs1, evbAttach(transport.get(), qEvb2.get())).Times(0);
EXPECT_CALL(*obs2, evbAttach(transport.get(), qEvb2.get())).Times(1);
EXPECT_CALL(*obs3, evbAttach(transport.get(), qEvb2.get())).Times(1);
transport->attachEventBase(qEvb2);
EXPECT_EQ(qEvb2, transport->getEventBase());
// Detach the event base evb2
EXPECT_CALL(*obs1, evbDetach(transport.get(), qEvb2.get())).Times(0);
EXPECT_CALL(*obs2, evbDetach(transport.get(), qEvb2.get())).Times(1);
EXPECT_CALL(*obs3, evbDetach(transport.get(), qEvb2.get())).Times(1);
transport->detachEventBase();
EXPECT_EQ(nullptr, transport->getEventBase());
// Attach the original event base evb
EXPECT_CALL(*obs1, evbAttach(transport.get(), qEvb.get())).Times(0);
EXPECT_CALL(*obs2, evbAttach(transport.get(), qEvb.get())).Times(1);
EXPECT_CALL(*obs3, evbAttach(transport.get(), qEvb.get())).Times(1);
transport->attachEventBase(qEvb);
EXPECT_EQ(qEvb, transport->getEventBase());
EXPECT_TRUE(transport->removeObserver(obs1.get()));
EXPECT_TRUE(transport->removeObserver(obs2.get()));
EXPECT_TRUE(transport->removeObserver(obs3.get()));
}
TEST_P(QuicTransportImplTestBase, GetConnectionStatsSmoke) {
auto stats = transport->getConnectionsStats();
EXPECT_EQ(stats.congestionController, CongestionControlType::Cubic);
EXPECT_EQ(stats.clientConnectionId, "0a090807");
}
TEST_P(QuicTransportImplTestBase, DatagramCallbackDatagramAvailable) {
NiceMock<MockDatagramCallback> datagramCb;
transport->enableDatagram();
auto transportSetDatagramCallback1 =
transport->setDatagramCallback(&datagramCb);
transport->addDatagram(folly::IOBuf::copyBuffer("datagram payload"));
EXPECT_CALL(datagramCb, onDatagramsAvailable());
transport->driveReadCallbacks();
}
TEST_P(QuicTransportImplTestBase, ZeroLengthDatagram) {
NiceMock<MockDatagramCallback> datagramCb;
transport->enableDatagram();
auto transportSetDatagramCallback2 =
transport->setDatagramCallback(&datagramCb);
transport->addDatagram(folly::IOBuf::copyBuffer(""));
EXPECT_CALL(datagramCb, onDatagramsAvailable());
transport->driveReadCallbacks();
auto datagrams = transport->readDatagramBufs();
EXPECT_FALSE(datagrams.hasError());
EXPECT_EQ(datagrams->size(), 1);
EXPECT_TRUE(datagrams->front() != nullptr);
EXPECT_EQ(datagrams->front()->computeChainDataLength(), 0);
}
TEST_P(QuicTransportImplTestBase, ZeroLengthDatagramBufs) {
NiceMock<MockDatagramCallback> datagramCb;
transport->enableDatagram();
auto transportSetDatagramCallback3 =
transport->setDatagramCallback(&datagramCb);
auto recvTime = Clock::now() + 5000ns;
transport->addDatagram(folly::IOBuf::copyBuffer(""), recvTime);
EXPECT_CALL(datagramCb, onDatagramsAvailable());
transport->driveReadCallbacks();
auto datagrams = transport->readDatagrams();
EXPECT_FALSE(datagrams.hasError());
EXPECT_EQ(datagrams->size(), 1);
EXPECT_TRUE(datagrams->front().bufQueue().front() != nullptr);
EXPECT_EQ(datagrams->front().receiveTimePoint(), recvTime);
EXPECT_EQ(datagrams->front().bufQueue().front()->computeChainDataLength(), 0);
}
TEST_P(QuicTransportImplTestBase, Cmsgs) {
transport->setServerConnectionId();
folly::SocketCmsgMap cmsgs;
cmsgs[{IPPROTO_IP, IP_TOS}] = 123;
EXPECT_CALL(*socketPtr, setCmsgs(_)).Times(1);
transport->setCmsgs(cmsgs);
EXPECT_CALL(*socketPtr, appendCmsgs(_)).Times(1);
transport->appendCmsgs(cmsgs);
}
class QuicTransportImplTestCounters : public QuicTransportImplTest {};
TEST_F(QuicTransportImplTestCounters, TransportResetClosesStreams) {
MockQuicStats quicStats;
auto transportSettings = transport->getTransportSettings();
auto& conn = transport->getConnectionState();
conn.statsCallback = &quicStats;
EXPECT_CALL(quicStats, onNewQuicStream()).Times(2);
EXPECT_CALL(quicStats, onQuicStreamClosed()).Times(2);
auto stream1 = transport->createBidirectionalStream().value();
auto stream2 = transport->createBidirectionalStream().value();
EXPECT_EQ(stream1, 1);
EXPECT_EQ(stream2, 5);
EXPECT_EQ(conn.streamManager->streamCount(), 2);
transport.reset();
}
class QuicTransportImplTestWithGroups : public QuicTransportImplTestBase {};
INSTANTIATE_TEST_SUITE_P(
QuicTransportImplTestWithGroups,
QuicTransportImplTestWithGroups,
::testing::Values(DelayedStreamNotifsTestParam{true}));
TEST_P(QuicTransportImplTestWithGroups, ReadCallbackWithGroupsDataAvailable) {
auto transportSettings = transport->getTransportSettings();
transportSettings.advertisedMaxStreamGroups = 16;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
auto groupId = transport->createBidirectionalStreamGroup();
EXPECT_TRUE(groupId.has_value());
auto stream1 = transport->createBidirectionalStreamInGroup(*groupId).value();
auto stream2 = transport->createBidirectionalStreamInGroup(*groupId).value();
NiceMock<MockReadCallback> readCb1;
NiceMock<MockReadCallback> readCb2;
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
ASSERT_FALSE(transport->setReadCallback(stream2, &readCb2).hasError());
transport->addDataToStream(
stream1,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0),
*groupId);
transport->addDataToStream(
stream2,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10),
*groupId);
EXPECT_CALL(readCb1, readAvailableWithGroup(stream1, *groupId));
transport->driveReadCallbacks();
transport->addDataToStream(
stream2,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0),
*groupId);
EXPECT_CALL(readCb1, readAvailableWithGroup(stream1, *groupId));
EXPECT_CALL(readCb2, readAvailableWithGroup(stream2, *groupId));
transport->driveReadCallbacks();
EXPECT_CALL(readCb1, readAvailableWithGroup(stream1, *groupId));
EXPECT_CALL(readCb2, readAvailableWithGroup(stream2, *groupId));
transport->driveReadCallbacks();
EXPECT_CALL(readCb2, readAvailableWithGroup(stream2, *groupId));
ASSERT_FALSE(transport->setReadCallback(stream1, nullptr).hasError());
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(QuicTransportImplTestWithGroups, ReadErrorCallbackWithGroups) {
auto transportSettings = transport->getTransportSettings();
transportSettings.advertisedMaxStreamGroups = 16;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
auto groupId = transport->createBidirectionalStreamGroup();
EXPECT_TRUE(groupId.has_value());
auto stream1 = transport->createBidirectionalStreamInGroup(*groupId).value();
NiceMock<MockReadCallback> readCb1;
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
transport->addStreamReadError(stream1, LocalErrorCode::NO_ERROR);
transport->addDataToStream(
stream1,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0),
*groupId);
EXPECT_CALL(readCb1, readErrorWithGroup(stream1, *groupId, _));
transport->driveReadCallbacks();
transport.reset();
}
TEST_P(
QuicTransportImplTestWithGroups,
ReadCallbackWithGroupsCancellCallbacks) {
auto transportSettings = transport->getTransportSettings();
transportSettings.advertisedMaxStreamGroups = 16;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
auto groupId = transport->createBidirectionalStreamGroup();
EXPECT_TRUE(groupId.has_value());
auto stream1 = transport->createBidirectionalStreamInGroup(*groupId).value();
auto stream2 = transport->createBidirectionalStreamInGroup(*groupId).value();
NiceMock<MockReadCallback> readCb1;
NiceMock<MockReadCallback> readCb2;
ASSERT_FALSE(transport->setReadCallback(stream1, &readCb1).hasError());
ASSERT_FALSE(transport->setReadCallback(stream2, &readCb2).hasError());
transport->addDataToStream(
stream1,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 0),
*groupId);
transport->addDataToStream(
stream2,
StreamBuffer(folly::IOBuf::copyBuffer("actual stream data"), 10),
*groupId);
EXPECT_CALL(readCb1, readErrorWithGroup(stream1, *groupId, _));
EXPECT_CALL(readCb2, readErrorWithGroup(stream2, *groupId, _));
QuicError error =
QuicError(TransportErrorCode::PROTOCOL_VIOLATION, "test error");
transport->cancelAllAppCallbacks(error);
transport.reset();
}
TEST_P(QuicTransportImplTestWithGroups, onNewStreamsAndGroupsCallbacks) {
auto transportSettings = transport->getTransportSettings();
transportSettings.advertisedMaxStreamGroups = 16;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
auto readData = folly::IOBuf::copyBuffer("actual stream data");
StreamGroupId groupId = 0x00;
StreamId stream1 = 0x00;
EXPECT_CALL(connCallback, onNewBidirectionalStreamGroup(groupId));
EXPECT_CALL(connCallback, onNewBidirectionalStreamInGroup(stream1, groupId));
transport->addDataToStream(
stream1, StreamBuffer(readData->clone(), 0, true), groupId);
StreamId stream2 = 0x04;
EXPECT_CALL(connCallback, onNewBidirectionalStreamInGroup(stream2, groupId));
transport->addDataToStream(
stream2, StreamBuffer(readData->clone(), 0, true), groupId);
StreamGroupId groupIdUni = 0x02;
StreamId uniStream3 = 0xa;
EXPECT_CALL(connCallback, onNewUnidirectionalStreamGroup(groupIdUni));
EXPECT_CALL(
connCallback, onNewUnidirectionalStreamInGroup(uniStream3, groupIdUni));
transport->addDataToStream(
uniStream3, StreamBuffer(readData->clone(), 0, true), groupIdUni);
transport.reset();
}
TEST_P(
QuicTransportImplTestWithGroups,
TestSetStreamGroupRetransmissionPolicyAllowed) {
auto transportSettings = transport->getTransportSettings();
transportSettings.advertisedMaxStreamGroups = 16;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
const StreamGroupId groupId = 0x00;
const QuicStreamGroupRetransmissionPolicy policy;
// Test policy set allowed
auto res = transport->setStreamGroupRetransmissionPolicy(groupId, policy);
EXPECT_TRUE(res.has_value());
// Test policy set not allowed.
transportSettings.advertisedMaxStreamGroups = 0;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
res = transport->setStreamGroupRetransmissionPolicy(groupId, policy);
EXPECT_TRUE(res.hasError());
EXPECT_EQ(res.error(), LocalErrorCode::INVALID_OPERATION);
EXPECT_EQ(1, transport->getStreamGroupRetransmissionPolicies().size());
transport.reset();
}
TEST_P(
QuicTransportImplTestWithGroups,
TestStreamGroupRetransmissionPolicyReset) {
auto transportSettings = transport->getTransportSettings();
transportSettings.advertisedMaxStreamGroups = 16;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
const StreamGroupId groupId = 0x00;
QuicStreamGroupRetransmissionPolicy policy;
// Add the policy.
auto res = transport->setStreamGroupRetransmissionPolicy(groupId, policy);
EXPECT_TRUE(res.has_value());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 1);
// Reset allowed.
res = transport->setStreamGroupRetransmissionPolicy(groupId, std::nullopt);
EXPECT_TRUE(res.has_value());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 0);
// Add the policy back.
res = transport->setStreamGroupRetransmissionPolicy(groupId, policy);
EXPECT_TRUE(res.has_value());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 1);
// Reset allowed even if custom policies are disabled.
transportSettings.advertisedMaxStreamGroups = 0;
res = transport->setStreamGroupRetransmissionPolicy(groupId, std::nullopt);
EXPECT_TRUE(res.has_value());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 0);
transport.reset();
}
TEST_P(
QuicTransportImplTestWithGroups,
TestStreamGroupRetransmissionPolicyAddRemove) {
auto transportSettings = transport->getTransportSettings();
transportSettings.advertisedMaxStreamGroups = 16;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
// Add a policy.
const StreamGroupId groupId = 0x00;
const QuicStreamGroupRetransmissionPolicy policy;
auto res = transport->setStreamGroupRetransmissionPolicy(groupId, policy);
EXPECT_TRUE(res.has_value());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 1);
// Add another one.
const StreamGroupId groupId2 = 0x04;
const QuicStreamGroupRetransmissionPolicy policy2;
res = transport->setStreamGroupRetransmissionPolicy(groupId2, policy2);
EXPECT_TRUE(res.has_value());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 2);
// Remove second policy.
res = transport->setStreamGroupRetransmissionPolicy(groupId2, std::nullopt);
EXPECT_TRUE(res.has_value());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 1);
// Remove first policy.
res = transport->setStreamGroupRetransmissionPolicy(groupId, std::nullopt);
EXPECT_TRUE(res.has_value());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 0);
transport.reset();
}
TEST_P(
QuicTransportImplTestWithGroups,
TestStreamGroupRetransmissionPolicyMaxLimit) {
auto transportSettings = transport->getTransportSettings();
transportSettings.advertisedMaxStreamGroups = 1;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
// Add a policy.
const StreamGroupId groupId = 0x00;
const QuicStreamGroupRetransmissionPolicy policy;
auto res = transport->setStreamGroupRetransmissionPolicy(groupId, policy);
EXPECT_TRUE(res.has_value());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 1);
// Try adding another one; should be over the limit.
const StreamGroupId groupId2 = 0x04;
res = transport->setStreamGroupRetransmissionPolicy(groupId2, policy);
EXPECT_TRUE(res.hasError());
EXPECT_EQ(res.error(), LocalErrorCode::RTX_POLICIES_LIMIT_EXCEEDED);
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 1);
transport.reset();
}
TEST_P(QuicTransportImplTestBase, TestUpdateWriteLooperWithWritableCallback) {
auto transportSettings = transport->getTransportSettings();
transportSettings.useSockWritableEvents = true;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
EXPECT_CALL(*socketPtr, isWritableCallbackSet()).WillOnce(Return(true));
transport->updateWriteLooper(true /* thisIteration */);
// Disable useSockWritableEvents.
transportSettings = transport->getTransportSettings();
transportSettings.useSockWritableEvents = false;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
EXPECT_CALL(*socketPtr, isWritableCallbackSet()).Times(0);
transport->updateWriteLooper(true /* thisIteration */);
transport.reset();
}
TEST_P(
QuicTransportImplTestBase,
TestMaybeStopWriteLooperAndArmSocketWritableEvent) {
auto transportSettings = transport->getTransportSettings();
transportSettings.useSockWritableEvents = true;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
// Create a stream with outgoing data.
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
auto stream = transport->getStream(streamId);
auto dataBuf = IOBuf::copyBuffer("hello");
stream->pendingWrites.append(dataBuf);
stream->writeBuffer.append(std::move(dataBuf));
// Insert streamId into the list.
conn->streamManager->updateWritableStreams(*stream);
// Write looper is running.
transport->writeLooper()->run(true /* thisIteration */);
EXPECT_TRUE(transport->writeLooper()->isRunning());
// Write event is not armed.
EXPECT_CALL(*socketPtr, isWritableCallbackSet()).WillOnce(Return(false));
EXPECT_CALL(*socketPtr, resumeWrite(_))
.WillOnce(Return(quic::Expected<void, QuicError>{}));
transport->maybeStopWriteLooperAndArmSocketWritableEvent();
// Write looper is stopped.
EXPECT_FALSE(transport->writeLooper()->isRunning());
transport.reset();
}
TEST_P(
QuicTransportImplTestBase,
TestMaybeStopWriteLooperAndArmSocketWritableEventNoData) {
auto transportSettings = transport->getTransportSettings();
transportSettings.useSockWritableEvents = true;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
// Write looper is running.
transport->writeLooper()->run(true /* thisIteration */);
EXPECT_TRUE(transport->writeLooper()->isRunning());
// Write event is not armed.
EXPECT_CALL(*socketPtr, isWritableCallbackSet()).WillOnce(Return(false));
EXPECT_CALL(*socketPtr, resumeWrite(_)).Times(0);
transport->maybeStopWriteLooperAndArmSocketWritableEvent();
// Write looper is still running.
EXPECT_TRUE(transport->writeLooper()->isRunning());
transport.reset();
}
TEST_P(
QuicTransportImplTestBase,
TestMaybeStopWriteLooperAndArmSocketWritableEventAlreadyArmed) {
auto transportSettings = transport->getTransportSettings();
transportSettings.useSockWritableEvents = true;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
// Create a stream with outgoing data.
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
auto stream = transport->getStream(streamId);
auto dataBuf = IOBuf::copyBuffer("hello");
stream->pendingWrites.append(dataBuf);
stream->writeBuffer.append(std::move(dataBuf));
// Insert streamId into the list.
conn->streamManager->updateWritableStreams(*stream);
// Write looper is stopped.
EXPECT_FALSE(transport->writeLooper()->isRunning());
// Write event is already armed.
EXPECT_CALL(*socketPtr, isWritableCallbackSet()).WillOnce(Return(true));
EXPECT_CALL(*socketPtr, resumeWrite(_)).Times(0);
transport->maybeStopWriteLooperAndArmSocketWritableEvent();
// Write looper is still stopped.
EXPECT_FALSE(transport->writeLooper()->isRunning());
transport.reset();
}
TEST_P(
QuicTransportImplTestBase,
TestMaybeStopWriteLooperAndArmSocketWritableEventNoCongestionControlAvailable) {
auto transportSettings = transport->getTransportSettings();
transportSettings.useSockWritableEvents = true;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
// Create a stream with outgoing data.
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
auto stream = transport->getStream(streamId);
auto dataBuf = IOBuf::copyBuffer("hello");
stream->pendingWrites.append(dataBuf);
stream->writeBuffer.append(std::move(dataBuf));
// Insert streamId into the list.
conn->streamManager->updateWritableStreams(*stream);
// Write looper is running.
transport->writeLooper()->run(true /* thisIteration */);
EXPECT_TRUE(transport->writeLooper()->isRunning());
// Fake in-flight bytes for CC to return no window available.
conn->lossState.inflightBytes = 123234534;
// Write event is not armed.
EXPECT_CALL(*socketPtr, isWritableCallbackSet()).WillOnce(Return(false));
EXPECT_CALL(*socketPtr, resumeWrite(_)).Times(0);
transport->maybeStopWriteLooperAndArmSocketWritableEvent();
// Write looper is still running.
EXPECT_TRUE(transport->writeLooper()->isRunning());
transport.reset();
}
TEST_P(
QuicTransportImplTestBase,
TestMaybeStopWriteLooperAndArmSocketWritableEventNoFlowControlAvailable) {
auto transportSettings = transport->getTransportSettings();
transportSettings.useSockWritableEvents = true;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
// Create a stream with outgoing data.
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
auto stream = transport->getStream(streamId);
auto dataBuf = IOBuf::copyBuffer("hello");
stream->pendingWrites.append(dataBuf);
stream->writeBuffer.append(std::move(dataBuf));
// Insert streamId into the list.
conn->streamManager->updateWritableStreams(*stream);
// Write looper is running.
transport->writeLooper()->run(true /* thisIteration */);
EXPECT_TRUE(transport->writeLooper()->isRunning());
// Fake no flow control.
conn->flowControlState.peerAdvertisedMaxOffset =
conn->flowControlState.sumCurWriteOffset = 1024;
// Write event is not armed.
EXPECT_CALL(*socketPtr, isWritableCallbackSet()).WillOnce(Return(false));
EXPECT_CALL(*socketPtr, resumeWrite(_)).Times(0);
transport->maybeStopWriteLooperAndArmSocketWritableEvent();
// Write looper is still running.
EXPECT_TRUE(transport->writeLooper()->isRunning());
transport.reset();
}
TEST_P(QuicTransportImplTestBase, TestOnSocketWritable) {
auto transportSettings = transport->getTransportSettings();
transportSettings.useSockWritableEvents = true;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
// Write looper is not running.
EXPECT_FALSE(transport->writeLooper()->isRunning());
EXPECT_CALL(*socketPtr, pauseWrite()).Times(1);
transport->onSocketWritable();
// Write looper is running.
EXPECT_TRUE(transport->writeLooper()->isRunning());
transport.reset();
}
TEST_P(
QuicTransportImplTestBase,
TestBackpressureWriterArmsSocketWritableEvent) {
transport->setServerConnectionId();
auto transportSettings = transport->getTransportSettings();
transportSettings.useSockWritableEvents = true;
transportSettings.batchingMode = QuicBatchingMode::BATCHING_MODE_NONE;
transportSettings.maxBatchSize = 1;
transportSettings.dataPathType = DataPathType::ChainedMemory;
transportSettings.enableWriterBackpressure = true;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
// Create a stream with outgoing data.
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
auto stream = transport->getStream(streamId);
std::string testString = "hello";
auto dataBuf = IOBuf::copyBuffer(testString);
stream->pendingWrites.append(dataBuf);
stream->writeBuffer.append(std::move(dataBuf));
conn->flowControlState.sumCurStreamBufferLen = testString.length();
// Insert streamId into the list.
conn->streamManager->updateWritableStreams(*stream);
// Mock arming the write callback
bool writeCallbackArmed = false;
EXPECT_CALL(*socketPtr, isWritableCallbackSet()).WillRepeatedly(Invoke([&]() {
return writeCallbackArmed;
}));
EXPECT_CALL(*socketPtr, resumeWrite(_))
.WillOnce(Invoke(
[&](QuicAsyncUDPSocket::WriteCallback*)
-> quic::Expected<void, QuicError> {
writeCallbackArmed = true;
return {};
}));
// Fail the first write loop.
EXPECT_CALL(*socketPtr, write(_, _, _))
.Times(2) // We attempt to flush the batch twice inside the write loop.
// Fail both.
.WillRepeatedly(
Invoke([&](const folly::SocketAddress&, const struct iovec*, size_t) {
errno = EAGAIN;
return 0;
}));
transport->writeLooper()->run(true /* thisIteration */);
EXPECT_TRUE(transport->writeLooper()->isRunning());
// A write attempt will cache the failed write, stop the write looper, and arm
// the write callback.
transport->pacedWriteDataToSocketThroughTransportBase();
// The transport has cached the failed write buffer.
EXPECT_TRUE(conn->pendingWriteBatch_.buf);
// Write looper stopped.
EXPECT_FALSE(transport->writeLooper()->isRunning());
// Write callback armed.
EXPECT_TRUE(writeCallbackArmed);
// Reset will make one write attempt. We don't care what happens to it
EXPECT_CALL(*socketPtr, write(_, _, _))
.Times(1)
.WillRepeatedly(Invoke([&](const folly::SocketAddress&,
const struct iovec* vec,
size_t iovec_len) {
errno = 0;
return getTotalIovecLen(vec, iovec_len);
}));
transport.reset();
}
TEST_P(
QuicTransportImplTestBase,
TestMaybeStopWriteLooperAndArmSocketWritableEventOnClosedSocket) {
auto transportSettings = transport->getTransportSettings();
transportSettings.useSockWritableEvents = true;
transport->setTransportSettings(transportSettings);
ASSERT_FALSE(transport->getConnectionState()
.streamManager->refreshTransportSettings(transportSettings)
.hasError());
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
// Create a stream with outgoing data.
auto streamId = transport->createBidirectionalStream().value();
const auto& conn = transport->transportConn;
auto stream = transport->getStream(streamId);
auto dataBuf = IOBuf::copyBuffer("hello");
stream->pendingWrites.append(dataBuf);
stream->writeBuffer.append(std::move(dataBuf));
// Insert streamId into the list.
conn->streamManager->updateWritableStreams(*stream);
// Write looper is running.
transport->writeLooper()->run(true /* thisIteration */);
EXPECT_TRUE(transport->writeLooper()->isRunning());
// Close the socket.
transport->closeImpl((QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("writeSocketDataAndCatch() error"))));
transport->maybeStopWriteLooperAndArmSocketWritableEvent();
transport.reset();
}
} // namespace quic::test