1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-07-29 03:41:11 +03:00

HTTPPriorityQueue

Summary:
This is a reimplementation of the HTTP priority queue with the new API. The new design is:

A single vector-based heap for sequential streams across all levels and an array of 8 RoundRobin's that hold incremental streams.

Getting the highest priority element doesn't require iterating through empty levels - it's right at the front of `heap_` or `roundRobins_[lowestRoundRobin_]`.

There's an index map which incremental streams always use, the value is an index into roundRobins_.  When there are a large number of sequential streams in the heap_, we also index those (with their array index).

The benchmarks (next diff) show that this is 2x faster for 8 streams and 20% slower for large numbers of sequential streams (96).  The break-even point is 40 streams.

Reviewed By: mjoras

Differential Revision: D68641759

fbshipit-source-id: 30ea5c719e998ead20d8762ee5ddad10111ea7e5
This commit is contained in:
Alan Frindell
2025-02-20 11:13:36 -08:00
committed by Facebook GitHub Bot
parent 484898f61b
commit f9e9ad0443
7 changed files with 930 additions and 1 deletions

View File

@ -31,3 +31,18 @@ mvfst_cpp_library(
"//quic/common:optional",
],
)
mvfst_cpp_library(
name = "http_priority_queue",
srcs = [
"HTTPPriorityQueue.cpp",
],
headers = [
"HTTPPriorityQueue.h",
],
exported_deps = [
":priority_queue",
":round_robin",
"//folly/container:f14_hash",
],
)

View File

@ -42,4 +42,43 @@ install(
DESTINATION ${CMAKE_INSTALL_LIBDIR}
)
add_library(
mvfst_http_priority_queue
HTTPPriorityQueue.cpp
)
target_include_directories(
mvfst_http_priority_queue PUBLIC
$<BUILD_INTERFACE:${QUIC_FBCODE_ROOT}>
$<INSTALL_INTERFACE:include/>
)
target_compile_options(
mvfst_http_priority_queue
PRIVATE
${_QUIC_COMMON_COMPILE_OPTIONS}
)
target_link_libraries(
mvfst_http_priority_queue PUBLIC
Folly::folly
)
file(
GLOB_RECURSE QUIC_API_HEADERS_TOINSTALL
RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}
*.h
)
list(FILTER QUIC_API_HEADERS_TOINSTALL EXCLUDE REGEX test/)
foreach(header ${QUIC_API_HEADERS_TOINSTALL})
get_filename_component(header_dir ${header} DIRECTORY)
install(FILES ${header} DESTINATION include/quic/priority/${header_dir})
endforeach()
install(
TARGETS mvfst_http_priority_queue
EXPORT mvfst-exports
DESTINATION ${CMAKE_INSTALL_LIBDIR}
)
add_subdirectory(test)

View File

