diff --git a/quic/api/QuicSocket.h b/quic/api/QuicSocket.h index eca66f17c..c9724df7a 100644 --- a/quic/api/QuicSocket.h +++ b/quic/api/QuicSocket.h @@ -292,40 +292,6 @@ class QuicSocket : virtual public QuicSocketLite { virtual folly::Expected getStreamPriority( StreamId id) = 0; - /** - * ===== Read API ==== - */ - - /** - * Callback class for receiving data on a stream - */ - class ReadCallback { - public: - virtual ~ReadCallback() = default; - - /** - * Called from the transport layer when there is data, EOF or an error - * available to read on the given stream ID - */ - virtual void readAvailable(StreamId id) noexcept = 0; - - /* - * Same as above, but called on streams within a group. - */ - virtual void readAvailableWithGroup(StreamId, StreamGroupId) noexcept {} - - /** - * Called from the transport layer when there is an error on the stream. - */ - virtual void readError(StreamId id, QuicError error) noexcept = 0; - - /** - * Same as above, but called on streams within a group. - */ - virtual void - readErrorWithGroup(StreamId, StreamGroupId, QuicError) noexcept {} - }; - /** * Set the read callback for the given stream. Note that read callback is * expected to be set all the time. Removing read callback indicates that @@ -433,28 +399,6 @@ class QuicSocket : virtual public QuicSocketLite { * }; */ - using PeekIterator = CircularDeque::const_iterator; - class PeekCallback { - public: - virtual ~PeekCallback() = default; - - /** - * Called from the transport layer when there is new data available to - * peek on a given stream. - * Callback can be called multiple times and it is up to application to - * de-dupe already peeked ranges. - */ - virtual void onDataAvailable( - StreamId id, - const folly::Range& peekData) noexcept = 0; - - /** - * Called from the transport layer during peek time when there is an error - * on the stream. - */ - virtual void peekError(StreamId id, QuicError error) noexcept = 0; - }; - virtual folly::Expected setPeekCallback( StreamId id, PeekCallback* cb) = 0; @@ -600,122 +544,6 @@ class QuicSocket : virtual public QuicSocketLite { virtual StreamDirectionality getStreamDirectionality( StreamId stream) noexcept = 0; - /** - * Callback class for receiving write readiness notifications - */ - class WriteCallback { - public: - virtual ~WriteCallback() = default; - - /** - * Invoked when stream is ready to write after notifyPendingWriteOnStream - * has previously been called. - * - * maxToSend represents the amount of data that the transport layer expects - * to write to the network during this event loop, eg: - * min(remaining flow control, remaining send buffer space) - */ - virtual void onStreamWriteReady( - StreamId /* id */, - uint64_t /* maxToSend */) noexcept {} - - /** - * Invoked when connection is ready to write after - * notifyPendingWriteOnConnection has previously been called. - * - * maxToSend represents the amount of data that the transport layer expects - * to write to the network during this event loop, eg: - * min(remaining flow control, remaining send buffer space) - */ - virtual void onConnectionWriteReady(uint64_t /* maxToSend */) noexcept {} - - /** - * Invoked when a connection is being torn down after - * notifyPendingWriteOnStream has been called - */ - virtual void onStreamWriteError( - StreamId /* id */, - QuicError /* error */) noexcept {} - - /** - * Invoked when a connection is being torn down after - * notifyPendingWriteOnConnection has been called - */ - virtual void onConnectionWriteError(QuicError - /* error */) noexcept {} - }; - - /** - * Inform the transport that there is data to write on this connection - * An app shouldn't mix connection and stream calls to this API - * Use this if the app wants to do prioritization. - */ - virtual folly::Expected - notifyPendingWriteOnConnection(WriteCallback* wcb) = 0; - - /** - * Inform the transport that there is data to write on a given stream. - * An app shouldn't mix connection and stream calls to this API - * Use the Connection call if the app wants to do prioritization. - */ - virtual folly::Expected - notifyPendingWriteOnStream(StreamId id, WriteCallback* wcb) = 0; - - virtual folly::Expected - unregisterStreamWriteCallback(StreamId) = 0; - - /** - * Structure used to communicate TX and ACK/Delivery notifications. - */ - struct ByteEvent { - enum class Type { ACK = 1, TX = 2 }; - static constexpr std::array kByteEventTypes = { - {Type::ACK, Type::TX}}; - - StreamId id{0}; - uint64_t offset{0}; - Type type; - - // sRTT at time of event - // TODO(bschlinker): Deprecate, caller can fetch transport state if - // desired. - std::chrono::microseconds srtt{0us}; - }; - - /** - * Structure used to communicate cancellation of a ByteEvent. - * - * According to Dictionary.com, cancellation is more frequent in American - * English than cancellation. Yet in American English, the preferred style is - * typically not to double the final L, so cancel generally becomes canceled. - */ - using ByteEventCancellation = ByteEvent; - - /** - * Callback class for receiving byte event (TX/ACK) notifications. - */ - class ByteEventCallback { - public: - virtual ~ByteEventCallback() = default; - - /** - * Invoked when a byte event has been successfully registered. - * Since this is a convenience notification and not a mandatory callback, - * not marking this as pure virtual. - */ - virtual void onByteEventRegistered(ByteEvent /* byteEvent */) {} - - /** - * Invoked when the byte event has occurred. - */ - virtual void onByteEvent(ByteEvent byteEvent) = 0; - - /** - * Invoked if byte event is canceled due to reset, shutdown, or other error. - */ - virtual void onByteEventCanceled(ByteEventCancellation cancellation) = 0; - }; - /** * Callback class for receiving ack notifications */ @@ -772,21 +600,6 @@ class QuicSocket : virtual public QuicSocketLite { const uint64_t offset, ByteEventCallback* cb) = 0; - /** - * Register a byte event to be triggered when specified event type occurs for - * the specified stream and offset. - * - * If the registration fails, the callback (ByteEventCallback* cb) will NEVER - * be invoked for anything. If the registration succeeds, the callback is - * guaranteed to receive an onByteEventRegistered() notification. - */ - virtual folly::Expected - registerByteEventCallback( - const ByteEvent::Type type, - const StreamId id, - const uint64_t offset, - ByteEventCallback* cb) = 0; - /** * Cancel byte event callbacks for given stream. * @@ -1099,16 +912,6 @@ class QuicSocket : virtual public QuicSocketLite { * or loss notification support for Datagram. */ - class DatagramCallback { - public: - virtual ~DatagramCallback() = default; - - /** - * Notifies the DatagramCallback that datagrams are available for read. - */ - virtual void onDatagramsAvailable() noexcept = 0; - }; - /** * Set the read callback for Datagrams */ diff --git a/quic/api/QuicSocketLite.h b/quic/api/QuicSocketLite.h index 1f44af320..e2f5396aa 100644 --- a/quic/api/QuicSocketLite.h +++ b/quic/api/QuicSocketLite.h @@ -230,6 +230,233 @@ class QuicSocketLite { receiveWindowMaxOffset(receiveWindowMaxOffsetIn) {} }; + /** + * ===== Read API ==== + */ + + /** + * Callback class for receiving data on a stream + */ + class ReadCallback { + public: + virtual ~ReadCallback() = default; + + /** + * Called from the transport layer when there is data, EOF or an error + * available to read on the given stream ID + */ + virtual void readAvailable(StreamId id) noexcept = 0; + + /* + * Same as above, but called on streams within a group. + */ + virtual void readAvailableWithGroup(StreamId, StreamGroupId) noexcept {} + + /** + * Called from the transport layer when there is an error on the stream. + */ + virtual void readError(StreamId id, QuicError error) noexcept = 0; + + /** + * Same as above, but called on streams within a group. + */ + virtual void + readErrorWithGroup(StreamId, StreamGroupId, QuicError) noexcept {} + }; + + /** + * ===== Peek/Consume API ===== + */ + + /** + * Usage: + * class Application { + * void onNewBidirectionalStream(StreamId id) { + * socket_->setPeekCallback(id, this); + * } + * + * virtual void onDataAvailable( + * StreamId id, + * const folly::Range& peekData) noexcept override + * { + * auto amount = tryInterpret(peekData); + * if (amount) { + * socket_->consume(id, amount); + * } + * } + * }; + */ + + using PeekIterator = CircularDeque::const_iterator; + class PeekCallback { + public: + virtual ~PeekCallback() = default; + + /** + * Called from the transport layer when there is new data available to + * peek on a given stream. + * Callback can be called multiple times and it is up to application to + * de-dupe already peeked ranges. + */ + virtual void onDataAvailable( + StreamId id, + const folly::Range& peekData) noexcept = 0; + + /** + * Called from the transport layer during peek time when there is an error + * on the stream. + */ + virtual void peekError(StreamId id, QuicError error) noexcept = 0; + }; + + /** + * Structure used to communicate TX and ACK/Delivery notifications. + */ + struct ByteEvent { + enum class Type { ACK = 1, TX = 2 }; + static constexpr std::array kByteEventTypes = { + {Type::ACK, Type::TX}}; + + StreamId id{0}; + uint64_t offset{0}; + Type type; + + // sRTT at time of event + // TODO(bschlinker): Deprecate, caller can fetch transport state if + // desired. + std::chrono::microseconds srtt{0us}; + }; + + /** + * Structure used to communicate cancellation of a ByteEvent. + * + * According to Dictionary.com, cancellation is more frequent in American + * English than cancellation. Yet in American English, the preferred style is + * typically not to double the final L, so cancel generally becomes canceled. + */ + using ByteEventCancellation = ByteEvent; + + /** + * Callback class for receiving byte event (TX/ACK) notifications. + */ + class ByteEventCallback { + public: + virtual ~ByteEventCallback() = default; + + /** + * Invoked when a byte event has been successfully registered. + * Since this is a convenience notification and not a mandatory callback, + * not marking this as pure virtual. + */ + virtual void onByteEventRegistered(ByteEvent /* byteEvent */) {} + + /** + * Invoked when the byte event has occurred. + */ + virtual void onByteEvent(ByteEvent byteEvent) = 0; + + /** + * Invoked if byte event is canceled due to reset, shutdown, or other error. + */ + virtual void onByteEventCanceled(ByteEventCancellation cancellation) = 0; + }; + + /** + * Register a byte event to be triggered when specified event type occurs for + * the specified stream and offset. + * + * If the registration fails, the callback (ByteEventCallback* cb) will NEVER + * be invoked for anything. If the registration succeeds, the callback is + * guaranteed to receive an onByteEventRegistered() notification. + */ + virtual folly::Expected + registerByteEventCallback( + const ByteEvent::Type type, + const StreamId id, + const uint64_t offset, + ByteEventCallback* cb) = 0; + + /** + * ===== Datagram API ===== + * + * Datagram support is experimental. Currently there isn't delivery callback + * or loss notification support for Datagram. + */ + + class DatagramCallback { + public: + virtual ~DatagramCallback() = default; + + /** + * Notifies the DatagramCallback that datagrams are available for read. + */ + virtual void onDatagramsAvailable() noexcept = 0; + }; + + /** + * Callback class for receiving write readiness notifications + */ + class WriteCallback { + public: + virtual ~WriteCallback() = default; + + /** + * Invoked when stream is ready to write after notifyPendingWriteOnStream + * has previously been called. + * + * maxToSend represents the amount of data that the transport layer expects + * to write to the network during this event loop, eg: + * min(remaining flow control, remaining send buffer space) + */ + virtual void onStreamWriteReady( + StreamId /* id */, + uint64_t /* maxToSend */) noexcept {} + + /** + * Invoked when connection is ready to write after + * notifyPendingWriteOnConnection has previously been called. + * + * maxToSend represents the amount of data that the transport layer expects + * to write to the network during this event loop, eg: + * min(remaining flow control, remaining send buffer space) + */ + virtual void onConnectionWriteReady(uint64_t /* maxToSend */) noexcept {} + + /** + * Invoked when a connection is being torn down after + * notifyPendingWriteOnStream has been called + */ + virtual void onStreamWriteError( + StreamId /* id */, + QuicError /* error */) noexcept {} + + /** + * Invoked when a connection is being torn down after + * notifyPendingWriteOnConnection has been called + */ + virtual void onConnectionWriteError(QuicError + /* error */) noexcept {} + }; + + /** + * Inform the transport that there is data to write on this connection + * An app shouldn't mix connection and stream calls to this API + * Use this if the app wants to do prioritization. + */ + virtual folly::Expected + notifyPendingWriteOnConnection(WriteCallback* wcb) = 0; + + /** + * Inform the transport that there is data to write on a given stream. + * An app shouldn't mix connection and stream calls to this API + * Use the Connection call if the app wants to do prioritization. + */ + virtual folly::Expected + notifyPendingWriteOnStream(StreamId id, WriteCallback* wcb) = 0; + + virtual folly::Expected + unregisterStreamWriteCallback(StreamId) = 0; + /** * Application can invoke this function to signal the transport to * initiate migration. diff --git a/quic/api/QuicTransportBase.cpp b/quic/api/QuicTransportBase.cpp index 2ffc891ce..722c4cd41 100644 --- a/quic/api/QuicTransportBase.cpp +++ b/quic/api/QuicTransportBase.cpp @@ -51,14 +51,11 @@ QuicTransportBase::QuicTransportBase( std::move(evb), std::move(socket), useConnectionEndWithErrorCallback), - lossTimeout_(this), ackTimeout_(this), pathValidationTimeout_(this), - idleTimeout_(this), keepaliveTimeout_(this), drainTimeout_(this), pingTimeout_(this), - excessWriteTimeout_(this), readLooper_(new FunctionLooper( evb_, [this]() { invokeReadDataAndCallbacks(); }, @@ -66,11 +63,7 @@ QuicTransportBase::QuicTransportBase( peekLooper_(new FunctionLooper( evb_, [this]() { invokePeekDataAndCallbacks(); }, - LooperType::PeekLooper)), - writeLooper_(new FunctionLooper( - evb_, - [this]() { pacedWriteDataToSocket(); }, - LooperType::WriteLooper)) { + LooperType::PeekLooper)) { writeLooper_->setPacingFunction([this]() -> auto { if (isConnectionPaced(*conn_)) { return conn_->pacer->getTimeUntilNextWrite(); @@ -85,22 +78,6 @@ QuicTransportBase::QuicTransportBase( } } -void QuicTransportBase::scheduleTimeout( - QuicTimerCallback* callback, - std::chrono::milliseconds timeout) { - if (evb_) { - evb_->scheduleTimeout(callback, timeout); - } -} - -void QuicTransportBase::cancelTimeout(QuicTimerCallback* callback) { - callback->cancelTimerCallback(); -} - -bool QuicTransportBase::isTimeoutScheduled(QuicTimerCallback* callback) const { - return callback->isTimerCallbackScheduled(); -} - void QuicTransportBase::setPacingTimer( QuicTimer::SharedPtr pacingTimer) noexcept { if (pacingTimer) { @@ -757,63 +734,6 @@ QuicTransportBase::pauseOrResumeRead(StreamId id, bool resume) { return folly::unit; } -void QuicTransportBase::invokeReadDataAndCallbacks() { - auto self = sharedGuard(); - SCOPE_EXIT { - self->checkForClosedStream(); - self->updateReadLooper(); - self->updateWriteLooper(true); - }; - // Need a copy since the set can change during callbacks. - std::vector readableStreamsCopy; - const auto& readableStreams = self->conn_->streamManager->readableStreams(); - readableStreamsCopy.reserve(readableStreams.size()); - std::copy( - readableStreams.begin(), - readableStreams.end(), - std::back_inserter(readableStreamsCopy)); - if (self->conn_->transportSettings.orderedReadCallbacks) { - std::sort(readableStreamsCopy.begin(), readableStreamsCopy.end()); - } - for (StreamId streamId : readableStreamsCopy) { - auto callback = self->readCallbacks_.find(streamId); - if (callback == self->readCallbacks_.end()) { - // Stream doesn't have a read callback set, skip it. - continue; - } - auto readCb = callback->second.readCb; - auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId)); - if (readCb && stream->streamReadError) { - self->conn_->streamManager->readableStreams().erase(streamId); - readCallbacks_.erase(callback); - // if there is an error on the stream - it's not readable anymore, so - // we cannot peek into it as well. - self->conn_->streamManager->peekableStreams().erase(streamId); - peekCallbacks_.erase(streamId); - VLOG(10) << "invoking read error callbacks on stream=" << streamId << " " - << *this; - if (!stream->groupId) { - readCb->readError(streamId, QuicError(*stream->streamReadError)); - } else { - readCb->readErrorWithGroup( - streamId, *stream->groupId, QuicError(*stream->streamReadError)); - } - } else if ( - readCb && callback->second.resumed && stream->hasReadableData()) { - VLOG(10) << "invoking read callbacks on stream=" << streamId << " " - << *this; - if (!stream->groupId) { - readCb->readAvailable(streamId); - } else { - readCb->readAvailableWithGroup(streamId, *stream->groupId); - } - } - } - if (self->datagramCallback_ && !conn_->datagramState.readBuffer.empty()) { - self->datagramCallback_->onDatagramsAvailable(); - } -} - void QuicTransportBase::updateReadLooper() { if (closeState_ != CloseState::OPEN) { VLOG(10) << "Stopping read looper " << *this; @@ -909,61 +829,6 @@ QuicTransportBase::pauseOrResumePeek(StreamId id, bool resume) { return folly::unit; } -void QuicTransportBase::invokePeekDataAndCallbacks() { - auto self = sharedGuard(); - SCOPE_EXIT { - self->checkForClosedStream(); - self->updatePeekLooper(); - self->updateWriteLooper(true); - }; - // TODO: add protection from calling "consume" in the middle of the peek - - // one way is to have a peek counter that is incremented when peek calblack - // is called and decremented when peek is done. once counter transitions - // to 0 we can execute "consume" calls that were done during "peek", for that, - // we would need to keep stack of them. - std::vector peekableStreamsCopy; - const auto& peekableStreams = self->conn_->streamManager->peekableStreams(); - peekableStreamsCopy.reserve(peekableStreams.size()); - std::copy( - peekableStreams.begin(), - peekableStreams.end(), - std::back_inserter(peekableStreamsCopy)); - VLOG(10) << __func__ - << " peekableListCopy.size()=" << peekableStreamsCopy.size(); - for (StreamId streamId : peekableStreamsCopy) { - auto callback = self->peekCallbacks_.find(streamId); - // This is a likely bug. Need to think more on whether events can - // be dropped - // remove streamId from list of peekable - as opposed to "read", "peek" is - // only called once per streamId and not on every EVB loop until application - // reads the data. - self->conn_->streamManager->peekableStreams().erase(streamId); - if (callback == self->peekCallbacks_.end()) { - VLOG(10) << " No peek callback for stream=" << streamId; - continue; - } - auto peekCb = callback->second.peekCb; - auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId)); - if (peekCb && stream->streamReadError) { - VLOG(10) << "invoking peek error callbacks on stream=" << streamId << " " - << *this; - peekCb->peekError(streamId, QuicError(*stream->streamReadError)); - } else if ( - peekCb && !stream->streamReadError && stream->hasPeekableData()) { - VLOG(10) << "invoking peek callbacks on stream=" << streamId << " " - << *this; - - peekDataFromQuicStream( - *stream, - [&](StreamId id, const folly::Range& peekRange) { - peekCb->onDataAvailable(id, peekRange); - }); - } else { - VLOG(10) << "Not invoking peek callbacks on stream=" << streamId; - } - } -} - void QuicTransportBase::invokeStreamsAvailableCallbacks() { if (conn_->streamManager->consumeMaxLocalBidirectionalStreamIdIncreased()) { // check in case new streams were created in preceding callbacks @@ -1905,34 +1770,6 @@ void QuicTransportBase::onNetworkData( } } -void QuicTransportBase::checkIdleTimer(TimePoint now) { - if (closeState_ == CloseState::CLOSED) { - return; - } - if (!idleTimeout_.isTimerCallbackScheduled()) { - return; - } - if (!idleTimeoutCheck_.lastTimeIdleTimeoutScheduled_.has_value()) { - return; - } - if (idleTimeoutCheck_.forcedIdleTimeoutScheduled_) { - return; - } - - if ((now - *idleTimeoutCheck_.lastTimeIdleTimeoutScheduled_) >= - idleTimeoutCheck_.idleTimeoutMs) { - // Call timer expiration async. - idleTimeoutCheck_.forcedIdleTimeoutScheduled_ = true; - runOnEvbAsync([](auto self) { - if (!self->good() || self->closeState_ == CloseState::CLOSED) { - // The connection was probably closed. - return; - } - self->idleTimeout_.timeoutExpired(); - }); - } -} - void QuicTransportBase::setIdleTimer() { if (closeState_ == CloseState::CLOSED) { return; @@ -2070,114 +1907,6 @@ StreamDirectionality QuicTransportBase::getStreamDirectionality( return quic::getStreamDirectionality(stream); } -folly::Expected -QuicTransportBase::notifyPendingWriteOnConnection( - QuicSocket::WriteCallback* wcb) { - if (closeState_ != CloseState::OPEN) { - return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); - } - if (connWriteCallback_ != nullptr) { - return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK); - } - // Assign the write callback before going into the loop so that if we close - // the connection while we are still scheduled, the write callback will get - // an error synchronously. - connWriteCallback_ = wcb; - runOnEvbAsync([](auto self) { - if (!self->connWriteCallback_) { - // The connection was probably closed. - return; - } - auto connWritableBytes = self->maxWritableOnConn(); - if (connWritableBytes != 0) { - auto connWriteCallback = self->connWriteCallback_; - self->connWriteCallback_ = nullptr; - connWriteCallback->onConnectionWriteReady(connWritableBytes); - } - }); - return folly::unit; -} - -folly::Expected -QuicTransportBase::unregisterStreamWriteCallback(StreamId id) { - if (!conn_->streamManager->streamExists(id)) { - return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); - } - if (pendingWriteCallbacks_.find(id) == pendingWriteCallbacks_.end()) { - return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); - } - pendingWriteCallbacks_.erase(id); - return folly::unit; -} - -folly::Expected -QuicTransportBase::notifyPendingWriteOnStream( - StreamId id, - QuicSocket::WriteCallback* wcb) { - if (isReceivingStream(conn_->nodeType, id)) { - return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); - } - if (closeState_ != CloseState::OPEN) { - return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); - } - if (!conn_->streamManager->streamExists(id)) { - return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); - } - auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id)); - if (!stream->writable()) { - return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED); - } - - if (wcb == nullptr) { - return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK); - } - // Add the callback to the pending write callbacks so that if we are closed - // while we are scheduled in the loop, the close will error out the - // callbacks. - auto wcbEmplaceResult = pendingWriteCallbacks_.emplace(id, wcb); - if (!wcbEmplaceResult.second) { - if ((wcbEmplaceResult.first)->second != wcb) { - return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK); - } else { - return folly::makeUnexpected(LocalErrorCode::CALLBACK_ALREADY_INSTALLED); - } - } - runOnEvbAsync([id](auto self) { - auto wcbIt = self->pendingWriteCallbacks_.find(id); - if (wcbIt == self->pendingWriteCallbacks_.end()) { - // the connection was probably closed. - return; - } - auto writeCallback = wcbIt->second; - if (!self->conn_->streamManager->streamExists(id)) { - self->pendingWriteCallbacks_.erase(wcbIt); - writeCallback->onStreamWriteError( - id, QuicError(LocalErrorCode::STREAM_NOT_EXISTS)); - return; - } - auto stream = CHECK_NOTNULL(self->conn_->streamManager->getStream(id)); - if (!stream->writable()) { - self->pendingWriteCallbacks_.erase(wcbIt); - writeCallback->onStreamWriteError( - id, QuicError(LocalErrorCode::STREAM_NOT_EXISTS)); - return; - } - auto maxCanWrite = self->maxWritableOnStream(*stream); - if (maxCanWrite != 0) { - self->pendingWriteCallbacks_.erase(wcbIt); - writeCallback->onStreamWriteReady(id, maxCanWrite); - } - }); - return folly::unit; -} - -uint64_t QuicTransportBase::maxWritableOnStream( - const QuicStreamState& stream) const { - auto connWritableBytes = maxWritableOnConn(); - auto streamFlowControlBytes = getSendStreamFlowControlBytesAPI(stream); - return std::min(streamFlowControlBytes, connWritableBytes); -} - QuicSocket::WriteResult QuicTransportBase::writeChain( StreamId id, Buf data, @@ -2263,110 +1992,6 @@ QuicTransportBase::registerTxCallback( return registerByteEventCallback(ByteEvent::Type::TX, id, offset, cb); } -folly::Expected -QuicTransportBase::registerByteEventCallback( - const ByteEvent::Type type, - const StreamId id, - const uint64_t offset, - ByteEventCallback* cb) { - if (isReceivingStream(conn_->nodeType, id)) { - return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); - } - if (closeState_ != CloseState::OPEN) { - return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); - } - [[maybe_unused]] auto self = sharedGuard(); - if (!conn_->streamManager->streamExists(id)) { - return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); - } - if (!cb) { - return folly::unit; - } - - ByteEventMap& byteEventMap = getByteEventMap(type); - auto byteEventMapIt = byteEventMap.find(id); - if (byteEventMapIt == byteEventMap.end()) { - byteEventMap.emplace( - id, - std::initializer_list::type::mapped_type::value_type>( - {{offset, cb}})); - } else { - // Keep ByteEvents for the same stream sorted by offsets: - auto pos = std::upper_bound( - byteEventMapIt->second.begin(), - byteEventMapIt->second.end(), - offset, - [&](uint64_t o, const ByteEventDetail& p) { return o < p.offset; }); - if (pos != byteEventMapIt->second.begin()) { - auto matchingEvent = std::find_if( - byteEventMapIt->second.begin(), - pos, - [offset, cb](const ByteEventDetail& p) { - return ((p.offset == offset) && (p.callback == cb)); - }); - if (matchingEvent != pos) { - // ByteEvent has been already registered for the same type, id, - // offset and for the same recipient, return an INVALID_OPERATION - // error to prevent duplicate registrations. - return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); - } - } - byteEventMapIt->second.emplace(pos, offset, cb); - } - auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id)); - - // Notify recipients that the registration was successful. - cb->onByteEventRegistered(ByteEvent{id, offset, type}); - - // if the callback is already ready, we still insert, but schedule to - // process - Optional maxOffsetReady; - switch (type) { - case ByteEvent::Type::ACK: - maxOffsetReady = getLargestDeliverableOffset(*stream); - break; - case ByteEvent::Type::TX: - maxOffsetReady = getLargestWriteOffsetTxed(*stream); - break; - } - if (maxOffsetReady.has_value() && (offset <= *maxOffsetReady)) { - runOnEvbAsync([id, cb, offset, type](auto selfObj) { - if (selfObj->closeState_ != CloseState::OPEN) { - // Close will error out all byte event callbacks. - return; - } - - auto& byteEventMapL = selfObj->getByteEventMap(type); - auto streamByteEventCbIt = byteEventMapL.find(id); - if (streamByteEventCbIt == byteEventMapL.end()) { - return; - } - - // This is scheduled to run in the future (during the next iteration of - // the event loop). It is possible that the ByteEventDetail list gets - // mutated between the time it was scheduled to now when we are ready to - // run it. Look at the current outstanding ByteEvents for this stream ID - // and confirm that our ByteEvent's offset and recipient callback are - // still present. - auto pos = std::find_if( - streamByteEventCbIt->second.begin(), - streamByteEventCbIt->second.end(), - [offset, cb](const ByteEventDetail& p) { - return ((p.offset == offset) && (p.callback == cb)); - }); - // if our byteEvent is not present, it must have been delivered already. - if (pos == streamByteEventCbIt->second.end()) { - return; - } - streamByteEventCbIt->second.erase(pos); - - cb->onByteEvent(ByteEvent{id, offset, type}); - }); - } - return folly::unit; -} - Optional QuicTransportBase::shutdownWrite(StreamId id) { if (isReceivingStream(conn_->nodeType, id)) { return LocalErrorCode::INVALID_OPERATION; @@ -2541,37 +2166,6 @@ void QuicTransportBase::sendPing(std::chrono::milliseconds pingTimeout) { } } -void QuicTransportBase::lossTimeoutExpired() noexcept { - CHECK_NE(closeState_, CloseState::CLOSED); - // onLossDetectionAlarm will set packetToSend in pending events - [[maybe_unused]] auto self = sharedGuard(); - try { - onLossDetectionAlarm(*conn_, markPacketLoss); - if (conn_->qLogger) { - conn_->qLogger->addTransportStateUpdate(kLossTimeoutExpired); - } - pacedWriteDataToSocket(); - } catch (const QuicTransportException& ex) { - VLOG(4) << __func__ << " " << ex.what() << " " << *this; - exceptionCloseWhat_ = ex.what(); - closeImpl(QuicError( - QuicErrorCode(ex.errorCode()), - std::string("lossTimeoutExpired() error"))); - } catch (const QuicInternalException& ex) { - VLOG(4) << __func__ << " " << ex.what() << " " << *this; - exceptionCloseWhat_ = ex.what(); - closeImpl(QuicError( - QuicErrorCode(ex.errorCode()), - std::string("lossTimeoutExpired() error"))); - } catch (const std::exception& ex) { - VLOG(4) << __func__ << " " << ex.what() << " " << *this; - exceptionCloseWhat_ = ex.what(); - closeImpl(QuicError( - QuicErrorCode(TransportErrorCode::INTERNAL_ERROR), - std::string("lossTimeoutExpired() error"))); - } -} - void QuicTransportBase::ackTimeoutExpired() noexcept { CHECK_NE(closeState_, CloseState::CLOSED); VLOG(10) << __func__ << " " << *this; @@ -2587,13 +2181,6 @@ void QuicTransportBase::pingTimeoutExpired() noexcept { } } -void QuicTransportBase::excessWriteTimeoutExpired() noexcept { - auto writeDataReason = shouldWriteData(*conn_); - if (writeDataReason != WriteDataReason::NO_WRITE) { - pacedWriteDataToSocket(); - } -} - void QuicTransportBase::pathValidationTimeoutExpired() noexcept { CHECK(conn_->outstandingPathValidation); @@ -2611,40 +2198,12 @@ void QuicTransportBase::pathValidationTimeoutExpired() noexcept { std::string("Path validation timed out"))); } -void QuicTransportBase::idleTimeoutExpired(bool drain) noexcept { - VLOG(4) << __func__ << " " << *this; - [[maybe_unused]] auto self = sharedGuard(); - // idle timeout is expired, just close the connection and drain or - // send connection close immediately depending on 'drain' - DCHECK_NE(closeState_, CloseState::CLOSED); - uint64_t numOpenStreans = conn_->streamManager->streamCount(); - auto localError = - drain ? LocalErrorCode::IDLE_TIMEOUT : LocalErrorCode::SHUTTING_DOWN; - closeImpl( - quic::QuicError( - QuicErrorCode(localError), - folly::to( - toString(localError), - ", num non control streams: ", - numOpenStreans - conn_->streamManager->numControlStreams())), - drain /* drainConnection */, - !drain /* sendCloseImmediately */); -} - void QuicTransportBase::keepaliveTimeoutExpired() noexcept { [[maybe_unused]] auto self = sharedGuard(); conn_->pendingEvents.sendPing = true; updateWriteLooper(true, conn_->transportSettings.inlineWriteAfterRead); } -void QuicTransportBase::scheduleLossTimeout(std::chrono::milliseconds timeout) { - if (closeState_ == CloseState::CLOSED) { - return; - } - timeout = timeMax(timeout, evb_->getTimerTickInterval()); - scheduleTimeout(&lossTimeout_, timeout); -} - void QuicTransportBase::scheduleAckTimeout() { if (closeState_ == CloseState::CLOSED) { return; @@ -2713,10 +2272,6 @@ void QuicTransportBase::schedulePathValidationTimeout() { } } -void QuicTransportBase::cancelLossTimeout() { - cancelTimeout(&lossTimeout_); -} - bool QuicTransportBase::isLossTimeoutScheduled() { return isTimeoutScheduled(&lossTimeout_); } @@ -2970,32 +2525,6 @@ QuicTransportBase::readDatagramBufs(size_t atMost) { return retDatagrams; } -void QuicTransportBase::writeSocketDataAndCatch() { - [[maybe_unused]] auto self = sharedGuard(); - try { - writeSocketData(); - processCallbacksAfterWriteData(); - } catch (const QuicTransportException& ex) { - VLOG(4) << __func__ << ex.what() << " " << *this; - exceptionCloseWhat_ = ex.what(); - closeImpl(QuicError( - QuicErrorCode(ex.errorCode()), - std::string("writeSocketDataAndCatch() error"))); - } catch (const QuicInternalException& ex) { - VLOG(4) << __func__ << ex.what() << " " << *this; - exceptionCloseWhat_ = ex.what(); - closeImpl(QuicError( - QuicErrorCode(ex.errorCode()), - std::string("writeSocketDataAndCatch() error"))); - } catch (const std::exception& ex) { - VLOG(4) << __func__ << " error=" << ex.what() << " " << *this; - exceptionCloseWhat_ = ex.what(); - closeImpl(QuicError( - QuicErrorCode(TransportErrorCode::INTERNAL_ERROR), - std::string("writeSocketDataAndCatch() error"))); - } -} - void QuicTransportBase::setTransportSettings( TransportSettings transportSettings) { if (conn_->nodeType == QuicNodeType::Client) { @@ -3324,21 +2853,6 @@ Optional QuicTransportBase::setControlStream(StreamId id) { return none; } -void QuicTransportBase::runOnEvbAsync( - folly::Function)> func) { - auto evb = getEventBase(); - evb->runInLoop( - [self = sharedGuard(), func = std::move(func), evb]() mutable { - if (self->getEventBase() != evb) { - // The eventbase changed between scheduling the loop and invoking - // the callback, ignore this - return; - } - func(std::move(self)); - }, - true); -} - void QuicTransportBase::onSocketWritable() noexcept { // Remove the writable callback. socket_->pauseWrite(); @@ -3349,83 +2863,6 @@ void QuicTransportBase::onSocketWritable() noexcept { writeLooper_->run(true /* thisIteration */); } -void QuicTransportBase::maybeStopWriteLooperAndArmSocketWritableEvent() { - if (!socket_ || (closeState_ == CloseState::CLOSED)) { - return; - } - if (conn_->transportSettings.useSockWritableEvents && - !socket_->isWritableCallbackSet()) { - // Check if all data has been written and we're not limited by flow - // control/congestion control. - auto writeReason = shouldWriteData(*conn_); - bool haveBufferToRetry = writeReason == WriteDataReason::BUFFERED_WRITE; - bool haveNewDataToWrite = - (writeReason != WriteDataReason::NO_WRITE) && !haveBufferToRetry; - bool haveCongestionControlWindow = true; - if (conn_->congestionController) { - haveCongestionControlWindow = - conn_->congestionController->getWritableBytes() > 0; - } - bool haveFlowControlWindow = getSendConnFlowControlBytesAPI(*conn_) > 0; - bool connHasWriteWindow = - haveCongestionControlWindow && haveFlowControlWindow; - if (haveBufferToRetry || (haveNewDataToWrite && connHasWriteWindow)) { - // Re-arm the write event and stop the write - // looper. - socket_->resumeWrite(this); - writeLooper_->stop(); - } - } -} - -void QuicTransportBase::pacedWriteDataToSocket() { - [[maybe_unused]] auto self = sharedGuard(); - SCOPE_EXIT { - self->maybeStopWriteLooperAndArmSocketWritableEvent(); - }; - - if (!isConnectionPaced(*conn_)) { - // Not paced and connection is still open, normal write. Even if pacing is - // previously enabled and then gets disabled, and we are here due to a - // timeout, we should do a normal write to flush out the residue from - // pacing write. - writeSocketDataAndCatch(); - - if (conn_->transportSettings.scheduleTimerForExcessWrites) { - // If we still have data to write, yield the event loop now but schedule a - // timeout to come around and write again as soon as possible. - auto writeDataReason = shouldWriteData(*conn_); - if (writeDataReason != WriteDataReason::NO_WRITE && - !excessWriteTimeout_.isTimerCallbackScheduled()) { - scheduleTimeout(&excessWriteTimeout_, 0ms); - } - } - return; - } - - // We are in the middle of a pacing interval. Leave it be. - if (writeLooper_->isPacingScheduled()) { - // The next burst is already scheduled. Since the burst size doesn't - // depend on much data we currently have in buffer at all, no need to - // change anything. - return; - } - - // Do a burst write before waiting for an interval. This will also call - // updateWriteLooper, but inside FunctionLooper we will ignore that. - writeSocketDataAndCatch(); -} - -void QuicTransportBase::describe(std::ostream& os) const { - CHECK(conn_); - os << *conn_; -} - -std::ostream& operator<<(std::ostream& os, const QuicTransportBase& qt) { - qt.describe(os); - return os; -} - inline std::ostream& operator<<( std::ostream& os, const CloseState& closeState) { @@ -3453,30 +2890,6 @@ QuicTransportBase::maybeResetStreamFromReadError( return folly::Expected(folly::unit); } -QuicTransportBase::ByteEventMap& QuicTransportBase::getByteEventMap( - const ByteEvent::Type type) { - switch (type) { - case ByteEvent::Type::ACK: - return deliveryCallbacks_; - case ByteEvent::Type::TX: - return txCallbacks_; - } - LOG(FATAL) << "Unhandled case in getByteEventMap"; - folly::assume_unreachable(); -} - -const QuicTransportBase::ByteEventMap& QuicTransportBase::getByteEventMapConst( - const ByteEvent::Type type) const { - switch (type) { - case ByteEvent::Type::ACK: - return deliveryCallbacks_; - case ByteEvent::Type::TX: - return txCallbacks_; - } - LOG(FATAL) << "Unhandled case in getByteEventMapConst"; - folly::assume_unreachable(); -} - void QuicTransportBase::onTransportKnobs(Buf knobBlob) { // Not yet implemented, VLOG(4) << "Received transport knobs: " diff --git a/quic/api/QuicTransportBase.h b/quic/api/QuicTransportBase.h index 36f97ce73..cd2a101c2 100644 --- a/quic/api/QuicTransportBase.h +++ b/quic/api/QuicTransportBase.h @@ -12,7 +12,6 @@ #include #include #include -#include #include #include #include @@ -37,8 +36,7 @@ namespace quic { */ class QuicTransportBase : public QuicSocket, public QuicTransportBaseLite, - QuicStreamPrioritiesObserver, - QuicAsyncUDPSocket::WriteCallback { + QuicStreamPrioritiesObserver { public: QuicTransportBase( std::shared_ptr evb, @@ -47,10 +45,6 @@ class QuicTransportBase : public QuicSocket, ~QuicTransportBase() override; - void scheduleTimeout( - QuicTimerCallback* callback, - std::chrono::milliseconds timeout); - void setPacingTimer(QuicTimer::SharedPtr pacingTimer) noexcept; Optional getClientConnectionId() const override; @@ -155,16 +149,6 @@ class QuicTransportBase : public QuicSocket, StreamDirectionality getStreamDirectionality( StreamId stream) noexcept override; - folly::Expected notifyPendingWriteOnStream( - StreamId id, - QuicSocket::WriteCallback* wcb) override; - - folly::Expected notifyPendingWriteOnConnection( - QuicSocket::WriteCallback* wcb) override; - - folly::Expected unregisterStreamWriteCallback( - StreamId id) override; - WriteResult writeChain( StreamId id, Buf data, @@ -282,12 +266,6 @@ class QuicTransportBase : public QuicSocket, */ virtual void unbindConnection() = 0; - /** - * Returns a shared_ptr which can be used as a guard to keep this - * object alive. - */ - virtual std::shared_ptr sharedGuard() = 0; - folly::Expected setStreamPriority( StreamId id, Priority priority) override; @@ -319,16 +297,6 @@ class QuicTransportBase : public QuicSocket, const uint64_t offset, ByteEventCallback* cb) override; - /** - * Register a byte event to be triggered when specified event type occurs for - * the specified stream and offset. - */ - folly::Expected registerByteEventCallback( - const ByteEvent::Type type, - const StreamId id, - const uint64_t offset, - ByteEventCallback* cb) override; - /** * Cancel byte event callbacks for given stream. * @@ -404,28 +372,6 @@ class QuicTransportBase : public QuicSocket, */ virtual void createBufAccessor(size_t /* capacity */) {} - // Timeout functions - class LossTimeout : public QuicTimerCallback { - public: - ~LossTimeout() override = default; - - explicit LossTimeout(QuicTransportBase* transport) - : transport_(transport) {} - - void timeoutExpired() noexcept override { - transport_->lossTimeoutExpired(); - } - - virtual void callbackCanceled() noexcept override { - // ignore. this usually means that the eventbase is dying, so we will be - // canceled anyway - return; - } - - private: - QuicTransportBase* transport_; - }; - class AckTimeout : public QuicTimerCallback { public: ~AckTimeout() override = default; @@ -466,26 +412,6 @@ class QuicTransportBase : public QuicSocket, QuicTransportBase* transport_; }; - class ExcessWriteTimeout : public QuicTimerCallback { - public: - ~ExcessWriteTimeout() override = default; - - explicit ExcessWriteTimeout(QuicTransportBase* transport) - : transport_(transport) {} - - void timeoutExpired() noexcept override { - transport_->excessWriteTimeoutExpired(); - } - - void callbackCanceled() noexcept override { - // Do nothing. - return; - } - - private: - QuicTransportBase* transport_; - }; - class PathValidationTimeout : public QuicTimerCallback { public: ~PathValidationTimeout() override = default; @@ -507,27 +433,6 @@ class QuicTransportBase : public QuicSocket, QuicTransportBase* transport_; }; - class IdleTimeout : public QuicTimerCallback { - public: - ~IdleTimeout() override = default; - - explicit IdleTimeout(QuicTransportBase* transport) - : transport_(transport) {} - - void timeoutExpired() noexcept override { - transport_->idleTimeoutExpired(true /* drain */); - } - - void callbackCanceled() noexcept override { - // skip drain when canceling the timeout, to avoid scheduling a new - // drain timeout - transport_->idleTimeoutExpired(false /* drain */); - } - - private: - QuicTransportBase* transport_; - }; - class KeepaliveTimeout : public QuicTimerCallback { public: ~KeepaliveTimeout() override = default; @@ -564,8 +469,6 @@ class QuicTransportBase : public QuicSocket, QuicTransportBase* transport_; }; - void scheduleLossTimeout(std::chrono::milliseconds timeout) override; - void cancelLossTimeout() override; bool isLossTimeoutScheduled() override; // TODO: make this const again // If you don't set it, the default is Cubic @@ -576,8 +479,6 @@ class QuicTransportBase : public QuicSocket, void setThrottlingSignalProvider( std::shared_ptr) override; - void describe(std::ostream& os) const; - virtual void setQLogger(std::shared_ptr qLogger); void setLoopDetectorCallback(std::shared_ptr callback) { @@ -651,13 +552,11 @@ class QuicTransportBase : public QuicSocket, void updateCongestionControlSettings( const TransportSettings& transportSettings); void updateSocketTosSettings(uint8_t dscpValue); - void processCallbacksAfterWriteData(); + void processCallbacksAfterWriteData() override; void processCallbacksAfterNetworkData(); - void invokeReadDataAndCallbacks(); - void invokePeekDataAndCallbacks(); void invokeStreamsAvailableCallbacks(); - void updateReadLooper(); - void updatePeekLooper(); + void updateReadLooper() override; + void updatePeekLooper() override; void updateWriteLooper(bool thisIteration, bool runInline = false) override; void handlePingCallbacks(); void handleKnobCallbacks(); @@ -680,13 +579,10 @@ class QuicTransportBase : public QuicSocket, void cleanupAckEventState(); - void runOnEvbAsync( - folly::Function)> func); - void closeImpl( Optional error, bool drainConnection = true, - bool sendCloseImmediately = true); + bool sendCloseImmediately = true) override; void closeUdpSocket(); folly::Expected pauseOrResumeRead( StreamId id, @@ -694,7 +590,7 @@ class QuicTransportBase : public QuicSocket, folly::Expected pauseOrResumePeek( StreamId id, bool resume); - void checkForClosedStream(); + void checkForClosedStream() override; folly::Expected setReadCallbackInternal( StreamId id, ReadCallback* cb, @@ -706,35 +602,11 @@ class QuicTransportBase : public QuicSocket, bool bidirectional, const OptionalIntegral& streamGroupId = std::nullopt); - /** - * A wrapper around writeSocketData - * - * writeSocketDataAndCatch protects writeSocketData in a try-catch. It also - * dispatch the next write loop. - */ - void writeSocketDataAndCatch(); - - /** - * Paced write data to socket when connection is paced. - * - * Whether connection is paced will be decided by TransportSettings and - * congection controller. When the connection is paced, this function writes - * out a burst size of packets and let the writeLooper schedule a callback to - * write another burst after a pacing interval if there are more data to - * write. When the connection isn't paced, this function does a normal write. - */ - void pacedWriteDataToSocket(); - - uint64_t maxWritableOnStream(const QuicStreamState&) const; - - void lossTimeoutExpired() noexcept; void ackTimeoutExpired() noexcept; void pathValidationTimeoutExpired() noexcept; - void idleTimeoutExpired(bool drain) noexcept; void keepaliveTimeoutExpired() noexcept; void drainTimeoutExpired() noexcept; void pingTimeoutExpired() noexcept; - void excessWriteTimeoutExpired() noexcept; void setIdleTimer() override; void scheduleAckTimeout() override; @@ -759,18 +631,6 @@ class QuicTransportBase : public QuicSocket, */ virtual void onTransportKnobs(Buf knobBlob); - struct ByteEventDetail { - ByteEventDetail(uint64_t offsetIn, ByteEventCallback* callbackIn) - : offset(offsetIn), callback(callbackIn) {} - uint64_t offset; - ByteEventCallback* callback; - }; - - using ByteEventMap = folly::F14FastMap>; - ByteEventMap& getByteEventMap(const ByteEvent::Type type); - FOLLY_NODISCARD const ByteEventMap& getByteEventMapConst( - const ByteEvent::Type type) const; - /** * Helper function that calls passed function for each ByteEvent type. * @@ -796,52 +656,22 @@ class QuicTransportBase : public QuicSocket, */ Optional getAdditionalCmsgsForAsyncUDPSocket(); - struct ReadCallbackData { - ReadCallback* readCb; - bool resumed{true}; - bool deliveredEOM{false}; - - ReadCallbackData(ReadCallback* readCallback) : readCb(readCallback) {} - }; - - struct PeekCallbackData { - PeekCallback* peekCb; - bool resumed{true}; - - PeekCallbackData(PeekCallback* peekCallback) : peekCb(peekCallback) {} - }; - - folly::F14FastMap readCallbacks_; - folly::F14FastMap peekCallbacks_; - - ByteEventMap deliveryCallbacks_; - ByteEventMap txCallbacks_; - - DatagramCallback* datagramCallback_{nullptr}; PingCallback* pingCallback_{nullptr}; - QuicSocket::WriteCallback* connWriteCallback_{nullptr}; - std::map pendingWriteCallbacks_; bool handshakeDoneNotified_{false}; - LossTimeout lossTimeout_; AckTimeout ackTimeout_; PathValidationTimeout pathValidationTimeout_; - IdleTimeout idleTimeout_; KeepaliveTimeout keepaliveTimeout_; DrainTimeout drainTimeout_; PingTimeout pingTimeout_; - ExcessWriteTimeout excessWriteTimeout_; FunctionLooper::Ptr readLooper_; FunctionLooper::Ptr peekLooper_; - FunctionLooper::Ptr writeLooper_; // TODO: This is silly. We need a better solution. // Uninitialied local address as a fallback answer when socket isn't bound. folly::SocketAddress localFallbackAddress; - Optional exceptionCloseWhat_; - uint64_t qlogRefcnt_{0}; // Priority level threshold for background streams @@ -908,10 +738,6 @@ class QuicTransportBase : public QuicSocket, }; protected: - void cancelTimeout(QuicTimerCallback* callback); - - bool isTimeoutScheduled(QuicTimerCallback* callback) const; - /** * Helper function to validate that the number of ECN packet marks match the * expected value, depending on the ECN state of the connection. @@ -934,19 +760,6 @@ class QuicTransportBase : public QuicSocket, uint64_t packetLimit); void onSocketWritable() noexcept override; - void maybeStopWriteLooperAndArmSocketWritableEvent(); - - /** - * Checks the idle timer on write events, and if it's past the idle timeout, - * calls the timer finctions. - */ - void checkIdleTimer(TimePoint now); - struct IdleTimeoutCheck { - std::chrono::milliseconds idleTimeoutMs{0}; - Optional lastTimeIdleTimeoutScheduled_; - bool forcedIdleTimeoutScheduled_{false}; - }; - IdleTimeoutCheck idleTimeoutCheck_; private: /** @@ -972,6 +785,4 @@ class QuicTransportBase : public QuicSocket, [[nodiscard]] bool checkCustomRetransmissionProfilesEnabled() const; }; -std::ostream& operator<<(std::ostream& os, const QuicTransportBase& qt); - } // namespace quic diff --git a/quic/api/QuicTransportBaseLite.cpp b/quic/api/QuicTransportBaseLite.cpp index 816840660..29cfab093 100644 --- a/quic/api/QuicTransportBaseLite.cpp +++ b/quic/api/QuicTransportBaseLite.cpp @@ -18,6 +18,228 @@ constexpr auto APP_NO_ERROR = quic::GenericApplicationErrorCode::NO_ERROR; namespace quic { +inline std::ostream& operator<<( + std::ostream& os, + const CloseState& closeState) { + switch (closeState) { + case CloseState::OPEN: + os << "OPEN"; + break; + case CloseState::GRACEFUL_CLOSING: + os << "GRACEFUL_CLOSING"; + break; + case CloseState::CLOSED: + os << "CLOSED"; + break; + } + return os; +} + +folly::Expected +QuicTransportBaseLite::notifyPendingWriteOnConnection( + QuicSocket::WriteCallback* wcb) { + if (closeState_ != CloseState::OPEN) { + return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); + } + if (connWriteCallback_ != nullptr) { + return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK); + } + // Assign the write callback before going into the loop so that if we close + // the connection while we are still scheduled, the write callback will get + // an error synchronously. + connWriteCallback_ = wcb; + runOnEvbAsync([](auto self) { + if (!self->connWriteCallback_) { + // The connection was probably closed. + return; + } + auto connWritableBytes = self->maxWritableOnConn(); + if (connWritableBytes != 0) { + auto connWriteCallback = self->connWriteCallback_; + self->connWriteCallback_ = nullptr; + connWriteCallback->onConnectionWriteReady(connWritableBytes); + } + }); + return folly::unit; +} + +folly::Expected +QuicTransportBaseLite::unregisterStreamWriteCallback(StreamId id) { + if (!conn_->streamManager->streamExists(id)) { + return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); + } + if (pendingWriteCallbacks_.find(id) == pendingWriteCallbacks_.end()) { + return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); + } + pendingWriteCallbacks_.erase(id); + return folly::unit; +} + +folly::Expected +QuicTransportBaseLite::notifyPendingWriteOnStream( + StreamId id, + QuicSocket::WriteCallback* wcb) { + if (isReceivingStream(conn_->nodeType, id)) { + return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); + } + if (closeState_ != CloseState::OPEN) { + return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); + } + if (!conn_->streamManager->streamExists(id)) { + return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); + } + auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id)); + if (!stream->writable()) { + return folly::makeUnexpected(LocalErrorCode::STREAM_CLOSED); + } + + if (wcb == nullptr) { + return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK); + } + // Add the callback to the pending write callbacks so that if we are closed + // while we are scheduled in the loop, the close will error out the + // callbacks. + auto wcbEmplaceResult = pendingWriteCallbacks_.emplace(id, wcb); + if (!wcbEmplaceResult.second) { + if ((wcbEmplaceResult.first)->second != wcb) { + return folly::makeUnexpected(LocalErrorCode::INVALID_WRITE_CALLBACK); + } else { + return folly::makeUnexpected(LocalErrorCode::CALLBACK_ALREADY_INSTALLED); + } + } + runOnEvbAsync([id](auto self) { + auto wcbIt = self->pendingWriteCallbacks_.find(id); + if (wcbIt == self->pendingWriteCallbacks_.end()) { + // the connection was probably closed. + return; + } + auto writeCallback = wcbIt->second; + if (!self->conn_->streamManager->streamExists(id)) { + self->pendingWriteCallbacks_.erase(wcbIt); + writeCallback->onStreamWriteError( + id, QuicError(LocalErrorCode::STREAM_NOT_EXISTS)); + return; + } + auto stream = CHECK_NOTNULL(self->conn_->streamManager->getStream(id)); + if (!stream->writable()) { + self->pendingWriteCallbacks_.erase(wcbIt); + writeCallback->onStreamWriteError( + id, QuicError(LocalErrorCode::STREAM_NOT_EXISTS)); + return; + } + auto maxCanWrite = self->maxWritableOnStream(*stream); + if (maxCanWrite != 0) { + self->pendingWriteCallbacks_.erase(wcbIt); + writeCallback->onStreamWriteReady(id, maxCanWrite); + } + }); + return folly::unit; +} + +folly::Expected +QuicTransportBaseLite::registerByteEventCallback( + const ByteEvent::Type type, + const StreamId id, + const uint64_t offset, + ByteEventCallback* cb) { + if (isReceivingStream(conn_->nodeType, id)) { + return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); + } + if (closeState_ != CloseState::OPEN) { + return folly::makeUnexpected(LocalErrorCode::CONNECTION_CLOSED); + } + [[maybe_unused]] auto self = sharedGuard(); + if (!conn_->streamManager->streamExists(id)) { + return folly::makeUnexpected(LocalErrorCode::STREAM_NOT_EXISTS); + } + if (!cb) { + return folly::unit; + } + + ByteEventMap& byteEventMap = getByteEventMap(type); + auto byteEventMapIt = byteEventMap.find(id); + if (byteEventMapIt == byteEventMap.end()) { + byteEventMap.emplace( + id, + std::initializer_list::type::mapped_type::value_type>( + {{offset, cb}})); + } else { + // Keep ByteEvents for the same stream sorted by offsets: + auto pos = std::upper_bound( + byteEventMapIt->second.begin(), + byteEventMapIt->second.end(), + offset, + [&](uint64_t o, const ByteEventDetail& p) { return o < p.offset; }); + if (pos != byteEventMapIt->second.begin()) { + auto matchingEvent = std::find_if( + byteEventMapIt->second.begin(), + pos, + [offset, cb](const ByteEventDetail& p) { + return ((p.offset == offset) && (p.callback == cb)); + }); + if (matchingEvent != pos) { + // ByteEvent has been already registered for the same type, id, + // offset and for the same recipient, return an INVALID_OPERATION + // error to prevent duplicate registrations. + return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); + } + } + byteEventMapIt->second.emplace(pos, offset, cb); + } + auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(id)); + + // Notify recipients that the registration was successful. + cb->onByteEventRegistered(ByteEvent{id, offset, type}); + + // if the callback is already ready, we still insert, but schedule to + // process + Optional maxOffsetReady; + switch (type) { + case ByteEvent::Type::ACK: + maxOffsetReady = getLargestDeliverableOffset(*stream); + break; + case ByteEvent::Type::TX: + maxOffsetReady = getLargestWriteOffsetTxed(*stream); + break; + } + if (maxOffsetReady.has_value() && (offset <= *maxOffsetReady)) { + runOnEvbAsync([id, cb, offset, type](auto selfObj) { + if (selfObj->closeState_ != CloseState::OPEN) { + // Close will error out all byte event callbacks. + return; + } + + auto& byteEventMapL = selfObj->getByteEventMap(type); + auto streamByteEventCbIt = byteEventMapL.find(id); + if (streamByteEventCbIt == byteEventMapL.end()) { + return; + } + + // This is scheduled to run in the future (during the next iteration of + // the event loop). It is possible that the ByteEventDetail list gets + // mutated between the time it was scheduled to now when we are ready to + // run it. Look at the current outstanding ByteEvents for this stream ID + // and confirm that our ByteEvent's offset and recipient callback are + // still present. + auto pos = std::find_if( + streamByteEventCbIt->second.begin(), + streamByteEventCbIt->second.end(), + [offset, cb](const ByteEventDetail& p) { + return ((p.offset == offset) && (p.callback == cb)); + }); + // if our byteEvent is not present, it must have been delivered already. + if (pos == streamByteEventCbIt->second.end()) { + return; + } + streamByteEventCbIt->second.erase(pos); + + cb->onByteEvent(ByteEvent{id, offset, type}); + }); + } + return folly::unit; +} + bool QuicTransportBaseLite::good() const { return closeState_ == CloseState::OPEN && hasWriteCipher() && !error(); } @@ -45,6 +267,13 @@ void QuicTransportBaseLite::setConnectionCallback( connCallback_ = callback; } +uint64_t QuicTransportBaseLite::maxWritableOnStream( + const QuicStreamState& stream) const { + auto connWritableBytes = maxWritableOnConn(); + auto streamFlowControlBytes = getSendStreamFlowControlBytesAPI(stream); + return std::min(streamFlowControlBytes, connWritableBytes); +} + void QuicTransportBaseLite::processConnectionSetupCallbacks( QuicError&& cancelCode) { // connSetupCallback_ could be null if start() was never @@ -73,6 +302,58 @@ void QuicTransportBaseLite::processConnectionCallbacks(QuicError&& cancelCode) { } } +QuicTransportBaseLite::ByteEventMap& QuicTransportBaseLite::getByteEventMap( + const ByteEvent::Type type) { + switch (type) { + case ByteEvent::Type::ACK: + return deliveryCallbacks_; + case ByteEvent::Type::TX: + return txCallbacks_; + } + LOG(FATAL) << "Unhandled case in getByteEventMap"; + folly::assume_unreachable(); +} + +const QuicTransportBaseLite::ByteEventMap& +QuicTransportBaseLite::getByteEventMapConst(const ByteEvent::Type type) const { + switch (type) { + case ByteEvent::Type::ACK: + return deliveryCallbacks_; + case ByteEvent::Type::TX: + return txCallbacks_; + } + LOG(FATAL) << "Unhandled case in getByteEventMapConst"; + folly::assume_unreachable(); +} + +void QuicTransportBaseLite::checkIdleTimer(TimePoint now) { + if (closeState_ == CloseState::CLOSED) { + return; + } + if (!idleTimeout_.isTimerCallbackScheduled()) { + return; + } + if (!idleTimeoutCheck_.lastTimeIdleTimeoutScheduled_.has_value()) { + return; + } + if (idleTimeoutCheck_.forcedIdleTimeoutScheduled_) { + return; + } + + if ((now - *idleTimeoutCheck_.lastTimeIdleTimeoutScheduled_) >= + idleTimeoutCheck_.idleTimeoutMs) { + // Call timer expiration async. + idleTimeoutCheck_.forcedIdleTimeoutScheduled_ = true; + runOnEvbAsync([](auto self) { + if (!self->good() || self->closeState_ == CloseState::CLOSED) { + // The connection was probably closed. + return; + } + self->idleTimeout_.timeoutExpired(); + }); + } +} + folly::Expected QuicTransportBaseLite::getStreamTransportInfo(StreamId id) const { if (!conn_->streamManager->streamExists(id)) { @@ -121,6 +402,114 @@ QuicTransportBaseLite::getStreamFlowControl(StreamId id) const { stream->flowControlState.advertisedMaxOffset); } +void QuicTransportBaseLite::runOnEvbAsync( + folly::Function)> func) { + auto evb = getEventBase(); + evb->runInLoop( + [self = sharedGuard(), func = std::move(func), evb]() mutable { + if (self->getEventBase() != evb) { + // The eventbase changed between scheduling the loop and invoking + // the callback, ignore this + return; + } + func(std::move(self)); + }, + true); +} + +void QuicTransportBaseLite::maybeStopWriteLooperAndArmSocketWritableEvent() { + if (!socket_ || (closeState_ == CloseState::CLOSED)) { + return; + } + if (conn_->transportSettings.useSockWritableEvents && + !socket_->isWritableCallbackSet()) { + // Check if all data has been written and we're not limited by flow + // control/congestion control. + auto writeReason = shouldWriteData(*conn_); + bool haveBufferToRetry = writeReason == WriteDataReason::BUFFERED_WRITE; + bool haveNewDataToWrite = + (writeReason != WriteDataReason::NO_WRITE) && !haveBufferToRetry; + bool haveCongestionControlWindow = true; + if (conn_->congestionController) { + haveCongestionControlWindow = + conn_->congestionController->getWritableBytes() > 0; + } + bool haveFlowControlWindow = getSendConnFlowControlBytesAPI(*conn_) > 0; + bool connHasWriteWindow = + haveCongestionControlWindow && haveFlowControlWindow; + if (haveBufferToRetry || (haveNewDataToWrite && connHasWriteWindow)) { + // Re-arm the write event and stop the write + // looper. + socket_->resumeWrite(this); + writeLooper_->stop(); + } + } +} + +void QuicTransportBaseLite::writeSocketDataAndCatch() { + [[maybe_unused]] auto self = sharedGuard(); + try { + writeSocketData(); + processCallbacksAfterWriteData(); + } catch (const QuicTransportException& ex) { + VLOG(4) << __func__ << ex.what() << " " << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(QuicError( + QuicErrorCode(ex.errorCode()), + std::string("writeSocketDataAndCatch() error"))); + } catch (const QuicInternalException& ex) { + VLOG(4) << __func__ << ex.what() << " " << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(QuicError( + QuicErrorCode(ex.errorCode()), + std::string("writeSocketDataAndCatch() error"))); + } catch (const std::exception& ex) { + VLOG(4) << __func__ << " error=" << ex.what() << " " << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(QuicError( + QuicErrorCode(TransportErrorCode::INTERNAL_ERROR), + std::string("writeSocketDataAndCatch() error"))); + } +} + +void QuicTransportBaseLite::pacedWriteDataToSocket() { + [[maybe_unused]] auto self = sharedGuard(); + SCOPE_EXIT { + self->maybeStopWriteLooperAndArmSocketWritableEvent(); + }; + + if (!isConnectionPaced(*conn_)) { + // Not paced and connection is still open, normal write. Even if pacing is + // previously enabled and then gets disabled, and we are here due to a + // timeout, we should do a normal write to flush out the residue from + // pacing write. + writeSocketDataAndCatch(); + + if (conn_->transportSettings.scheduleTimerForExcessWrites) { + // If we still have data to write, yield the event loop now but schedule a + // timeout to come around and write again as soon as possible. + auto writeDataReason = shouldWriteData(*conn_); + if (writeDataReason != WriteDataReason::NO_WRITE && + !excessWriteTimeout_.isTimerCallbackScheduled()) { + scheduleTimeout(&excessWriteTimeout_, 0ms); + } + } + return; + } + + // We are in the middle of a pacing interval. Leave it be. + if (writeLooper_->isPacingScheduled()) { + // The next burst is already scheduled. Since the burst size doesn't + // depend on much data we currently have in buffer at all, no need to + // change anything. + return; + } + + // Do a burst write before waiting for an interval. This will also call + // updateWriteLooper, but inside FunctionLooper we will ignore that. + writeSocketDataAndCatch(); +} + void QuicTransportBaseLite::writeSocketData() { if (socket_) { ++(conn_->writeCount); // incremented on each write (or write attempt) @@ -231,6 +620,68 @@ void QuicTransportBaseLite::writeSocketData() { updateWriteLooper(false); } +void QuicTransportBaseLite::cancelTimeout(QuicTimerCallback* callback) { + callback->cancelTimerCallback(); +} + +void QuicTransportBaseLite::excessWriteTimeoutExpired() noexcept { + auto writeDataReason = shouldWriteData(*conn_); + if (writeDataReason != WriteDataReason::NO_WRITE) { + pacedWriteDataToSocket(); + } +} + +void QuicTransportBaseLite::lossTimeoutExpired() noexcept { + CHECK_NE(closeState_, CloseState::CLOSED); + // onLossDetectionAlarm will set packetToSend in pending events + [[maybe_unused]] auto self = sharedGuard(); + try { + onLossDetectionAlarm(*conn_, markPacketLoss); + if (conn_->qLogger) { + conn_->qLogger->addTransportStateUpdate(kLossTimeoutExpired); + } + pacedWriteDataToSocket(); + } catch (const QuicTransportException& ex) { + VLOG(4) << __func__ << " " << ex.what() << " " << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(QuicError( + QuicErrorCode(ex.errorCode()), + std::string("lossTimeoutExpired() error"))); + } catch (const QuicInternalException& ex) { + VLOG(4) << __func__ << " " << ex.what() << " " << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(QuicError( + QuicErrorCode(ex.errorCode()), + std::string("lossTimeoutExpired() error"))); + } catch (const std::exception& ex) { + VLOG(4) << __func__ << " " << ex.what() << " " << *this; + exceptionCloseWhat_ = ex.what(); + closeImpl(QuicError( + QuicErrorCode(TransportErrorCode::INTERNAL_ERROR), + std::string("lossTimeoutExpired() error"))); + } +} + +void QuicTransportBaseLite::idleTimeoutExpired(bool drain) noexcept { + VLOG(4) << __func__ << " " << *this; + [[maybe_unused]] auto self = sharedGuard(); + // idle timeout is expired, just close the connection and drain or + // send connection close immediately depending on 'drain' + DCHECK_NE(closeState_, CloseState::CLOSED); + uint64_t numOpenStreans = conn_->streamManager->streamCount(); + auto localError = + drain ? LocalErrorCode::IDLE_TIMEOUT : LocalErrorCode::SHUTTING_DOWN; + closeImpl( + quic::QuicError( + QuicErrorCode(localError), + folly::to( + toString(localError), + ", num non control streams: ", + numOpenStreans - conn_->streamManager->numControlStreams())), + drain /* drainConnection */, + !drain /* sendCloseImmediately */); +} + bool QuicTransportBaseLite::processCancelCode(const QuicError& cancelCode) { bool noError = false; switch (cancelCode.code.type()) { @@ -268,6 +719,144 @@ uint64_t QuicTransportBaseLite::maxWritableOnConn() const { return ret; } +void QuicTransportBaseLite::scheduleTimeout( + QuicTimerCallback* callback, + std::chrono::milliseconds timeout) { + if (evb_) { + evb_->scheduleTimeout(callback, timeout); + } +} + +void QuicTransportBaseLite::scheduleLossTimeout( + std::chrono::milliseconds timeout) { + if (closeState_ == CloseState::CLOSED) { + return; + } + timeout = timeMax(timeout, evb_->getTimerTickInterval()); + scheduleTimeout(&lossTimeout_, timeout); +} + +void QuicTransportBaseLite::cancelLossTimeout() { + cancelTimeout(&lossTimeout_); +} + +bool QuicTransportBaseLite::isTimeoutScheduled( + QuicTimerCallback* callback) const { + return callback->isTimerCallbackScheduled(); +} + +void QuicTransportBaseLite::invokeReadDataAndCallbacks() { + auto self = sharedGuard(); + SCOPE_EXIT { + self->checkForClosedStream(); + self->updateReadLooper(); + self->updateWriteLooper(true); + }; + // Need a copy since the set can change during callbacks. + std::vector readableStreamsCopy; + const auto& readableStreams = self->conn_->streamManager->readableStreams(); + readableStreamsCopy.reserve(readableStreams.size()); + std::copy( + readableStreams.begin(), + readableStreams.end(), + std::back_inserter(readableStreamsCopy)); + if (self->conn_->transportSettings.orderedReadCallbacks) { + std::sort(readableStreamsCopy.begin(), readableStreamsCopy.end()); + } + for (StreamId streamId : readableStreamsCopy) { + auto callback = self->readCallbacks_.find(streamId); + if (callback == self->readCallbacks_.end()) { + // Stream doesn't have a read callback set, skip it. + continue; + } + auto readCb = callback->second.readCb; + auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId)); + if (readCb && stream->streamReadError) { + self->conn_->streamManager->readableStreams().erase(streamId); + readCallbacks_.erase(callback); + // if there is an error on the stream - it's not readable anymore, so + // we cannot peek into it as well. + self->conn_->streamManager->peekableStreams().erase(streamId); + peekCallbacks_.erase(streamId); + VLOG(10) << "invoking read error callbacks on stream=" << streamId << " " + << *this; + if (!stream->groupId) { + readCb->readError(streamId, QuicError(*stream->streamReadError)); + } else { + readCb->readErrorWithGroup( + streamId, *stream->groupId, QuicError(*stream->streamReadError)); + } + } else if ( + readCb && callback->second.resumed && stream->hasReadableData()) { + VLOG(10) << "invoking read callbacks on stream=" << streamId << " " + << *this; + if (!stream->groupId) { + readCb->readAvailable(streamId); + } else { + readCb->readAvailableWithGroup(streamId, *stream->groupId); + } + } + } + if (self->datagramCallback_ && !conn_->datagramState.readBuffer.empty()) { + self->datagramCallback_->onDatagramsAvailable(); + } +} + +void QuicTransportBaseLite::invokePeekDataAndCallbacks() { + auto self = sharedGuard(); + SCOPE_EXIT { + self->checkForClosedStream(); + self->updatePeekLooper(); + self->updateWriteLooper(true); + }; + // TODO: add protection from calling "consume" in the middle of the peek - + // one way is to have a peek counter that is incremented when peek calblack + // is called and decremented when peek is done. once counter transitions + // to 0 we can execute "consume" calls that were done during "peek", for that, + // we would need to keep stack of them. + std::vector peekableStreamsCopy; + const auto& peekableStreams = self->conn_->streamManager->peekableStreams(); + peekableStreamsCopy.reserve(peekableStreams.size()); + std::copy( + peekableStreams.begin(), + peekableStreams.end(), + std::back_inserter(peekableStreamsCopy)); + VLOG(10) << __func__ + << " peekableListCopy.size()=" << peekableStreamsCopy.size(); + for (StreamId streamId : peekableStreamsCopy) { + auto callback = self->peekCallbacks_.find(streamId); + // This is a likely bug. Need to think more on whether events can + // be dropped + // remove streamId from list of peekable - as opposed to "read", "peek" is + // only called once per streamId and not on every EVB loop until application + // reads the data. + self->conn_->streamManager->peekableStreams().erase(streamId); + if (callback == self->peekCallbacks_.end()) { + VLOG(10) << " No peek callback for stream=" << streamId; + continue; + } + auto peekCb = callback->second.peekCb; + auto stream = CHECK_NOTNULL(conn_->streamManager->getStream(streamId)); + if (peekCb && stream->streamReadError) { + VLOG(10) << "invoking peek error callbacks on stream=" << streamId << " " + << *this; + peekCb->peekError(streamId, QuicError(*stream->streamReadError)); + } else if ( + peekCb && !stream->streamReadError && stream->hasPeekableData()) { + VLOG(10) << "invoking peek callbacks on stream=" << streamId << " " + << *this; + + peekDataFromQuicStream( + *stream, + [&](StreamId id, const folly::Range& peekRange) { + peekCb->onDataAvailable(id, peekRange); + }); + } else { + VLOG(10) << "Not invoking peek callbacks on stream=" << streamId; + } + } +} + void QuicTransportBaseLite::updatePacketProcessorsPrewriteRequests() { folly::SocketCmsgMap cmsgs; for (const auto& pp : conn_->packetProcessors) { @@ -286,4 +875,14 @@ void QuicTransportBaseLite::updatePacketProcessorsPrewriteRequests() { conn_->socketCmsgsState.targetWriteCount = conn_->writeCount; } +void QuicTransportBaseLite::describe(std::ostream& os) const { + CHECK(conn_); + os << *conn_; +} + +std::ostream& operator<<(std::ostream& os, const QuicTransportBaseLite& qt) { + qt.describe(os); + return os; +} + } // namespace quic diff --git a/quic/api/QuicTransportBaseLite.h b/quic/api/QuicTransportBaseLite.h index 4ef5e8acc..d1406ced7 100644 --- a/quic/api/QuicTransportBaseLite.h +++ b/quic/api/QuicTransportBaseLite.h @@ -8,12 +8,14 @@ #pragma once #include +#include namespace quic { enum class CloseState { OPEN, GRACEFUL_CLOSING, CLOSED }; -class QuicTransportBaseLite : virtual public QuicSocketLite { +class QuicTransportBaseLite : virtual public QuicSocketLite, + QuicAsyncUDPSocket::WriteCallback { public: QuicTransportBaseLite( std::shared_ptr evb, @@ -21,7 +23,14 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { bool useConnectionEndWithErrorCallback) : evb_(evb), socket_(std::move(socket)), - useConnectionEndWithErrorCallback_(useConnectionEndWithErrorCallback) {} + useConnectionEndWithErrorCallback_(useConnectionEndWithErrorCallback), + lossTimeout_(this), + excessWriteTimeout_(this), + idleTimeout_(this), + writeLooper_(new FunctionLooper( + evb_, + [this]() { pacedWriteDataToSocket(); }, + LooperType::WriteLooper)) {} /** * Invoked when we have to write some data to the wire. @@ -31,6 +40,26 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { */ virtual void writeData() = 0; + folly::Expected notifyPendingWriteOnStream( + StreamId id, + QuicSocketLite::WriteCallback* wcb) override; + + folly::Expected notifyPendingWriteOnConnection( + QuicSocketLite::WriteCallback* wcb) override; + + folly::Expected unregisterStreamWriteCallback( + StreamId id) override; + + /** + * Register a byte event to be triggered when specified event type occurs for + * the specified stream and offset. + */ + folly::Expected registerByteEventCallback( + const ByteEvent::Type type, + const StreamId id, + const uint64_t offset, + ByteEventCallback* cb) override; + bool good() const override; bool error() const override; @@ -54,6 +83,8 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { void setSendBuffer(StreamId, size_t /*maxUnacked*/, size_t /*maxUnsent*/) override {} + uint64_t maxWritableOnStream(const QuicStreamState&) const; + [[nodiscard]] std::shared_ptr getEventBase() const override; folly::Expected getStreamTransportInfo( @@ -74,15 +105,76 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { [[nodiscard]] uint64_t maxWritableOnConn() const override; - virtual void scheduleLossTimeout(std::chrono::milliseconds /* timeout */) { - // TODO: Fill this in from QuicTransportBase and remove the "virtual" - // qualifier - } + void scheduleTimeout( + QuicTimerCallback* callback, + std::chrono::milliseconds timeout); - virtual void cancelLossTimeout() { - // TODO: Fill this in from QuicTransportBase and remove the "virtual" - // qualifier - } + class ExcessWriteTimeout : public QuicTimerCallback { + public: + ~ExcessWriteTimeout() override = default; + + explicit ExcessWriteTimeout(QuicTransportBaseLite* transport) + : transport_(transport) {} + + void timeoutExpired() noexcept override { + transport_->excessWriteTimeoutExpired(); + } + + void callbackCanceled() noexcept override { + // Do nothing. + return; + } + + private: + QuicTransportBaseLite* transport_; + }; + + // Timeout functions + class LossTimeout : public QuicTimerCallback { + public: + ~LossTimeout() override = default; + + explicit LossTimeout(QuicTransportBaseLite* transport) + : transport_(transport) {} + + void timeoutExpired() noexcept override { + transport_->lossTimeoutExpired(); + } + + virtual void callbackCanceled() noexcept override { + // ignore. this usually means that the eventbase is dying, so we will be + // canceled anyway + return; + } + + private: + QuicTransportBaseLite* transport_; + }; + + class IdleTimeout : public QuicTimerCallback { + public: + ~IdleTimeout() override = default; + + explicit IdleTimeout(QuicTransportBaseLite* transport) + : transport_(transport) {} + + void timeoutExpired() noexcept override { + transport_->idleTimeoutExpired(true /* drain */); + } + + void callbackCanceled() noexcept override { + // skip drain when canceling the timeout, to avoid scheduling a new + // drain timeout + transport_->idleTimeoutExpired(false /* drain */); + } + + private: + QuicTransportBaseLite* transport_; + }; + + void scheduleLossTimeout(std::chrono::milliseconds timeout); + + void cancelLossTimeout(); virtual bool isLossTimeoutScheduled() { // TODO: Fill this in from QuicTransportBase and remove the "virtual" @@ -90,7 +182,34 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { return false; } + /** + * Returns a shared_ptr which can be used as a guard to keep this + * object alive. + */ + virtual std::shared_ptr sharedGuard() = 0; + + void describe(std::ostream& os) const; + protected: + /** + * A wrapper around writeSocketData + * + * writeSocketDataAndCatch protects writeSocketData in a try-catch. It also + * dispatch the next write loop. + */ + void writeSocketDataAndCatch(); + + /** + * Paced write data to socket when connection is paced. + * + * Whether connection is paced will be decided by TransportSettings and + * congection controller. When the connection is paced, this function writes + * out a burst size of packets and let the writeLooper schedule a callback to + * write another burst after a pacing interval if there are more data to + * write. When the connection isn't paced, this function does a normal write. + */ + void pacedWriteDataToSocket(); + /** * write data to socket * @@ -101,6 +220,17 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { */ void writeSocketData(); + virtual void closeImpl( + Optional /* error */, + bool /* drainConnection */ = true, + bool /* sendCloseImmediately */ = true) { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + + void runOnEvbAsync( + folly::Function)> func); + virtual void updateWriteLooper( bool /* thisIteration */, bool /* runInline */ = false) { @@ -108,6 +238,34 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { // qualifier } + virtual void updateReadLooper() { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + + virtual void updatePeekLooper() { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + + void maybeStopWriteLooperAndArmSocketWritableEvent(); + + virtual void checkForClosedStream() { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + + void cancelTimeout(QuicTimerCallback* callback); + + void excessWriteTimeoutExpired() noexcept; + void lossTimeoutExpired() noexcept; + void idleTimeoutExpired(bool drain) noexcept; + + bool isTimeoutScheduled(QuicTimerCallback* callback) const; + + void invokeReadDataAndCallbacks(); + void invokePeekDataAndCallbacks(); + // Helpers to notify all registered observers about specific events during // socket write (if enabled in the observer's config). virtual void notifyStartWritingFromAppRateLimited() { @@ -126,6 +284,11 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { // qualifier } + virtual void processCallbacksAfterWriteData() { + // TODO: Fill this in from QuicTransportBase and remove the "virtual" + // qualifier + } + virtual void setIdleTimer() { // TODO: Fill this in from QuicTransportBase and remove the "virtual" // qualifier @@ -161,6 +324,64 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { bool transportReadyNotified_{false}; + struct ReadCallbackData { + ReadCallback* readCb; + bool resumed{true}; + bool deliveredEOM{false}; + + ReadCallbackData(ReadCallback* readCallback) : readCb(readCallback) {} + }; + + struct PeekCallbackData { + PeekCallback* peekCb; + bool resumed{true}; + + PeekCallbackData(PeekCallback* peekCallback) : peekCb(peekCallback) {} + }; + + DatagramCallback* datagramCallback_{nullptr}; + + folly::F14FastMap readCallbacks_; + folly::F14FastMap peekCallbacks_; + + std::map pendingWriteCallbacks_; + QuicSocketLite::WriteCallback* connWriteCallback_{nullptr}; + + struct ByteEventDetail { + ByteEventDetail(uint64_t offsetIn, ByteEventCallback* callbackIn) + : offset(offsetIn), callback(callbackIn) {} + uint64_t offset; + ByteEventCallback* callback; + }; + + using ByteEventMap = folly::F14FastMap>; + ByteEventMap& getByteEventMap(const ByteEvent::Type type); + FOLLY_NODISCARD const ByteEventMap& getByteEventMapConst( + const ByteEvent::Type type) const; + + ByteEventMap deliveryCallbacks_; + ByteEventMap txCallbacks_; + + /** + * Checks the idle timer on write events, and if it's past the idle timeout, + * calls the timer finctions. + */ + void checkIdleTimer(TimePoint now); + struct IdleTimeoutCheck { + std::chrono::milliseconds idleTimeoutMs{0}; + Optional lastTimeIdleTimeoutScheduled_; + bool forcedIdleTimeoutScheduled_{false}; + }; + IdleTimeoutCheck idleTimeoutCheck_; + + LossTimeout lossTimeout_; + ExcessWriteTimeout excessWriteTimeout_; + IdleTimeout idleTimeout_; + + FunctionLooper::Ptr writeLooper_; + + Optional exceptionCloseWhat_; + std:: unique_ptr conn_; @@ -175,4 +396,6 @@ class QuicTransportBaseLite : virtual public QuicSocketLite { void updatePacketProcessorsPrewriteRequests(); }; +std::ostream& operator<<(std::ostream& os, const QuicTransportBaseLite& qt); + } // namespace quic diff --git a/quic/api/test/QuicTransportBaseTest.cpp b/quic/api/test/QuicTransportBaseTest.cpp index a92568b70..53c17401c 100644 --- a/quic/api/test/QuicTransportBaseTest.cpp +++ b/quic/api/test/QuicTransportBaseTest.cpp @@ -357,7 +357,7 @@ class TestQuicTransport return conn_->oneRttWriteCipher != nullptr; } - std::shared_ptr sharedGuard() override { + std::shared_ptr sharedGuard() override { return shared_from_this(); } diff --git a/quic/api/test/TestQuicTransport.h b/quic/api/test/TestQuicTransport.h index 5f3bfa37c..43de5d57d 100644 --- a/quic/api/test/TestQuicTransport.h +++ b/quic/api/test/TestQuicTransport.h @@ -124,7 +124,7 @@ class TestQuicTransport return true; } - std::shared_ptr sharedGuard() override { + std::shared_ptr sharedGuard() override { return shared_from_this(); } diff --git a/quic/client/QuicClientTransport.cpp b/quic/client/QuicClientTransport.cpp index 2403e5ce6..8cd315ca9 100644 --- a/quic/client/QuicClientTransport.cpp +++ b/quic/client/QuicClientTransport.cpp @@ -1065,7 +1065,7 @@ bool QuicClientTransport::hasWriteCipher() const { return clientConn_->oneRttWriteCipher || clientConn_->zeroRttWriteCipher; } -std::shared_ptr QuicClientTransport::sharedGuard() { +std::shared_ptr QuicClientTransport::sharedGuard() { return shared_from_this(); } diff --git a/quic/client/QuicClientTransport.h b/quic/client/QuicClientTransport.h index 71165412b..9b7becb3b 100644 --- a/quic/client/QuicClientTransport.h +++ b/quic/client/QuicClientTransport.h @@ -153,7 +153,7 @@ class QuicClientTransport void closeTransport() override; void unbindConnection() override; bool hasWriteCipher() const override; - std::shared_ptr sharedGuard() override; + std::shared_ptr sharedGuard() override; // QuicAsyncUDPSocket::ReadCallback void onReadClosed() noexcept override {} diff --git a/quic/client/test/QuicClientTransportMock.h b/quic/client/test/QuicClientTransportMock.h index f3f88a15a..b3c901112 100644 --- a/quic/client/test/QuicClientTransportMock.h +++ b/quic/client/test/QuicClientTransportMock.h @@ -54,7 +54,6 @@ class QuicClientTransportMock : public QuicClientTransport { (const)); MOCK_METHOD((bool), isTLSResumed, (), (const)); MOCK_METHOD((ZeroRttAttemptState), getZeroRttState, ()); - MOCK_METHOD((void), closeImpl, (Optional, bool, bool)); MOCK_METHOD((void), close, (Optional)); MOCK_METHOD((void), writeData, ()); MOCK_METHOD((void), closeSecondSocket, ()); diff --git a/quic/server/QuicServerTransport.cpp b/quic/server/QuicServerTransport.cpp index 348fc25a1..e92894427 100644 --- a/quic/server/QuicServerTransport.cpp +++ b/quic/server/QuicServerTransport.cpp @@ -387,7 +387,7 @@ bool QuicServerTransport::hasReadCipher() const { conn_->readCodec->getOneRttReadCipher() != nullptr; } -std::shared_ptr QuicServerTransport::sharedGuard() { +std::shared_ptr QuicServerTransport::sharedGuard() { return shared_from_this(); } diff --git a/quic/server/QuicServerTransport.h b/quic/server/QuicServerTransport.h index 8467045cf..ba413c326 100644 --- a/quic/server/QuicServerTransport.h +++ b/quic/server/QuicServerTransport.h @@ -144,7 +144,7 @@ class QuicServerTransport void closeTransport() override; void unbindConnection() override; bool hasWriteCipher() const override; - std::shared_ptr sharedGuard() override; + std::shared_ptr sharedGuard() override; QuicConnectionStats getConnectionsStats() const override; WriteResult writeBufMeta(