1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-08-08 09:42:06 +03:00

Implement setStreamGroupRtxPolicy()

Summary: Implement setStreamGroupRtxPolicy()

Reviewed By: mjoras

Differential Revision: D39698378

fbshipit-source-id: c6fc5a9a9ab7f322775565f6df629103b36b2a36
This commit is contained in:
Konstantin Tsoy
2023-02-23 21:31:24 -08:00
committed by Facebook GitHub Bot
parent b4cb994fd8
commit aa0fd83223
8 changed files with 134 additions and 6 deletions

View File

@@ -293,6 +293,7 @@ enum class LocalErrorCode : uint32_t {
CALLBACK_ALREADY_INSTALLED = 0x4000001A, CALLBACK_ALREADY_INSTALLED = 0x4000001A,
KNOB_FRAME_UNSUPPORTED = 0x4000001B, KNOB_FRAME_UNSUPPORTED = 0x4000001B,
PACER_NOT_AVAILABLE = 0x4000001C, PACER_NOT_AVAILABLE = 0x4000001C,
RTX_POLICIES_LIMIT_EXCEEDED = 0x4000001D,
}; };
enum class QuicNodeType : bool { enum class QuicNodeType : bool {

View File

@@ -123,6 +123,8 @@ folly::StringPiece toString(LocalErrorCode code) {
return "Knob Frame Not Supported"; return "Knob Frame Not Supported";
case LocalErrorCode::PACER_NOT_AVAILABLE: case LocalErrorCode::PACER_NOT_AVAILABLE:
return "Pacer not available"; return "Pacer not available";
case LocalErrorCode::RTX_POLICIES_LIMIT_EXCEEDED:
return "Retransmission policies limit exceeded";
default: default:
break; break;
} }

View File

@@ -1424,7 +1424,7 @@ class QuicSocket {
virtual folly::Expected<folly::Unit, LocalErrorCode> virtual folly::Expected<folly::Unit, LocalErrorCode>
setStreamGroupRetransmissionPolicy( setStreamGroupRetransmissionPolicy(
StreamGroupId groupId, StreamGroupId groupId,
QuicStreamGroupRetransmissionPolicy policy) noexcept = 0; std::optional<QuicStreamGroupRetransmissionPolicy> policy) noexcept = 0;
protected: protected:
/** /**

View File

@@ -3811,11 +3811,24 @@ bool QuicTransportBase::checkCustomRetransmissionProfilesEnabled() const {
folly::Expected<folly::Unit, LocalErrorCode> folly::Expected<folly::Unit, LocalErrorCode>
QuicTransportBase::setStreamGroupRetransmissionPolicy( QuicTransportBase::setStreamGroupRetransmissionPolicy(
StreamGroupId /* groupId */, StreamGroupId groupId,
QuicStreamGroupRetransmissionPolicy /* policy */) noexcept { std::optional<QuicStreamGroupRetransmissionPolicy> policy) noexcept {
// Reset the policy to default one.
if (policy == std::nullopt) {
conn_->retransmissionPolicies.erase(groupId);
return folly::unit;
}
if (!checkCustomRetransmissionProfilesEnabled()) { if (!checkCustomRetransmissionProfilesEnabled()) {
return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION); return folly::makeUnexpected(LocalErrorCode::INVALID_OPERATION);
} }
if (conn_->retransmissionPolicies.size() >=
conn_->transportSettings.advertisedMaxStreamGroups) {
return folly::makeUnexpected(LocalErrorCode::RTX_POLICIES_LIMIT_EXCEEDED);
}
conn_->retransmissionPolicies.emplace(groupId, *policy);
return folly::unit; return folly::unit;
} }

View File