@ -0,0 +1,312 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <quic/priority/HTTPPriorityQueue.h>
namespace {
constexpr size_t kBuildIndexThreshold = 100;
constexpr size_t kDestroyIndexThreshold = 50;
} // namespace
namespace quic {
quic::Optional<HTTPPriorityQueue::FindResult> HTTPPriorityQueue::find(
Identifier id) const {
auto it = indexMap_.find(id);
if (it != indexMap_.end()) {
return FindResult{it->second, it};
}
if (!useIndexMapForSequential_) {
// linear search the heap
for (size_t i = 0; i < heap_.size(); i++) {
auto& elem = heap_[i];
if (!elem.priority->incremental && elem.identifier == id) {
return FindResult{IndexMapElem{false, i}, indexMap_.end()};
}
}
}
return quic::none;
}
void HTTPPriorityQueue::addIndex(Identifier id, IndexMapElem indexElem) {
if (useIndexMapForSequential_ || indexElem.incremental) {
indexMap_[id] = indexElem;
}
}
void HTTPPriorityQueue::removeIndex(IndexMap::const_iterator it) {
if (it != indexMap_.end() &&
(useIndexMapForSequential_ || it->second.incremental)) {
indexMap_.erase(it);
}
}
void HTTPPriorityQueue::buildSequentialIndex() {
for (size_t i = 0; i < heap_.size(); i++) {
auto& elem = heap_[i];
if (!elem.priority->incremental) {
addIndex(elem.identifier, {false, i});
}
}
}
void HTTPPriorityQueue::destroySequentialIndex() {
for (auto it = indexMap_.begin(); it != indexMap_.end();) {
if (it->second.incremental) {
++it;
} else {
it = indexMap_.erase(it);
}
}
useIndexMapForSequential_ = false;
}
void HTTPPriorityQueue::insertOrUpdate(
Identifier id,
PriorityQueue::Priority basePriority) {
Priority priority(basePriority);
auto findResult = find(id);
if (findResult) {
if (updateInSequential(findResult->elem, priority)) {
return;
} else {
// moving in/out of a RR, just erase
eraseImpl(id, findResult->elem);
removeIndex(findResult->indexIt);
}
}
if (!priority->paused) {
insert(id, priority);
}
}
void HTTPPriorityQueue::updateIfExist(
Identifier id,
PriorityQueue::Priority basePriority) {
Priority priority(basePriority);
auto findResult = find(id);
if (!findResult) {
return;
}
if (!updateInSequential(findResult->elem, priority)) {
// moving in/out of a RR/paused, just erase
bool wasIncremental = findResult->elem.incremental;
eraseImpl(id, findResult->elem);
if (priority->paused) {
removeIndex(findResult->indexIt);
return;
}
if (wasIncremental && !priority->incremental &&
!useIndexMapForSequential_) {
removeIndex(findResult->indexIt);
} // else don't need removeIndex -- it will get updated
insert(id, priority);
}
}
void HTTPPriorityQueue::erase(Identifier id) {
auto findResult = find(id);
if (findResult) {
Priority priority(0, false, 0);
if (findResult->elem.incremental) {
priority = Priority(findResult->elem.index, true, 0);
} else {
priority = heap_[findResult->elem.index].priority;
}
if (hasOpenTransaction_) {
erased_.emplace_back(std::move(priority), id);
}
eraseImpl(id, findResult->elem);
removeIndex(findResult->indexIt);
}
if (useIndexMapForSequential_ && heap_.size() < kDestroyIndexThreshold) {
destroySequentialIndex();
}
}
void HTTPPriorityQueue::clear() {
heap_.clear();
indexMap_.clear();
useIndexMapForSequential_ = false;
for (auto& rr : roundRobins_) {
rr.clear();
}
}
const HTTPPriorityQueue::Element* FOLLY_NULLABLE
HTTPPriorityQueue::top() const {
uint8_t topPri = roundRobins_.size();
const Element* topElem = nullptr;
if (!heap_.empty()) {
topElem = &heap_.front();
topPri = topElem->priority->urgency;
}
if (lowestRoundRobin_ < topPri && !roundRobins_[lowestRoundRobin_].empty()) {
return nullptr;
}
CHECK(topElem) << "Empty";
return topElem;
}
quic::PriorityQueue::Identifier HTTPPriorityQueue::getNextScheduledID(
quic::Optional<uint64_t> previousConsumed) {
auto elem = top();
if (elem) {
return elem->identifier;
} else {
return roundRobins_[lowestRoundRobin_].getNext(previousConsumed);
}
}
quic::PriorityQueue::Identifier HTTPPriorityQueue::peekNextScheduledID() const {
auto elem = top();
if (elem) {
return elem->identifier;
} else {
return roundRobins_[lowestRoundRobin_].peekNext();
}
}
void HTTPPriorityQueue::consume(quic::Optional<uint64_t> consumed) {
auto elem = top();
if (!elem) {
roundRobins_[lowestRoundRobin_].consume(consumed);
}
}
HTTPPriorityQueue::Priority HTTPPriorityQueue::headPriority() const {
auto elem = top();
if (elem) {
return elem->priority;
} else {
return {lowestRoundRobin_, true};
}
}
void HTTPPriorityQueue::heapifyUp(size_t index) {
while (index > 0) {
size_t parentIndex = (index - 1) / 2;
if (heap_[parentIndex] <= heap_[index]) {
break;
}
// Swap elements and update index map
std::swap(heap_[parentIndex], heap_[index]);
assignIndex(heap_[parentIndex], parentIndex);
assignIndex(heap_[index], index);
index = parentIndex;
}
}
void HTTPPriorityQueue::heapifyDown(size_t index) {
while (true) {
size_t smallest = index;
size_t leftChildIndex = 2 * index + 1;
size_t rightChildIndex = 2 * index + 2;
if (leftChildIndex < heap_.size() &&
heap_[leftChildIndex] < heap_[smallest]) {
smallest = leftChildIndex;
}
if (rightChildIndex < heap_.size() &&
heap_[rightChildIndex] < heap_[smallest]) {
smallest = rightChildIndex;
}
if (smallest == index) {
break;
}
// Swap elements and update index map
std::swap(heap_[smallest], heap_[index]);
assignIndex(heap_[smallest], smallest);
assignIndex(heap_[index], index);
index = smallest;
}
}
void HTTPPriorityQueue::assignIndex(Element& element, size_t index) {
CHECK(!element.priority->incremental);
addIndex(element.identifier, {false, index});
}
void HTTPPriorityQueue::insert(Identifier id, const Priority& priority) {
if (!useIndexMapForSequential_ && heap_.size() >= kBuildIndexThreshold) {
useIndexMapForSequential_ = true;
buildSequentialIndex();
}
if (priority->incremental) {
auto& rr = roundRobins_[priority->urgency];
rr.insert(id);
roundRobinElements_++;
addIndex(id, {true, priority->urgency});
if (priority->urgency < lowestRoundRobin_) {
lowestRoundRobin_ = priority->urgency;
}
} else {
heap_.emplace_back(priority, id);
auto index = heap_.size() - 1;
addIndex(id, {false, index});
heapifyUp(index);
}
}
bool HTTPPriorityQueue::updateInSequential(
IndexMapElem indexElem,
Priority priority) {
if (priority->paused) {
return false;
}
if (indexElem.incremental || priority->incremental) {
if (indexElem.incremental && priority->incremental) {
return indexElem.index == priority->urgency;
}
return false;
}
auto index = indexElem.index;
auto& elem = heap_[index];
if (elem.priority == priority) {
return true; // no-op
}
std::swap(elem.priority, priority);
if (elem.priority < priority) {
heapifyUp(index);
} else {
heapifyDown(index);
}
return true;
}
void HTTPPriorityQueue::eraseImpl(Identifier id, IndexMapElem indexElem) {
auto index = indexElem.index;
if (indexElem.incremental) {
auto& rr = roundRobins_[index];
rr.erase(id);
roundRobinElements_--;
if (index == lowestRoundRobin_ && rr.empty()) {
while (lowestRoundRobin_ < roundRobins_.size() &&
roundRobins_[lowestRoundRobin_].empty()) {
lowestRoundRobin_++;
}
}
} else {
auto lastIndex = heap_.size() - 1;
std::swap(heap_[index], heap_[lastIndex]);
assignIndex(heap_[index], index);
heap_.pop_back();
if (index != lastIndex) {
if (index > 0 && heap_[index] < heap_[(index - 1) / 2]) {
heapifyUp(index);
} else {
heapifyDown(index);
}
} // special case, erasing the last element
}
}
} // namespace quic

