1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-04-18 17:24:03 +03:00

PriorityQueue API

Summary:
New generic PriorityQueue API

The goal is to abstract the priority mechanism and even what is a "priority" from mvfst entirely.  The queue prioritizes "Identifiers" which can be streams or datagram "flows", which is more flexible than mvfst is currently capable of.

Reviewed By: hanidamlaj

Differential Revision: D68641756

fbshipit-source-id: 0449ed7852216f2906c0ecf9c14a10cce3c308c7
This commit is contained in:
Alan Frindell 2025-02-09 18:40:06 -08:00 committed by Facebook GitHub Bot
parent d4d827ca25
commit ac34f7e952
5 changed files with 316 additions and 0 deletions

View File

@ -84,6 +84,7 @@ add_subdirectory(happyeyeballs)
add_subdirectory(logging)
add_subdirectory(loss)
add_subdirectory(observer)
add_subdirectory(priority)
add_subdirectory(samples)
add_subdirectory(server)
add_subdirectory(state)

View File

@ -0,0 +1,6 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
add_subdirectory(test)

View File

@ -0,0 +1,226 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <glog/logging.h>
#include <quic/common/Optional.h>
#include <array>
namespace quic {
/*
* Generic Priority Queue interface for QUIC and HTTP stream egress
*
* Example usage:
*
* buildPacket(queue) {
* auto txn = queue.beginTransaction();
* while (!queue.empty() && spaceLeftInPacket) {
* auto nextID = queue.peekNextScheduledID();
* previousWritten = addBytesToPacket(packet, nextID, &spaceLeftInPacket);
* if (!streamWritable(nextID)) {
* queue.erase(nextID);
* } else {
* queue.consume(previousWritten);
* }
* }
* }
*
* sendBuiltPackets(queue, Transaction txn, ...) {
* sent = send(...);
* if (sent) {
* queue.commit(std::move(txn));
* } else {
* queue.rollback(std::move(txn));
* }
*/
class PriorityQueue {
public:
// Generic Identifier for either a QUIC stream or arbitrary datagram flow ID
// Since QUIC streams are limited to 2^62-1, we can use the high order bits
// of value to indicate the type.
struct Identifier {
// For now, this is restricted to 2 bits. However, there's plenty of
// type space under 0x40 because datagram flow IDs are 32 bits.
// clang-format off
enum class Type : uint8_t {
STREAM = 0x00, // 0000 0000
DATAGRAM = 0x40, // 0100 0000
// Unused = 0x80, // 1000 0000
UNINITIALIZED = 0xC0, // 1100 0000
};
// clang-format on
static constexpr uint8_t kTypeShift = 56;
static constexpr uint64_t kTypeMask =
static_cast<uint64_t>(Type::UNINITIALIZED) << kTypeShift;
Identifier() = default;
static Identifier fromStreamID(uint64_t streamID) {
CHECK_LT(streamID, 1LLU << 62);
return Identifier(streamID);
}
static Identifier fromDatagramFlowID(uint32_t flowID) {
return Identifier((uint64_t(Type::DATAGRAM) << kTypeShift) | flowID);
}
[[nodiscard]] Type getType() const {
return Type((value & kTypeMask) >> kTypeShift);
}
[[nodiscard]] bool isStreamID() const {
return getType() == Type::STREAM;
}
[[nodiscard]] bool isDatagramFlowID() const {
return getType() == Type::DATAGRAM;
}
[[nodiscard]] bool isInitialized() const {
return getType() != Type::UNINITIALIZED;
}
[[nodiscard]] uint64_t asStreamID() const {
CHECK(isStreamID());
return value & ~kTypeMask;
}
[[nodiscard]] uint32_t asDatagramFlowID() const {
CHECK(isDatagramFlowID());
return uint32_t(value); // truncating the top works
}
[[nodiscard]] uint64_t asUint64() const {
return value & ~kTypeMask;
}
bool operator==(const Identifier& other) const {
return value == other.value;
}
struct hash {
size_t operator()(const Identifier& id) const {
return std::hash<uint64_t>()(id.value);
}
};
private:
explicit Identifier(uint64_t v) : value(v) {}
uint64_t value{uint64_t(Type::UNINITIALIZED) << kTypeShift};
};
// Abstract class representing priority. Concrete implementations of the queue
// will define their own priority structure.
class Priority {
public:
Priority() : storage_{kUninitialized} {}
virtual ~Priority() = default;
[[nodiscard]] bool isInitialized() const {
return storage_ != kUninitialized;
}
protected:
using StorageType = std::array<uint8_t, 16>;
template <typename T>
T& getPriority() {
static_assert(
std::is_trivially_copyable<T>::value, "T must be trivially copyable");
static_assert(sizeof(T) <= sizeof(StorageType), "T must fit in storage_");
return *reinterpret_cast<T*>(storage_.data());
}
template <typename T>
const T& getPriority() const {
static_assert(
std::is_trivially_copyable<T>::value, "T must be trivially copyable");
static_assert(sizeof(T) <= sizeof(StorageType), "T must fit in storage_");
return *reinterpret_cast<const T*>(storage_.data());
}
private:
// clang-format off
static constexpr StorageType kUninitialized = {
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
// clang-format on
StorageType storage_;
};
virtual ~PriorityQueue() = default;
// Returns true if the queue contains ID, false otherwise
[[nodiscard]] virtual bool contains(Identifier id) const = 0;
// Returns true if the queue contains no elements, false otherwise
[[nodiscard]] virtual bool empty() const = 0;
// Compare two Priority's for equality
[[nodiscard]] virtual bool equalPriority(
const Priority& p1,
const Priority& p2) const = 0;
// Add the given id to the priority queue with the given priority. If it
// already exists in the queue, update it to the specified priority.
virtual void insertOrUpdate(Identifier id, Priority priority) = 0;
// Update the priority of id if it exists in the queue, otherwise no-op
virtual void updateIfExist(Identifier id, Priority priority) = 0;
// Remove the ID from the queue
virtual void erase(Identifier id) = 0;
// Remove all entries from the queue
virtual void clear() = 0;
// Return the highest priority identifier in the queue. It is an error to
// call this if the queue is empty, returns an uninitialized Identifier.
// For stateful queues (eg: a round-robin queue), this method can mutate the
// state, such that the next call to getNextScheduledID returns some other
// value.
//
// previousConsumed indicates how many resources the previously returned ID
// consumed. This can be used by a stateful queue that wants to ensure
// fairness of resource usage before advancing.
virtual Identifier getNextScheduledID(
quic::Optional<uint64_t> previousConsumed) = 0;
// Return the highest priority identifier in the queue, but does not
// mutate any state. Calling this repeatedly will return the same value.
// It is an error to call this on an empty queue.
[[nodiscard]] virtual Identifier peekNextScheduledID() const = 0;
virtual void consume(quic::Optional<uint64_t> consumed) = 0;
class Transaction;
// Begin a transaction with the queue. Transactions are optional, but are
// useful for conditionally erasing/dequeuing elements with the ability to
// rollback (reinsert them).
virtual Transaction beginTransaction() = 0;
// Commit the current transaction. All ID erasures since beginTransaction
// become permanent.
virtual void commitTransaction(Transaction&&) = 0;
// Rollback the current transaction All IDs erased since beginTransaction
// are re-inserted at their previous priority level.
virtual void rollbackTransaction(Transaction&&) = 0;
protected:
Transaction makeTransaction();
};
class PriorityQueue::Transaction {
public:
~Transaction() = default;
Transaction(Transaction&&) = default;
Transaction& operator=(Transaction&&) = default;
Transaction(const Transaction&) = delete;
Transaction& operator=(const Transaction&) = delete;
private:
friend class PriorityQueue;
Transaction() = default;
};
inline PriorityQueue::Transaction PriorityQueue::makeTransaction() {
return {};
}
} // namespace quic

View File

@ -0,0 +1,19 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# Always install this header, even when not building tests.
# It can be used by dependent projects to build their own tests
if(NOT BUILD_TESTS)
return()
endif()
quic_add_test(TARGET PriorityQueueTest
SOURCES
PriorityQueueTest.cpp
DEPENDS
Folly::folly
mvfst_constants
)

View File

@ -0,0 +1,64 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>
#include <quic/priority/PriorityQueue.h>
#include <unordered_map>
namespace {
using quic::PriorityQueue;
TEST(PriorityQueueIdentifier, All) {
PriorityQueue::Identifier uninit;
EXPECT_TRUE(
uninit.getType() == PriorityQueue::Identifier::Type::UNINITIALIZED);
EXPECT_FALSE(uninit.isInitialized());
EXPECT_TRUE(uninit == PriorityQueue::Identifier());
auto id1 = PriorityQueue::Identifier::fromStreamID(7);
EXPECT_TRUE(id1.getType() == PriorityQueue::Identifier::Type::STREAM);
EXPECT_TRUE(id1.isInitialized());
EXPECT_TRUE(id1.isStreamID());
EXPECT_FALSE(id1.isDatagramFlowID());
EXPECT_EQ(id1.asStreamID(), 7);
EXPECT_FALSE(id1 == uninit);
auto idBig = PriorityQueue::Identifier::fromStreamID(1LLU << 57);
EXPECT_TRUE(idBig.getType() == PriorityQueue::Identifier::Type::STREAM);
EXPECT_TRUE(idBig.isInitialized());
EXPECT_TRUE(idBig.isStreamID());
EXPECT_FALSE(idBig.isDatagramFlowID());
EXPECT_EQ(idBig.asStreamID(), 1LLU << 57);
EXPECT_FALSE(idBig == uninit);
auto id2 = PriorityQueue::Identifier::fromDatagramFlowID(7);
EXPECT_TRUE(id2.getType() == PriorityQueue::Identifier::Type::DATAGRAM);
EXPECT_TRUE(id2.isInitialized());
EXPECT_FALSE(id2.isStreamID());
EXPECT_TRUE(id2.isDatagramFlowID());
EXPECT_EQ(id2.asDatagramFlowID(), 7);
EXPECT_FALSE(id2 == uninit);
EXPECT_FALSE(id1 == id2);
EXPECT_TRUE(id2 == PriorityQueue::Identifier::fromDatagramFlowID(7));
std::unordered_map<
PriorityQueue::Identifier,
uint64_t,
PriorityQueue::Identifier::hash>
m;
// id1 and id2 have the same numeric value but different type, they hash and
// compare differently
m[id1] = 99;
m[id2] = 100;
EXPECT_EQ(m[id1], 99);
EXPECT_EQ(m[id2], 100);
}
} // namespace