@@ -655,12 +655,21 @@ class QuicTransportBase : public QuicSocket, QuicStreamPrioritiesObserver {
void appendCmsgs(const folly::SocketOptionMap& options); void appendCmsgs(const folly::SocketOptionMap& options);
/** /**
* Sets stream group retransmission policy. * Sets the policy per stream group id.
* If policy == std::nullopt, the policy is removed for corresponding stream
* group id (reset to the default rtx policy).
*/ */
folly::Expected<folly::Unit, LocalErrorCode> folly::Expected<folly::Unit, LocalErrorCode>
setStreamGroupRetransmissionPolicy( setStreamGroupRetransmissionPolicy(
StreamGroupId groupId, StreamGroupId groupId,
QuicStreamGroupRetransmissionPolicy policy) noexcept override; std::optional<QuicStreamGroupRetransmissionPolicy> policy) noexcept
override;
[[nodiscard]] const folly::
F14FastMap<StreamGroupId, QuicStreamGroupRetransmissionPolicy>&
getStreamGroupRetransmissionPolicies() const {
return conn_->retransmissionPolicies;
}
protected: protected:
void updateCongestionControlSettings( void updateCongestionControlSettings(

View File

@@ -366,7 +366,7 @@ class MockQuicSocket : public QuicSocket {
MOCK_METHOD( MOCK_METHOD(
(folly::Expected<folly::Unit, LocalErrorCode>), (folly::Expected<folly::Unit, LocalErrorCode>),
setStreamGroupRetransmissionPolicy, setStreamGroupRetransmissionPolicy,
(StreamGroupId, QuicStreamGroupRetransmissionPolicy), (StreamGroupId, std::optional<QuicStreamGroupRetransmissionPolicy>),
(noexcept)); (noexcept));
}; };
} // namespace quic } // namespace quic

View File

@@ -4454,6 +4454,105 @@ TEST_P(
res = transport->setStreamGroupRetransmissionPolicy(groupId, policy); res = transport->setStreamGroupRetransmissionPolicy(groupId, policy);
EXPECT_TRUE(res.hasError()); EXPECT_TRUE(res.hasError());
EXPECT_EQ(res.error(), LocalErrorCode::INVALID_OPERATION); EXPECT_EQ(res.error(), LocalErrorCode::INVALID_OPERATION);
EXPECT_EQ(1, transport->getStreamGroupRetransmissionPolicies().size());
transport.reset();
}
TEST_P(
QuicTransportImplTestWithGroups,
TestStreamGroupRetransmissionPolicyReset) {
auto transportSettings = transport->getTransportSettings();
transportSettings.advertisedMaxStreamGroups = 16;
transport->setTransportSettings(transportSettings);
transport->getConnectionState().streamManager->refreshTransportSettings(
transportSettings);
const StreamGroupId groupId = 0x00;
QuicStreamGroupRetransmissionPolicy policy;
// Add the policy.
auto res = transport->setStreamGroupRetransmissionPolicy(groupId, policy);
EXPECT_TRUE(res.hasValue());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 1);
// Reset allowed.
res = transport->setStreamGroupRetransmissionPolicy(groupId, std::nullopt);
EXPECT_TRUE(res.hasValue());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 0);
// Add the policy back.
res = transport->setStreamGroupRetransmissionPolicy(groupId, policy);
EXPECT_TRUE(res.hasValue());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 1);
// Reset allowed even if custom policies are disabled.
transportSettings.advertisedMaxStreamGroups = 0;
res = transport->setStreamGroupRetransmissionPolicy(groupId, std::nullopt);
EXPECT_TRUE(res.hasValue());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 0);
transport.reset();
}
TEST_P(
QuicTransportImplTestWithGroups,
TestStreamGroupRetransmissionPolicyAddRemove) {
auto transportSettings = transport->getTransportSettings();
transportSettings.advertisedMaxStreamGroups = 16;
transport->setTransportSettings(transportSettings);
transport->getConnectionState().streamManager->refreshTransportSettings(
transportSettings);
// Add a policy.
const StreamGroupId groupId = 0x00;
const QuicStreamGroupRetransmissionPolicy policy;
auto res = transport->setStreamGroupRetransmissionPolicy(groupId, policy);
EXPECT_TRUE(res.hasValue());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 1);
// Add another one.
const StreamGroupId groupId2 = 0x04;
const QuicStreamGroupRetransmissionPolicy policy2;
res = transport->setStreamGroupRetransmissionPolicy(groupId2, policy2);
EXPECT_TRUE(res.hasValue());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 2);
// Remove second policy.
res = transport->setStreamGroupRetransmissionPolicy(groupId2, std::nullopt);
EXPECT_TRUE(res.hasValue());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 1);
// Remove first policy.
res = transport->setStreamGroupRetransmissionPolicy(groupId, std::nullopt);
EXPECT_TRUE(res.hasValue());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 0);
transport.reset();
}
TEST_P(
QuicTransportImplTestWithGroups,
TestStreamGroupRetransmissionPolicyMaxLimit) {
auto transportSettings = transport->getTransportSettings();
transportSettings.advertisedMaxStreamGroups = 1;
transport->setTransportSettings(transportSettings);
transport->getConnectionState().streamManager->refreshTransportSettings(
transportSettings);
// Add a policy.
const StreamGroupId groupId = 0x00;
const QuicStreamGroupRetransmissionPolicy policy;
auto res = transport->setStreamGroupRetransmissionPolicy(groupId, policy);
EXPECT_TRUE(res.hasValue());
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 1);
// Try adding another one; should be over the limit.
const StreamGroupId groupId2 = 0x04;
res = transport->setStreamGroupRetransmissionPolicy(groupId2, policy);
EXPECT_TRUE(res.hasError());
EXPECT_EQ(res.error(), LocalErrorCode::RTX_POLICIES_LIMIT_EXCEEDED);
EXPECT_EQ(transport->getStreamGroupRetransmissionPolicies().size(), 1);
transport.reset(); transport.reset();
} }

View File

@@ -26,6 +26,7 @@
#include <quic/state/PacketEvent.h> #include <quic/state/PacketEvent.h>
#include <quic/state/PendingPathRateLimiter.h> #include <quic/state/PendingPathRateLimiter.h>
#include <quic/state/QuicConnectionStats.h> #include <quic/state/QuicConnectionStats.h>
#include <quic/state/QuicStreamGroupRetransmissionPolicy.h>
#include <quic/state/QuicStreamManager.h> #include <quic/state/QuicStreamManager.h>
#include <quic/state/QuicTransportStatsCallback.h> #include <quic/state/QuicTransportStatsCallback.h>
#include <quic/state/StreamData.h> #include <quic/state/StreamData.h>
@@ -723,6 +724,9 @@ struct QuicConnectionStateBase : public folly::DelayedDestruction {
maybePeerAckReceiveTimestampsConfig; maybePeerAckReceiveTimestampsConfig;
bool peerAdvertisedKnobFrameSupport{false}; bool peerAdvertisedKnobFrameSupport{false};
// Retransmission policies map.
folly::F14FastMap<StreamGroupId, QuicStreamGroupRetransmissionPolicy>
retransmissionPolicies;
}; };
std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st); std::ostream& operator<<(std::ostream& os, const QuicConnectionStateBase& st);