mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-08-08 09:42:06 +03:00
Move core functionality to QuicTransportBaseLite [7/n]
Summary: See title. Reviewed By: mjoras Differential Revision: D64065113 fbshipit-source-id: 31f9a29a88bf763156d42c49df9db0f6f8e1a9d0
This commit is contained in:
committed by
Facebook GitHub Bot
parent
a253b7d782
commit
c62dac180e
@@ -292,40 +292,6 @@ class QuicSocket : virtual public QuicSocketLite {
|
||||
virtual folly::Expected<Priority, LocalErrorCode> getStreamPriority(
|
||||
StreamId id) = 0;
|
||||
|
||||
/**
|
||||
* ===== Read API ====
|
||||
*/
|
||||
|
||||
/**
|
||||
* Callback class for receiving data on a stream
|
||||
*/
|
||||
class ReadCallback {
|
||||
public:
|
||||
virtual ~ReadCallback() = default;
|
||||
|
||||
/**
|
||||
* Called from the transport layer when there is data, EOF or an error
|
||||
* available to read on the given stream ID
|
||||
*/
|
||||
virtual void readAvailable(StreamId id) noexcept = 0;
|
||||
|
||||
/*
|
||||
* Same as above, but called on streams within a group.
|
||||
*/
|
||||
virtual void readAvailableWithGroup(StreamId, StreamGroupId) noexcept {}
|
||||
|
||||
/**
|
||||
* Called from the transport layer when there is an error on the stream.
|
||||
*/
|
||||
virtual void readError(StreamId id, QuicError error) noexcept = 0;
|
||||
|
||||
/**
|
||||
* Same as above, but called on streams within a group.
|
||||
*/
|
||||
virtual void
|
||||
readErrorWithGroup(StreamId, StreamGroupId, QuicError) noexcept {}
|
||||
};
|
||||
|
||||
/**
|
||||
* Set the read callback for the given stream. Note that read callback is
|
||||
* expected to be set all the time. Removing read callback indicates that
|
||||
@@ -433,28 +399,6 @@ class QuicSocket : virtual public QuicSocketLite {
|
||||
* };
|
||||
*/
|
||||
|
||||
using PeekIterator = CircularDeque<StreamBuffer>::const_iterator;
|
||||
class PeekCallback {
|
||||
public:
|
||||
virtual ~PeekCallback() = default;
|
||||
|
||||
/**
|
||||
* Called from the transport layer when there is new data available to
|
||||
* peek on a given stream.
|
||||
* Callback can be called multiple times and it is up to application to
|
||||
* de-dupe already peeked ranges.
|
||||
*/
|
||||
virtual void onDataAvailable(
|
||||
StreamId id,
|
||||
const folly::Range<PeekIterator>& peekData) noexcept = 0;
|
||||
|
||||
/**
|
||||
* Called from the transport layer during peek time when there is an error
|
||||
* on the stream.
|
||||
*/
|
||||
virtual void peekError(StreamId id, QuicError error) noexcept = 0;
|
||||
};
|
||||
|
||||
virtual folly::Expected<folly::Unit, LocalErrorCode> setPeekCallback(
|
||||
StreamId id,
|
||||
PeekCallback* cb) = 0;
|
||||
@@ -600,122 +544,6 @@ class QuicSocket : virtual public QuicSocketLite {
|
||||
virtual StreamDirectionality getStreamDirectionality(
|
||||
StreamId stream) noexcept = 0;
|
||||
|
||||
/**
|
||||
* Callback class for receiving write readiness notifications
|
||||
*/
|
||||
class WriteCallback {
|
||||
public:
|
||||
virtual ~WriteCallback() = default;
|
||||
|
||||
/**
|
||||
* Invoked when stream is ready to write after notifyPendingWriteOnStream
|
||||
* has previously been called.
|
||||
*
|
||||
* maxToSend represents the amount of data that the transport layer expects
|
||||
* to write to the network during this event loop, eg:
|
||||
* min(remaining flow control, remaining send buffer space)
|
||||
*/
|
||||
virtual void onStreamWriteReady(
|
||||
StreamId /* id */,
|
||||
uint64_t /* maxToSend */) noexcept {}
|
||||
|
||||
/**
|
||||
* Invoked when connection is ready to write after
|
||||
* notifyPendingWriteOnConnection has previously been called.
|
||||
*
|
||||
* maxToSend represents the amount of data that the transport layer expects
|
||||
* to write to the network during this event loop, eg:
|
||||
* min(remaining flow control, remaining send buffer space)
|
||||
*/
|
||||
virtual void onConnectionWriteReady(uint64_t /* maxToSend */) noexcept {}
|
||||
|
||||
/**
|
||||
* Invoked when a connection is being torn down after
|
||||
* notifyPendingWriteOnStream has been called
|
||||
*/
|
||||
virtual void onStreamWriteError(
|
||||
StreamId /* id */,
|
||||
QuicError /* error */) noexcept {}
|
||||
|
||||
/**
|
||||
* Invoked when a connection is being torn down after
|
||||
* notifyPendingWriteOnConnection has been called
|
||||
*/
|
||||
virtual void onConnectionWriteError(QuicError
|
||||
/* error */) noexcept {}
|
||||
};
|
||||
|
||||
/**
|
||||
* Inform the transport that there is data to write on this connection
|
||||
* An app shouldn't mix connection and stream calls to this API
|
||||
* Use this if the app wants to do prioritization.
|
||||
*/
|
||||
virtual folly::Expected<folly::Unit, LocalErrorCode>
|
||||
notifyPendingWriteOnConnection(WriteCallback* wcb) = 0;
|
||||
|
||||
/**
|
||||
* Inform the transport that there is data to write on a given stream.
|
||||
* An app shouldn't mix connection and stream calls to this API
|
||||
* Use the Connection call if the app wants to do prioritization.
|
||||
*/
|
||||
virtual folly::Expected<folly::Unit, LocalErrorCode>
|
||||
notifyPendingWriteOnStream(StreamId id, WriteCallback* wcb) = 0;
|
||||
|
||||
virtual folly::Expected<folly::Unit, LocalErrorCode>
|
||||
unregisterStreamWriteCallback(StreamId) = 0;
|
||||
|
||||
/**
|
||||
* Structure used to communicate TX and ACK/Delivery notifications.
|
||||
*/
|
||||
struct ByteEvent {
|
||||
enum class Type { ACK = 1, TX = 2 };
|
||||
static constexpr std::array<Type, 2> kByteEventTypes = {
|
||||
{Type::ACK, Type::TX}};
|
||||
|
||||
StreamId id{0};
|
||||
uint64_t offset{0};
|
||||
Type type;
|
||||
|
||||
// sRTT at time of event
|
||||
// TODO(bschlinker): Deprecate, caller can fetch transport state if
|
||||
// desired.
|
||||
std::chrono::microseconds srtt{0us};
|
||||
};
|
||||
|
||||
/**
|
||||
* Structure used to communicate cancellation of a ByteEvent.
|
||||
*
|
||||
* According to Dictionary.com, cancellation is more frequent in American
|
||||
* English than cancellation. Yet in American English, the preferred style is
|
||||
* typically not to double the final L, so cancel generally becomes canceled.
|
||||
*/
|
||||
using ByteEventCancellation = ByteEvent;
|
||||
|
||||
/**
|
||||
* Callback class for receiving byte event (TX/ACK) notifications.
|
||||
*/
|
||||
class ByteEventCallback {
|
||||
public:
|
||||
virtual ~ByteEventCallback() = default;
|
||||
|
||||
/**
|
||||
* Invoked when a byte event has been successfully registered.
|
||||
* Since this is a convenience notification and not a mandatory callback,
|
||||
* not marking this as pure virtual.
|
||||
*/
|
||||
virtual void onByteEventRegistered(ByteEvent /* byteEvent */) {}
|
||||
|
||||
/**
|
||||
* Invoked when the byte event has occurred.
|
||||
*/
|
||||
virtual void onByteEvent(ByteEvent byteEvent) = 0;
|
||||
|
||||
/**
|
||||
* Invoked if byte event is canceled due to reset, shutdown, or other error.
|
||||
*/
|
||||
virtual void onByteEventCanceled(ByteEventCancellation cancellation) = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Callback class for receiving ack notifications
|
||||
*/
|
||||
@@ -772,21 +600,6 @@ class QuicSocket : virtual public QuicSocketLite {
|
||||
const uint64_t offset,
|
||||
ByteEventCallback* cb) = 0;
|
||||
|
||||
/**
|
||||
* Register a byte event to be triggered when specified event type occurs for
|
||||
* the specified stream and offset.
|
||||
*
|
||||
* If the registration fails, the callback (ByteEventCallback* cb) will NEVER
|
||||
* be invoked for anything. If the registration succeeds, the callback is
|
||||
* guaranteed to receive an onByteEventRegistered() notification.
|
||||
*/
|
||||
virtual folly::Expected<folly::Unit, LocalErrorCode>
|
||||
registerByteEventCallback(
|
||||
const ByteEvent::Type type,
|
||||
const StreamId id,
|
||||
const uint64_t offset,
|
||||
ByteEventCallback* cb) = 0;
|
||||
|
||||
/**
|
||||
* Cancel byte event callbacks for given stream.
|
||||
*
|
||||
@@ -1099,16 +912,6 @@ class QuicSocket : virtual public QuicSocketLite {
|
||||
* or loss notification support for Datagram.
|
||||
*/
|
||||
|
||||
class DatagramCallback {
|
||||
public:
|
||||
virtual ~DatagramCallback() = default;
|
||||
|
||||
/**
|
||||
* Notifies the DatagramCallback that datagrams are available for read.
|
||||
*/
|
||||
virtual void onDatagramsAvailable() noexcept = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Set the read callback for Datagrams
|
||||
*/
|
||||
|
@@ -230,6 +230,233 @@ class QuicSocketLite {
|
||||
receiveWindowMaxOffset(receiveWindowMaxOffsetIn) {}
|
||||
};
|
||||
|
||||
/**
|
||||
* ===== Read API ====
|
||||
*/
|
||||
|
||||
/**
|
||||
* Callback class for receiving data on a stream
|
||||
*/
|
||||
class ReadCallback {
|
||||
public:
|
||||
virtual ~ReadCallback() = default;
|
||||
|
||||
/**
|
||||
* Called from the transport layer when there is data, EOF or an error
|
||||
* available to read on the given stream ID
|
||||
*/
|
||||
virtual void readAvailable(StreamId id) noexcept = 0;
|
||||
|
||||
/*
|
||||
* Same as above, but called on streams within a group.
|
||||
*/
|
||||
virtual void readAvailableWithGroup(StreamId, StreamGroupId) noexcept {}
|
||||
|
||||
/**
|
||||
* Called from the transport layer when there is an error on the stream.
|
||||
*/
|
||||
virtual void readError(StreamId id, QuicError error) noexcept = 0;
|
||||
|
||||
/**
|
||||
* Same as above, but called on streams within a group.
|
||||
*/
|
||||
virtual void
|
||||
readErrorWithGroup(StreamId, StreamGroupId, QuicError) noexcept {}
|
||||
};
|
||||
|
||||
/**
|
||||
* ===== Peek/Consume API =====
|
||||
*/
|
||||
|
||||
/**
|
||||
* Usage:
|
||||
* class Application {
|
||||
* void onNewBidirectionalStream(StreamId id) {
|
||||
* socket_->setPeekCallback(id, this);
|
||||
* }
|
||||
*
|
||||
* virtual void onDataAvailable(
|
||||
* StreamId id,
|
||||
* const folly::Range<PeekIterator>& peekData) noexcept override
|
||||
* {
|
||||
* auto amount = tryInterpret(peekData);
|
||||
* if (amount) {
|
||||
* socket_->consume(id, amount);
|
||||
* }
|
||||
* }
|
||||
* };
|
||||
*/
|
||||
|
||||
using PeekIterator = CircularDeque<StreamBuffer>::const_iterator;
|
||||
class PeekCallback {
|
||||
public:
|
||||
virtual ~PeekCallback() = default;
|
||||
|
||||
/**
|
||||
* Called from the transport layer when there is new data available to
|
||||
* peek on a given stream.
|
||||
* Callback can be called multiple times and it is up to application to
|
||||
* de-dupe already peeked ranges.
|
||||
*/
|
||||
virtual void onDataAvailable(
|
||||
StreamId id,
|
||||
const folly::Range<PeekIterator>& peekData) noexcept = 0;
|
||||
|
||||
/**
|
||||
* Called from the transport layer during peek time when there is an error
|
||||
* on the stream.
|
||||
*/
|
||||
virtual void peekError(StreamId id, QuicError error) noexcept = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Structure used to communicate TX and ACK/Delivery notifications.
|
||||
*/
|
||||
struct ByteEvent {
|
||||
enum class Type { ACK = 1, TX = 2 };
|
||||
static constexpr std::array<Type, 2> kByteEventTypes = {
|
||||
{Type::ACK, Type::TX}};
|
||||
|
||||
StreamId id{0};
|
||||
uint64_t offset{0};
|
||||
Type type;
|
||||
|
||||
// sRTT at time of event
|
||||
// TODO(bschlinker): Deprecate, caller can fetch transport state if
|
||||
// desired.
|
||||
std::chrono::microseconds srtt{0us};
|
||||
};
|
||||
|
||||
/**
|
||||
* Structure used to communicate cancellation of a ByteEvent.
|
||||
*
|
||||
* According to Dictionary.com, cancellation is more frequent in American
|
||||
* English than cancellation. Yet in American English, the preferred style is
|
||||
* typically not to double the final L, so cancel generally becomes canceled.
|
||||
*/
|
||||
using ByteEventCancellation = ByteEvent;
|
||||
|
||||
/**
|
||||
* Callback class for receiving byte event (TX/ACK) notifications.
|
||||
*/
|
||||
class ByteEventCallback {
|
||||
public:
|
||||
virtual ~ByteEventCallback() = default;
|
||||
|
||||
/**
|
||||
* Invoked when a byte event has been successfully registered.
|
||||
* Since this is a convenience notification and not a mandatory callback,
|
||||
* not marking this as pure virtual.
|
||||
*/
|
||||
virtual void onByteEventRegistered(ByteEvent /* byteEvent */) {}
|
||||
|
||||
/**
|
||||
* Invoked when the byte event has occurred.
|
||||
*/
|
||||
virtual void onByteEvent(ByteEvent byteEvent) = 0;
|
||||
|
||||
/**
|
||||
* Invoked if byte event is canceled due to reset, shutdown, or other error.
|
||||
*/
|
||||
virtual void onByteEventCanceled(ByteEventCancellation cancellation) = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Register a byte event to be triggered when specified event type occurs for
|
||||
* the specified stream and offset.
|
||||
*
|
||||
* If the registration fails, the callback (ByteEventCallback* cb) will NEVER
|
||||
* be invoked for anything. If the registration succeeds, the callback is
|
||||
* guaranteed to receive an onByteEventRegistered() notification.
|
||||
*/
|
||||
virtual folly::Expected<folly::Unit, LocalErrorCode>
|
||||
registerByteEventCallback(
|
||||
const ByteEvent::Type type,
|
||||
const StreamId id,
|
||||
const uint64_t offset,
|
||||
ByteEventCallback* cb) = 0;
|
||||
|
||||
/**
|
||||
* ===== Datagram API =====
|
||||
*
|
||||
* Datagram support is experimental. Currently there isn't delivery callback
|
||||
* or loss notification support for Datagram.
|
||||
*/
|
||||
|
||||
class DatagramCallback {
|
||||
public:
|
||||
virtual ~DatagramCallback() = default;
|
||||
|
||||
/**
|
||||
* Notifies the DatagramCallback that datagrams are available for read.
|
||||
*/
|
||||
virtual void onDatagramsAvailable() noexcept = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Callback class for receiving write readiness notifications
|
||||
*/
|
||||
class WriteCallback {
|
||||
public:
|
||||
virtual ~WriteCallback() = default;
|
||||
|
||||
/**
|
||||
* Invoked when stream is ready to write after notifyPendingWriteOnStream
|
||||
* has previously been called.
|
||||
*
|
||||
* maxToSend represents the amount of data that the transport layer expects
|
||||
* to write to the network during this event loop, eg:
|
||||
* min(remaining flow control, remaining send buffer space)
|
||||
*/
|
||||
virtual void onStreamWriteReady(
|
||||
StreamId /* id */,
|
||||
uint64_t /* maxToSend */) noexcept {}
|
||||
|
||||
/**
|
||||
* Invoked when connection is ready to write after
|
||||
* notifyPendingWriteOnConnection has previously been called.
|
||||
*
|
||||
* maxToSend represents the amount of data that the transport layer expects
|
||||
* to write to the network during this event loop, eg:
|
||||
* min(remaining flow control, remaining send buffer space)
|
||||
*/
|
||||
virtual void onConnectionWriteReady(uint64_t /* maxToSend */) noexcept {}
|
||||
|
||||
/**
|
||||
* Invoked when a connection is being torn down after
|
||||
* notifyPendingWriteOnStream has been called
|
||||
*/
|
||||
virtual void onStreamWriteError(
|
||||
StreamId /* id */,
|
||||
QuicError /* error */) noexcept {}
|
||||
|
||||
/**
|
||||
* Invoked when a connection is being torn down after
|
||||
* notifyPendingWriteOnConnection has been called
|
||||
*/
|
||||
virtual void onConnectionWriteError(QuicError
|
||||
/* error */) noexcept {}
|
||||
};
|
||||
|
||||
/**
|
||||
* Inform the transport that there is data to write on this connection
|
||||
* An app shouldn't mix connection and stream calls to this API
|
||||
* Use this if the app wants to do prioritization.
|
||||
*/
|
||||
virtual folly::Expected<folly::Unit, LocalErrorCode>
|
||||
notifyPendingWriteOnConnection(WriteCallback* wcb) = 0;
|
||||
|
||||
/**
|
||||
* Inform the transport that there is data to write on a given stream.
|
||||
* An app shouldn't mix connection and stream calls to this API
|
||||
* Use the Connection call if the app wants to do prioritization.
|
||||
*/
|
||||
virtual folly::Expected<folly::Unit, LocalErrorCode>
|
||||
notifyPendingWriteOnStream(StreamId id, WriteCallback* wcb) = 0;
|
||||
|
||||
virtual folly::Expected<folly::Unit, LocalErrorCode>
|
||||
unregisterStreamWriteCallback(StreamId) = 0;
|
||||
|
||||
/**
|
||||
* Application can invoke this function to signal the transport to
|
||||
* initiate migration.
|
||||
|
@@ -51,14 +51,11 @@ QuicTransportBase::QuicTransportBase(
|
||||
std::move(evb),
|
||||
std::move(socket),
|
||||
useConnectionEndWithErrorCallback),
|
||||
lossTimeout_(this),
|
||||
ackTimeout_(this),
|
||||
pathValidationTimeout_(this),
|
||||
idleTimeout_(this),
|
||||
keepaliveTimeout_(this),
|
||||
drainTimeout_(this),
|
||||
pingTimeout_(this),
|
||||
excessWriteTimeout_(this),
|
||||
readLooper_(new FunctionLooper(
|
||||
evb_,
|
||||
[this]() { invokeReadDataAndCallbacks(); },
|
||||
@@ -66,11 +63,7 @@ QuicTransportBase::QuicTransportBase(
|
||||
peekLooper_(new FunctionLooper(
|
||||
evb_,
|
||||
[this]() { invokePeekDataAndCallbacks(); },
|
||||
LooperType::PeekLooper)),
|
||||
writeLooper_(new FunctionLooper(
|
||||
evb_,
|
||||
[this]() { pacedWriteDataToSocket(); },
|
||||
LooperType::WriteLooper)) {
|
||||
LooperType::PeekLooper)) {
|
||||
writeLooper_->setPacingFunction([this]() -> auto {
|
||||
if (isConnectionPaced(*conn_)) {
|
||||
return conn_->pacer->getTimeUntilNextWrite();
|
||||
@@ -85,22 +78,6 @@ QuicTransportBase::QuicTransportBase(
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::scheduleTimeout(
|
||||
QuicTimerCallback* callback,
|
||||
std::chrono::milliseconds timeout) {
|
||||
if (evb_) {
|
||||
evb_->scheduleTimeout(callback, timeout);
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::cancelTimeout(QuicTimerCallback* callback) {
|
||||
callback->cancelTimerCallback();
|
||||
}
|
||||
|
||||
bool QuicTransportBase::isTimeoutScheduled(QuicTimerCallback* callback) const {
|
||||
return callback->isTimerCallbackScheduled();
|
||||
}
|
||||
|
||||
void QuicTransportBase::setPacingTimer(
|
||||
QuicTimer::SharedPtr pacingTimer) noexcept {
|
||||
if (pacingTimer) {
|
||||
@@ -757,63 +734,6 @@ QuicTransportBase::pauseOrResumeRead(StreamId id, bool resume) {
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
void QuicTransportBase::invokeReadDataAndCallbacks() {
|
||||
auto self = sharedGuard();
|
||||
SCOPE_EXIT {
|
||||
self->checkForClosedStream();
|
||||
self->updateReadLooper();
|
||||
self->updateWriteLooper(true);
|
||||
};
|
||||
// Need a copy since the set can change during callbacks.
|
||||
std::vector<StreamId> readableStreamsCopy;
|
||||
const auto& readableStreams = self->conn_->streamManager->readableStreams();
|
||||
readableStreamsCopy.reserve(readableStreams.size());
|
||||
std::copy(
|
||||
readableStreams.begin(),
|
||||
readableStreams.end(),
|
||||
std::back_inserter(readableStreamsCopy));
|
||||
if (self->conn_->transportSettings.orderedReadCallbacks) {
|
||||
std::sort(readableStreamsCopy.begin(), readableStreamsCopy.end());
|
||||
}
|
||||
for (StreamId streamId : readableStreamsCopy) {
|
||||
auto callback = self->readCallbacks_.find(streamId);
|
||||
if (callback == self->readCallbacks_.end()) {
|
||||
// Stream doesn't have a read callback set, skip it.
|
||||
continue;
|
||||
}
|
||||
auto readCb = callback->second.readCb;
|
||||
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId));
|
||||
if (readCb && stream->streamReadError) {
|
||||
self->conn_->streamManager->readableStreams().erase(streamId);
|
||||
readCallbacks_.erase(callback);
|
||||
// if there is an error on the stream - it's not readable anymore, so
|
||||
// we cannot peek into it as well.
|
||||
self->conn_->streamManager->peekableStreams().erase(streamId);
|
||||
peekCallbacks_.erase(streamId);
|
||||
VLOG(10) << "invoking read error callbacks on stream=" << streamId << " "
|
||||
<< *this;
|
||||
if (!stream->groupId) {
|
||||
readCb->readError(streamId, QuicError(*stream->streamReadError));
|
||||
} else {
|
||||
readCb->readErrorWithGroup(
|
||||
streamId, *stream->groupId, QuicError(*stream->streamReadError));
|
||||
}
|
||||
} else if (
|
||||
readCb && callback->second.resumed && stream->hasReadableData()) {
|
||||
VLOG(10) << "invoking read callbacks on stream=" << streamId << " "
|
||||
<< *this;
|
||||
if (!stream->groupId) {
|
||||
readCb->readAvailable(streamId);
|
||||
} else {
|
||||
readCb->readAvailableWithGroup(streamId, *stream->groupId);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (self->datagramCallback_ && !conn_->datagramState.readBuffer.empty()) {
|
||||
self->datagramCallback_->onDatagramsAvailable();
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::updateReadLooper() {
|
||||
if (closeState_ != CloseState::OPEN) {
|
||||
VLOG(10) << "Stopping read looper " << *this;
|
||||
@@ -909,61 +829,6 @@ QuicTransportBase::pauseOrResumePeek(StreamId id, bool resume) {
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
void QuicTransportBase::invokePeekDataAndCallbacks() {
|
||||
auto self = sharedGuard();
|
||||
SCOPE_EXIT {
|
||||
self->checkForClosedStream();
|
||||
self->updatePeekLooper();
|
||||
self->updateWriteLooper(true);
|
||||
};
|
||||
// TODO: add protection from calling "consume" in the middle of the peek -
|
||||
// one way is to have a peek counter that is incremented when peek calblack
|
||||
// is called and decremented when peek is done. once counter transitions
|
||||
// to 0 we can execute "consume" calls that were done during "peek", for that,
|
||||
// we would need to keep stack of them.
|
||||
std::vector<StreamId> peekableStreamsCopy;
|
||||
const auto& peekableStreams = self->conn_->streamManager->peekableStreams();
|
||||
peekableStreamsCopy.reserve(peekableStreams.size());
|
||||
std::copy(
|
||||
peekableStreams.begin(),
|
||||
peekableStreams.end(),
|
||||
std::back_inserter(peekableStreamsCopy));
|
||||
VLOG(10) << __func__
|
||||
<< " peekableListCopy.size()=" << peekableStreamsCopy.size();
|
||||
for (StreamId streamId : peekableStreamsCopy) {
|
||||
auto callback = self->peekCallbacks_.find(streamId);
|
||||
// This is a likely bug. Need to think more on whether events can
|
||||
// be dropped
|
||||
// remove streamId from list of peekable - as opposed to "read", "peek" is
|
||||
// only called once per streamId and not on every EVB loop until application
|
||||
// reads the data.
|
||||
self->conn_->streamManager->peekableStreams().erase(streamId);
|
||||
if (callback == self->peekCallbacks_.end()) {
|
||||
VLOG(10) << " No peek callback for stream=" << streamId;
|
||||
continue;
|
||||
}
|
||||
auto peekCb = callback->second.peekCb;
|
||||
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId));
|
||||
if (peekCb && stream->streamReadError) {
|
||||
VLOG(10) << "invoking peek error callbacks on stream=" << streamId << " "
|
||||
<< *this;
|
||||
peekCb->peekError(streamId, QuicError(*stream->streamReadError));
|
||||
} else if (
|
||||
peekCb && !stream->streamReadError && stream->hasPeekableData()) {
|
||||
VLOG(10) << "invoking peek callbacks on stream=" << streamId << " "
|
||||
<< *this;
|
||||
|
||||
peekDataFromQuicStream(
|
||||
*stream,
|
||||
[&](StreamId id, const folly::Range<PeekIterator>& peekRange) {
|
||||
peekCb->onDataAvailable(id, peekRange);
|
||||
});
|
||||
} else {
|
||||
VLOG(10) << "Not invoking peek callbacks on stream=" << streamId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::invokeStreamsAvailableCallbacks() {
|
||||
if (conn_->streamManager->consumeMaxLocalBidirectionalStreamIdIncreased()) {
|
||||
// check in case new streams were created in preceding callbacks
|
||||
@@ -1905,34 +1770,6 @@ void QuicTransportBase::onNetworkData(
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::checkIdleTimer(TimePoint now) {
|
||||
if (closeState_ == CloseState::CLOSED) {
|
||||
return;
|
||||
}
|
||||
if (!idleTimeout_.isTimerCallbackScheduled()) {
|
||||
return;
|
||||
}
|
||||
if (!idleTimeoutCheck_.lastTimeIdleTimeoutScheduled_.has_value()) {
|
||||
return;
|
||||
}
|
||||
if (idleTimeoutCheck_.forcedIdleTimeoutScheduled_) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ((now - *idleTimeoutCheck_.lastTimeIdleTimeoutScheduled_) >=
|
||||
idleTimeoutCheck_.idleTimeoutMs) {
|
||||
// Call timer expiration async.
|
||||
idleTimeoutCheck_.forcedIdleTimeoutScheduled_ = true;
|
||||
runOnEvbAsync([](auto self) {
|
||||
if (!self->good() || self->closeState_ == CloseState::CLOSED) {
|
||||
// The connection was probably closed.
|
||||
return;
|
||||
}
|
||||
self->idleTimeout_.timeoutExpired();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::setIdleTimer() {
|
||||
if (closeState_ == CloseState::CLOSED) {
|
||||
return;
|
||||
@@ -2070,114 +1907,6 @@ StreamDirectionality QuicTransportBase::getStreamDirectionality(
|
||||
return quic::getStreamDirectionality(stream);
|
||||
}
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode>
|
||||
QuicTransportBase::notifyPendingWriteOnConnection(
|
||||
QuicSocket::WriteCallback* wcb) {
|
||||
if (closeState_ != CloseState::OPEN) {
|
||||
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
|
||||
}
|
||||
if (connWriteCallback_ != nullptr) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK);
|
||||
}
|
||||
// Assign the write callback before going into the loop so that if we close
|
||||
// the connection while we are still scheduled, the write callback will get
|
||||
// an error synchronously.
|
||||
connWriteCallback_ = wcb;
|
||||
runOnEvbAsync([](auto self) {
|
||||
if (!self->connWriteCallback_) {
|
||||
// The connection was probably closed.
|
||||
return;
|
||||
}
|
||||
auto connWritableBytes = self->maxWritableOnConn();
|
||||
if (connWritableBytes != 0) {
|
||||
auto connWriteCallback = self->connWriteCallback_;
|
||||
self->connWriteCallback_ = nullptr;
|
||||
connWriteCallback->onConnectionWriteReady(connWritableBytes);
|
||||
}
|
||||
});
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode>
|
||||
QuicTransportBase::unregisterStreamWriteCallback(StreamId id) {
|
||||
if (!conn_->streamManager->streamExists(id)) {
|
||||
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
|
||||
}
|
||||
if (pendingWriteCallbacks_.find(id) == pendingWriteCallbacks_.end()) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
|
||||
}
|
||||
pendingWriteCallbacks_.erase(id);
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode>
|
||||
QuicTransportBase::notifyPendingWriteOnStream(
|
||||
StreamId id,
|
||||
QuicSocket::WriteCallback* wcb) {
|
||||
if (isReceivingStream(conn_->nodeType, id)) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
|
||||
}
|
||||
if (closeState_ != CloseState::OPEN) {
|
||||
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
|
||||
}
|
||||
if (!conn_->streamManager->streamExists(id)) {
|
||||
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
|
||||
}
|
||||
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
|
||||
if (!stream->writable()) {
|
||||
return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED);
|
||||
}
|
||||
|
||||
if (wcb == nullptr) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK);
|
||||
}
|
||||
// Add the callback to the pending write callbacks so that if we are closed
|
||||
// while we are scheduled in the loop, the close will error out the
|
||||
// callbacks.
|
||||
auto wcbEmplaceResult = pendingWriteCallbacks_.emplace(id, wcb);
|
||||
if (!wcbEmplaceResult.second) {
|
||||
if ((wcbEmplaceResult.first)->second != wcb) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK);
|
||||
} else {
|
||||
return folly::makeUnexpected(LocalErrorCode::CALLBACK_ALREADY_INSTALLED);
|
||||
}
|
||||
}
|
||||
runOnEvbAsync([id](auto self) {
|
||||
auto wcbIt = self->pendingWriteCallbacks_.find(id);
|
||||
if (wcbIt == self->pendingWriteCallbacks_.end()) {
|
||||
// the connection was probably closed.
|
||||
return;
|
||||
}
|
||||
auto writeCallback = wcbIt->second;
|
||||
if (!self->conn_->streamManager->streamExists(id)) {
|
||||
self->pendingWriteCallbacks_.erase(wcbIt);
|
||||
writeCallback->onStreamWriteError(
|
||||
id, QuicError(LocalErrorCode::STREAM_NOT_EXISTS));
|
||||
return;
|
||||
}
|
||||
auto stream = CHECK_NOTNULL(self->conn_->streamManager->getStream(id));
|
||||
if (!stream->writable()) {
|
||||
self->pendingWriteCallbacks_.erase(wcbIt);
|
||||
writeCallback->onStreamWriteError(
|
||||
id, QuicError(LocalErrorCode::STREAM_NOT_EXISTS));
|
||||
return;
|
||||
}
|
||||
auto maxCanWrite = self->maxWritableOnStream(*stream);
|
||||
if (maxCanWrite != 0) {
|
||||
self->pendingWriteCallbacks_.erase(wcbIt);
|
||||
writeCallback->onStreamWriteReady(id, maxCanWrite);
|
||||
}
|
||||
});
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
uint64_t QuicTransportBase::maxWritableOnStream(
|
||||
const QuicStreamState& stream) const {
|
||||
auto connWritableBytes = maxWritableOnConn();
|
||||
auto streamFlowControlBytes = getSendStreamFlowControlBytesAPI(stream);
|
||||
return std::min(streamFlowControlBytes, connWritableBytes);
|
||||
}
|
||||
|
||||
QuicSocket::WriteResult QuicTransportBase::writeChain(
|
||||
StreamId id,
|
||||
Buf data,
|
||||
@@ -2263,110 +1992,6 @@ QuicTransportBase::registerTxCallback(
|
||||
return registerByteEventCallback(ByteEvent::Type::TX, id, offset, cb);
|
||||
}
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode>
|
||||
QuicTransportBase::registerByteEventCallback(
|
||||
const ByteEvent::Type type,
|
||||
const StreamId id,
|
||||
const uint64_t offset,
|
||||
ByteEventCallback* cb) {
|
||||
if (isReceivingStream(conn_->nodeType, id)) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
|
||||
}
|
||||
if (closeState_ != CloseState::OPEN) {
|
||||
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
|
||||
}
|
||||
[[maybe_unused]] auto self = sharedGuard();
|
||||
if (!conn_->streamManager->streamExists(id)) {
|
||||
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
|
||||
}
|
||||
if (!cb) {
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
ByteEventMap& byteEventMap = getByteEventMap(type);
|
||||
auto byteEventMapIt = byteEventMap.find(id);
|
||||
if (byteEventMapIt == byteEventMap.end()) {
|
||||
byteEventMap.emplace(
|
||||
id,
|
||||
std::initializer_list<std::remove_reference<
|
||||
decltype(byteEventMap)>::type::mapped_type::value_type>(
|
||||
{{offset, cb}}));
|
||||
} else {
|
||||
// Keep ByteEvents for the same stream sorted by offsets:
|
||||
auto pos = std::upper_bound(
|
||||
byteEventMapIt->second.begin(),
|
||||
byteEventMapIt->second.end(),
|
||||
offset,
|
||||
[&](uint64_t o, const ByteEventDetail& p) { return o < p.offset; });
|
||||
if (pos != byteEventMapIt->second.begin()) {
|
||||
auto matchingEvent = std::find_if(
|
||||
byteEventMapIt->second.begin(),
|
||||
pos,
|
||||
[offset, cb](const ByteEventDetail& p) {
|
||||
return ((p.offset == offset) && (p.callback == cb));
|
||||
});
|
||||
if (matchingEvent != pos) {
|
||||
// ByteEvent has been already registered for the same type, id,
|
||||
// offset and for the same recipient, return an INVALID_OPERATION
|
||||
// error to prevent duplicate registrations.
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
|
||||
}
|
||||
}
|
||||
byteEventMapIt->second.emplace(pos, offset, cb);
|
||||
}
|
||||
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
|
||||
|
||||
// Notify recipients that the registration was successful.
|
||||
cb->onByteEventRegistered(ByteEvent{id, offset, type});
|
||||
|
||||
// if the callback is already ready, we still insert, but schedule to
|
||||
// process
|
||||
Optional<uint64_t> maxOffsetReady;
|
||||
switch (type) {
|
||||
case ByteEvent::Type::ACK:
|
||||
maxOffsetReady = getLargestDeliverableOffset(*stream);
|
||||
break;
|
||||
case ByteEvent::Type::TX:
|
||||
maxOffsetReady = getLargestWriteOffsetTxed(*stream);
|
||||
break;
|
||||
}
|
||||
if (maxOffsetReady.has_value() && (offset <= *maxOffsetReady)) {
|
||||
runOnEvbAsync([id, cb, offset, type](auto selfObj) {
|
||||
if (selfObj->closeState_ != CloseState::OPEN) {
|
||||
// Close will error out all byte event callbacks.
|
||||
return;
|
||||
}
|
||||
|
||||
auto& byteEventMapL = selfObj->getByteEventMap(type);
|
||||
auto streamByteEventCbIt = byteEventMapL.find(id);
|
||||
if (streamByteEventCbIt == byteEventMapL.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// This is scheduled to run in the future (during the next iteration of
|
||||
// the event loop). It is possible that the ByteEventDetail list gets
|
||||
// mutated between the time it was scheduled to now when we are ready to
|
||||
// run it. Look at the current outstanding ByteEvents for this stream ID
|
||||
// and confirm that our ByteEvent's offset and recipient callback are
|
||||
// still present.
|
||||
auto pos = std::find_if(
|
||||
streamByteEventCbIt->second.begin(),
|
||||
streamByteEventCbIt->second.end(),
|
||||
[offset, cb](const ByteEventDetail& p) {
|
||||
return ((p.offset == offset) && (p.callback == cb));
|
||||
});
|
||||
// if our byteEvent is not present, it must have been delivered already.
|
||||
if (pos == streamByteEventCbIt->second.end()) {
|
||||
return;
|
||||
}
|
||||
streamByteEventCbIt->second.erase(pos);
|
||||
|
||||
cb->onByteEvent(ByteEvent{id, offset, type});
|
||||
});
|
||||
}
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
Optional<LocalErrorCode> QuicTransportBase::shutdownWrite(StreamId id) {
|
||||
if (isReceivingStream(conn_->nodeType, id)) {
|
||||
return LocalErrorCode::INVALID_OPERATION;
|
||||
@@ -2541,37 +2166,6 @@ void QuicTransportBase::sendPing(std::chrono::milliseconds pingTimeout) {
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::lossTimeoutExpired() noexcept {
|
||||
CHECK_NE(closeState_, CloseState::CLOSED);
|
||||
// onLossDetectionAlarm will set packetToSend in pending events
|
||||
[[maybe_unused]] auto self = sharedGuard();
|
||||
try {
|
||||
onLossDetectionAlarm(*conn_, markPacketLoss);
|
||||
if (conn_->qLogger) {
|
||||
conn_->qLogger->addTransportStateUpdate(kLossTimeoutExpired);
|
||||
}
|
||||
pacedWriteDataToSocket();
|
||||
} catch (const QuicTransportException& ex) {
|
||||
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
|
||||
exceptionCloseWhat_ = ex.what();
|
||||
closeImpl(QuicError(
|
||||
QuicErrorCode(ex.errorCode()),
|
||||
std::string("lossTimeoutExpired() error")));
|
||||
} catch (const QuicInternalException& ex) {
|
||||
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
|
||||
exceptionCloseWhat_ = ex.what();
|
||||
closeImpl(QuicError(
|
||||
QuicErrorCode(ex.errorCode()),
|
||||
std::string("lossTimeoutExpired() error")));
|
||||
} catch (const std::exception& ex) {
|
||||
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
|
||||
exceptionCloseWhat_ = ex.what();
|
||||
closeImpl(QuicError(
|
||||
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
|
||||
std::string("lossTimeoutExpired() error")));
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::ackTimeoutExpired() noexcept {
|
||||
CHECK_NE(closeState_, CloseState::CLOSED);
|
||||
VLOG(10) << __func__ << " " << *this;
|
||||
@@ -2587,13 +2181,6 @@ void QuicTransportBase::pingTimeoutExpired() noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::excessWriteTimeoutExpired() noexcept {
|
||||
auto writeDataReason = shouldWriteData(*conn_);
|
||||
if (writeDataReason != WriteDataReason::NO_WRITE) {
|
||||
pacedWriteDataToSocket();
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::pathValidationTimeoutExpired() noexcept {
|
||||
CHECK(conn_->outstandingPathValidation);
|
||||
|
||||
@@ -2611,40 +2198,12 @@ void QuicTransportBase::pathValidationTimeoutExpired() noexcept {
|
||||
std::string("Path validation timed out")));
|
||||
}
|
||||
|
||||
void QuicTransportBase::idleTimeoutExpired(bool drain) noexcept {
|
||||
VLOG(4) << __func__ << " " << *this;
|
||||
[[maybe_unused]] auto self = sharedGuard();
|
||||
// idle timeout is expired, just close the connection and drain or
|
||||
// send connection close immediately depending on 'drain'
|
||||
DCHECK_NE(closeState_, CloseState::CLOSED);
|
||||
uint64_t numOpenStreans = conn_->streamManager->streamCount();
|
||||
auto localError =
|
||||
drain ? LocalErrorCode::IDLE_TIMEOUT : LocalErrorCode::SHUTTING_DOWN;
|
||||
closeImpl(
|
||||
quic::QuicError(
|
||||
QuicErrorCode(localError),
|
||||
folly::to<std::string>(
|
||||
toString(localError),
|
||||
", num non control streams: ",
|
||||
numOpenStreans - conn_->streamManager->numControlStreams())),
|
||||
drain /* drainConnection */,
|
||||
!drain /* sendCloseImmediately */);
|
||||
}
|
||||
|
||||
void QuicTransportBase::keepaliveTimeoutExpired() noexcept {
|
||||
[[maybe_unused]] auto self = sharedGuard();
|
||||
conn_->pendingEvents.sendPing = true;
|
||||
updateWriteLooper(true, conn_->transportSettings.inlineWriteAfterRead);
|
||||
}
|
||||
|
||||
void QuicTransportBase::scheduleLossTimeout(std::chrono::milliseconds timeout) {
|
||||
if (closeState_ == CloseState::CLOSED) {
|
||||
return;
|
||||
}
|
||||
timeout = timeMax(timeout, evb_->getTimerTickInterval());
|
||||
scheduleTimeout(&lossTimeout_, timeout);
|
||||
}
|
||||
|
||||
void QuicTransportBase::scheduleAckTimeout() {
|
||||
if (closeState_ == CloseState::CLOSED) {
|
||||
return;
|
||||
@@ -2713,10 +2272,6 @@ void QuicTransportBase::schedulePathValidationTimeout() {
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::cancelLossTimeout() {
|
||||
cancelTimeout(&lossTimeout_);
|
||||
}
|
||||
|
||||
bool QuicTransportBase::isLossTimeoutScheduled() {
|
||||
return isTimeoutScheduled(&lossTimeout_);
|
||||
}
|
||||
@@ -2970,32 +2525,6 @@ QuicTransportBase::readDatagramBufs(size_t atMost) {
|
||||
return retDatagrams;
|
||||
}
|
||||
|
||||
void QuicTransportBase::writeSocketDataAndCatch() {
|
||||
[[maybe_unused]] auto self = sharedGuard();
|
||||
try {
|
||||
writeSocketData();
|
||||
processCallbacksAfterWriteData();
|
||||
} catch (const QuicTransportException& ex) {
|
||||
VLOG(4) << __func__ << ex.what() << " " << *this;
|
||||
exceptionCloseWhat_ = ex.what();
|
||||
closeImpl(QuicError(
|
||||
QuicErrorCode(ex.errorCode()),
|
||||
std::string("writeSocketDataAndCatch() error")));
|
||||
} catch (const QuicInternalException& ex) {
|
||||
VLOG(4) << __func__ << ex.what() << " " << *this;
|
||||
exceptionCloseWhat_ = ex.what();
|
||||
closeImpl(QuicError(
|
||||
QuicErrorCode(ex.errorCode()),
|
||||
std::string("writeSocketDataAndCatch() error")));
|
||||
} catch (const std::exception& ex) {
|
||||
VLOG(4) << __func__ << " error=" << ex.what() << " " << *this;
|
||||
exceptionCloseWhat_ = ex.what();
|
||||
closeImpl(QuicError(
|
||||
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
|
||||
std::string("writeSocketDataAndCatch() error")));
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::setTransportSettings(
|
||||
TransportSettings transportSettings) {
|
||||
if (conn_->nodeType == QuicNodeType::Client) {
|
||||
@@ -3324,21 +2853,6 @@ Optional<LocalErrorCode> QuicTransportBase::setControlStream(StreamId id) {
|
||||
return none;
|
||||
}
|
||||
|
||||
void QuicTransportBase::runOnEvbAsync(
|
||||
folly::Function<void(std::shared_ptr<QuicTransportBase>)> func) {
|
||||
auto evb = getEventBase();
|
||||
evb->runInLoop(
|
||||
[self = sharedGuard(), func = std::move(func), evb]() mutable {
|
||||
if (self->getEventBase() != evb) {
|
||||
// The eventbase changed between scheduling the loop and invoking
|
||||
// the callback, ignore this
|
||||
return;
|
||||
}
|
||||
func(std::move(self));
|
||||
},
|
||||
true);
|
||||
}
|
||||
|
||||
void QuicTransportBase::onSocketWritable() noexcept {
|
||||
// Remove the writable callback.
|
||||
socket_->pauseWrite();
|
||||
@@ -3349,83 +2863,6 @@ void QuicTransportBase::onSocketWritable() noexcept {
|
||||
writeLooper_->run(true /* thisIteration */);
|
||||
}
|
||||
|
||||
void QuicTransportBase::maybeStopWriteLooperAndArmSocketWritableEvent() {
|
||||
if (!socket_ || (closeState_ == CloseState::CLOSED)) {
|
||||
return;
|
||||
}
|
||||
if (conn_->transportSettings.useSockWritableEvents &&
|
||||
!socket_->isWritableCallbackSet()) {
|
||||
// Check if all data has been written and we're not limited by flow
|
||||
// control/congestion control.
|
||||
auto writeReason = shouldWriteData(*conn_);
|
||||
bool haveBufferToRetry = writeReason == WriteDataReason::BUFFERED_WRITE;
|
||||
bool haveNewDataToWrite =
|
||||
(writeReason != WriteDataReason::NO_WRITE) && !haveBufferToRetry;
|
||||
bool haveCongestionControlWindow = true;
|
||||
if (conn_->congestionController) {
|
||||
haveCongestionControlWindow =
|
||||
conn_->congestionController->getWritableBytes() > 0;
|
||||
}
|
||||
bool haveFlowControlWindow = getSendConnFlowControlBytesAPI(*conn_) > 0;
|
||||
bool connHasWriteWindow =
|
||||
haveCongestionControlWindow && haveFlowControlWindow;
|
||||
if (haveBufferToRetry || (haveNewDataToWrite && connHasWriteWindow)) {
|
||||
// Re-arm the write event and stop the write
|
||||
// looper.
|
||||
socket_->resumeWrite(this);
|
||||
writeLooper_->stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBase::pacedWriteDataToSocket() {
|
||||
[[maybe_unused]] auto self = sharedGuard();
|
||||
SCOPE_EXIT {
|
||||
self->maybeStopWriteLooperAndArmSocketWritableEvent();
|
||||
};
|
||||
|
||||
if (!isConnectionPaced(*conn_)) {
|
||||
// Not paced and connection is still open, normal write. Even if pacing is
|
||||
// previously enabled and then gets disabled, and we are here due to a
|
||||
// timeout, we should do a normal write to flush out the residue from
|
||||
// pacing write.
|
||||
writeSocketDataAndCatch();
|
||||
|
||||
if (conn_->transportSettings.scheduleTimerForExcessWrites) {
|
||||
// If we still have data to write, yield the event loop now but schedule a
|
||||
// timeout to come around and write again as soon as possible.
|
||||
auto writeDataReason = shouldWriteData(*conn_);
|
||||
if (writeDataReason != WriteDataReason::NO_WRITE &&
|
||||
!excessWriteTimeout_.isTimerCallbackScheduled()) {
|
||||
scheduleTimeout(&excessWriteTimeout_, 0ms);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// We are in the middle of a pacing interval. Leave it be.
|
||||
if (writeLooper_->isPacingScheduled()) {
|
||||
// The next burst is already scheduled. Since the burst size doesn't
|
||||
// depend on much data we currently have in buffer at all, no need to
|
||||
// change anything.
|
||||
return;
|
||||
}
|
||||
|
||||
// Do a burst write before waiting for an interval. This will also call
|
||||
// updateWriteLooper, but inside FunctionLooper we will ignore that.
|
||||
writeSocketDataAndCatch();
|
||||
}
|
||||
|
||||
void QuicTransportBase::describe(std::ostream& os) const {
|
||||
CHECK(conn_);
|
||||
os << *conn_;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const QuicTransportBase& qt) {
|
||||
qt.describe(os);
|
||||
return os;
|
||||
}
|
||||
|
||||
inline std::ostream& operator<<(
|
||||
std::ostream& os,
|
||||
const CloseState& closeState) {
|
||||
@@ -3453,30 +2890,6 @@ QuicTransportBase::maybeResetStreamFromReadError(
|
||||
return folly::Expected<folly::Unit, LocalErrorCode>(folly::unit);
|
||||
}
|
||||
|
||||
QuicTransportBase::ByteEventMap& QuicTransportBase::getByteEventMap(
|
||||
const ByteEvent::Type type) {
|
||||
switch (type) {
|
||||
case ByteEvent::Type::ACK:
|
||||
return deliveryCallbacks_;
|
||||
case ByteEvent::Type::TX:
|
||||
return txCallbacks_;
|
||||
}
|
||||
LOG(FATAL) << "Unhandled case in getByteEventMap";
|
||||
folly::assume_unreachable();
|
||||
}
|
||||
|
||||
const QuicTransportBase::ByteEventMap& QuicTransportBase::getByteEventMapConst(
|
||||
const ByteEvent::Type type) const {
|
||||
switch (type) {
|
||||
case ByteEvent::Type::ACK:
|
||||
return deliveryCallbacks_;
|
||||
case ByteEvent::Type::TX:
|
||||
return txCallbacks_;
|
||||
}
|
||||
LOG(FATAL) << "Unhandled case in getByteEventMapConst";
|
||||
folly::assume_unreachable();
|
||||
}
|
||||
|
||||
void QuicTransportBase::onTransportKnobs(Buf knobBlob) {
|
||||
// Not yet implemented,
|
||||
VLOG(4) << "Received transport knobs: "
|
||||
|
@@ -12,7 +12,6 @@
|
||||
#include <quic/api/QuicSocket.h>
|
||||
#include <quic/api/QuicTransportBaseLite.h>
|
||||
#include <quic/api/QuicTransportFunctions.h>
|
||||
#include <quic/common/FunctionLooper.h>
|
||||
#include <quic/common/NetworkData.h>
|
||||
#include <quic/common/events/QuicEventBase.h>
|
||||
#include <quic/common/events/QuicTimer.h>
|
||||
@@ -37,8 +36,7 @@ namespace quic {
|
||||
*/
|
||||
class QuicTransportBase : public QuicSocket,
|
||||
public QuicTransportBaseLite,
|
||||
QuicStreamPrioritiesObserver,
|
||||
QuicAsyncUDPSocket::WriteCallback {
|
||||
QuicStreamPrioritiesObserver {
|
||||
public:
|
||||
QuicTransportBase(
|
||||
std::shared_ptr<QuicEventBase> evb,
|
||||
@@ -47,10 +45,6 @@ class QuicTransportBase : public QuicSocket,
|
||||
|
||||
~QuicTransportBase() override;
|
||||
|
||||
void scheduleTimeout(
|
||||
QuicTimerCallback* callback,
|
||||
std::chrono::milliseconds timeout);
|
||||
|
||||
void setPacingTimer(QuicTimer::SharedPtr pacingTimer) noexcept;
|
||||
|
||||
Optional<ConnectionId> getClientConnectionId() const override;
|
||||
@@ -155,16 +149,6 @@ class QuicTransportBase : public QuicSocket,
|
||||
StreamDirectionality getStreamDirectionality(
|
||||
StreamId stream) noexcept override;
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode> notifyPendingWriteOnStream(
|
||||
StreamId id,
|
||||
QuicSocket::WriteCallback* wcb) override;
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode> notifyPendingWriteOnConnection(
|
||||
QuicSocket::WriteCallback* wcb) override;
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode> unregisterStreamWriteCallback(
|
||||
StreamId id) override;
|
||||
|
||||
WriteResult writeChain(
|
||||
StreamId id,
|
||||
Buf data,
|
||||
@@ -282,12 +266,6 @@ class QuicTransportBase : public QuicSocket,
|
||||
*/
|
||||
virtual void unbindConnection() = 0;
|
||||
|
||||
/**
|
||||
* Returns a shared_ptr which can be used as a guard to keep this
|
||||
* object alive.
|
||||
*/
|
||||
virtual std::shared_ptr<QuicTransportBase> sharedGuard() = 0;
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode> setStreamPriority(
|
||||
StreamId id,
|
||||
Priority priority) override;
|
||||
@@ -319,16 +297,6 @@ class QuicTransportBase : public QuicSocket,
|
||||
const uint64_t offset,
|
||||
ByteEventCallback* cb) override;
|
||||
|
||||
/**
|
||||
* Register a byte event to be triggered when specified event type occurs for
|
||||
* the specified stream and offset.
|
||||
*/
|
||||
folly::Expected<folly::Unit, LocalErrorCode> registerByteEventCallback(
|
||||
const ByteEvent::Type type,
|
||||
const StreamId id,
|
||||
const uint64_t offset,
|
||||
ByteEventCallback* cb) override;
|
||||
|
||||
/**
|
||||
* Cancel byte event callbacks for given stream.
|
||||
*
|
||||
@@ -404,28 +372,6 @@ class QuicTransportBase : public QuicSocket,
|
||||
*/
|
||||
virtual void createBufAccessor(size_t /* capacity */) {}
|
||||
|
||||
// Timeout functions
|
||||
class LossTimeout : public QuicTimerCallback {
|
||||
public:
|
||||
~LossTimeout() override = default;
|
||||
|
||||
explicit LossTimeout(QuicTransportBase* transport)
|
||||
: transport_(transport) {}
|
||||
|
||||
void timeoutExpired() noexcept override {
|
||||
transport_->lossTimeoutExpired();
|
||||
}
|
||||
|
||||
virtual void callbackCanceled() noexcept override {
|
||||
// ignore. this usually means that the eventbase is dying, so we will be
|
||||
// canceled anyway
|
||||
return;
|
||||
}
|
||||
|
||||
private:
|
||||
QuicTransportBase* transport_;
|
||||
};
|
||||
|
||||
class AckTimeout : public QuicTimerCallback {
|
||||
public:
|
||||
~AckTimeout() override = default;
|
||||
@@ -466,26 +412,6 @@ class QuicTransportBase : public QuicSocket,
|
||||
QuicTransportBase* transport_;
|
||||
};
|
||||
|
||||
class ExcessWriteTimeout : public QuicTimerCallback {
|
||||
public:
|
||||
~ExcessWriteTimeout() override = default;
|
||||
|
||||
explicit ExcessWriteTimeout(QuicTransportBase* transport)
|
||||
: transport_(transport) {}
|
||||
|
||||
void timeoutExpired() noexcept override {
|
||||
transport_->excessWriteTimeoutExpired();
|
||||
}
|
||||
|
||||
void callbackCanceled() noexcept override {
|
||||
// Do nothing.
|
||||
return;
|
||||
}
|
||||
|
||||
private:
|
||||
QuicTransportBase* transport_;
|
||||
};
|
||||
|
||||
class PathValidationTimeout : public QuicTimerCallback {
|
||||
public:
|
||||
~PathValidationTimeout() override = default;
|
||||
@@ -507,27 +433,6 @@ class QuicTransportBase : public QuicSocket,
|
||||
QuicTransportBase* transport_;
|
||||
};
|
||||
|
||||
class IdleTimeout : public QuicTimerCallback {
|
||||
public:
|
||||
~IdleTimeout() override = default;
|
||||
|
||||
explicit IdleTimeout(QuicTransportBase* transport)
|
||||
: transport_(transport) {}
|
||||
|
||||
void timeoutExpired() noexcept override {
|
||||
transport_->idleTimeoutExpired(true /* drain */);
|
||||
}
|
||||
|
||||
void callbackCanceled() noexcept override {
|
||||
// skip drain when canceling the timeout, to avoid scheduling a new
|
||||
// drain timeout
|
||||
transport_->idleTimeoutExpired(false /* drain */);
|
||||
}
|
||||
|
||||
private:
|
||||
QuicTransportBase* transport_;
|
||||
};
|
||||
|
||||
class KeepaliveTimeout : public QuicTimerCallback {
|
||||
public:
|
||||
~KeepaliveTimeout() override = default;
|
||||
@@ -564,8 +469,6 @@ class QuicTransportBase : public QuicSocket,
|
||||
QuicTransportBase* transport_;
|
||||
};
|
||||
|
||||
void scheduleLossTimeout(std::chrono::milliseconds timeout) override;
|
||||
void cancelLossTimeout() override;
|
||||
bool isLossTimeoutScheduled() override; // TODO: make this const again
|
||||
|
||||
// If you don't set it, the default is Cubic
|
||||
@@ -576,8 +479,6 @@ class QuicTransportBase : public QuicSocket,
|
||||
void setThrottlingSignalProvider(
|
||||
std::shared_ptr<ThrottlingSignalProvider>) override;
|
||||
|
||||
void describe(std::ostream& os) const;
|
||||
|
||||
virtual void setQLogger(std::shared_ptr<QLogger> qLogger);
|
||||
|
||||
void setLoopDetectorCallback(std::shared_ptr<LoopDetectorCallback> callback) {
|
||||
@@ -651,13 +552,11 @@ class QuicTransportBase : public QuicSocket,
|
||||
void updateCongestionControlSettings(
|
||||
const TransportSettings& transportSettings);
|
||||
void updateSocketTosSettings(uint8_t dscpValue);
|
||||
void processCallbacksAfterWriteData();
|
||||
void processCallbacksAfterWriteData() override;
|
||||
void processCallbacksAfterNetworkData();
|
||||
void invokeReadDataAndCallbacks();
|
||||
void invokePeekDataAndCallbacks();
|
||||
void invokeStreamsAvailableCallbacks();
|
||||
void updateReadLooper();
|
||||
void updatePeekLooper();
|
||||
void updateReadLooper() override;
|
||||
void updatePeekLooper() override;
|
||||
void updateWriteLooper(bool thisIteration, bool runInline = false) override;
|
||||
void handlePingCallbacks();
|
||||
void handleKnobCallbacks();
|
||||
@@ -680,13 +579,10 @@ class QuicTransportBase : public QuicSocket,
|
||||
|
||||
void cleanupAckEventState();
|
||||
|
||||
void runOnEvbAsync(
|
||||
folly::Function<void(std::shared_ptr<QuicTransportBase>)> func);
|
||||
|
||||
void closeImpl(
|
||||
Optional<QuicError> error,
|
||||
bool drainConnection = true,
|
||||
bool sendCloseImmediately = true);
|
||||
bool sendCloseImmediately = true) override;
|
||||
void closeUdpSocket();
|
||||
folly::Expected<folly::Unit, LocalErrorCode> pauseOrResumeRead(
|
||||
StreamId id,
|
||||
@@ -694,7 +590,7 @@ class QuicTransportBase : public QuicSocket,
|
||||
folly::Expected<folly::Unit, LocalErrorCode> pauseOrResumePeek(
|
||||
StreamId id,
|
||||
bool resume);
|
||||
void checkForClosedStream();
|
||||
void checkForClosedStream() override;
|
||||
folly::Expected<folly::Unit, LocalErrorCode> setReadCallbackInternal(
|
||||
StreamId id,
|
||||
ReadCallback* cb,
|
||||
@@ -706,35 +602,11 @@ class QuicTransportBase : public QuicSocket,
|
||||
bool bidirectional,
|
||||
const OptionalIntegral<StreamGroupId>& streamGroupId = std::nullopt);
|
||||
|
||||
/**
|
||||
* A wrapper around writeSocketData
|
||||
*
|
||||
* writeSocketDataAndCatch protects writeSocketData in a try-catch. It also
|
||||
* dispatch the next write loop.
|
||||
*/
|
||||
void writeSocketDataAndCatch();
|
||||
|
||||
/**
|
||||
* Paced write data to socket when connection is paced.
|
||||
*
|
||||
* Whether connection is paced will be decided by TransportSettings and
|
||||
* congection controller. When the connection is paced, this function writes
|
||||
* out a burst size of packets and let the writeLooper schedule a callback to
|
||||
* write another burst after a pacing interval if there are more data to
|
||||
* write. When the connection isn't paced, this function does a normal write.
|
||||
*/
|
||||
void pacedWriteDataToSocket();
|
||||
|
||||
uint64_t maxWritableOnStream(const QuicStreamState&) const;
|
||||
|
||||
void lossTimeoutExpired() noexcept;
|
||||
void ackTimeoutExpired() noexcept;
|
||||
void pathValidationTimeoutExpired() noexcept;
|
||||
void idleTimeoutExpired(bool drain) noexcept;
|
||||
void keepaliveTimeoutExpired() noexcept;
|
||||
void drainTimeoutExpired() noexcept;
|
||||
void pingTimeoutExpired() noexcept;
|
||||
void excessWriteTimeoutExpired() noexcept;
|
||||
|
||||
void setIdleTimer() override;
|
||||
void scheduleAckTimeout() override;
|
||||
@@ -759,18 +631,6 @@ class QuicTransportBase : public QuicSocket,
|
||||
*/
|
||||
virtual void onTransportKnobs(Buf knobBlob);
|
||||
|
||||
struct ByteEventDetail {
|
||||
ByteEventDetail(uint64_t offsetIn, ByteEventCallback* callbackIn)
|
||||
: offset(offsetIn), callback(callbackIn) {}
|
||||
uint64_t offset;
|
||||
ByteEventCallback* callback;
|
||||
};
|
||||
|
||||
using ByteEventMap = folly::F14FastMap<StreamId, std::deque<ByteEventDetail>>;
|
||||
ByteEventMap& getByteEventMap(const ByteEvent::Type type);
|
||||
FOLLY_NODISCARD const ByteEventMap& getByteEventMapConst(
|
||||
const ByteEvent::Type type) const;
|
||||
|
||||
/**
|
||||
* Helper function that calls passed function for each ByteEvent type.
|
||||
*
|
||||
@@ -796,52 +656,22 @@ class QuicTransportBase : public QuicSocket,
|
||||
*/
|
||||
Optional<folly::SocketCmsgMap> getAdditionalCmsgsForAsyncUDPSocket();
|
||||
|
||||
struct ReadCallbackData {
|
||||
ReadCallback* readCb;
|
||||
bool resumed{true};
|
||||
bool deliveredEOM{false};
|
||||
|
||||
ReadCallbackData(ReadCallback* readCallback) : readCb(readCallback) {}
|
||||
};
|
||||
|
||||
struct PeekCallbackData {
|
||||
PeekCallback* peekCb;
|
||||
bool resumed{true};
|
||||
|
||||
PeekCallbackData(PeekCallback* peekCallback) : peekCb(peekCallback) {}
|
||||
};
|
||||
|
||||
folly::F14FastMap<StreamId, ReadCallbackData> readCallbacks_;
|
||||
folly::F14FastMap<StreamId, PeekCallbackData> peekCallbacks_;
|
||||
|
||||
ByteEventMap deliveryCallbacks_;
|
||||
ByteEventMap txCallbacks_;
|
||||
|
||||
DatagramCallback* datagramCallback_{nullptr};
|
||||
PingCallback* pingCallback_{nullptr};
|
||||
|
||||
QuicSocket::WriteCallback* connWriteCallback_{nullptr};
|
||||
std::map<StreamId, QuicSocket::WriteCallback*> pendingWriteCallbacks_;
|
||||
bool handshakeDoneNotified_{false};
|
||||
|
||||
LossTimeout lossTimeout_;
|
||||
AckTimeout ackTimeout_;
|
||||
PathValidationTimeout pathValidationTimeout_;
|
||||
IdleTimeout idleTimeout_;
|
||||
KeepaliveTimeout keepaliveTimeout_;
|
||||
DrainTimeout drainTimeout_;
|
||||
PingTimeout pingTimeout_;
|
||||
ExcessWriteTimeout excessWriteTimeout_;
|
||||
FunctionLooper::Ptr readLooper_;
|
||||
FunctionLooper::Ptr peekLooper_;
|
||||
FunctionLooper::Ptr writeLooper_;
|
||||
|
||||
// TODO: This is silly. We need a better solution.
|
||||
// Uninitialied local address as a fallback answer when socket isn't bound.
|
||||
folly::SocketAddress localFallbackAddress;
|
||||
|
||||
Optional<std::string> exceptionCloseWhat_;
|
||||
|
||||
uint64_t qlogRefcnt_{0};
|
||||
|
||||
// Priority level threshold for background streams
|
||||
@@ -908,10 +738,6 @@ class QuicTransportBase : public QuicSocket,
|
||||
};
|
||||
|
||||
protected:
|
||||
void cancelTimeout(QuicTimerCallback* callback);
|
||||
|
||||
bool isTimeoutScheduled(QuicTimerCallback* callback) const;
|
||||
|
||||
/**
|
||||
* Helper function to validate that the number of ECN packet marks match the
|
||||
* expected value, depending on the ECN state of the connection.
|
||||
@@ -934,19 +760,6 @@ class QuicTransportBase : public QuicSocket,
|
||||
uint64_t packetLimit);
|
||||
|
||||
void onSocketWritable() noexcept override;
|
||||
void maybeStopWriteLooperAndArmSocketWritableEvent();
|
||||
|
||||
/**
|
||||
* Checks the idle timer on write events, and if it's past the idle timeout,
|
||||
* calls the timer finctions.
|
||||
*/
|
||||
void checkIdleTimer(TimePoint now);
|
||||
struct IdleTimeoutCheck {
|
||||
std::chrono::milliseconds idleTimeoutMs{0};
|
||||
Optional<TimePoint> lastTimeIdleTimeoutScheduled_;
|
||||
bool forcedIdleTimeoutScheduled_{false};
|
||||
};
|
||||
IdleTimeoutCheck idleTimeoutCheck_;
|
||||
|
||||
private:
|
||||
/**
|
||||
@@ -972,6 +785,4 @@ class QuicTransportBase : public QuicSocket,
|
||||
[[nodiscard]] bool checkCustomRetransmissionProfilesEnabled() const;
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const QuicTransportBase& qt);
|
||||
|
||||
} // namespace quic
|
||||
|
@@ -18,6 +18,228 @@ constexpr auto APP_NO_ERROR = quic::GenericApplicationErrorCode::NO_ERROR;
|
||||
|
||||
namespace quic {
|
||||
|
||||
inline std::ostream& operator<<(
|
||||
std::ostream& os,
|
||||
const CloseState& closeState) {
|
||||
switch (closeState) {
|
||||
case CloseState::OPEN:
|
||||
os << "OPEN";
|
||||
break;
|
||||
case CloseState::GRACEFUL_CLOSING:
|
||||
os << "GRACEFUL_CLOSING";
|
||||
break;
|
||||
case CloseState::CLOSED:
|
||||
os << "CLOSED";
|
||||
break;
|
||||
}
|
||||
return os;
|
||||
}
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode>
|
||||
QuicTransportBaseLite::notifyPendingWriteOnConnection(
|
||||
QuicSocket::WriteCallback* wcb) {
|
||||
if (closeState_ != CloseState::OPEN) {
|
||||
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
|
||||
}
|
||||
if (connWriteCallback_ != nullptr) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK);
|
||||
}
|
||||
// Assign the write callback before going into the loop so that if we close
|
||||
// the connection while we are still scheduled, the write callback will get
|
||||
// an error synchronously.
|
||||
connWriteCallback_ = wcb;
|
||||
runOnEvbAsync([](auto self) {
|
||||
if (!self->connWriteCallback_) {
|
||||
// The connection was probably closed.
|
||||
return;
|
||||
}
|
||||
auto connWritableBytes = self->maxWritableOnConn();
|
||||
if (connWritableBytes != 0) {
|
||||
auto connWriteCallback = self->connWriteCallback_;
|
||||
self->connWriteCallback_ = nullptr;
|
||||
connWriteCallback->onConnectionWriteReady(connWritableBytes);
|
||||
}
|
||||
});
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode>
|
||||
QuicTransportBaseLite::unregisterStreamWriteCallback(StreamId id) {
|
||||
if (!conn_->streamManager->streamExists(id)) {
|
||||
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
|
||||
}
|
||||
if (pendingWriteCallbacks_.find(id) == pendingWriteCallbacks_.end()) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
|
||||
}
|
||||
pendingWriteCallbacks_.erase(id);
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode>
|
||||
QuicTransportBaseLite::notifyPendingWriteOnStream(
|
||||
StreamId id,
|
||||
QuicSocket::WriteCallback* wcb) {
|
||||
if (isReceivingStream(conn_->nodeType, id)) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
|
||||
}
|
||||
if (closeState_ != CloseState::OPEN) {
|
||||
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
|
||||
}
|
||||
if (!conn_->streamManager->streamExists(id)) {
|
||||
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
|
||||
}
|
||||
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
|
||||
if (!stream->writable()) {
|
||||
return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED);
|
||||
}
|
||||
|
||||
if (wcb == nullptr) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK);
|
||||
}
|
||||
// Add the callback to the pending write callbacks so that if we are closed
|
||||
// while we are scheduled in the loop, the close will error out the
|
||||
// callbacks.
|
||||
auto wcbEmplaceResult = pendingWriteCallbacks_.emplace(id, wcb);
|
||||
if (!wcbEmplaceResult.second) {
|
||||
if ((wcbEmplaceResult.first)->second != wcb) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK);
|
||||
} else {
|
||||
return folly::makeUnexpected(LocalErrorCode::CALLBACK_ALREADY_INSTALLED);
|
||||
}
|
||||
}
|
||||
runOnEvbAsync([id](auto self) {
|
||||
auto wcbIt = self->pendingWriteCallbacks_.find(id);
|
||||
if (wcbIt == self->pendingWriteCallbacks_.end()) {
|
||||
// the connection was probably closed.
|
||||
return;
|
||||
}
|
||||
auto writeCallback = wcbIt->second;
|
||||
if (!self->conn_->streamManager->streamExists(id)) {
|
||||
self->pendingWriteCallbacks_.erase(wcbIt);
|
||||
writeCallback->onStreamWriteError(
|
||||
id, QuicError(LocalErrorCode::STREAM_NOT_EXISTS));
|
||||
return;
|
||||
}
|
||||
auto stream = CHECK_NOTNULL(self->conn_->streamManager->getStream(id));
|
||||
if (!stream->writable()) {
|
||||
self->pendingWriteCallbacks_.erase(wcbIt);
|
||||
writeCallback->onStreamWriteError(
|
||||
id, QuicError(LocalErrorCode::STREAM_NOT_EXISTS));
|
||||
return;
|
||||
}
|
||||
auto maxCanWrite = self->maxWritableOnStream(*stream);
|
||||
if (maxCanWrite != 0) {
|
||||
self->pendingWriteCallbacks_.erase(wcbIt);
|
||||
writeCallback->onStreamWriteReady(id, maxCanWrite);
|
||||
}
|
||||
});
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode>
|
||||
QuicTransportBaseLite::registerByteEventCallback(
|
||||
const ByteEvent::Type type,
|
||||
const StreamId id,
|
||||
const uint64_t offset,
|
||||
ByteEventCallback* cb) {
|
||||
if (isReceivingStream(conn_->nodeType, id)) {
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
|
||||
}
|
||||
if (closeState_ != CloseState::OPEN) {
|
||||
return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED);
|
||||
}
|
||||
[[maybe_unused]] auto self = sharedGuard();
|
||||
if (!conn_->streamManager->streamExists(id)) {
|
||||
return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS);
|
||||
}
|
||||
if (!cb) {
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
ByteEventMap& byteEventMap = getByteEventMap(type);
|
||||
auto byteEventMapIt = byteEventMap.find(id);
|
||||
if (byteEventMapIt == byteEventMap.end()) {
|
||||
byteEventMap.emplace(
|
||||
id,
|
||||
std::initializer_list<std::remove_reference<
|
||||
decltype(byteEventMap)>::type::mapped_type::value_type>(
|
||||
{{offset, cb}}));
|
||||
} else {
|
||||
// Keep ByteEvents for the same stream sorted by offsets:
|
||||
auto pos = std::upper_bound(
|
||||
byteEventMapIt->second.begin(),
|
||||
byteEventMapIt->second.end(),
|
||||
offset,
|
||||
[&](uint64_t o, const ByteEventDetail& p) { return o < p.offset; });
|
||||
if (pos != byteEventMapIt->second.begin()) {
|
||||
auto matchingEvent = std::find_if(
|
||||
byteEventMapIt->second.begin(),
|
||||
pos,
|
||||
[offset, cb](const ByteEventDetail& p) {
|
||||
return ((p.offset == offset) && (p.callback == cb));
|
||||
});
|
||||
if (matchingEvent != pos) {
|
||||
// ByteEvent has been already registered for the same type, id,
|
||||
// offset and for the same recipient, return an INVALID_OPERATION
|
||||
// error to prevent duplicate registrations.
|
||||
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
|
||||
}
|
||||
}
|
||||
byteEventMapIt->second.emplace(pos, offset, cb);
|
||||
}
|
||||
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id));
|
||||
|
||||
// Notify recipients that the registration was successful.
|
||||
cb->onByteEventRegistered(ByteEvent{id, offset, type});
|
||||
|
||||
// if the callback is already ready, we still insert, but schedule to
|
||||
// process
|
||||
Optional<uint64_t> maxOffsetReady;
|
||||
switch (type) {
|
||||
case ByteEvent::Type::ACK:
|
||||
maxOffsetReady = getLargestDeliverableOffset(*stream);
|
||||
break;
|
||||
case ByteEvent::Type::TX:
|
||||
maxOffsetReady = getLargestWriteOffsetTxed(*stream);
|
||||
break;
|
||||
}
|
||||
if (maxOffsetReady.has_value() && (offset <= *maxOffsetReady)) {
|
||||
runOnEvbAsync([id, cb, offset, type](auto selfObj) {
|
||||
if (selfObj->closeState_ != CloseState::OPEN) {
|
||||
// Close will error out all byte event callbacks.
|
||||
return;
|
||||
}
|
||||
|
||||
auto& byteEventMapL = selfObj->getByteEventMap(type);
|
||||
auto streamByteEventCbIt = byteEventMapL.find(id);
|
||||
if (streamByteEventCbIt == byteEventMapL.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// This is scheduled to run in the future (during the next iteration of
|
||||
// the event loop). It is possible that the ByteEventDetail list gets
|
||||
// mutated between the time it was scheduled to now when we are ready to
|
||||
// run it. Look at the current outstanding ByteEvents for this stream ID
|
||||
// and confirm that our ByteEvent's offset and recipient callback are
|
||||
// still present.
|
||||
auto pos = std::find_if(
|
||||
streamByteEventCbIt->second.begin(),
|
||||
streamByteEventCbIt->second.end(),
|
||||
[offset, cb](const ByteEventDetail& p) {
|
||||
return ((p.offset == offset) && (p.callback == cb));
|
||||
});
|
||||
// if our byteEvent is not present, it must have been delivered already.
|
||||
if (pos == streamByteEventCbIt->second.end()) {
|
||||
return;
|
||||
}
|
||||
streamByteEventCbIt->second.erase(pos);
|
||||
|
||||
cb->onByteEvent(ByteEvent{id, offset, type});
|
||||
});
|
||||
}
|
||||
return folly::unit;
|
||||
}
|
||||
|
||||
bool QuicTransportBaseLite::good() const {
|
||||
return closeState_ == CloseState::OPEN && hasWriteCipher() && !error();
|
||||
}
|
||||
@@ -45,6 +267,13 @@ void QuicTransportBaseLite::setConnectionCallback(
|
||||
connCallback_ = callback;
|
||||
}
|
||||
|
||||
uint64_t QuicTransportBaseLite::maxWritableOnStream(
|
||||
const QuicStreamState& stream) const {
|
||||
auto connWritableBytes = maxWritableOnConn();
|
||||
auto streamFlowControlBytes = getSendStreamFlowControlBytesAPI(stream);
|
||||
return std::min(streamFlowControlBytes, connWritableBytes);
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::processConnectionSetupCallbacks(
|
||||
QuicError&& cancelCode) {
|
||||
// connSetupCallback_ could be null if start() was never
|
||||
@@ -73,6 +302,58 @@ void QuicTransportBaseLite::processConnectionCallbacks(QuicError&& cancelCode) {
|
||||
}
|
||||
}
|
||||
|
||||
QuicTransportBaseLite::ByteEventMap& QuicTransportBaseLite::getByteEventMap(
|
||||
const ByteEvent::Type type) {
|
||||
switch (type) {
|
||||
case ByteEvent::Type::ACK:
|
||||
return deliveryCallbacks_;
|
||||
case ByteEvent::Type::TX:
|
||||
return txCallbacks_;
|
||||
}
|
||||
LOG(FATAL) << "Unhandled case in getByteEventMap";
|
||||
folly::assume_unreachable();
|
||||
}
|
||||
|
||||
const QuicTransportBaseLite::ByteEventMap&
|
||||
QuicTransportBaseLite::getByteEventMapConst(const ByteEvent::Type type) const {
|
||||
switch (type) {
|
||||
case ByteEvent::Type::ACK:
|
||||
return deliveryCallbacks_;
|
||||
case ByteEvent::Type::TX:
|
||||
return txCallbacks_;
|
||||
}
|
||||
LOG(FATAL) << "Unhandled case in getByteEventMapConst";
|
||||
folly::assume_unreachable();
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::checkIdleTimer(TimePoint now) {
|
||||
if (closeState_ == CloseState::CLOSED) {
|
||||
return;
|
||||
}
|
||||
if (!idleTimeout_.isTimerCallbackScheduled()) {
|
||||
return;
|
||||
}
|
||||
if (!idleTimeoutCheck_.lastTimeIdleTimeoutScheduled_.has_value()) {
|
||||
return;
|
||||
}
|
||||
if (idleTimeoutCheck_.forcedIdleTimeoutScheduled_) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ((now - *idleTimeoutCheck_.lastTimeIdleTimeoutScheduled_) >=
|
||||
idleTimeoutCheck_.idleTimeoutMs) {
|
||||
// Call timer expiration async.
|
||||
idleTimeoutCheck_.forcedIdleTimeoutScheduled_ = true;
|
||||
runOnEvbAsync([](auto self) {
|
||||
if (!self->good() || self->closeState_ == CloseState::CLOSED) {
|
||||
// The connection was probably closed.
|
||||
return;
|
||||
}
|
||||
self->idleTimeout_.timeoutExpired();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
folly::Expected<QuicSocketLite::StreamTransportInfo, LocalErrorCode>
|
||||
QuicTransportBaseLite::getStreamTransportInfo(StreamId id) const {
|
||||
if (!conn_->streamManager->streamExists(id)) {
|
||||
@@ -121,6 +402,114 @@ QuicTransportBaseLite::getStreamFlowControl(StreamId id) const {
|
||||
stream->flowControlState.advertisedMaxOffset);
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::runOnEvbAsync(
|
||||
folly::Function<void(std::shared_ptr<QuicTransportBaseLite>)> func) {
|
||||
auto evb = getEventBase();
|
||||
evb->runInLoop(
|
||||
[self = sharedGuard(), func = std::move(func), evb]() mutable {
|
||||
if (self->getEventBase() != evb) {
|
||||
// The eventbase changed between scheduling the loop and invoking
|
||||
// the callback, ignore this
|
||||
return;
|
||||
}
|
||||
func(std::move(self));
|
||||
},
|
||||
true);
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::maybeStopWriteLooperAndArmSocketWritableEvent() {
|
||||
if (!socket_ || (closeState_ == CloseState::CLOSED)) {
|
||||
return;
|
||||
}
|
||||
if (conn_->transportSettings.useSockWritableEvents &&
|
||||
!socket_->isWritableCallbackSet()) {
|
||||
// Check if all data has been written and we're not limited by flow
|
||||
// control/congestion control.
|
||||
auto writeReason = shouldWriteData(*conn_);
|
||||
bool haveBufferToRetry = writeReason == WriteDataReason::BUFFERED_WRITE;
|
||||
bool haveNewDataToWrite =
|
||||
(writeReason != WriteDataReason::NO_WRITE) && !haveBufferToRetry;
|
||||
bool haveCongestionControlWindow = true;
|
||||
if (conn_->congestionController) {
|
||||
haveCongestionControlWindow =
|
||||
conn_->congestionController->getWritableBytes() > 0;
|
||||
}
|
||||
bool haveFlowControlWindow = getSendConnFlowControlBytesAPI(*conn_) > 0;
|
||||
bool connHasWriteWindow =
|
||||
haveCongestionControlWindow && haveFlowControlWindow;
|
||||
if (haveBufferToRetry || (haveNewDataToWrite && connHasWriteWindow)) {
|
||||
// Re-arm the write event and stop the write
|
||||
// looper.
|
||||
socket_->resumeWrite(this);
|
||||
writeLooper_->stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::writeSocketDataAndCatch() {
|
||||
[[maybe_unused]] auto self = sharedGuard();
|
||||
try {
|
||||
writeSocketData();
|
||||
processCallbacksAfterWriteData();
|
||||
} catch (const QuicTransportException& ex) {
|
||||
VLOG(4) << __func__ << ex.what() << " " << *this;
|
||||
exceptionCloseWhat_ = ex.what();
|
||||
closeImpl(QuicError(
|
||||
QuicErrorCode(ex.errorCode()),
|
||||
std::string("writeSocketDataAndCatch() error")));
|
||||
} catch (const QuicInternalException& ex) {
|
||||
VLOG(4) << __func__ << ex.what() << " " << *this;
|
||||
exceptionCloseWhat_ = ex.what();
|
||||
closeImpl(QuicError(
|
||||
QuicErrorCode(ex.errorCode()),
|
||||
std::string("writeSocketDataAndCatch() error")));
|
||||
} catch (const std::exception& ex) {
|
||||
VLOG(4) << __func__ << " error=" << ex.what() << " " << *this;
|
||||
exceptionCloseWhat_ = ex.what();
|
||||
closeImpl(QuicError(
|
||||
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
|
||||
std::string("writeSocketDataAndCatch() error")));
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::pacedWriteDataToSocket() {
|
||||
[[maybe_unused]] auto self = sharedGuard();
|
||||
SCOPE_EXIT {
|
||||
self->maybeStopWriteLooperAndArmSocketWritableEvent();
|
||||
};
|
||||
|
||||
if (!isConnectionPaced(*conn_)) {
|
||||
// Not paced and connection is still open, normal write. Even if pacing is
|
||||
// previously enabled and then gets disabled, and we are here due to a
|
||||
// timeout, we should do a normal write to flush out the residue from
|
||||
// pacing write.
|
||||
writeSocketDataAndCatch();
|
||||
|
||||
if (conn_->transportSettings.scheduleTimerForExcessWrites) {
|
||||
// If we still have data to write, yield the event loop now but schedule a
|
||||
// timeout to come around and write again as soon as possible.
|
||||
auto writeDataReason = shouldWriteData(*conn_);
|
||||
if (writeDataReason != WriteDataReason::NO_WRITE &&
|
||||
!excessWriteTimeout_.isTimerCallbackScheduled()) {
|
||||
scheduleTimeout(&excessWriteTimeout_, 0ms);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// We are in the middle of a pacing interval. Leave it be.
|
||||
if (writeLooper_->isPacingScheduled()) {
|
||||
// The next burst is already scheduled. Since the burst size doesn't
|
||||
// depend on much data we currently have in buffer at all, no need to
|
||||
// change anything.
|
||||
return;
|
||||
}
|
||||
|
||||
// Do a burst write before waiting for an interval. This will also call
|
||||
// updateWriteLooper, but inside FunctionLooper we will ignore that.
|
||||
writeSocketDataAndCatch();
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::writeSocketData() {
|
||||
if (socket_) {
|
||||
++(conn_->writeCount); // incremented on each write (or write attempt)
|
||||
@@ -231,6 +620,68 @@ void QuicTransportBaseLite::writeSocketData() {
|
||||
updateWriteLooper(false);
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::cancelTimeout(QuicTimerCallback* callback) {
|
||||
callback->cancelTimerCallback();
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::excessWriteTimeoutExpired() noexcept {
|
||||
auto writeDataReason = shouldWriteData(*conn_);
|
||||
if (writeDataReason != WriteDataReason::NO_WRITE) {
|
||||
pacedWriteDataToSocket();
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::lossTimeoutExpired() noexcept {
|
||||
CHECK_NE(closeState_, CloseState::CLOSED);
|
||||
// onLossDetectionAlarm will set packetToSend in pending events
|
||||
[[maybe_unused]] auto self = sharedGuard();
|
||||
try {
|
||||
onLossDetectionAlarm(*conn_, markPacketLoss);
|
||||
if (conn_->qLogger) {
|
||||
conn_->qLogger->addTransportStateUpdate(kLossTimeoutExpired);
|
||||
}
|
||||
pacedWriteDataToSocket();
|
||||
} catch (const QuicTransportException& ex) {
|
||||
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
|
||||
exceptionCloseWhat_ = ex.what();
|
||||
closeImpl(QuicError(
|
||||
QuicErrorCode(ex.errorCode()),
|
||||
std::string("lossTimeoutExpired() error")));
|
||||
} catch (const QuicInternalException& ex) {
|
||||
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
|
||||
exceptionCloseWhat_ = ex.what();
|
||||
closeImpl(QuicError(
|
||||
QuicErrorCode(ex.errorCode()),
|
||||
std::string("lossTimeoutExpired() error")));
|
||||
} catch (const std::exception& ex) {
|
||||
VLOG(4) << __func__ << " " << ex.what() << " " << *this;
|
||||
exceptionCloseWhat_ = ex.what();
|
||||
closeImpl(QuicError(
|
||||
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
|
||||
std::string("lossTimeoutExpired() error")));
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::idleTimeoutExpired(bool drain) noexcept {
|
||||
VLOG(4) << __func__ << " " << *this;
|
||||
[[maybe_unused]] auto self = sharedGuard();
|
||||
// idle timeout is expired, just close the connection and drain or
|
||||
// send connection close immediately depending on 'drain'
|
||||
DCHECK_NE(closeState_, CloseState::CLOSED);
|
||||
uint64_t numOpenStreans = conn_->streamManager->streamCount();
|
||||
auto localError =
|
||||
drain ? LocalErrorCode::IDLE_TIMEOUT : LocalErrorCode::SHUTTING_DOWN;
|
||||
closeImpl(
|
||||
quic::QuicError(
|
||||
QuicErrorCode(localError),
|
||||
folly::to<std::string>(
|
||||
toString(localError),
|
||||
", num non control streams: ",
|
||||
numOpenStreans - conn_->streamManager->numControlStreams())),
|
||||
drain /* drainConnection */,
|
||||
!drain /* sendCloseImmediately */);
|
||||
}
|
||||
|
||||
bool QuicTransportBaseLite::processCancelCode(const QuicError& cancelCode) {
|
||||
bool noError = false;
|
||||
switch (cancelCode.code.type()) {
|
||||
@@ -268,6 +719,144 @@ uint64_t QuicTransportBaseLite::maxWritableOnConn() const {
|
||||
return ret;
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::scheduleTimeout(
|
||||
QuicTimerCallback* callback,
|
||||
std::chrono::milliseconds timeout) {
|
||||
if (evb_) {
|
||||
evb_->scheduleTimeout(callback, timeout);
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::scheduleLossTimeout(
|
||||
std::chrono::milliseconds timeout) {
|
||||
if (closeState_ == CloseState::CLOSED) {
|
||||
return;
|
||||
}
|
||||
timeout = timeMax(timeout, evb_->getTimerTickInterval());
|
||||
scheduleTimeout(&lossTimeout_, timeout);
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::cancelLossTimeout() {
|
||||
cancelTimeout(&lossTimeout_);
|
||||
}
|
||||
|
||||
bool QuicTransportBaseLite::isTimeoutScheduled(
|
||||
QuicTimerCallback* callback) const {
|
||||
return callback->isTimerCallbackScheduled();
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::invokeReadDataAndCallbacks() {
|
||||
auto self = sharedGuard();
|
||||
SCOPE_EXIT {
|
||||
self->checkForClosedStream();
|
||||
self->updateReadLooper();
|
||||
self->updateWriteLooper(true);
|
||||
};
|
||||
// Need a copy since the set can change during callbacks.
|
||||
std::vector<StreamId> readableStreamsCopy;
|
||||
const auto& readableStreams = self->conn_->streamManager->readableStreams();
|
||||
readableStreamsCopy.reserve(readableStreams.size());
|
||||
std::copy(
|
||||
readableStreams.begin(),
|
||||
readableStreams.end(),
|
||||
std::back_inserter(readableStreamsCopy));
|
||||
if (self->conn_->transportSettings.orderedReadCallbacks) {
|
||||
std::sort(readableStreamsCopy.begin(), readableStreamsCopy.end());
|
||||
}
|
||||
for (StreamId streamId : readableStreamsCopy) {
|
||||
auto callback = self->readCallbacks_.find(streamId);
|
||||
if (callback == self->readCallbacks_.end()) {
|
||||
// Stream doesn't have a read callback set, skip it.
|
||||
continue;
|
||||
}
|
||||
auto readCb = callback->second.readCb;
|
||||
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId));
|
||||
if (readCb && stream->streamReadError) {
|
||||
self->conn_->streamManager->readableStreams().erase(streamId);
|
||||
readCallbacks_.erase(callback);
|
||||
// if there is an error on the stream - it's not readable anymore, so
|
||||
// we cannot peek into it as well.
|
||||
self->conn_->streamManager->peekableStreams().erase(streamId);
|
||||
peekCallbacks_.erase(streamId);
|
||||
VLOG(10) << "invoking read error callbacks on stream=" << streamId << " "
|
||||
<< *this;
|
||||
if (!stream->groupId) {
|
||||
readCb->readError(streamId, QuicError(*stream->streamReadError));
|
||||
} else {
|
||||
readCb->readErrorWithGroup(
|
||||
streamId, *stream->groupId, QuicError(*stream->streamReadError));
|
||||
}
|
||||
} else if (
|
||||
readCb && callback->second.resumed && stream->hasReadableData()) {
|
||||
VLOG(10) << "invoking read callbacks on stream=" << streamId << " "
|
||||
<< *this;
|
||||
if (!stream->groupId) {
|
||||
readCb->readAvailable(streamId);
|
||||
} else {
|
||||
readCb->readAvailableWithGroup(streamId, *stream->groupId);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (self->datagramCallback_ && !conn_->datagramState.readBuffer.empty()) {
|
||||
self->datagramCallback_->onDatagramsAvailable();
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::invokePeekDataAndCallbacks() {
|
||||
auto self = sharedGuard();
|
||||
SCOPE_EXIT {
|
||||
self->checkForClosedStream();
|
||||
self->updatePeekLooper();
|
||||
self->updateWriteLooper(true);
|
||||
};
|
||||
// TODO: add protection from calling "consume" in the middle of the peek -
|
||||
// one way is to have a peek counter that is incremented when peek calblack
|
||||
// is called and decremented when peek is done. once counter transitions
|
||||
// to 0 we can execute "consume" calls that were done during "peek", for that,
|
||||
// we would need to keep stack of them.
|
||||
std::vector<StreamId> peekableStreamsCopy;
|
||||
const auto& peekableStreams = self->conn_->streamManager->peekableStreams();
|
||||
peekableStreamsCopy.reserve(peekableStreams.size());
|
||||
std::copy(
|
||||
peekableStreams.begin(),
|
||||
peekableStreams.end(),
|
||||
std::back_inserter(peekableStreamsCopy));
|
||||
VLOG(10) << __func__
|
||||
<< " peekableListCopy.size()=" << peekableStreamsCopy.size();
|
||||
for (StreamId streamId : peekableStreamsCopy) {
|
||||
auto callback = self->peekCallbacks_.find(streamId);
|
||||
// This is a likely bug. Need to think more on whether events can
|
||||
// be dropped
|
||||
// remove streamId from list of peekable - as opposed to "read", "peek" is
|
||||
// only called once per streamId and not on every EVB loop until application
|
||||
// reads the data.
|
||||
self->conn_->streamManager->peekableStreams().erase(streamId);
|
||||
if (callback == self->peekCallbacks_.end()) {
|
||||
VLOG(10) << " No peek callback for stream=" << streamId;
|
||||
continue;
|
||||
}
|
||||
auto peekCb = callback->second.peekCb;
|
||||
auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId));
|
||||
if (peekCb && stream->streamReadError) {
|
||||
VLOG(10) << "invoking peek error callbacks on stream=" << streamId << " "
|
||||
<< *this;
|
||||
peekCb->peekError(streamId, QuicError(*stream->streamReadError));
|
||||
} else if (
|
||||
peekCb && !stream->streamReadError && stream->hasPeekableData()) {
|
||||
VLOG(10) << "invoking peek callbacks on stream=" << streamId << " "
|
||||
<< *this;
|
||||
|
||||
peekDataFromQuicStream(
|
||||
*stream,
|
||||
[&](StreamId id, const folly::Range<PeekIterator>& peekRange) {
|
||||
peekCb->onDataAvailable(id, peekRange);
|
||||
});
|
||||
} else {
|
||||
VLOG(10) << "Not invoking peek callbacks on stream=" << streamId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::updatePacketProcessorsPrewriteRequests() {
|
||||
folly::SocketCmsgMap cmsgs;
|
||||
for (const auto& pp : conn_->packetProcessors) {
|
||||
@@ -286,4 +875,14 @@ void QuicTransportBaseLite::updatePacketProcessorsPrewriteRequests() {
|
||||
conn_->socketCmsgsState.targetWriteCount = conn_->writeCount;
|
||||
}
|
||||
|
||||
void QuicTransportBaseLite::describe(std::ostream& os) const {
|
||||
CHECK(conn_);
|
||||
os << *conn_;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const QuicTransportBaseLite& qt) {
|
||||
qt.describe(os);
|
||||
return os;
|
||||
}
|
||||
|
||||
} // namespace quic
|
||||
|
@@ -8,12 +8,14 @@
|
||||
#pragma once
|
||||
|
||||
#include <quic/api/QuicSocketLite.h>
|
||||
#include <quic/common/FunctionLooper.h>
|
||||
|
||||
namespace quic {
|
||||
|
||||
enum class CloseState { OPEN, GRACEFUL_CLOSING, CLOSED };
|
||||
|
||||
class QuicTransportBaseLite : virtual public QuicSocketLite {
|
||||
class QuicTransportBaseLite : virtual public QuicSocketLite,
|
||||
QuicAsyncUDPSocket::WriteCallback {
|
||||
public:
|
||||
QuicTransportBaseLite(
|
||||
std::shared_ptr<QuicEventBase> evb,
|
||||
@@ -21,7 +23,14 @@ class QuicTransportBaseLite : virtual public QuicSocketLite {
|
||||
bool useConnectionEndWithErrorCallback)
|
||||
: evb_(evb),
|
||||
socket_(std::move(socket)),
|
||||
useConnectionEndWithErrorCallback_(useConnectionEndWithErrorCallback) {}
|
||||
useConnectionEndWithErrorCallback_(useConnectionEndWithErrorCallback),
|
||||
lossTimeout_(this),
|
||||
excessWriteTimeout_(this),
|
||||
idleTimeout_(this),
|
||||
writeLooper_(new FunctionLooper(
|
||||
evb_,
|
||||
[this]() { pacedWriteDataToSocket(); },
|
||||
LooperType::WriteLooper)) {}
|
||||
|
||||
/**
|
||||
* Invoked when we have to write some data to the wire.
|
||||
@@ -31,6 +40,26 @@ class QuicTransportBaseLite : virtual public QuicSocketLite {
|
||||
*/
|
||||
virtual void writeData() = 0;
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode> notifyPendingWriteOnStream(
|
||||
StreamId id,
|
||||
QuicSocketLite::WriteCallback* wcb) override;
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode> notifyPendingWriteOnConnection(
|
||||
QuicSocketLite::WriteCallback* wcb) override;
|
||||
|
||||
folly::Expected<folly::Unit, LocalErrorCode> unregisterStreamWriteCallback(
|
||||
StreamId id) override;
|
||||
|
||||
/**
|
||||
* Register a byte event to be triggered when specified event type occurs for
|
||||
* the specified stream and offset.
|
||||
*/
|
||||
folly::Expected<folly::Unit, LocalErrorCode> registerByteEventCallback(
|
||||
const ByteEvent::Type type,
|
||||
const StreamId id,
|
||||
const uint64_t offset,
|
||||
ByteEventCallback* cb) override;
|
||||
|
||||
bool good() const override;
|
||||
|
||||
bool error() const override;
|
||||
@@ -54,6 +83,8 @@ class QuicTransportBaseLite : virtual public QuicSocketLite {
|
||||
void setSendBuffer(StreamId, size_t /*maxUnacked*/, size_t /*maxUnsent*/)
|
||||
override {}
|
||||
|
||||
uint64_t maxWritableOnStream(const QuicStreamState&) const;
|
||||
|
||||
[[nodiscard]] std::shared_ptr<QuicEventBase> getEventBase() const override;
|
||||
|
||||
folly::Expected<StreamTransportInfo, LocalErrorCode> getStreamTransportInfo(
|
||||
@@ -74,15 +105,76 @@ class QuicTransportBaseLite : virtual public QuicSocketLite {
|
||||
|
||||
[[nodiscard]] uint64_t maxWritableOnConn() const override;
|
||||
|
||||
virtual void scheduleLossTimeout(std::chrono::milliseconds /* timeout */) {
|
||||
// TODO: Fill this in from QuicTransportBase and remove the "virtual"
|
||||
// qualifier
|
||||
}
|
||||
void scheduleTimeout(
|
||||
QuicTimerCallback* callback,
|
||||
std::chrono::milliseconds timeout);
|
||||
|
||||
virtual void cancelLossTimeout() {
|
||||
// TODO: Fill this in from QuicTransportBase and remove the "virtual"
|
||||
// qualifier
|
||||
}
|
||||
class ExcessWriteTimeout : public QuicTimerCallback {
|
||||
public:
|
||||
~ExcessWriteTimeout() override = default;
|
||||
|
||||
explicit ExcessWriteTimeout(QuicTransportBaseLite* transport)
|
||||
: transport_(transport) {}
|
||||
|
||||
void timeoutExpired() noexcept override {
|
||||
transport_->excessWriteTimeoutExpired();
|
||||
}
|
||||
|
||||
void callbackCanceled() noexcept override {
|
||||
// Do nothing.
|
||||
return;
|
||||
}
|
||||
|
||||
private:
|
||||
QuicTransportBaseLite* transport_;
|
||||
};
|
||||
|
||||
// Timeout functions
|
||||
class LossTimeout : public QuicTimerCallback {
|
||||
public:
|
||||
~LossTimeout() override = default;
|
||||
|
||||
explicit LossTimeout(QuicTransportBaseLite* transport)
|
||||
: transport_(transport) {}
|
||||
|
||||
void timeoutExpired() noexcept override {
|
||||
transport_->lossTimeoutExpired();
|
||||
}
|
||||
|
||||
virtual void callbackCanceled() noexcept override {
|
||||
// ignore. this usually means that the eventbase is dying, so we will be
|
||||
// canceled anyway
|
||||
return;
|
||||
}
|
||||
|
||||
private:
|
||||
QuicTransportBaseLite* transport_;
|
||||
};
|
||||
|
||||
class IdleTimeout : public QuicTimerCallback {
|
||||
public:
|
||||
~IdleTimeout() override = default;
|
||||
|
||||
explicit IdleTimeout(QuicTransportBaseLite* transport)
|
||||
: transport_(transport) {}
|
||||
|
||||
void timeoutExpired() noexcept override {
|
||||
transport_->idleTimeoutExpired(true /* drain */);
|
||||
}
|
||||
|
||||
void callbackCanceled() noexcept override {
|
||||
// skip drain when canceling the timeout, to avoid scheduling a new
|
||||
// drain timeout
|
||||
transport_->idleTimeoutExpired(false /* drain */);
|
||||
}
|
||||
|
||||
private:
|
||||
QuicTransportBaseLite* transport_;
|
||||
};
|
||||
|
||||
void scheduleLossTimeout(std::chrono::milliseconds timeout);
|
||||
|
||||
void cancelLossTimeout();
|
||||
|
||||
virtual bool isLossTimeoutScheduled() {
|
||||
// TODO: Fill this in from QuicTransportBase and remove the "virtual"
|
||||
@@ -90,7 +182,34 @@ class QuicTransportBaseLite : virtual public QuicSocketLite {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a shared_ptr which can be used as a guard to keep this
|
||||
* object alive.
|
||||
*/
|
||||
virtual std::shared_ptr<QuicTransportBaseLite> sharedGuard() = 0;
|
||||
|
||||
void describe(std::ostream& os) const;
|
||||
|
||||
protected:
|
||||
/**
|
||||
* A wrapper around writeSocketData
|
||||
*
|
||||
* writeSocketDataAndCatch protects writeSocketData in a try-catch. It also
|
||||
* dispatch the next write loop.
|
||||
*/
|
||||
void writeSocketDataAndCatch();
|
||||
|
||||
/**
|
||||
* Paced write data to socket when connection is paced.
|
||||
*
|
||||
* Whether connection is paced will be decided by TransportSettings and
|
||||
* congection controller. When the connection is paced, this function writes
|
||||
* out a burst size of packets and let the writeLooper schedule a callback to
|
||||
* write another burst after a pacing interval if there are more data to
|
||||
* write. When the connection isn't paced, this function does a normal write.
|
||||
*/
|
||||
void pacedWriteDataToSocket();
|
||||
|
||||
/**
|
||||
* write data to socket
|
||||
*
|
||||
@@ -101,6 +220,17 @@ class QuicTransportBaseLite : virtual public QuicSocketLite {
|
||||
*/
|
||||
void writeSocketData();
|
||||
|
||||
virtual void closeImpl(
|
||||
Optional<QuicError> /* error */,
|
||||
bool /* drainConnection */ = true,
|
||||
bool /* sendCloseImmediately */ = true) {
|
||||
// TODO: Fill this in from QuicTransportBase and remove the "virtual"
|
||||
// qualifier
|
||||
}
|
||||
|
||||
void runOnEvbAsync(
|
||||
folly::Function<void(std::shared_ptr<QuicTransportBaseLite>)> func);
|
||||
|
||||
virtual void updateWriteLooper(
|
||||
bool /* thisIteration */,
|
||||
bool /* runInline */ = false) {
|
||||
@@ -108,6 +238,34 @@ class QuicTransportBaseLite : virtual public QuicSocketLite {
|
||||
// qualifier
|
||||
}
|
||||
|
||||
virtual void updateReadLooper() {
|
||||
// TODO: Fill this in from QuicTransportBase and remove the "virtual"
|
||||
// qualifier
|
||||
}
|
||||
|
||||
virtual void updatePeekLooper() {
|
||||
// TODO: Fill this in from QuicTransportBase and remove the "virtual"
|
||||
// qualifier
|
||||
}
|
||||
|
||||
void maybeStopWriteLooperAndArmSocketWritableEvent();
|
||||
|
||||
virtual void checkForClosedStream() {
|
||||
// TODO: Fill this in from QuicTransportBase and remove the "virtual"
|
||||
// qualifier
|
||||
}
|
||||
|
||||
void cancelTimeout(QuicTimerCallback* callback);
|
||||
|
||||
void excessWriteTimeoutExpired() noexcept;
|
||||
void lossTimeoutExpired() noexcept;
|
||||
void idleTimeoutExpired(bool drain) noexcept;
|
||||
|
||||
bool isTimeoutScheduled(QuicTimerCallback* callback) const;
|
||||
|
||||
void invokeReadDataAndCallbacks();
|
||||
void invokePeekDataAndCallbacks();
|
||||
|
||||
// Helpers to notify all registered observers about specific events during
|
||||
// socket write (if enabled in the observer's config).
|
||||
virtual void notifyStartWritingFromAppRateLimited() {
|
||||
@@ -126,6 +284,11 @@ class QuicTransportBaseLite : virtual public QuicSocketLite {
|
||||
// qualifier
|
||||
}
|
||||
|
||||
virtual void processCallbacksAfterWriteData() {
|
||||
// TODO: Fill this in from QuicTransportBase and remove the "virtual"
|
||||
// qualifier
|
||||
}
|
||||
|
||||
virtual void setIdleTimer() {
|
||||
// TODO: Fill this in from QuicTransportBase and remove the "virtual"
|
||||
// qualifier
|
||||
@@ -161,6 +324,64 @@ class QuicTransportBaseLite : virtual public QuicSocketLite {
|
||||
|
||||
bool transportReadyNotified_{false};
|
||||
|
||||
struct ReadCallbackData {
|
||||
ReadCallback* readCb;
|
||||
bool resumed{true};
|
||||
bool deliveredEOM{false};
|
||||
|
||||
ReadCallbackData(ReadCallback* readCallback) : readCb(readCallback) {}
|
||||
};
|
||||
|
||||
struct PeekCallbackData {
|
||||
PeekCallback* peekCb;
|
||||
bool resumed{true};
|
||||
|
||||
PeekCallbackData(PeekCallback* peekCallback) : peekCb(peekCallback) {}
|
||||
};
|
||||
|
||||
DatagramCallback* datagramCallback_{nullptr};
|
||||
|
||||
folly::F14FastMap<StreamId, ReadCallbackData> readCallbacks_;
|
||||
folly::F14FastMap<StreamId, PeekCallbackData> peekCallbacks_;
|
||||
|
||||
std::map<StreamId, QuicSocketLite::WriteCallback*> pendingWriteCallbacks_;
|
||||
QuicSocketLite::WriteCallback* connWriteCallback_{nullptr};
|
||||
|
||||
struct ByteEventDetail {
|
||||
ByteEventDetail(uint64_t offsetIn, ByteEventCallback* callbackIn)
|
||||
: offset(offsetIn), callback(callbackIn) {}
|
||||
uint64_t offset;
|
||||
ByteEventCallback* callback;
|
||||
};
|
||||
|
||||
using ByteEventMap = folly::F14FastMap<StreamId, std::deque<ByteEventDetail>>;
|
||||
ByteEventMap& getByteEventMap(const ByteEvent::Type type);
|
||||
FOLLY_NODISCARD const ByteEventMap& getByteEventMapConst(
|
||||
const ByteEvent::Type type) const;
|
||||
|
||||
ByteEventMap deliveryCallbacks_;
|
||||
ByteEventMap txCallbacks_;
|
||||
|
||||
/**
|
||||
* Checks the idle timer on write events, and if it's past the idle timeout,
|
||||
* calls the timer finctions.
|
||||
*/
|
||||
void checkIdleTimer(TimePoint now);
|
||||
struct IdleTimeoutCheck {
|
||||
std::chrono::milliseconds idleTimeoutMs{0};
|
||||
Optional<TimePoint> lastTimeIdleTimeoutScheduled_;
|
||||
bool forcedIdleTimeoutScheduled_{false};
|
||||
};
|
||||
IdleTimeoutCheck idleTimeoutCheck_;
|
||||
|
||||
LossTimeout lossTimeout_;
|
||||
ExcessWriteTimeout excessWriteTimeout_;
|
||||
IdleTimeout idleTimeout_;
|
||||
|
||||
FunctionLooper::Ptr writeLooper_;
|
||||
|
||||
Optional<std::string> exceptionCloseWhat_;
|
||||
|
||||
std::
|
||||
unique_ptr<QuicConnectionStateBase, folly::DelayedDestruction::Destructor>
|
||||
conn_;
|
||||
@@ -175,4 +396,6 @@ class QuicTransportBaseLite : virtual public QuicSocketLite {
|
||||
void updatePacketProcessorsPrewriteRequests();
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const QuicTransportBaseLite& qt);
|
||||
|
||||
} // namespace quic
|
||||
|
@@ -357,7 +357,7 @@ class TestQuicTransport
|
||||
return conn_->oneRttWriteCipher != nullptr;
|
||||
}
|
||||
|
||||
std::shared_ptr<QuicTransportBase> sharedGuard() override {
|
||||
std::shared_ptr<QuicTransportBaseLite> sharedGuard() override {
|
||||
return shared_from_this();
|
||||
}
|
||||
|
||||
|
@@ -124,7 +124,7 @@ class TestQuicTransport
|
||||
return true;
|
||||
}
|
||||
|
||||
std::shared_ptr<QuicTransportBase> sharedGuard() override {
|
||||
std::shared_ptr<QuicTransportBaseLite> sharedGuard() override {
|
||||
return shared_from_this();
|
||||
}
|
||||
|
||||
|
@@ -1065,7 +1065,7 @@ bool QuicClientTransport::hasWriteCipher() const {
|
||||
return clientConn_->oneRttWriteCipher || clientConn_->zeroRttWriteCipher;
|
||||
}
|
||||
|
||||
std::shared_ptr<QuicTransportBase> QuicClientTransport::sharedGuard() {
|
||||
std::shared_ptr<QuicTransportBaseLite> QuicClientTransport::sharedGuard() {
|
||||
return shared_from_this();
|
||||
}
|
||||
|
||||
|
@@ -153,7 +153,7 @@ class QuicClientTransport
|
||||
void closeTransport() override;
|
||||
void unbindConnection() override;
|
||||
bool hasWriteCipher() const override;
|
||||
std::shared_ptr<QuicTransportBase> sharedGuard() override;
|
||||
std::shared_ptr<QuicTransportBaseLite> sharedGuard() override;
|
||||
|
||||
// QuicAsyncUDPSocket::ReadCallback
|
||||
void onReadClosed() noexcept override {}
|
||||
|
@@ -54,7 +54,6 @@ class QuicClientTransportMock : public QuicClientTransport {
|
||||
(const));
|
||||
MOCK_METHOD((bool), isTLSResumed, (), (const));
|
||||
MOCK_METHOD((ZeroRttAttemptState), getZeroRttState, ());
|
||||
MOCK_METHOD((void), closeImpl, (Optional<QuicError>, bool, bool));
|
||||
MOCK_METHOD((void), close, (Optional<QuicError>));
|
||||
MOCK_METHOD((void), writeData, ());
|
||||
MOCK_METHOD((void), closeSecondSocket, ());
|
||||
|
@@ -387,7 +387,7 @@ bool QuicServerTransport::hasReadCipher() const {
|
||||
conn_->readCodec->getOneRttReadCipher() != nullptr;
|
||||
}
|
||||
|
||||
std::shared_ptr<QuicTransportBase> QuicServerTransport::sharedGuard() {
|
||||
std::shared_ptr<QuicTransportBaseLite> QuicServerTransport::sharedGuard() {
|
||||
return shared_from_this();
|
||||
}
|
||||
|
||||
|
@@ -144,7 +144,7 @@ class QuicServerTransport
|
||||
void closeTransport() override;
|
||||
void unbindConnection() override;
|
||||
bool hasWriteCipher() const override;
|
||||
std::shared_ptr<QuicTransportBase> sharedGuard() override;
|
||||
std::shared_ptr<QuicTransportBaseLite> sharedGuard() override;
|
||||
QuicConnectionStats getConnectionsStats() const override;
|
||||
|
||||
WriteResult writeBufMeta(
|
||||
|
Reference in New Issue
Block a user