1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-11-09 10:00:57 +03:00
Files
mvfst/quic/api/QuicTransportBase.h
Konstantin Tsoy 21ea1990ab Add a useSplitConnectionCallbacks to mvfst
Summary:
This change just adds a (currently no-op) flag that will be used in diffs up the stack.

The idea here is that I'll add split QUIC connection callback interfaces that will live side by side with existing single monolithic callback for now. We will experiment with split callbacks on small scale to see that there is no regressions and then will phase out the old callback gradually.

This flag is to control which callback(s) to use.

Reviewed By: mjoras

Differential Revision: D30399667

fbshipit-source-id: 8fc4e4a005e93cf6d48a987f49edee33b90dbbf1
2021-08-31 18:27:40 -07:00

888 lines
28 KiB
C++

/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
*/
#pragma once
#include <quic/QuicConstants.h>
#include <quic/QuicException.h>
#include <quic/api/QuicSocket.h>
#include <quic/common/FunctionLooper.h>
#include <quic/common/Timers.h>
#include <quic/congestion_control/CongestionControllerFactory.h>
#include <quic/congestion_control/Copa.h>
#include <quic/congestion_control/NewReno.h>
#include <quic/congestion_control/QuicCubic.h>
#include <quic/state/StateData.h>
#include <folly/ExceptionWrapper.h>
#include <folly/io/async/AsyncUDPSocket.h>
#include <folly/io/async/HHWheelTimer.h>
namespace quic {
enum class CloseState { OPEN, GRACEFUL_CLOSING, CLOSED };
/**
* Base class for the QUIC Transport. Implements common behavior for both
* clients and servers. QuicTransportBase assumes the following:
* 1. It is intended to be sub-classed and used via the subclass directly.
* 2. Assumes that the sub-class manages its ownership via a shared_ptr.
* This is needed in order for QUIC to be able to live beyond the lifetime
* of the object that holds it to send graceful close messages to the peer.
*/
class QuicTransportBase : public QuicSocket {
public:
QuicTransportBase(
folly::EventBase* evb,
std::unique_ptr<folly::AsyncUDPSocket> socket,
bool useSplitConnectionCallbacks = false);
~QuicTransportBase() override;
void setPacingTimer(TimerHighRes::SharedPtr pacingTimer) noexcept;
folly::EventBase* getEventBase() const override;
folly::Optional<ConnectionId> getClientConnectionId() const override;
folly::Optional<ConnectionId> getServerConnectionId() const override;
folly::Optional<ConnectionId> getClientChosenDestConnectionId()
const override;
const folly::SocketAddress& getPeerAddress() const override;
const folly::SocketAddress& getOriginalPeerAddress() const override;
const folly::SocketAddress& getLocalAddress() const override;
const std::shared_ptr<QLogger> getQLogger() const;
// QuicSocket interface
bool good() const override;
bool replaySafe() const override;
bool error() const override;
void close(
folly::Optional<std::pair<QuicErrorCode, std::string>> error) override;
void closeGracefully() override;
void closeNow(
folly::Optional<std::pair<QuicErrorCode, std::string>> error) override;
folly::Expected<size_t, LocalErrorCode> getStreamReadOffset(
StreamId id) const override;
folly::Expected<size_t, LocalErrorCode> getStreamWriteOffset(
StreamId id) const override;
folly::Expected<size_t, LocalErrorCode> getStreamWriteBufferedBytes(
StreamId id) const override;
TransportInfo getTransportInfo() const override;
folly::Expected<StreamTransportInfo, LocalErrorCode> getStreamTransportInfo(
StreamId id) const override;
folly::Optional<std::string> getAppProtocol() const override;
void setReceiveWindow(StreamId id, size_t recvWindowSize) override;
void setSendBuffer(StreamId id, size_t maxUnacked, size_t maxUnsent) override;
uint64_t getConnectionBufferAvailable() const override;
uint64_t bufferSpaceAvailable() const;
folly::Expected<QuicSocket::FlowControlState, LocalErrorCode>
getConnectionFlowControl() const override;
folly::Expected<QuicSocket::FlowControlState, LocalErrorCode>
getStreamFlowControl(StreamId id) const override;
folly::Expected<folly::Unit, LocalErrorCode> setConnectionFlowControlWindow(
uint64_t windowSize) override;
folly::Expected<folly::Unit, LocalErrorCode> setStreamFlowControlWindow(
StreamId id,
uint64_t windowSize) override;
folly::Expected<folly::Unit, LocalErrorCode> setReadCallback(
StreamId id,
ReadCallback* cb,
folly::Optional<ApplicationErrorCode> err =
GenericApplicationErrorCode::NO_ERROR) override;
void unsetAllReadCallbacks() override;
void unsetAllPeekCallbacks() override;
void unsetAllDeliveryCallbacks() override;
folly::Expected<folly::Unit, LocalErrorCode> pauseRead(StreamId id) override;
folly::Expected<folly::Unit, LocalErrorCode> resumeRead(StreamId id) override;
folly::Expected<folly::Unit, LocalErrorCode> stopSending(
StreamId id,
ApplicationErrorCode error) override;
folly::Expected<std::pair<Buf, bool>, LocalErrorCode> read(
StreamId id,
size_t maxLen) override;
folly::Expected<folly::Unit, LocalErrorCode> setPeekCallback(
StreamId id,
PeekCallback* cb) override;
folly::Expected<folly::Unit, LocalErrorCode> pausePeek(StreamId id) override;
folly::Expected<folly::Unit, LocalErrorCode> resumePeek(StreamId id) override;
folly::Expected<folly::Unit, LocalErrorCode> peek(
StreamId id,
const folly::Function<void(StreamId id, const folly::Range<PeekIterator>&)
const>& peekCallback) override;
folly::Expected<folly::Unit, LocalErrorCode> consume(
StreamId id,
size_t amount) override;
folly::Expected<
folly::Unit,
std::pair<LocalErrorCode, folly::Optional<uint64_t>>>
consume(StreamId id, uint64_t offset, size_t amount) override;
folly::Expected<StreamId, LocalErrorCode> createBidirectionalStream(
bool replaySafe = true) override;
folly::Expected<StreamId, LocalErrorCode> createUnidirectionalStream(
bool replaySafe = true) override;
uint64_t getNumOpenableBidirectionalStreams() const override;
uint64_t getNumOpenableUnidirectionalStreams() const override;
bool isClientStream(StreamId stream) noexcept override;
bool isServerStream(StreamId stream) noexcept override;
bool isUnidirectionalStream(StreamId stream) noexcept override;
bool isBidirectionalStream(StreamId stream) noexcept override;
folly::Expected<folly::Unit, LocalErrorCode> notifyPendingWriteOnStream(
StreamId id,
WriteCallback* wcb) override;
folly::Expected<folly::Unit, LocalErrorCode> notifyPendingWriteOnConnection(
WriteCallback* wcb) override;
folly::Expected<folly::Unit, LocalErrorCode> unregisterStreamWriteCallback(
StreamId id) override;
WriteResult writeChain(
StreamId id,
Buf data,
bool eof,
ByteEventCallback* cb = nullptr) override;
// TODO: Maybe I should virtualize DSR related APIs and only implement in
// QuicServerTransport
WriteResult writeBufMeta(
StreamId id,
const BufferMeta& data,
bool eof,
ByteEventCallback* cb = nullptr) override;
WriteResult setDSRPacketizationRequestSender(
StreamId id,
std::unique_ptr<DSRPacketizationRequestSender> sender) override;
folly::Expected<folly::Unit, LocalErrorCode> registerDeliveryCallback(
StreamId id,
uint64_t offset,
ByteEventCallback* cb) override;
folly::Optional<LocalErrorCode> shutdownWrite(StreamId id) override;
folly::Expected<folly::Unit, LocalErrorCode> resetStream(
StreamId id,
ApplicationErrorCode errorCode) override;
folly::Expected<folly::Unit, LocalErrorCode> maybeResetStreamFromReadError(
StreamId id,
QuicErrorCode error) override;
void sendPing(PingCallback* callback, std::chrono::milliseconds pingTimeout)
override;
const QuicConnectionStateBase* getState() const override {
return conn_.get();
}
// Interface with the Transport layer when data is available.
// This is invoked when new data is received from the UDP socket.
virtual void onNetworkData(
const folly::SocketAddress& peer,
NetworkData&& data) noexcept;
virtual void setSupportedVersions(const std::vector<QuicVersion>& versions);
void setConnectionCallback(ConnectionCallback* callback) final;
void setEarlyDataAppParamsFunctions(
folly::Function<bool(const folly::Optional<std::string>&, const Buf&)
const> validator,
folly::Function<Buf()> getter) final;
bool isDetachable() override;
void detachEventBase() override;
void attachEventBase(folly::EventBase* evb) override;
folly::Optional<LocalErrorCode> setControlStream(StreamId id) override;
/**
* Set the initial flow control window for the connection.
*/
void setTransportSettings(TransportSettings transportSettings) override;
/**
* Sets the maximum pacing rate in Bytes per second to be used
* if pacing is enabled.
*/
folly::Expected<folly::Unit, LocalErrorCode> setMaxPacingRate(
uint64_t maxRateBytesPerSec) override;
/**
* Set a "knob". This will emit a knob frame to the peer, which the peer
* application can act on by e.g. changing transport settings during the
* connection.
*/
folly::Expected<folly::Unit, LocalErrorCode>
setKnob(uint64_t knobSpace, uint64_t knobId, Buf knobBlob) override;
/**
* Can Knob Frames be exchanged with the peer on this connection?
*/
FOLLY_NODISCARD bool isKnobSupported() const override;
/**
* Set factory to create specific congestion controller instances
* for a given connection.
* Deletes current congestion controller instance, to create new controller
* call setCongestionControl() or setTransportSettings().
*/
virtual void setCongestionControllerFactory(
std::shared_ptr<CongestionControllerFactory> factory);
/**
* Retrieve the transport settings
*/
const TransportSettings& getTransportSettings() const override;
// Subclass API.
/**
* Invoked when a new packet is read from the network.
* peer is the address of the peer that was in the packet.
* The sub-class may throw an exception if there was an error in processing
* the packet in which case the connection will be closed.
*/
virtual void onReadData(
const folly::SocketAddress& peer,
NetworkDataSingle&& networkData) = 0;
/**
* Invoked when we have to write some data to the wire.
* The subclass may use this to start writing data to the socket.
* It may also throw an exception in case of an error in which case the
* connection will be closed.
*/
virtual void writeData() = 0;
/**
* closeTransport is invoked on the sub-class when the transport is closed.
* The sub-class may clean up any state during this call. The transport
* may still be draining after this call.
*/
virtual void closeTransport() = 0;
/**
* Invoked after the drain timeout has exceeded and the connection state will
* be destroyed.
*/
virtual void unbindConnection() = 0;
/**
* Returns whether or not the connection has a write cipher. This will be used
* to decide to return the onTransportReady() callbacks.
*/
virtual bool hasWriteCipher() const = 0;
/**
* Returns a shared_ptr which can be used as a guard to keep this
* object alive.
*/
virtual std::shared_ptr<QuicTransportBase> sharedGuard() = 0;
folly::Expected<folly::Unit, LocalErrorCode> setStreamPriority(
StreamId id,
PriorityLevel level,
bool incremental) override;
folly::Expected<Priority, LocalErrorCode> getStreamPriority(
StreamId id) override;
/**
* Invoke onCanceled on all the delivery callbacks registered for streamId.
*/
void cancelDeliveryCallbacksForStream(StreamId id) override;
/**
* Invoke onCanceled on all the delivery callbacks registered for streamId for
* offsets lower than the offset provided.
*/
void cancelDeliveryCallbacksForStream(StreamId id, uint64_t offset) override;
/**
* Register a callback to be invoked when the stream offset was transmitted.
*
* Currently, an offset is considered "transmitted" if it has been written to
* to the underlying UDP socket, indicating that it has passed through
* congestion control and pacing. In the future, this callback may be
* triggered by socket/NIC software or hardware timestamps.
*/
folly::Expected<folly::Unit, LocalErrorCode> registerTxCallback(
const StreamId id,
const uint64_t offset,
ByteEventCallback* cb) override;
/**
* Register a byte event to be triggered when specified event type occurs for
* the specified stream and offset.
*/
folly::Expected<folly::Unit, LocalErrorCode> registerByteEventCallback(
const ByteEvent::Type type,
const StreamId id,
const uint64_t offset,
ByteEventCallback* cb) override;
/**
* Cancel byte event callbacks for given stream.
*
* If an offset is provided, cancels only callbacks with an offset less than
* or equal to the provided offset, otherwise cancels all callbacks.
*/
void cancelByteEventCallbacksForStream(
const StreamId id,
const folly::Optional<uint64_t>& offset = folly::none) override;
/**
* Cancel byte event callbacks for given type and stream.
*
* If an offset is provided, cancels only callbacks with an offset less than
* or equal to the provided offset, otherwise cancels all callbacks.
*/
void cancelByteEventCallbacksForStream(
const ByteEvent::Type type,
const StreamId id,
const folly::Optional<uint64_t>& offset = folly::none) override;
/**
* Cancel all byte event callbacks of all streams.
*/
void cancelAllByteEventCallbacks() override;
/**
* Cancel all byte event callbacks of all streams of the given type.
*/
void cancelByteEventCallbacks(const ByteEvent::Type type) override;
/**
* Reset or send a stop sending on all non-control streams. Leaves the
* connection otherwise unmodified. Note this will also trigger the
* onStreamWriteError and readError callbacks immediately.
*/
void resetNonControlStreams(
ApplicationErrorCode error,
folly::StringPiece errorMsg) override;
/**
* Get the number of pending byte events for the given stream.
*/
FOLLY_NODISCARD size_t
getNumByteEventCallbacksForStream(const StreamId id) const override;
/**
* Get the number of pending byte events of specified type for given stream.
*/
FOLLY_NODISCARD size_t getNumByteEventCallbacksForStream(
const ByteEvent::Type type,
const StreamId id) const override;
// Timeout functions
class LossTimeout : public folly::HHWheelTimer::Callback {
public:
~LossTimeout() override = default;
explicit LossTimeout(QuicTransportBase* transport)
: transport_(transport) {}
void timeoutExpired() noexcept override {
transport_->lossTimeoutExpired();
}
virtual void callbackCanceled() noexcept override {
// ignore. this usually means that the eventbase is dying, so we will be
// canceled anyway
return;
}
private:
QuicTransportBase* transport_;
};
class AckTimeout : public folly::HHWheelTimer::Callback {
public:
~AckTimeout() override = default;
explicit AckTimeout(QuicTransportBase* transport) : transport_(transport) {}
void timeoutExpired() noexcept override {
transport_->ackTimeoutExpired();
}
virtual void callbackCanceled() noexcept override {
// ignore. this usually means that the eventbase is dying, so we will be
// canceled anyway
return;
}
private:
QuicTransportBase* transport_;
};
class PingTimeout : public folly::HHWheelTimer::Callback {
public:
~PingTimeout() override = default;
explicit PingTimeout(QuicTransportBase* transport)
: transport_(transport) {}
void timeoutExpired() noexcept override {
transport_->pingTimeoutExpired();
}
void callbackCanceled() noexcept override {
// ignore, as this happens only when event base dies
return;
}
private:
QuicTransportBase* transport_;
};
class PathValidationTimeout : public folly::HHWheelTimer::Callback {
public:
~PathValidationTimeout() override = default;
explicit PathValidationTimeout(QuicTransportBase* transport)
: transport_(transport) {}
void timeoutExpired() noexcept override {
transport_->pathValidationTimeoutExpired();
}
virtual void callbackCanceled() noexcept override {
// ignore. this usually means that the eventbase is dying, so we will be
// canceled anyway
return;
}
private:
QuicTransportBase* transport_;
};
class IdleTimeout : public folly::HHWheelTimer::Callback {
public:
~IdleTimeout() override = default;
explicit IdleTimeout(QuicTransportBase* transport)
: transport_(transport) {}
void timeoutExpired() noexcept override {
transport_->idleTimeoutExpired(true /* drain */);
}
void callbackCanceled() noexcept override {
// skip drain when canceling the timeout, to avoid scheduling a new
// drain timeout
transport_->idleTimeoutExpired(false /* drain */);
}
private:
QuicTransportBase* transport_;
};
// DrainTimeout is a bit different from other timeouts. It needs to hold a
// shared_ptr to the transport, since if a DrainTimeout is scheduled,
// transport cannot die.
class DrainTimeout : public folly::HHWheelTimer::Callback {
public:
~DrainTimeout() override = default;
explicit DrainTimeout(QuicTransportBase* transport)
: transport_(transport) {}
void timeoutExpired() noexcept override {
transport_->drainTimeoutExpired();
}
private:
QuicTransportBase* transport_;
};
class D6DProbeTimeout : public folly::HHWheelTimer::Callback {
public:
~D6DProbeTimeout() override = default;
explicit D6DProbeTimeout(QuicTransportBase* transport)
: transport_(transport) {}
void timeoutExpired() noexcept override {
transport_->d6dProbeTimeoutExpired();
}
private:
QuicTransportBase* transport_;
};
class D6DRaiseTimeout : public folly::HHWheelTimer::Callback {
public:
~D6DRaiseTimeout() override = default;
explicit D6DRaiseTimeout(QuicTransportBase* transport)
: transport_(transport) {}
void timeoutExpired() noexcept override {
transport_->d6dRaiseTimeoutExpired();
}
private:
QuicTransportBase* transport_;
};
class D6DTxTimeout : public folly::HHWheelTimer::Callback {
public:
~D6DTxTimeout() override = default;
explicit D6DTxTimeout(QuicTransportBase* transport)
: transport_(transport) {}
void timeoutExpired() noexcept override {
transport_->d6dTxTimeoutExpired();
}
private:
QuicTransportBase* transport_;
};
void scheduleLossTimeout(std::chrono::milliseconds timeout);
void cancelLossTimeout();
bool isLossTimeoutScheduled() const;
// If you don't set it, the default is Cubic
void setCongestionControl(CongestionControlType type) override;
void describe(std::ostream& os) const;
void setLogger(std::shared_ptr<Logger> logger) {
conn_->logger = std::move(logger);
}
virtual void setQLogger(std::shared_ptr<QLogger> qLogger);
void setLoopDetectorCallback(std::shared_ptr<LoopDetectorCallback> callback) {
conn_->loopDetectorCallback = std::move(callback);
}
virtual void cancelAllAppCallbacks(
const std::pair<QuicErrorCode, folly::StringPiece>& error) noexcept;
/**
* Adds an observer.
*
* Observers can tie their lifetime to aspects of this socket's lifecycle /
* lifetime and perform inspection at various states.
*
* This enables instrumentation to be added without changing / interfering
* with how the application uses the socket.
*
* @param observer Observer to add (implements Observer).
*/
void addObserver(Observer* observer) override;
/**
* Removes an observer.
*
* @param observer Observer to remove.
* @return Whether observer found and removed from list.
*/
bool removeObserver(Observer* observer) override;
/**
* Returns installed observers.
*
* @return Reference to const vector with installed observers.
*/
FOLLY_NODISCARD const ObserverVec& getObservers() const override;
FOLLY_NODISCARD QuicConnectionStats getConnectionsStats() const override;
/**
* Set the read callback for Datagrams
*/
folly::Expected<folly::Unit, LocalErrorCode> setDatagramCallback(
DatagramCallback* cb) override;
/**
* Returns the maximum allowed Datagram payload size.
* 0 means Datagram is not supported
*/
FOLLY_NODISCARD uint16_t getDatagramSizeLimit() const override;
/**
* Writes a Datagram frame. If buf is larger than the size limit returned by
* getDatagramSizeLimit(), or if the write buffer is full, buf will simply be
* dropped, and a LocalErrorCode will be returned to caller.
*/
folly::Expected<folly::Unit, LocalErrorCode> writeDatagram(Buf buf) override;
/**
* Returns the currently available received Datagrams.
* Returns all datagrams if atMost is 0.
*/
folly::Expected<std::vector<Buf>, LocalErrorCode> readDatagrams(
size_t atMost = 0) override;
/**
* Set control messages to be sent for socket_ write, note that it's for this
* specific transport and does not change the other sockets sharing the same
* fd.
*/
void setCmsgs(const folly::SocketOptionMap& options);
void appendCmsgs(const folly::SocketOptionMap& options);
protected:
void updateCongestionControlSettings(
const TransportSettings& transportSettings);
void processCallbacksAfterWriteData();
void processCallbacksAfterNetworkData();
void invokeReadDataAndCallbacks();
void invokePeekDataAndCallbacks();
void invokeStreamsAvailableCallbacks();
void updateReadLooper();
void updatePeekLooper();
void updateWriteLooper(bool thisIteration);
void handlePingCallback();
void handleKnobCallbacks();
void handleCancelByteEventCallbacks();
void handleNewStreamCallbacks(std::vector<StreamId>& newPeerStreams);
void handleDeliveryCallbacks();
void handleStreamFlowControlUpdatedCallbacks(
std::vector<StreamId>& streamStorage);
void handleStreamStopSendingCallbacks();
void handleConnWritable();
void runOnEvbAsync(
folly::Function<void(std::shared_ptr<QuicTransportBase>)> func);
void closeImpl(
folly::Optional<std::pair<QuicErrorCode, std::string>> error,
bool drainConnection = true,
bool sendCloseImmediately = true);
folly::Expected<folly::Unit, LocalErrorCode> pauseOrResumeRead(
StreamId id,
bool resume);
folly::Expected<folly::Unit, LocalErrorCode> pauseOrResumePeek(
StreamId id,
bool resume);
void checkForClosedStream();
folly::Expected<folly::Unit, LocalErrorCode> setReadCallbackInternal(
StreamId id,
ReadCallback* cb,
folly::Optional<ApplicationErrorCode> err) noexcept;
folly::Expected<folly::Unit, LocalErrorCode> setPeekCallbackInternal(
StreamId id,
PeekCallback* cb) noexcept;
folly::Expected<StreamId, LocalErrorCode> createStreamInternal(
bool bidirectional);
/**
* write data to socket
*
* At transport layer, this is the simplest form of write. It writes data
* out to the network, and schedule necessary timers (ack, idle, loss). It is
* both pacing oblivious and writeLooper oblivious. Caller needs to explicitly
* invoke updateWriteLooper afterwards if that's desired.
*/
void writeSocketData();
/**
* A wrapper around writeSocketData
*
* writeSocketDataAndCatch protects writeSocketData in a try-catch. It also
* dispatch the next write loop.
*/
void writeSocketDataAndCatch();
/**
* Paced write data to socket when connection is paced.
*
* Whether connection is based will be decided by TransportSettings and
* congection controller. When the connection is paced, this function writes
* out a burst size of packets and let the writeLooper schedule a callback to
* write another burst after a pacing interval if there are more data to
* write. When the connection isn't paced, this function do a normal write.
*/
void pacedWriteDataToSocket(bool fromTimer);
uint64_t maxWritableOnStream(const QuicStreamState&);
uint64_t maxWritableOnConn();
void lossTimeoutExpired() noexcept;
void ackTimeoutExpired() noexcept;
void pathValidationTimeoutExpired() noexcept;
void idleTimeoutExpired(bool drain) noexcept;
void drainTimeoutExpired() noexcept;
void pingTimeoutExpired() noexcept;
void d6dProbeTimeoutExpired() noexcept;
void d6dRaiseTimeoutExpired() noexcept;
void d6dTxTimeoutExpired() noexcept;
void setIdleTimer();
void scheduleAckTimeout();
void schedulePathValidationTimeout();
void schedulePingTimeout(
PingCallback* callback,
std::chrono::milliseconds pingTimeout);
void scheduleD6DRaiseTimeout();
void scheduleD6DProbeTimeout();
void scheduleD6DTxTimeout();
void validateCongestionAndPacing(CongestionControlType& type);
// Helpers to notify all registered observers about specific events during
// socket write (if enabled in the observer's config).
void notifyStartWritingFromAppRateLimited();
void notifyPacketsWritten();
void notifyAppRateLimited();
/**
* Callback when we receive a transport knob
*/
virtual void onTransportKnobs(Buf knobBlob);
struct ByteEventDetail {
ByteEventDetail(uint64_t offsetIn, ByteEventCallback* callbackIn)
: offset(offsetIn), callback(callbackIn) {}
uint64_t offset;
ByteEventCallback* callback;
};
using ByteEventMap = folly::F14FastMap<StreamId, std::deque<ByteEventDetail>>;
ByteEventMap& getByteEventMap(const ByteEvent::Type type);
FOLLY_NODISCARD const ByteEventMap& getByteEventMapConst(
const ByteEvent::Type type) const;
/**
* Helper function that calls passed function for each ByteEvent type.
*
* Removes number of locations to update when a byte event is added.
*/
void invokeForEachByteEventType(
const std::function<void(const ByteEvent::Type)>& fn) {
for (const auto& type : ByteEvent::kByteEventTypes) {
fn(type);
}
}
void invokeForEachByteEventTypeConst(
const std::function<void(const ByteEvent::Type)>& fn) const {
for (const auto& type : ByteEvent::kByteEventTypes) {
fn(type);
}
}
void resetConnectionCallbacks() {
connCallback_ = nullptr;
}
std::atomic<folly::EventBase*> evb_;
std::unique_ptr<folly::AsyncUDPSocket> socket_;
ConnectionCallback* connCallback_{nullptr};
std::
unique_ptr<QuicConnectionStateBase, folly::DelayedDestruction::Destructor>
conn_;
struct ReadCallbackData {
ReadCallback* readCb;
bool resumed{true};
bool deliveredEOM{false};
ReadCallbackData(ReadCallback* readCallback) : readCb(readCallback) {}
};
struct PeekCallbackData {
PeekCallback* peekCb;
bool resumed{true};
PeekCallbackData(PeekCallback* peekCallback) : peekCb(peekCallback) {}
};
folly::F14FastMap<StreamId, ReadCallbackData> readCallbacks_;
folly::F14FastMap<StreamId, PeekCallbackData> peekCallbacks_;
ByteEventMap deliveryCallbacks_;
ByteEventMap txCallbacks_;
DatagramCallback* datagramCallback_{nullptr};
PingCallback* pingCallback_{nullptr};
WriteCallback* connWriteCallback_{nullptr};
std::map<StreamId, WriteCallback*> pendingWriteCallbacks_;
CloseState closeState_{CloseState::OPEN};
bool transportReadyNotified_{false};
bool d6dProbingStarted_{false};
LossTimeout lossTimeout_;
AckTimeout ackTimeout_;
PathValidationTimeout pathValidationTimeout_;
IdleTimeout idleTimeout_;
DrainTimeout drainTimeout_;
PingTimeout pingTimeout_;
D6DProbeTimeout d6dProbeTimeout_;
D6DRaiseTimeout d6dRaiseTimeout_;
D6DTxTimeout d6dTxTimeout_;
FunctionLooper::Ptr readLooper_;
FunctionLooper::Ptr peekLooper_;
FunctionLooper::Ptr writeLooper_;
// TODO: This is silly. We need a better solution.
// Uninitialied local address as a fallback answer when socket isn't bound.
folly::SocketAddress localFallbackAddress;
folly::Optional<std::string> exceptionCloseWhat_;
// Observers
std::shared_ptr<ObserverVec> observers_{std::make_shared<ObserverVec>()};
uint64_t qlogRefcnt_{0};
// Temp flag controlling which connection callbacks to use - old single
// callback object or two new split callback objects. Will be removed out once
// mvfst is switched to the new split callbacks eventually.
bool useSplitConnectionCallbacks_{false};
};
std::ostream& operator<<(std::ostream& os, const QuicTransportBase& qt);
} // namespace quic