diff --git a/quic/priority/BUCK b/quic/priority/BUCK index 536b77049..362eca5e4 100644 --- a/quic/priority/BUCK +++ b/quic/priority/BUCK @@ -31,3 +31,18 @@ mvfst_cpp_library( "//quic/common:optional", ], ) + +mvfst_cpp_library( + name = "http_priority_queue", + srcs = [ + "HTTPPriorityQueue.cpp", + ], + headers = [ + "HTTPPriorityQueue.h", + ], + exported_deps = [ + ":priority_queue", + ":round_robin", + "//folly/container:f14_hash", + ], +) diff --git a/quic/priority/CMakeLists.txt b/quic/priority/CMakeLists.txt index 3111865a3..dbbb63b5a 100644 --- a/quic/priority/CMakeLists.txt +++ b/quic/priority/CMakeLists.txt @@ -42,4 +42,43 @@ install( DESTINATION ${CMAKE_INSTALL_LIBDIR} ) +add_library( + mvfst_http_priority_queue + HTTPPriorityQueue.cpp +) + +target_include_directories( + mvfst_http_priority_queue PUBLIC + $ + $ +) + +target_compile_options( + mvfst_http_priority_queue + PRIVATE + ${_QUIC_COMMON_COMPILE_OPTIONS} +) + +target_link_libraries( + mvfst_http_priority_queue PUBLIC + Folly::folly +) + +file( + GLOB_RECURSE QUIC_API_HEADERS_TOINSTALL + RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} + *.h +) +list(FILTER QUIC_API_HEADERS_TOINSTALL EXCLUDE REGEX test/) +foreach(header ${QUIC_API_HEADERS_TOINSTALL}) + get_filename_component(header_dir ${header} DIRECTORY) + install(FILES ${header} DESTINATION include/quic/priority/${header_dir}) +endforeach() + +install( + TARGETS mvfst_http_priority_queue + EXPORT mvfst-exports + DESTINATION ${CMAKE_INSTALL_LIBDIR} +) + add_subdirectory(test) diff --git a/quic/priority/HTTPPriorityQueue.cpp b/quic/priority/HTTPPriorityQueue.cpp new file mode 100644 index 000000000..2159e574a --- /dev/null +++ b/quic/priority/HTTPPriorityQueue.cpp @@ -0,0 +1,312 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +namespace { +constexpr size_t kBuildIndexThreshold = 100; +constexpr size_t kDestroyIndexThreshold = 50; +} // namespace + +namespace quic { + +quic::Optional HTTPPriorityQueue::find( + Identifier id) const { + auto it = indexMap_.find(id); + if (it != indexMap_.end()) { + return FindResult{it->second, it}; + } + if (!useIndexMapForSequential_) { + // linear search the heap + for (size_t i = 0; i < heap_.size(); i++) { + auto& elem = heap_[i]; + if (!elem.priority->incremental && elem.identifier == id) { + return FindResult{IndexMapElem{false, i}, indexMap_.end()}; + } + } + } + return quic::none; +} + +void HTTPPriorityQueue::addIndex(Identifier id, IndexMapElem indexElem) { + if (useIndexMapForSequential_ || indexElem.incremental) { + indexMap_[id] = indexElem; + } +} + +void HTTPPriorityQueue::removeIndex(IndexMap::const_iterator it) { + if (it != indexMap_.end() && + (useIndexMapForSequential_ || it->second.incremental)) { + indexMap_.erase(it); + } +} + +void HTTPPriorityQueue::buildSequentialIndex() { + for (size_t i = 0; i < heap_.size(); i++) { + auto& elem = heap_[i]; + if (!elem.priority->incremental) { + addIndex(elem.identifier, {false, i}); + } + } +} + +void HTTPPriorityQueue::destroySequentialIndex() { + for (auto it = indexMap_.begin(); it != indexMap_.end();) { + if (it->second.incremental) { + ++it; + } else { + it = indexMap_.erase(it); + } + } + useIndexMapForSequential_ = false; +} + +void HTTPPriorityQueue::insertOrUpdate( + Identifier id, + PriorityQueue::Priority basePriority) { + Priority priority(basePriority); + auto findResult = find(id); + if (findResult) { + if (updateInSequential(findResult->elem, priority)) { + return; + } else { + // moving in/out of a RR, just erase + eraseImpl(id, findResult->elem); + removeIndex(findResult->indexIt); + } + } + if (!priority->paused) { + insert(id, priority); + } +} + +void HTTPPriorityQueue::updateIfExist( + Identifier id, + PriorityQueue::Priority basePriority) { + Priority priority(basePriority); + auto findResult = find(id); + if (!findResult) { + return; + } + if (!updateInSequential(findResult->elem, priority)) { + // moving in/out of a RR/paused, just erase + bool wasIncremental = findResult->elem.incremental; + eraseImpl(id, findResult->elem); + if (priority->paused) { + removeIndex(findResult->indexIt); + return; + } + if (wasIncremental && !priority->incremental && + !useIndexMapForSequential_) { + removeIndex(findResult->indexIt); + } // else don't need removeIndex -- it will get updated + insert(id, priority); + } +} + +void HTTPPriorityQueue::erase(Identifier id) { + auto findResult = find(id); + if (findResult) { + Priority priority(0, false, 0); + if (findResult->elem.incremental) { + priority = Priority(findResult->elem.index, true, 0); + } else { + priority = heap_[findResult->elem.index].priority; + } + if (hasOpenTransaction_) { + erased_.emplace_back(std::move(priority), id); + } + eraseImpl(id, findResult->elem); + removeIndex(findResult->indexIt); + } + if (useIndexMapForSequential_ && heap_.size() < kDestroyIndexThreshold) { + destroySequentialIndex(); + } +} + +void HTTPPriorityQueue::clear() { + heap_.clear(); + indexMap_.clear(); + useIndexMapForSequential_ = false; + for (auto& rr : roundRobins_) { + rr.clear(); + } +} + +const HTTPPriorityQueue::Element* FOLLY_NULLABLE +HTTPPriorityQueue::top() const { + uint8_t topPri = roundRobins_.size(); + const Element* topElem = nullptr; + if (!heap_.empty()) { + topElem = &heap_.front(); + topPri = topElem->priority->urgency; + } + if (lowestRoundRobin_ < topPri && !roundRobins_[lowestRoundRobin_].empty()) { + return nullptr; + } + CHECK(topElem) << "Empty"; + return topElem; +} + +quic::PriorityQueue::Identifier HTTPPriorityQueue::getNextScheduledID( + quic::Optional previousConsumed) { + auto elem = top(); + if (elem) { + return elem->identifier; + } else { + return roundRobins_[lowestRoundRobin_].getNext(previousConsumed); + } +} + +quic::PriorityQueue::Identifier HTTPPriorityQueue::peekNextScheduledID() const { + auto elem = top(); + if (elem) { + return elem->identifier; + } else { + return roundRobins_[lowestRoundRobin_].peekNext(); + } +} + +void HTTPPriorityQueue::consume(quic::Optional consumed) { + auto elem = top(); + if (!elem) { + roundRobins_[lowestRoundRobin_].consume(consumed); + } +} + +HTTPPriorityQueue::Priority HTTPPriorityQueue::headPriority() const { + auto elem = top(); + if (elem) { + return elem->priority; + } else { + return {lowestRoundRobin_, true}; + } +} + +void HTTPPriorityQueue::heapifyUp(size_t index) { + while (index > 0) { + size_t parentIndex = (index - 1) / 2; + if (heap_[parentIndex] <= heap_[index]) { + break; + } + // Swap elements and update index map + std::swap(heap_[parentIndex], heap_[index]); + assignIndex(heap_[parentIndex], parentIndex); + assignIndex(heap_[index], index); + index = parentIndex; + } +} + +void HTTPPriorityQueue::heapifyDown(size_t index) { + while (true) { + size_t smallest = index; + size_t leftChildIndex = 2 * index + 1; + size_t rightChildIndex = 2 * index + 2; + + if (leftChildIndex < heap_.size() && + heap_[leftChildIndex] < heap_[smallest]) { + smallest = leftChildIndex; + } + + if (rightChildIndex < heap_.size() && + heap_[rightChildIndex] < heap_[smallest]) { + smallest = rightChildIndex; + } + + if (smallest == index) { + break; + } + + // Swap elements and update index map + std::swap(heap_[smallest], heap_[index]); + assignIndex(heap_[smallest], smallest); + assignIndex(heap_[index], index); + index = smallest; + } +} + +void HTTPPriorityQueue::assignIndex(Element& element, size_t index) { + CHECK(!element.priority->incremental); + addIndex(element.identifier, {false, index}); +} + +void HTTPPriorityQueue::insert(Identifier id, const Priority& priority) { + if (!useIndexMapForSequential_ && heap_.size() >= kBuildIndexThreshold) { + useIndexMapForSequential_ = true; + buildSequentialIndex(); + } + if (priority->incremental) { + auto& rr = roundRobins_[priority->urgency]; + rr.insert(id); + roundRobinElements_++; + addIndex(id, {true, priority->urgency}); + if (priority->urgency < lowestRoundRobin_) { + lowestRoundRobin_ = priority->urgency; + } + } else { + heap_.emplace_back(priority, id); + auto index = heap_.size() - 1; + addIndex(id, {false, index}); + heapifyUp(index); + } +} + +bool HTTPPriorityQueue::updateInSequential( + IndexMapElem indexElem, + Priority priority) { + if (priority->paused) { + return false; + } + if (indexElem.incremental || priority->incremental) { + if (indexElem.incremental && priority->incremental) { + return indexElem.index == priority->urgency; + } + return false; + } + auto index = indexElem.index; + auto& elem = heap_[index]; + if (elem.priority == priority) { + return true; // no-op + } + std::swap(elem.priority, priority); + if (elem.priority < priority) { + heapifyUp(index); + } else { + heapifyDown(index); + } + return true; +} + +void HTTPPriorityQueue::eraseImpl(Identifier id, IndexMapElem indexElem) { + auto index = indexElem.index; + if (indexElem.incremental) { + auto& rr = roundRobins_[index]; + rr.erase(id); + roundRobinElements_--; + if (index == lowestRoundRobin_ && rr.empty()) { + while (lowestRoundRobin_ < roundRobins_.size() && + roundRobins_[lowestRoundRobin_].empty()) { + lowestRoundRobin_++; + } + } + } else { + auto lastIndex = heap_.size() - 1; + std::swap(heap_[index], heap_[lastIndex]); + assignIndex(heap_[index], index); + heap_.pop_back(); + + if (index != lastIndex) { + if (index > 0 && heap_[index] < heap_[(index - 1) / 2]) { + heapifyUp(index); + } else { + heapifyDown(index); + } + } // special case, erasing the last element + } +} + +} // namespace quic diff --git a/quic/priority/HTTPPriorityQueue.h b/quic/priority/HTTPPriorityQueue.h new file mode 100644 index 000000000..9116650c5 --- /dev/null +++ b/quic/priority/HTTPPriorityQueue.h @@ -0,0 +1,243 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include + +#include + +namespace quic { + +class HTTPPriorityQueue : public quic::PriorityQueue { + // Element in the IndexMap. If incremental is true, index applies to + // roundRobins_, otherwise it applies to heap_. + struct IndexMapElem { + bool incremental : 1; + uint64_t index : 63; + }; + using IndexMap = + folly::F14ValueMap; + + public: + class Priority : public quic::PriorityQueue::Priority { + public: + using OrderId = uint64_t; + struct HTTPPriority { + uint8_t urgency : 3; + bool paused : 1; + bool incremental : 1; + OrderId order : 59; + }; + + // TODO: change default priority to (3, false, false, 0) to match spec + static constexpr HTTPPriority kDefaultPriority{3, false, true, 0}; + + /*implicit*/ Priority(const PriorityQueue::Priority& basePriority) + : PriorityQueue::Priority(basePriority) { + if (!isInitialized()) { + getFields() = kDefaultPriority; + } + } + + Priority(uint8_t u, bool i, OrderId o = 0) { + auto& fields = getFields(); + fields.urgency = u; + fields.incremental = i; + fields.order = (i ? 0 : o); + fields.paused = false; + } + + enum Paused { PAUSED }; + /* implicit */ Priority(Paused) : Priority(0, false, 0) { + getFields().paused = true; + } + + Priority(const Priority&) = default; + Priority& operator=(const Priority&) = default; + Priority(Priority&&) = default; + Priority& operator=(Priority&&) = default; + ~Priority() override = default; + + const HTTPPriority* operator->() const { + return &getFields(); + } + + bool operator==(const Priority& other) const { + auto asUint64 = toUint64(); + auto otherAsUint64 = other.toUint64(); + if (asUint64 == otherAsUint64) { + return true; + } + // The only other way to be equal is if one is initialized and the other + // is default + const static uint64_t kDefaultUint64 = Priority( + kDefaultPriority.urgency, + kDefaultPriority.incremental, + kDefaultPriority.order) + .toUint64(); + return (kDefaultUint64 == otherAsUint64 && !isInitialized()) || + (asUint64 == kDefaultUint64 && !other.isInitialized()); + } + + bool operator<(const Priority& other) const { + return toUint64() < other.toUint64(); + } + + [[nodiscard]] uint64_t toUint64() const { + auto& fields = getFields(); + return ( + (uint64_t(fields.urgency) << 61) | (uint64_t(fields.paused) << 60) | + (uint64_t(fields.incremental) << 59) | fields.order); + } + + [[nodiscard]] const HTTPPriority& getFields() const { + return getPriority(); + } + + private: + HTTPPriority& getFields() { + return getPriority(); + } + }; + + void advanceAfterNext(size_t n) { + for (auto& rr : roundRobins_) { + rr.advanceAfterNext(n); + } + } + + void advanceAfterBytes(uint64_t bytes) { + for (auto& rr : roundRobins_) { + rr.advanceAfterBytes(bytes); + } + } + + [[nodiscard]] bool empty() const override { + return heap_.empty() && roundRobinElements_ == 0; + } + + [[nodiscard]] bool equalPriority( + const PriorityQueue::Priority& p1, + const PriorityQueue::Priority& p2) const override { + return static_cast(p1) == + static_cast(p2); + } + + [[nodiscard]] bool contains(Identifier id) const override { + return find(id) != quic::none; + } + + void insertOrUpdate(Identifier id, PriorityQueue::Priority priority) override; + + void updateIfExist(Identifier id, PriorityQueue::Priority priority) override; + + void erase(Identifier id) override; + + void clear() override; + + Identifier getNextScheduledID( + quic::Optional previousConsumed) override; + + [[nodiscard]] Identifier peekNextScheduledID() const override; + + void consume(quic::Optional consumed) override; + + // Note: transactions only reinsert erased transactions at previous priority + // they don't undo inserts, updates, or consume. + Transaction beginTransaction() override { + if (hasOpenTransaction_) { + rollbackTransaction(makeTransaction()); + } + hasOpenTransaction_ = true; + return makeTransaction(); + } + + void commitTransaction(Transaction&&) override { + if (hasOpenTransaction_) { + hasOpenTransaction_ = false; + erased_.clear(); + } + } + + void rollbackTransaction(Transaction&&) override { + if (hasOpenTransaction_) { + for (auto& e : erased_) { + insert(e.identifier, e.priority); + } + erased_.clear(); + hasOpenTransaction_ = false; + } + } + + [[nodiscard]] Priority headPriority() const; + + private: + // Heap Element. If priority.incremental is true, then Identifier is + // uninitialized - the element is a placeholder for the RoundRobin at + // roundRobins_[priority.urgency]. + // + // In the current design, there are no elements with Incremental priority in + // the heap. + struct Element { + Element(Priority p, Identifier i) : priority(std::move(p)), identifier(i) {} + Priority priority; + Identifier identifier; + + bool operator<(const Element& other) const { + if (priority < other.priority) { + return true; + } + if (other.priority < priority || other.priority->incremental) { + return false; + } + // sequential priorities are equal + return identifier.asUint64() < other.identifier.asUint64(); + } + bool operator<=(const Element& other) const { + return !(other < *this); + } + }; + + struct FindResult { + IndexMapElem elem; + IndexMap::const_iterator indexIt; + }; + [[nodiscard]] quic::Optional find(Identifier id) const; + void addIndex(Identifier id, IndexMapElem indexElem); + void removeIndex(IndexMap::const_iterator it); + void buildSequentialIndex(); + void destroySequentialIndex(); + + void heapifyUp(size_t index); + void heapifyDown(size_t index); + void assignIndex(Element& element, size_t index); + void insert(Identifier id, const Priority& priority); + bool updateInSequential(IndexMapElem indexElem, Priority priority); + void eraseImpl(Identifier id, IndexMapElem indexElem); + + [[nodiscard]] const Element* FOLLY_NULLABLE top() const; + + // Holds sequential elements + std::vector heap_; + // Map from id -> RoundRobin or Heap Index + IndexMap indexMap_; + // Holds incremental elements + std::array roundRobins_; + // Holds erased elements from the current transaction + std::vector erased_; + // Count of Round Robin elements in the Queue + uint32_t roundRobinElements_{0}; + // The index of the first non-empty RoundRobin, or roundRobins_.size() + uint8_t lowestRoundRobin_{uint8_t(roundRobins_.size())}; + bool hasOpenTransaction_{false}; + bool useIndexMapForSequential_{false}; +}; + +} // namespace quic diff --git a/quic/priority/test/BUCK b/quic/priority/test/BUCK index 2dd604c58..1cb29c1f4 100644 --- a/quic/priority/test/BUCK +++ b/quic/priority/test/BUCK @@ -25,3 +25,14 @@ mvfst_cpp_test( "//quic/priority:round_robin", ], ) + +mvfst_cpp_test( + name = "http_priority_queue_test", + srcs = ["HTTPPriorityQueueTest.cpp"], + headers = [], + deps = [ + "//folly/portability:gmock", + "//folly/portability:gtest", + "//quic/priority:http_priority_queue", + ], +) diff --git a/quic/priority/test/CMakeLists.txt b/quic/priority/test/CMakeLists.txt index 736b85f5e..893f28a53 100644 --- a/quic/priority/test/CMakeLists.txt +++ b/quic/priority/test/CMakeLists.txt @@ -14,8 +14,9 @@ quic_add_test(TARGET PriorityQueueTest SOURCES PriorityQueueTest.cpp RoundRobinTests.cpp + HTTPPriorityQueueTest.cpp DEPENDS Folly::folly - mvfst_constants mvfst_round_robin + mvfst_http_priority_queue ) diff --git a/quic/priority/test/HTTPPriorityQueueTest.cpp b/quic/priority/test/HTTPPriorityQueueTest.cpp new file mode 100644 index 000000000..cd7f3a8e8 --- /dev/null +++ b/quic/priority/test/HTTPPriorityQueueTest.cpp @@ -0,0 +1,308 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include +#include + +namespace { + +using namespace quic; +using Identifier = quic::PriorityQueue::Identifier; +class HTTPPriorityQueueTest : public testing::Test { + protected: + HTTPPriorityQueue queue_; +}; + +TEST_F(HTTPPriorityQueueTest, EmptyQueue) { + EXPECT_TRUE(queue_.empty()); +} + +TEST_F(HTTPPriorityQueueTest, Compare) { + std::vector pris = { + PriorityQueue::Priority(), + {0, false}, + {0, false, 1}, + {0, true}, + {7, false}, + {7, false, std::numeric_limits::max()}, + {7, true}, + {HTTPPriorityQueue::Priority::PAUSED}}; + for (size_t i = 0; i < pris.size(); ++i) { + for (size_t j = 0; j < pris.size(); ++j) { + EXPECT_TRUE( + (i == j && pris[i] == pris[j]) || (i != j && !(pris[i] == pris[j]))); + EXPECT_TRUE( + (i == j && queue_.equalPriority(pris[i], pris[j])) || + (i != j && !queue_.equalPriority(pris[i], pris[j]))); + } + } + // TODO: will change when default changes + EXPECT_EQ(pris[0], HTTPPriorityQueue::Priority(3, true)); +} + +TEST_F(HTTPPriorityQueueTest, InsertSingleElement) { + auto id = Identifier::fromStreamID(1); + auto priority = HTTPPriorityQueue::Priority(0, false); + queue_.insertOrUpdate(id, priority); + EXPECT_FALSE(queue_.empty()); + EXPECT_EQ(queue_.getNextScheduledID(quic::none), id); +} + +TEST_F(HTTPPriorityQueueTest, InsertMultipleElements) { + auto id1 = Identifier::fromStreamID(1); + auto id2 = Identifier::fromStreamID(2); + auto priority1 = HTTPPriorityQueue::Priority(0, false); + auto priority2 = HTTPPriorityQueue::Priority(1, false); + queue_.insertOrUpdate(id1, priority1); + queue_.insertOrUpdate(id2, priority2); + EXPECT_EQ(queue_.getNextScheduledID(quic::none), id1); + EXPECT_EQ(queue_.getNextScheduledID(quic::none), id1); + queue_.erase(id1); + EXPECT_EQ(queue_.getNextScheduledID(quic::none), id2); +} + +TEST_F(HTTPPriorityQueueTest, UpdatePriority) { + auto id = Identifier::fromStreamID(1); + auto priority = HTTPPriorityQueue::Priority(0, false); + queue_.insertOrUpdate(id, priority); + auto newPriority = HTTPPriorityQueue::Priority(1, false); + queue_.updateIfExist(id, newPriority); + EXPECT_EQ(queue_.getNextScheduledID(quic::none), id); + queue_.updateIfExist(id, newPriority); + EXPECT_EQ(queue_.getNextScheduledID(quic::none), id); +} + +TEST_F(HTTPPriorityQueueTest, EraseElement) { + auto id = Identifier::fromStreamID(1); + auto priority = HTTPPriorityQueue::Priority(0, false); + queue_.insertOrUpdate(id, priority); + queue_.erase(id); + EXPECT_TRUE(queue_.empty()); + queue_.updateIfExist(id, HTTPPriorityQueue::Priority(0, false)); +} + +TEST_F(HTTPPriorityQueueTest, HeapUpOnErase) { + std::vector items{1, 4, 2, 5, 6, 3}; + for (auto i : items) { + // identical priority and order, sort by stream ID + queue_.insertOrUpdate( + Identifier::fromStreamID(i), HTTPPriorityQueue::Priority(0, false, 0)); + } + queue_.erase(Identifier::fromStreamID(5)); // swaps with 3 which moves up + for (auto i = 1; i < 7; i++) { + if (i == 5) { + continue; + } + EXPECT_EQ(queue_.getNextScheduledID(quic::none).asUint64(), i); + queue_.erase(Identifier::fromStreamID(i)); + } + EXPECT_TRUE(queue_.empty()); +} + +TEST_F(HTTPPriorityQueueTest, UpdateIncrementalToNonIncremental) { + auto id = Identifier::fromStreamID(1); + auto priority = HTTPPriorityQueue::Priority(7, true); + queue_.insertOrUpdate(id, priority); + auto id2 = Identifier::fromStreamID(2); + queue_.insertOrUpdate(id2, HTTPPriorityQueue::Priority(0, true)); + + // Update from incremental to non-incremental (updateIfExist) + queue_.updateIfExist(id, HTTPPriorityQueue::Priority(0, false)); + EXPECT_TRUE(queue_.getNextScheduledID(quic::none) == id); + queue_.erase(id); + EXPECT_TRUE(queue_.getNextScheduledID(quic::none) == id2); + // Update from incremental to non-incremental (insertOrUpdate) + queue_.insertOrUpdate(id2, HTTPPriorityQueue::Priority(0, false)); + EXPECT_TRUE(queue_.headPriority() == HTTPPriorityQueue::Priority(0, false)); +} + +TEST_F(HTTPPriorityQueueTest, UpdateNonIncrementalToIncremental) { + auto id = Identifier::fromStreamID(1); + auto priority = HTTPPriorityQueue::Priority(0, false); + queue_.insertOrUpdate(id, priority); + auto id2 = Identifier::fromStreamID(2); + queue_.insertOrUpdate(id2, HTTPPriorityQueue::Priority(0, true)); + + // Update from non-incremental to incremental + priority = HTTPPriorityQueue::Priority(0, true); + queue_.updateIfExist(id, priority); + EXPECT_TRUE(queue_.contains(id)); + EXPECT_TRUE(queue_.getNextScheduledID(quic::none) == id2); + EXPECT_TRUE(queue_.getNextScheduledID(quic::none) == id); +} + +TEST_F(HTTPPriorityQueueTest, UpdateIncrementalUrgency) { + auto id = Identifier::fromStreamID(1); + auto priority = HTTPPriorityQueue::Priority(0, true); + queue_.insertOrUpdate(id, priority); + + // Update urgency of incremental priority from 0 -> 1 + priority = HTTPPriorityQueue::Priority(1, true); + queue_.updateIfExist(id, priority); + EXPECT_TRUE(queue_.contains(id)); + EXPECT_TRUE(queue_.getNextScheduledID(quic::none) == id); + EXPECT_TRUE(queue_.headPriority() == HTTPPriorityQueue::Priority(1, true)); +} + +TEST_F(HTTPPriorityQueueTest, InsertOrUpdateNoOp) { + auto id = Identifier::fromStreamID(1); + queue_.insertOrUpdate(id, HTTPPriorityQueue::Priority(0, true)); + queue_.insertOrUpdate(id, HTTPPriorityQueue::Priority(0, true)); + + // Update urgency of incremental priority from 0 -> 1 + queue_.updateIfExist(id, HTTPPriorityQueue::Priority(1, true)); + EXPECT_TRUE(queue_.contains(id)); + EXPECT_TRUE(queue_.getNextScheduledID(quic::none) == id); + EXPECT_TRUE(queue_.headPriority() == HTTPPriorityQueue::Priority(1, true)); +} + +TEST_F(HTTPPriorityQueueTest, PeekAndClear) { + for (size_t i = 0; i < 16; i++) { + queue_.insertOrUpdate( + Identifier::fromStreamID(i), HTTPPriorityQueue::Priority(i / 2, i % 2)); + } + EXPECT_EQ(queue_.peekNextScheduledID(), Identifier::fromStreamID(0)); + EXPECT_EQ(queue_.peekNextScheduledID(), Identifier::fromStreamID(0)); + queue_.erase(Identifier::fromStreamID(0)); + EXPECT_EQ(queue_.peekNextScheduledID(), Identifier::fromStreamID(1)); + EXPECT_EQ(queue_.peekNextScheduledID(), Identifier::fromStreamID(1)); + queue_.clear(); +} + +TEST_F(HTTPPriorityQueueTest, DoubleBeginTransaction) { + auto txn = queue_.beginTransaction(); + queue_.insertOrUpdate( + Identifier::fromStreamID(0), HTTPPriorityQueue::Priority(7, false)); + // begin without commit/rollback => rollback + txn = queue_.beginTransaction(); + queue_.insertOrUpdate( + Identifier::fromStreamID(1), HTTPPriorityQueue::Priority(0, false)); + queue_.rollbackTransaction(std::move(txn)); + EXPECT_TRUE(queue_.contains(Identifier::fromStreamID(0))); + EXPECT_TRUE(queue_.contains(Identifier::fromStreamID(1))); +} + +TEST_F(HTTPPriorityQueueTest, InsertWithoutTransaction) { + // no txn, erase not rollbackable. + queue_.insertOrUpdate( + Identifier::fromStreamID(0), HTTPPriorityQueue::Priority(7, false)); + queue_.erase(Identifier::fromStreamID(0)); + // beginTransaction here + auto txn = queue_.beginTransaction(); + queue_.insertOrUpdate( + Identifier::fromStreamID(1), HTTPPriorityQueue::Priority(0, false)); + queue_.rollbackTransaction(std::move(txn)); + EXPECT_FALSE(queue_.contains(Identifier::fromStreamID(0))); + EXPECT_TRUE(queue_.contains(Identifier::fromStreamID(1))); +} + +TEST_F(HTTPPriorityQueueTest, Paused) { + auto id = Identifier::fromStreamID(0); + // insert paused -> nope + HTTPPriorityQueue::Priority paused(HTTPPriorityQueue::Priority::PAUSED); + queue_.insertOrUpdate(id, paused); + EXPECT_TRUE(queue_.empty()); + + // update unpaused(seq) -> paused: deleted + queue_.insertOrUpdate(id, HTTPPriorityQueue::Priority(0, false)); + EXPECT_EQ(queue_.peekNextScheduledID(), id); + queue_.updateIfExist(id, paused); + EXPECT_TRUE(queue_.empty()); + + // update from paused to unpaused: no-op -- is this right? + queue_.updateIfExist(id, HTTPPriorityQueue::Priority(0, true)); + EXPECT_TRUE(queue_.empty()); + + // update unpaused(rr) -> paused: deleted + queue_.insertOrUpdate(id, HTTPPriorityQueue::Priority(0, true)); + EXPECT_EQ(queue_.peekNextScheduledID(), id); + queue_.updateIfExist(id, paused); + EXPECT_TRUE(queue_.empty()); +} + +TEST_F(HTTPPriorityQueueTest, ComplexOperations) { + std::vector ids; + auto txn = queue_.beginTransaction(); + // Insert elements with different priorities + for (int i = 0; i < 20; ++i) { + // every 4th stream has same pri, and those 5 streams are in reverse stream + // ID order, 6 and 14 are RR. + auto priority = HTTPPriorityQueue::Priority(i % 4, i % 8 == 6, 20 - i); + ids.push_back(priority); + queue_.insertOrUpdate(Identifier::fromStreamID(i), priority); + } + + // Update some priorities to shuffle their position + auto setPriority = [&](size_t index, + const HTTPPriorityQueue::Priority& pri) mutable { + ids[index] = pri; + queue_.updateIfExist(Identifier::fromStreamID(index), pri); + }; + setPriority(5, HTTPPriorityQueue::Priority(0, false)); + setPriority(10, HTTPPriorityQueue::Priority(3, false)); + setPriority(15, HTTPPriorityQueue::Priority(1, false)); + + // Erase some elements + queue_.erase(Identifier::fromStreamID(5)); // highest pri + queue_.erase(Identifier::fromStreamID(7)); // lowest pri + queue_.erase(Identifier::fromStreamID(12)); + queue_.commitTransaction(std::move(txn)); + + // Call getNextScheduledID + erase until the queue is empty + HTTPPriorityQueue::Priority lastPriority = queue_.headPriority(); + // clang-format off + std::list expectedOrder{ + /*u=0*/ 16, 8, 4, 0, + /*u=1*/ 15, 17, 13, 9, 1, + /*u=2*/ 18, 2, + /*u=2,i*/ 6, 14, + /*u=3*/ 10, 19, 11, 3 + }; + // clang-format on + + txn = queue_.beginTransaction(); + while (!queue_.empty()) { + // priorities should not decrease + auto headPriority = queue_.headPriority(); + CHECK(lastPriority == headPriority || lastPriority < headPriority); + lastPriority = headPriority; + auto nextId = queue_.peekNextScheduledID(); + queue_.consume(quic::none); + CHECK_EQ(nextId.asUint64(), expectedOrder.front()); + expectedOrder.pop_front(); + auto expectedPri = ids[nextId.asUint64()]; + CHECK(expectedPri == headPriority); + queue_.erase(nextId); + CHECK(!queue_.contains(nextId)); + } + queue_.rollbackTransaction(std::move(txn)); + EXPECT_FALSE(queue_.empty()); + EXPECT_TRUE( + queue_.headPriority() == HTTPPriorityQueue::Priority(0, false, 4)); +} + +TEST_F(HTTPPriorityQueueTest, IndexEverything) { + // Insert elements with different priorities + for (int i = 1; i < 200; ++i) { + auto priority = HTTPPriorityQueue::Priority(i % 8, i % 16 == 0, 200 - i); + queue_.insertOrUpdate(Identifier::fromStreamID(i), priority); + } + // 3 is a generator for the prime set modulo 199 + size_t g = 3; + size_t id = g; + for (size_t x = 0; x < 199; x++) { + queue_.erase(Identifier::fromStreamID(id)); + id = (id * g) % 199; + } + // The only number left is 199 + EXPECT_EQ(queue_.peekNextScheduledID().asUint64(), 199); + queue_.erase(Identifier::fromStreamID(199)); + EXPECT_TRUE(queue_.empty()); +} +} // namespace