View File

@ -0,0 +1,243 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <folly/container/F14Map.h>
#include <quic/priority/PriorityQueue.h>
#include <quic/priority/RoundRobin.h>
#include <utility>
namespace quic {
class HTTPPriorityQueue : public quic::PriorityQueue {
// Element in the IndexMap. If incremental is true, index applies to
// roundRobins_, otherwise it applies to heap_.
struct IndexMapElem {
bool incremental : 1;
uint64_t index : 63;
};
using IndexMap =
folly::F14ValueMap<Identifier, IndexMapElem, Identifier::hash>;
public:
class Priority : public quic::PriorityQueue::Priority {
public:
using OrderId = uint64_t;
struct HTTPPriority {
uint8_t urgency : 3;
bool paused : 1;
bool incremental : 1;
OrderId order : 59;
};
// TODO: change default priority to (3, false, false, 0) to match spec
static constexpr HTTPPriority kDefaultPriority{3, false, true, 0};
/*implicit*/ Priority(const PriorityQueue::Priority& basePriority)
: PriorityQueue::Priority(basePriority) {
if (!isInitialized()) {
getFields() = kDefaultPriority;
}
}
Priority(uint8_t u, bool i, OrderId o = 0) {
auto& fields = getFields();
fields.urgency = u;
fields.incremental = i;
fields.order = (i ? 0 : o);
fields.paused = false;
}
enum Paused { PAUSED };
/* implicit */ Priority(Paused) : Priority(0, false, 0) {
getFields().paused = true;
}
Priority(const Priority&) = default;
Priority& operator=(const Priority&) = default;
Priority(Priority&&) = default;
Priority& operator=(Priority&&) = default;
~Priority() override = default;
const HTTPPriority* operator->() const {
return &getFields();
}
bool operator==(const Priority& other) const {
auto asUint64 = toUint64();
auto otherAsUint64 = other.toUint64();
if (asUint64 == otherAsUint64) {
return true;
}
// The only other way to be equal is if one is initialized and the other
// is default
const static uint64_t kDefaultUint64 = Priority(
kDefaultPriority.urgency,
kDefaultPriority.incremental,
kDefaultPriority.order)
.toUint64();
return (kDefaultUint64 == otherAsUint64 && !isInitialized()) ||
(asUint64 == kDefaultUint64 && !other.isInitialized());
}
bool operator<(const Priority& other) const {
return toUint64() < other.toUint64();
}
[[nodiscard]] uint64_t toUint64() const {
auto& fields = getFields();
return (
(uint64_t(fields.urgency) << 61) | (uint64_t(fields.paused) << 60) |
(uint64_t(fields.incremental) << 59) | fields.order);
}
[[nodiscard]] const HTTPPriority& getFields() const {
return getPriority<HTTPPriority>();
}
private:
HTTPPriority& getFields() {
return getPriority<HTTPPriority>();
}
};
void advanceAfterNext(size_t n) {
for (auto& rr : roundRobins_) {
rr.advanceAfterNext(n);
}
}
void advanceAfterBytes(uint64_t bytes) {
for (auto& rr : roundRobins_) {
rr.advanceAfterBytes(bytes);
}
}
[[nodiscard]] bool empty() const override {
return heap_.empty() && roundRobinElements_ == 0;
}
[[nodiscard]] bool equalPriority(
const PriorityQueue::Priority& p1,
const PriorityQueue::Priority& p2) const override {
return static_cast<const HTTPPriorityQueue::Priority&>(p1) ==
static_cast<const HTTPPriorityQueue::Priority&>(p2);
}
[[nodiscard]] bool contains(Identifier id) const override {
return find(id) != quic::none;
}
void insertOrUpdate(Identifier id, PriorityQueue::Priority priority) override;
void updateIfExist(Identifier id, PriorityQueue::Priority priority) override;
void erase(Identifier id) override;
void clear() override;
Identifier getNextScheduledID(
quic::Optional<uint64_t> previousConsumed) override;
[[nodiscard]] Identifier peekNextScheduledID() const override;
void consume(quic::Optional<uint64_t> consumed) override;
// Note: transactions only reinsert erased transactions at previous priority
// they don't undo inserts, updates, or consume.
Transaction beginTransaction() override {
if (hasOpenTransaction_) {
rollbackTransaction(makeTransaction());
}
hasOpenTransaction_ = true;
return makeTransaction();
}
void commitTransaction(Transaction&&) override {
if (hasOpenTransaction_) {
hasOpenTransaction_ = false;
erased_.clear();
}
}
void rollbackTransaction(Transaction&&) override {
if (hasOpenTransaction_) {
for (auto& e : erased_) {
insert(e.identifier, e.priority);
}
erased_.clear();
hasOpenTransaction_ = false;
}
}
[[nodiscard]] Priority headPriority() const;
private:
// Heap Element. If priority.incremental is true, then Identifier is
// uninitialized - the element is a placeholder for the RoundRobin at
// roundRobins_[priority.urgency].
//
// In the current design, there are no elements with Incremental priority in
// the heap.
struct Element {
Element(Priority p, Identifier i) : priority(std::move(p)), identifier(i) {}
Priority priority;
Identifier identifier;
bool operator<(const Element& other) const {
if (priority < other.priority) {
return true;
}
if (other.priority < priority || other.priority->incremental) {
return false;
}
// sequential priorities are equal
return identifier.asUint64() < other.identifier.asUint64();
}
bool operator<=(const Element& other) const {
return !(other < *this);
}
};
struct FindResult {
IndexMapElem elem;
IndexMap::const_iterator indexIt;
};
[[nodiscard]] quic::Optional<FindResult> find(Identifier id) const;
void addIndex(Identifier id, IndexMapElem indexElem);
void removeIndex(IndexMap::const_iterator it);
void buildSequentialIndex();
void destroySequentialIndex();
void heapifyUp(size_t index);
void heapifyDown(size_t index);
void assignIndex(Element& element, size_t index);
void insert(Identifier id, const Priority& priority);
bool updateInSequential(IndexMapElem indexElem, Priority priority);
void eraseImpl(Identifier id, IndexMapElem indexElem);
[[nodiscard]] const Element* FOLLY_NULLABLE top() const;
// Holds sequential elements
std::vector<Element> heap_;
// Map from id -> RoundRobin or Heap Index
IndexMap indexMap_;
// Holds incremental elements
std::array<RoundRobin, 8> roundRobins_;
// Holds erased elements from the current transaction
std::vector<Element> erased_;
// Count of Round Robin elements in the Queue
uint32_t roundRobinElements_{0};
// The index of the first non-empty RoundRobin, or roundRobins_.size()
uint8_t lowestRoundRobin_{uint8_t(roundRobins_.size())};
bool hasOpenTransaction_{false};
bool useIndexMapForSequential_{false};
};
} // namespace quic

