/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace quic { enum class CloseState { OPEN, GRACEFUL_CLOSING, CLOSED }; /** * Base class for the QUIC Transport. Implements common behavior for both * clients and servers. QuicTransportBase assumes the following: * 1. It is intended to be sub-classed and used via the subclass directly. * 2. Assumes that the sub-class manages its ownership via a shared_ptr. * This is needed in order for QUIC to be able to live beyond the lifetime * of the object that holds it to send graceful close messages to the peer. */ class QuicTransportBase : public QuicSocket, QuicStreamPrioritiesObserver, QuicAsyncUDPSocket::WriteCallback { public: QuicTransportBase( std::shared_ptr evb, std::unique_ptr socket, bool useConnectionEndWithErrorCallback = false); ~QuicTransportBase() override; void scheduleTimeout( QuicTimerCallback* callback, std::chrono::milliseconds timeout); void setPacingTimer(QuicTimer::SharedPtr pacingTimer) noexcept; [[nodiscard]] std::shared_ptr getEventBase() const override; Optional getClientConnectionId() const override; Optional getServerConnectionId() const override; Optional getClientChosenDestConnectionId() const override; const folly::SocketAddress& getPeerAddress() const override; const folly::SocketAddress& getOriginalPeerAddress() const override; const folly::SocketAddress& getLocalAddress() const override; const std::shared_ptr getQLogger() const; // QuicSocket interface bool good() const override; bool replaySafe() const override; bool error() const override; void close(Optional error) override; void closeGracefully() override; void closeNow(Optional error) override; folly::Expected getStreamReadOffset( StreamId id) const override; folly::Expected getStreamWriteOffset( StreamId id) const override; folly::Expected getStreamWriteBufferedBytes( StreamId id) const override; TransportInfo getTransportInfo() const override; folly::Expected getStreamTransportInfo( StreamId id) const override; Optional getAppProtocol() const override; void setReceiveWindow(StreamId, size_t /*recvWindowSize*/) override {} void setSendBuffer(StreamId, size_t /*maxUnacked*/, size_t /*maxUnsent*/) override {} uint64_t getConnectionBufferAvailable() const override; uint64_t bufferSpaceAvailable() const; folly::Expected getConnectionFlowControl() const override; folly::Expected getStreamFlowControl(StreamId id) const override; folly::Expected getMaxWritableOnStream( StreamId id) const override; [[nodiscard]] uint64_t maxWritableOnConn() const override; folly::Expected setConnectionFlowControlWindow( uint64_t windowSize) override; folly::Expected setStreamFlowControlWindow( StreamId id, uint64_t windowSize) override; folly::Expected setReadCallback( StreamId id, ReadCallback* cb, Optional err = GenericApplicationErrorCode::NO_ERROR) override; void unsetAllReadCallbacks() override; void unsetAllPeekCallbacks() override; void unsetAllDeliveryCallbacks() override; folly::Expected pauseRead(StreamId id) override; folly::Expected resumeRead(StreamId id) override; folly::Expected stopSending( StreamId id, ApplicationErrorCode error) override; folly::Expected, LocalErrorCode> read( StreamId id, size_t maxLen) override; folly::Expected setPeekCallback( StreamId id, PeekCallback* cb) override; folly::Expected pausePeek(StreamId id) override; folly::Expected resumePeek(StreamId id) override; folly::Expected peek( StreamId id, const folly::Function&) const>& peekCallback) override; folly::Expected consume( StreamId id, size_t amount) override; folly::Expected>> consume(StreamId id, uint64_t offset, size_t amount) override; folly::Expected createBidirectionalStream( bool replaySafe = true) override; folly::Expected createUnidirectionalStream( bool replaySafe = true) override; folly::Expected createBidirectionalStreamGroup() override; folly::Expected createUnidirectionalStreamGroup() override; folly::Expected createBidirectionalStreamInGroup( StreamGroupId groupId) override; folly::Expected createUnidirectionalStreamInGroup( StreamGroupId groupId) override; uint64_t getNumOpenableBidirectionalStreams() const override; uint64_t getNumOpenableUnidirectionalStreams() const override; bool isClientStream(StreamId stream) noexcept override; bool isServerStream(StreamId stream) noexcept override; StreamInitiator getStreamInitiator(StreamId stream) noexcept override; bool isUnidirectionalStream(StreamId stream) noexcept override; bool isBidirectionalStream(StreamId stream) noexcept override; StreamDirectionality getStreamDirectionality( StreamId stream) noexcept override; folly::Expected notifyPendingWriteOnStream( StreamId id, QuicSocket::WriteCallback* wcb) override; folly::Expected notifyPendingWriteOnConnection( QuicSocket::WriteCallback* wcb) override; folly::Expected unregisterStreamWriteCallback( StreamId id) override; WriteResult writeChain( StreamId id, Buf data, bool eof, ByteEventCallback* cb = nullptr) override; folly::Expected registerDeliveryCallback( StreamId id, uint64_t offset, ByteEventCallback* cb) override; Optional shutdownWrite(StreamId id) override; folly::Expected resetStream( StreamId id, ApplicationErrorCode errorCode) override; folly::Expected maybeResetStreamFromReadError( StreamId id, QuicErrorCode error) override; folly::Expected setPingCallback( PingCallback* cb) override; void sendPing(std::chrono::milliseconds pingTimeout) override; const QuicConnectionStateBase* getState() const override { return conn_.get(); } // Interface with the Transport layer when data is available. // This is invoked when new data is received from the UDP socket. virtual void onNetworkData( const folly::SocketAddress& peer, NetworkData&& data) noexcept; virtual void setSupportedVersions(const std::vector& versions); virtual void setAckRxTimestampsEnabled(bool enableAckRxTimestamps); void setConnectionSetupCallback( folly::MaybeManagedPtr callback) final; void setConnectionCallback( folly::MaybeManagedPtr callback) final; void setEarlyDataAppParamsFunctions( folly::Function&, const Buf&) const> validator, folly::Function getter) final; bool isDetachable() override; void detachEventBase() override; void attachEventBase(std::shared_ptr evb) override; Optional setControlStream(StreamId id) override; /** * Set the initial flow control window for the connection. */ void setTransportSettings(TransportSettings transportSettings) override; /** * Sets the maximum pacing rate in Bytes per second to be used * if pacing is enabled. */ folly::Expected setMaxPacingRate( uint64_t maxRateBytesPerSec) override; /** * Set a "knob". This will emit a knob frame to the peer, which the peer * application can act on by e.g. changing transport settings during the * connection. */ folly::Expected setKnob(uint64_t knobSpace, uint64_t knobId, Buf knobBlob) override; /** * Can Knob Frames be exchanged with the peer on this connection? */ FOLLY_NODISCARD bool isKnobSupported() const override; /** * Set factory to create specific congestion controller instances * for a given connection. * Deletes current congestion controller instance, to create new controller * call setCongestionControl() or setTransportSettings(). */ virtual void setCongestionControllerFactory( std::shared_ptr factory); /** * Retrieve the transport settings */ const TransportSettings& getTransportSettings() const override; // Subclass API. /** * Invoked when a new packet is read from the network. * peer is the address of the peer that was in the packet. * The sub-class may throw an exception if there was an error in processing * the packet in which case the connection will be closed. */ virtual void onReadData( const folly::SocketAddress& peer, ReceivedUdpPacket&& udpPacket) = 0; /** * Invoked when we have to write some data to the wire. * The subclass may use this to start writing data to the socket. * It may also throw an exception in case of an error in which case the * connection will be closed. */ virtual void writeData() = 0; /** * closeTransport is invoked on the sub-class when the transport is closed. * The sub-class may clean up any state during this call. The transport * may still be draining after this call. */ virtual void closeTransport() = 0; /** * Invoked after the drain timeout has exceeded and the connection state will * be destroyed. */ virtual void unbindConnection() = 0; /** * Returns whether or not the connection has a write cipher. This will be used * to decide to return the onTransportReady() callbacks. */ virtual bool hasWriteCipher() const = 0; /** * Returns a shared_ptr which can be used as a guard to keep this * object alive. */ virtual std::shared_ptr sharedGuard() = 0; folly::Expected setStreamPriority( StreamId id, Priority priority) override; folly::Expected getStreamPriority( StreamId id) override; /** * Invoke onCanceled on all the delivery callbacks registered for streamId. */ void cancelDeliveryCallbacksForStream(StreamId id) override; /** * Invoke onCanceled on all the delivery callbacks registered for streamId for * offsets lower than the offset provided. */ void cancelDeliveryCallbacksForStream(StreamId id, uint64_t offset) override; /** * Register a callback to be invoked when the stream offset was transmitted. * * Currently, an offset is considered "transmitted" if it has been written to * to the underlying UDP socket, indicating that it has passed through * congestion control and pacing. In the future, this callback may be * triggered by socket/NIC software or hardware timestamps. */ folly::Expected registerTxCallback( const StreamId id, const uint64_t offset, ByteEventCallback* cb) override; /** * Register a byte event to be triggered when specified event type occurs for * the specified stream and offset. */ folly::Expected registerByteEventCallback( const ByteEvent::Type type, const StreamId id, const uint64_t offset, ByteEventCallback* cb) override; /** * Cancel byte event callbacks for given stream. * * If an offset is provided, cancels only callbacks with an offset less than * or equal to the provided offset, otherwise cancels all callbacks. */ void cancelByteEventCallbacksForStream( const StreamId id, const Optional& offset = none) override; /** * Cancel byte event callbacks for given type and stream. * * If an offset is provided, cancels only callbacks with an offset less than * or equal to the provided offset, otherwise cancels all callbacks. */ void cancelByteEventCallbacksForStream( const ByteEvent::Type type, const StreamId id, const Optional& offset = none) override; /** * Cancel all byte event callbacks of all streams. */ void cancelAllByteEventCallbacks() override; /** * Cancel all byte event callbacks of all streams of the given type. */ void cancelByteEventCallbacks(const ByteEvent::Type type) override; /** * Reset or send a stop sending on all non-control streams. Leaves the * connection otherwise unmodified. Note this will also trigger the * onStreamWriteError and readError callbacks immediately. */ void resetNonControlStreams( ApplicationErrorCode error, folly::StringPiece errorMsg) override; /** * Get the number of pending byte events for the given stream. */ FOLLY_NODISCARD size_t getNumByteEventCallbacksForStream(const StreamId id) const override; /** * Get the number of pending byte events of specified type for given stream. */ FOLLY_NODISCARD size_t getNumByteEventCallbacksForStream( const ByteEvent::Type type, const StreamId id) const override; /* * Set the background mode priority threshold and the target bw utilization * factor to use when in background mode. * * If all streams have equal or lower priority compares to the threshold * (value >= threshold), the connection is considered to be in background * mode. */ void setBackgroundModeParameters( PriorityLevel maxBackgroundPriority, float backgroundUtilizationFactor); /* * Disable background mode by clearing all related parameters. */ void clearBackgroundModeParameters(); /* * Creates buf accessor for use with in-place batch writer. */ virtual void createBufAccessor(size_t /* capacity */) {} // Timeout functions class LossTimeout : public QuicTimerCallback { public: ~LossTimeout() override = default; explicit LossTimeout(QuicTransportBase* transport) : transport_(transport) {} void timeoutExpired() noexcept override { transport_->lossTimeoutExpired(); } virtual void callbackCanceled() noexcept override { // ignore. this usually means that the eventbase is dying, so we will be // canceled anyway return; } private: QuicTransportBase* transport_; }; class AckTimeout : public QuicTimerCallback { public: ~AckTimeout() override = default; explicit AckTimeout(QuicTransportBase* transport) : transport_(transport) {} void timeoutExpired() noexcept override { transport_->ackTimeoutExpired(); } virtual void callbackCanceled() noexcept override { // ignore. this usually means that the eventbase is dying, so we will be // canceled anyway return; } private: QuicTransportBase* transport_; }; class PingTimeout : public QuicTimerCallback { public: ~PingTimeout() override = default; explicit PingTimeout(QuicTransportBase* transport) : transport_(transport) {} void timeoutExpired() noexcept override { transport_->pingTimeoutExpired(); } void callbackCanceled() noexcept override { // ignore, as this happens only when event base dies return; } private: QuicTransportBase* transport_; }; class ExcessWriteTimeout : public QuicTimerCallback { public: ~ExcessWriteTimeout() override = default; explicit ExcessWriteTimeout(QuicTransportBase* transport) : transport_(transport) {} void timeoutExpired() noexcept override { transport_->excessWriteTimeoutExpired(); } void callbackCanceled() noexcept override { // Do nothing. return; } private: QuicTransportBase* transport_; }; class PathValidationTimeout : public QuicTimerCallback { public: ~PathValidationTimeout() override = default; explicit PathValidationTimeout(QuicTransportBase* transport) : transport_(transport) {} void timeoutExpired() noexcept override { transport_->pathValidationTimeoutExpired(); } virtual void callbackCanceled() noexcept override { // ignore. this usually means that the eventbase is dying, so we will be // canceled anyway return; } private: QuicTransportBase* transport_; }; class IdleTimeout : public QuicTimerCallback { public: ~IdleTimeout() override = default; explicit IdleTimeout(QuicTransportBase* transport) : transport_(transport) {} void timeoutExpired() noexcept override { transport_->idleTimeoutExpired(true /* drain */); } void callbackCanceled() noexcept override { // skip drain when canceling the timeout, to avoid scheduling a new // drain timeout transport_->idleTimeoutExpired(false /* drain */); } private: QuicTransportBase* transport_; }; class KeepaliveTimeout : public QuicTimerCallback { public: ~KeepaliveTimeout() override = default; explicit KeepaliveTimeout(QuicTransportBase* transport) : transport_(transport) {} void timeoutExpired() noexcept override { transport_->keepaliveTimeoutExpired(); } void callbackCanceled() noexcept override { // Specifically do nothing since if we got canceled we shouldn't write. } private: QuicTransportBase* transport_; }; // DrainTimeout is a bit different from other timeouts. It needs to hold a // shared_ptr to the transport, since if a DrainTimeout is scheduled, // transport cannot die. class DrainTimeout : public QuicTimerCallback { public: ~DrainTimeout() override = default; explicit DrainTimeout(QuicTransportBase* transport) : transport_(transport) {} void timeoutExpired() noexcept override { transport_->drainTimeoutExpired(); } private: QuicTransportBase* transport_; }; void scheduleLossTimeout(std::chrono::milliseconds timeout); void cancelLossTimeout(); bool isLossTimeoutScheduled(); // TODO: make this const again // If you don't set it, the default is Cubic void setCongestionControl(CongestionControlType type) override; void addPacketProcessor( std::shared_ptr packetProcessor) override; void setThrottlingSignalProvider( std::shared_ptr) override; void describe(std::ostream& os) const; virtual void setQLogger(std::shared_ptr qLogger); void setLoopDetectorCallback(std::shared_ptr callback) { conn_->loopDetectorCallback = std::move(callback); } virtual void cancelAllAppCallbacks(const QuicError& error) noexcept; FOLLY_NODISCARD QuicConnectionStats getConnectionsStats() const override; /** * Set the read callback for Datagrams */ folly::Expected setDatagramCallback( DatagramCallback* cb) override; /** * Returns the maximum allowed Datagram payload size. * 0 means Datagram is not supported */ FOLLY_NODISCARD uint16_t getDatagramSizeLimit() const override; /** * Writes a Datagram frame. If buf is larger than the size limit returned by * getDatagramSizeLimit(), or if the write buffer is full, buf will simply be * dropped, and a LocalErrorCode will be returned to caller. */ folly::Expected writeDatagram(Buf buf) override; /** * Returns the currently available received Datagrams. * Returns all datagrams if atMost is 0. */ folly::Expected, LocalErrorCode> readDatagrams( size_t atMost = 0) override; /** * Returns the currently available received Datagram IOBufs. * Returns all datagrams if atMost is 0. */ folly::Expected, LocalErrorCode> readDatagramBufs( size_t atMost = 0) override; /** * Set control messages to be sent for socket_ write, note that it's for this * specific transport and does not change the other sockets sharing the same * fd. */ void setCmsgs(const folly::SocketCmsgMap& options); void appendCmsgs(const folly::SocketCmsgMap& options); /** * Sets the policy per stream group id. * If policy == std::nullopt, the policy is removed for corresponding stream * group id (reset to the default rtx policy). */ folly::Expected setStreamGroupRetransmissionPolicy( StreamGroupId groupId, std::optional policy) noexcept override; [[nodiscard]] const folly:: F14FastMap& getStreamGroupRetransmissionPolicies() const { return conn_->retransmissionPolicies; } protected: void updateCongestionControlSettings( const TransportSettings& transportSettings); void updateSocketTosSettings(uint8_t dscpValue); void processCallbacksAfterWriteData(); void processCallbacksAfterNetworkData(); void invokeReadDataAndCallbacks(); void invokePeekDataAndCallbacks(); void invokeStreamsAvailableCallbacks(); void updateReadLooper(); void updatePeekLooper(); void updateWriteLooper(bool thisIteration); void handlePingCallbacks(); void handleKnobCallbacks(); void handleAckEventCallbacks(); void handleCancelByteEventCallbacks(); void handleNewStreamCallbacks(std::vector& newPeerStreams); void handleNewGroupedStreamCallbacks(std::vector& newPeerStreams); void handleDeliveryCallbacks(); void handleStreamFlowControlUpdatedCallbacks( std::vector& streamStorage); void handleStreamStopSendingCallbacks(); void handleConnWritable(); /* * Observe changes in stream priorities and handle background mode. * * Implements the QuicStreamPrioritiesObserver interface */ void onStreamPrioritiesChange() override; void cleanupAckEventState(); void runOnEvbAsync( folly::Function)> func); void closeImpl( Optional error, bool drainConnection = true, bool sendCloseImmediately = true); void closeUdpSocket(); folly::Expected pauseOrResumeRead( StreamId id, bool resume); folly::Expected pauseOrResumePeek( StreamId id, bool resume); void checkForClosedStream(); folly::Expected setReadCallbackInternal( StreamId id, ReadCallback* cb, Optional err) noexcept; folly::Expected setPeekCallbackInternal( StreamId id, PeekCallback* cb) noexcept; folly::Expected createStreamInternal( bool bidirectional, const OptionalIntegral& streamGroupId = std::nullopt); /** * write data to socket * * At transport layer, this is the simplest form of write. It writes data * out to the network, and schedule necessary timers (ack, idle, loss). It is * both pacing oblivious and writeLooper oblivious. Caller needs to explicitly * invoke updateWriteLooper afterwards if that's desired. */ void writeSocketData(); /** * A wrapper around writeSocketData * * writeSocketDataAndCatch protects writeSocketData in a try-catch. It also * dispatch the next write loop. */ void writeSocketDataAndCatch(); /** * Paced write data to socket when connection is paced. * * Whether connection is paced will be decided by TransportSettings and * congection controller. When the connection is paced, this function writes * out a burst size of packets and let the writeLooper schedule a callback to * write another burst after a pacing interval if there are more data to * write. When the connection isn't paced, this function does a normal write. */ void pacedWriteDataToSocket(); uint64_t maxWritableOnStream(const QuicStreamState&) const; void lossTimeoutExpired() noexcept; void ackTimeoutExpired() noexcept; void pathValidationTimeoutExpired() noexcept; void idleTimeoutExpired(bool drain) noexcept; void keepaliveTimeoutExpired() noexcept; void drainTimeoutExpired() noexcept; void pingTimeoutExpired() noexcept; void excessWriteTimeoutExpired() noexcept; void setIdleTimer(); void scheduleAckTimeout(); void schedulePathValidationTimeout(); void schedulePingTimeout( PingCallback* callback, std::chrono::milliseconds pingTimeout); void validateCongestionAndPacing(CongestionControlType& type); // Helpers to notify all registered observers about specific events during // socket write (if enabled in the observer's config). void notifyStartWritingFromAppRateLimited(); void notifyPacketsWritten( const uint64_t numPacketsWritten, const uint64_t numAckElicitingPacketsWritten, const uint64_t numBytesWritten); void notifyAppRateLimited(); /** * Callback when we receive a transport knob */ virtual void onTransportKnobs(Buf knobBlob); struct ByteEventDetail { ByteEventDetail(uint64_t offsetIn, ByteEventCallback* callbackIn) : offset(offsetIn), callback(callbackIn) {} uint64_t offset; ByteEventCallback* callback; }; using ByteEventMap = folly::F14FastMap>; ByteEventMap& getByteEventMap(const ByteEvent::Type type); FOLLY_NODISCARD const ByteEventMap& getByteEventMapConst( const ByteEvent::Type type) const; /** * Helper function that calls passed function for each ByteEvent type. * * Removes number of locations to update when a byte event is added. */ void invokeForEachByteEventType( const std::function& fn) { for (const auto& type : ByteEvent::kByteEventTypes) { fn(type); } } void invokeForEachByteEventTypeConst( const std::function& fn) const { for (const auto& type : ByteEvent::kByteEventTypes) { fn(type); } } void resetConnectionCallbacks() { connSetupCallback_ = nullptr; connCallback_ = nullptr; } bool processCancelCode(const QuicError& cancelCode); void processConnectionSetupCallbacks(QuicError&& cancelCode); void processConnectionCallbacks(QuicError&& cancelCode); /** * The callback function for AsyncUDPSocket to provide the additional cmsgs * required by this QuicSocket's packet processors. */ Optional getAdditionalCmsgsForAsyncUDPSocket(); std::shared_ptr evb_; std::unique_ptr socket_; folly::MaybeManagedPtr connSetupCallback_{nullptr}; folly::MaybeManagedPtr connCallback_{nullptr}; // A flag telling transport if the new onConnectionEnd(error) cb must be used. bool useConnectionEndWithErrorCallback_{false}; std:: unique_ptr conn_; struct ReadCallbackData { ReadCallback* readCb; bool resumed{true}; bool deliveredEOM{false}; ReadCallbackData(ReadCallback* readCallback) : readCb(readCallback) {} }; struct PeekCallbackData { PeekCallback* peekCb; bool resumed{true}; PeekCallbackData(PeekCallback* peekCallback) : peekCb(peekCallback) {} }; folly::F14FastMap readCallbacks_; folly::F14FastMap peekCallbacks_; ByteEventMap deliveryCallbacks_; ByteEventMap txCallbacks_; DatagramCallback* datagramCallback_{nullptr}; PingCallback* pingCallback_{nullptr}; QuicSocket::WriteCallback* connWriteCallback_{nullptr}; std::map pendingWriteCallbacks_; CloseState closeState_{CloseState::OPEN}; bool transportReadyNotified_{false}; bool handshakeDoneNotified_{false}; LossTimeout lossTimeout_; AckTimeout ackTimeout_; PathValidationTimeout pathValidationTimeout_; IdleTimeout idleTimeout_; KeepaliveTimeout keepaliveTimeout_; DrainTimeout drainTimeout_; PingTimeout pingTimeout_; ExcessWriteTimeout excessWriteTimeout_; FunctionLooper::Ptr readLooper_; FunctionLooper::Ptr peekLooper_; FunctionLooper::Ptr writeLooper_; // TODO: This is silly. We need a better solution. // Uninitialied local address as a fallback answer when socket isn't bound. folly::SocketAddress localFallbackAddress; Optional exceptionCloseWhat_; uint64_t qlogRefcnt_{0}; // Priority level threshold for background streams // If all streams have equal or lower priority to the threshold // (value >= threshold), the connection is considered to be in background // mode. Optional backgroundPriorityThreshold_; Optional backgroundUtilizationFactor_; /** * Container for use in QuicTransportBase implementations. * * Contains a SocketObserverContainer and hands out weak or raw pointers. * * Weak pointers are used to meet the needs of QuicConnectionStateBase: * - QuicConnectionStateBase needs a pointer to the SocketObserverContainer * so that loss / ACK / other processing logic can access the observer * container and send the observers notifications. There may not be a * SocketObserverContainer if the QuicTransportBase implementation does * not support it. * * - A SocketObserverContainer must not outlive the instance of the * QuicTransportBase implementation that it is associated with. This is * because observers are notified that the object being observed has been * destroyed when the container is destroyed, and thus if the container * outlives the lifetime of the transport, then the observers will think * the transport is still alive when it is in fact dead. * - By storing a weak pointer to the SocketObserverContainer in the * QuicConnectionStateBase, we provide access to the observer container * without extending its lifetime. In parallel, because it is a managed * pointer, we avoid the possibility of dereferencing a stale pointer * (e.g., a pointer pointing to an object that has since been destroyed). * * We store a shared_ptr inside of this container and then distribute weak_ptr * to reduce the risk of a shared_ptr mistakenly * being held elsewhere. */ class WrappedSocketObserverContainer { public: explicit WrappedSocketObserverContainer(QuicSocket* socket) : observerContainer_( std::make_shared(socket)) {} [[nodiscard]] SocketObserverContainer* getPtr() const { return observerContainer_.get(); } [[nodiscard]] std::weak_ptr getWeakPtr() const { return observerContainer_; } // deleted constructors (unnecessary, difficult to safely support) WrappedSocketObserverContainer(const WrappedSocketObserverContainer&) = delete; WrappedSocketObserverContainer(WrappedSocketObserverContainer&&) = delete; WrappedSocketObserverContainer& operator=( const WrappedSocketObserverContainer&) = delete; WrappedSocketObserverContainer& operator=( WrappedSocketObserverContainer&& rhs) = delete; private: std::shared_ptr observerContainer_; }; protected: void cancelTimeout(QuicTimerCallback* callback); bool isTimeoutScheduled(QuicTimerCallback* callback) const; /** * Helper function to validate that the number of ECN packet marks match the * expected value, depending on the ECN state of the connection. * * If ECN is enabled, this function validates it's working correctly. If ECN * is not enabled or has already failed validation, this function does * nothing. */ void validateECNState(); WriteQuicDataResult handleInitialWriteDataCommon( const ConnectionId& srcConnId, const ConnectionId& dstConnId, uint64_t packetLimit, const std::string& token = ""); WriteQuicDataResult handleHandshakeWriteDataCommon( const ConnectionId& srcConnId, const ConnectionId& dstConnId, uint64_t packetLimit); void onSocketWritable() noexcept override; void maybeStopWriteLooperAndArmSocketWritableEvent(); /** * Checks the idle timer on write events, and if it's past the idle timeout, * calls the timer finctions. */ void checkIdleTimer(TimePoint now); struct IdleTimeoutCheck { std::chrono::milliseconds idleTimeoutMs{0}; Optional lastTimeIdleTimeoutScheduled_; bool forcedIdleTimeoutScheduled_{false}; }; IdleTimeoutCheck idleTimeoutCheck_; private: /** * Helper functions to handle new streams. */ void handleNewStreams(std::vector& newPeerStreams); void handleNewGroupedStreams(std::vector& newPeerStreams); /** * Helper to log new stream event to observer. */ void logStreamOpenEvent(StreamId streamId); /** * Helper to check if using custom retransmission profiles is feasible. * Custom retransmission profiles are only applicable when stream groups are * enabled, i.e. advertisedMaxStreamGroups in transport settings is > 0. */ [[nodiscard]] bool checkCustomRetransmissionProfilesEnabled() const; /** * Helper function to collect prewrite requests from the PacketProcessors * Currently this collects cmsgs to be written. The Cmsgs will be stored in * the connection state and passed to AsyncUDPSocket in the next * additionalCmsgs callback */ void updatePacketProcessorsPrewriteRequests(); }; std::ostream& operator<<(std::ostream& os, const QuicTransportBase& qt); } // namespace quic