View File

@ -25,3 +25,14 @@ mvfst_cpp_test(
"//quic/priority:round_robin",
],
)
mvfst_cpp_test(
name = "http_priority_queue_test",
srcs = ["HTTPPriorityQueueTest.cpp"],
headers = [],
deps = [
"//folly/portability:gmock",
"//folly/portability:gtest",
"//quic/priority:http_priority_queue",
],
)

View File

@ -14,8 +14,9 @@ quic_add_test(TARGET PriorityQueueTest
SOURCES
PriorityQueueTest.cpp
RoundRobinTests.cpp
HTTPPriorityQueueTest.cpp
DEPENDS
Folly::folly
mvfst_constants
mvfst_round_robin
mvfst_http_priority_queue
)

View File

@ -0,0 +1,308 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>
#include <quic/priority/HTTPPriorityQueue.h>
#include <list>
namespace {
using namespace quic;
using Identifier = quic::PriorityQueue::Identifier;
class HTTPPriorityQueueTest : public testing::Test {
protected:
HTTPPriorityQueue queue_;
};
TEST_F(HTTPPriorityQueueTest, EmptyQueue) {
EXPECT_TRUE(queue_.empty());
}
TEST_F(HTTPPriorityQueueTest, Compare) {
std::vector<HTTPPriorityQueue::Priority> pris = {
PriorityQueue::Priority(),
{0, false},
{0, false, 1},
{0, true},
{7, false},
{7, false, std::numeric_limits<uint32_t>::max()},
{7, true},
{HTTPPriorityQueue::Priority::PAUSED}};
for (size_t i = 0; i < pris.size(); ++i) {
for (size_t j = 0; j < pris.size(); ++j) {
EXPECT_TRUE(
(i == j && pris[i] == pris[j]) || (i != j && !(pris[i] == pris[j])));
EXPECT_TRUE(
(i == j && queue_.equalPriority(pris[i], pris[j])) ||
(i != j && !queue_.equalPriority(pris[i], pris[j])));
}
}
// TODO: will change when default changes
EXPECT_EQ(pris[0], HTTPPriorityQueue::Priority(3, true));
}
TEST_F(HTTPPriorityQueueTest, InsertSingleElement) {
auto id = Identifier::fromStreamID(1);
auto priority = HTTPPriorityQueue::Priority(0, false);
queue_.insertOrUpdate(id, priority);
EXPECT_FALSE(queue_.empty());
EXPECT_EQ(queue_.getNextScheduledID(quic::none), id);
}
TEST_F(HTTPPriorityQueueTest, InsertMultipleElements) {
auto id1 = Identifier::fromStreamID(1);
auto id2 = Identifier::fromStreamID(2);
auto priority1 = HTTPPriorityQueue::Priority(0, false);
auto priority2 = HTTPPriorityQueue::Priority(1, false);
queue_.insertOrUpdate(id1, priority1);
queue_.insertOrUpdate(id2, priority2);
EXPECT_EQ(queue_.getNextScheduledID(quic::none), id1);
EXPECT_EQ(queue_.getNextScheduledID(quic::none), id1);
queue_.erase(id1);
EXPECT_EQ(queue_.getNextScheduledID(quic::none), id2);
}
TEST_F(HTTPPriorityQueueTest, UpdatePriority) {
auto id = Identifier::fromStreamID(1);
auto priority = HTTPPriorityQueue::Priority(0, false);
queue_.insertOrUpdate(id, priority);
auto newPriority = HTTPPriorityQueue::Priority(1, false);
queue_.updateIfExist(id, newPriority);
EXPECT_EQ(queue_.getNextScheduledID(quic::none), id);
queue_.updateIfExist(id, newPriority);
EXPECT_EQ(queue_.getNextScheduledID(quic::none), id);
}
TEST_F(HTTPPriorityQueueTest, EraseElement) {
auto id = Identifier::fromStreamID(1);
auto priority = HTTPPriorityQueue::Priority(0, false);
queue_.insertOrUpdate(id, priority);
queue_.erase(id);
EXPECT_TRUE(queue_.empty());
queue_.updateIfExist(id, HTTPPriorityQueue::Priority(0, false));
}
TEST_F(HTTPPriorityQueueTest, HeapUpOnErase) {
std::vector<size_t> items{1, 4, 2, 5, 6, 3};
for (auto i : items) {
// identical priority and order, sort by stream ID
queue_.insertOrUpdate(
Identifier::fromStreamID(i), HTTPPriorityQueue::Priority(0, false, 0));
}
queue_.erase(Identifier::fromStreamID(5)); // swaps with 3 which moves up
for (auto i = 1; i < 7; i++) {
if (i == 5) {
continue;
}
EXPECT_EQ(queue_.getNextScheduledID(quic::none).asUint64(), i);
queue_.erase(Identifier::fromStreamID(i));
}
EXPECT_TRUE(queue_.empty());
}
TEST_F(HTTPPriorityQueueTest, UpdateIncrementalToNonIncremental) {
auto id = Identifier::fromStreamID(1);
auto priority = HTTPPriorityQueue::Priority(7, true);
queue_.insertOrUpdate(id, priority);
auto id2 = Identifier::fromStreamID(2);
queue_.insertOrUpdate(id2, HTTPPriorityQueue::Priority(0, true));
// Update from incremental to non-incremental (updateIfExist)
queue_.updateIfExist(id, HTTPPriorityQueue::Priority(0, false));
EXPECT_TRUE(queue_.getNextScheduledID(quic::none) == id);
queue_.erase(id);
EXPECT_TRUE(queue_.getNextScheduledID(quic::none) == id2);
// Update from incremental to non-incremental (insertOrUpdate)
queue_.insertOrUpdate(id2, HTTPPriorityQueue::Priority(0, false));
EXPECT_TRUE(queue_.headPriority() == HTTPPriorityQueue::Priority(0, false));
}
TEST_F(HTTPPriorityQueueTest, UpdateNonIncrementalToIncremental) {
auto id = Identifier::fromStreamID(1);
auto priority = HTTPPriorityQueue::Priority(0, false);
queue_.insertOrUpdate(id, priority);
auto id2 = Identifier::fromStreamID(2);
queue_.insertOrUpdate(id2, HTTPPriorityQueue::Priority(0, true));
// Update from non-incremental to incremental
priority = HTTPPriorityQueue::Priority(0, true);
queue_.updateIfExist(id, priority);
EXPECT_TRUE(queue_.contains(id));
EXPECT_TRUE(queue_.getNextScheduledID(quic::none) == id2);
EXPECT_TRUE(queue_.getNextScheduledID(quic::none) == id);
}
TEST_F(HTTPPriorityQueueTest, UpdateIncrementalUrgency) {
auto id = Identifier::fromStreamID(1);
auto priority = HTTPPriorityQueue::Priority(0, true);
queue_.insertOrUpdate(id, priority);
// Update urgency of incremental priority from 0 -> 1
priority = HTTPPriorityQueue::Priority(1, true);
queue_.updateIfExist(id, priority);
EXPECT_TRUE(queue_.contains(id));
EXPECT_TRUE(queue_.getNextScheduledID(quic::none) == id);
EXPECT_TRUE(queue_.headPriority() == HTTPPriorityQueue::Priority(1, true));
}
TEST_F(HTTPPriorityQueueTest, InsertOrUpdateNoOp) {
auto id = Identifier::fromStreamID(1);
queue_.insertOrUpdate(id, HTTPPriorityQueue::Priority(0, true));
queue_.insertOrUpdate(id, HTTPPriorityQueue::Priority(0, true));
// Update urgency of incremental priority from 0 -> 1
queue_.updateIfExist(id, HTTPPriorityQueue::Priority(1, true));
EXPECT_TRUE(queue_.contains(id));
EXPECT_TRUE(queue_.getNextScheduledID(quic::none) == id);
EXPECT_TRUE(queue_.headPriority() == HTTPPriorityQueue::Priority(1, true));
}
TEST_F(HTTPPriorityQueueTest, PeekAndClear) {
for (size_t i = 0; i < 16; i++) {
queue_.insertOrUpdate(
Identifier::fromStreamID(i), HTTPPriorityQueue::Priority(i / 2, i % 2));
}
EXPECT_EQ(queue_.peekNextScheduledID(), Identifier::fromStreamID(0));
EXPECT_EQ(queue_.peekNextScheduledID(), Identifier::fromStreamID(0));
queue_.erase(Identifier::fromStreamID(0));
EXPECT_EQ(queue_.peekNextScheduledID(), Identifier::fromStreamID(1));
EXPECT_EQ(queue_.peekNextScheduledID(), Identifier::fromStreamID(1));
queue_.clear();
}
TEST_F(HTTPPriorityQueueTest, DoubleBeginTransaction) {
auto txn = queue_.beginTransaction();
queue_.insertOrUpdate(
Identifier::fromStreamID(0), HTTPPriorityQueue::Priority(7, false));
// begin without commit/rollback => rollback
txn = queue_.beginTransaction();
queue_.insertOrUpdate(
Identifier::fromStreamID(1), HTTPPriorityQueue::Priority(0, false));
queue_.rollbackTransaction(std::move(txn));
EXPECT_TRUE(queue_.contains(Identifier::fromStreamID(0)));
EXPECT_TRUE(queue_.contains(Identifier::fromStreamID(1)));
}
TEST_F(HTTPPriorityQueueTest, InsertWithoutTransaction) {
// no txn, erase not rollbackable.
queue_.insertOrUpdate(
Identifier::fromStreamID(0), HTTPPriorityQueue::Priority(7, false));
queue_.erase(Identifier::fromStreamID(0));
// beginTransaction here
auto txn = queue_.beginTransaction();
queue_.insertOrUpdate(
Identifier::fromStreamID(1), HTTPPriorityQueue::Priority(0, false));
queue_.rollbackTransaction(std::move(txn));
EXPECT_FALSE(queue_.contains(Identifier::fromStreamID(0)));
EXPECT_TRUE(queue_.contains(Identifier::fromStreamID(1)));
}
TEST_F(HTTPPriorityQueueTest, Paused) {
auto id = Identifier::fromStreamID(0);
// insert paused -> nope
HTTPPriorityQueue::Priority paused(HTTPPriorityQueue::Priority::PAUSED);
queue_.insertOrUpdate(id, paused);
EXPECT_TRUE(queue_.empty());
// update unpaused(seq) -> paused: deleted
queue_.insertOrUpdate(id, HTTPPriorityQueue::Priority(0, false));
EXPECT_EQ(queue_.peekNextScheduledID(), id);
queue_.updateIfExist(id, paused);
EXPECT_TRUE(queue_.empty());
// update from paused to unpaused: no-op -- is this right?
queue_.updateIfExist(id, HTTPPriorityQueue::Priority(0, true));
EXPECT_TRUE(queue_.empty());
// update unpaused(rr) -> paused: deleted
queue_.insertOrUpdate(id, HTTPPriorityQueue::Priority(0, true));
EXPECT_EQ(queue_.peekNextScheduledID(), id);
queue_.updateIfExist(id, paused);
EXPECT_TRUE(queue_.empty());
}
TEST_F(HTTPPriorityQueueTest, ComplexOperations) {
std::vector<HTTPPriorityQueue::Priority> ids;
auto txn = queue_.beginTransaction();
// Insert elements with different priorities
for (int i = 0; i < 20; ++i) {
// every 4th stream has same pri, and those 5 streams are in reverse stream
// ID order, 6 and 14 are RR.
auto priority = HTTPPriorityQueue::Priority(i % 4, i % 8 == 6, 20 - i);
ids.push_back(priority);
queue_.insertOrUpdate(Identifier::fromStreamID(i), priority);
}
// Update some priorities to shuffle their position
auto setPriority = [&](size_t index,
const HTTPPriorityQueue::Priority& pri) mutable {
ids[index] = pri;
queue_.updateIfExist(Identifier::fromStreamID(index), pri);
};
setPriority(5, HTTPPriorityQueue::Priority(0, false));
setPriority(10, HTTPPriorityQueue::Priority(3, false));
setPriority(15, HTTPPriorityQueue::Priority(1, false));
// Erase some elements
queue_.erase(Identifier::fromStreamID(5)); // highest pri
queue_.erase(Identifier::fromStreamID(7)); // lowest pri
queue_.erase(Identifier::fromStreamID(12));
queue_.commitTransaction(std::move(txn));
// Call getNextScheduledID + erase until the queue is empty
HTTPPriorityQueue::Priority lastPriority = queue_.headPriority();
// clang-format off
std::list<size_t> expectedOrder{
/*u=0*/ 16, 8, 4, 0,
/*u=1*/ 15, 17, 13, 9, 1,
/*u=2*/ 18, 2,
/*u=2,i*/ 6, 14,
/*u=3*/ 10, 19, 11, 3
};
// clang-format on
txn = queue_.beginTransaction();
while (!queue_.empty()) {
// priorities should not decrease
auto headPriority = queue_.headPriority();
CHECK(lastPriority == headPriority || lastPriority < headPriority);
lastPriority = headPriority;
auto nextId = queue_.peekNextScheduledID();
queue_.consume(quic::none);
CHECK_EQ(nextId.asUint64(), expectedOrder.front());
expectedOrder.pop_front();
auto expectedPri = ids[nextId.asUint64()];
CHECK(expectedPri == headPriority);
queue_.erase(nextId);
CHECK(!queue_.contains(nextId));
}
queue_.rollbackTransaction(std::move(txn));
EXPECT_FALSE(queue_.empty());
EXPECT_TRUE(
queue_.headPriority() == HTTPPriorityQueue::Priority(0, false, 4));
}
TEST_F(HTTPPriorityQueueTest, IndexEverything) {
// Insert elements with different priorities
for (int i = 1; i < 200; ++i) {
auto priority = HTTPPriorityQueue::Priority(i % 8, i % 16 == 0, 200 - i);
queue_.insertOrUpdate(Identifier::fromStreamID(i), priority);
}
// 3 is a generator for the prime set modulo 199
size_t g = 3;
size_t id = g;
for (size_t x = 0; x < 199; x++) {
queue_.erase(Identifier::fromStreamID(id));
id = (id * g) % 199;
}
// The only number left is 199
EXPECT_EQ(queue_.peekNextScheduledID().asUint64(), 199);
queue_.erase(Identifier::fromStreamID(199));
EXPECT_TRUE(queue_.empty());
}
} // namespace