mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-04-18 17:24:03 +03:00
Summary: This will be needed by logging components - eg QLog Reviewed By: hanidamlaj Differential Revision: D72476485 fbshipit-source-id: 3b7f3e81e87fc6cc2a252293208863d33bd5df50
328 lines
8.6 KiB
C++
328 lines
8.6 KiB
C++
/*
|
|
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
*
|
|
* This source code is licensed under the MIT license found in the
|
|
* LICENSE file in the root directory of this source tree.
|
|
*/
|
|
|
|
#include <quic/priority/HTTPPriorityQueue.h>
|
|
|
|
namespace {
|
|
constexpr size_t kBuildIndexThreshold = 100;
|
|
constexpr size_t kDestroyIndexThreshold = 50;
|
|
} // namespace
|
|
|
|
namespace quic {
|
|
PriorityQueue::PriorityLogFields HTTPPriorityQueue::toLogFields(
|
|
const PriorityQueue::Priority& pri) const {
|
|
// This is defined by the QLOG schema
|
|
auto httpPri = static_cast<const HTTPPriorityQueue::Priority&>(pri);
|
|
if (httpPri->paused) {
|
|
return {{"paused", "true"}};
|
|
}
|
|
PriorityLogFields result;
|
|
result.reserve(3);
|
|
result.emplace_back("urgency", std::to_string(httpPri->urgency));
|
|
result.emplace_back("incremental", httpPri->incremental ? "true" : "false");
|
|
result.emplace_back("order", std::to_string(httpPri->order));
|
|
|
|
return result;
|
|
}
|
|
|
|
quic::Optional<HTTPPriorityQueue::FindResult> HTTPPriorityQueue::find(
|
|
Identifier id) const {
|
|
auto it = indexMap_.find(id);
|
|
if (it != indexMap_.end()) {
|
|
return FindResult{it->second, it};
|
|
}
|
|
if (!useIndexMapForSequential_) {
|
|
// linear search the heap
|
|
for (size_t i = 0; i < heap_.size(); i++) {
|
|
auto& elem = heap_[i];
|
|
if (!elem.priority->incremental && elem.identifier == id) {
|
|
return FindResult{IndexMapElem{false, i}, indexMap_.end()};
|
|
}
|
|
}
|
|
}
|
|
return quic::none;
|
|
}
|
|
|
|
void HTTPPriorityQueue::addIndex(Identifier id, IndexMapElem indexElem) {
|
|
if (useIndexMapForSequential_ || indexElem.incremental) {
|
|
indexMap_[id] = indexElem;
|
|
}
|
|
}
|
|
|
|
void HTTPPriorityQueue::removeIndex(IndexMap::const_iterator it) {
|
|
if (it != indexMap_.end() &&
|
|
(useIndexMapForSequential_ || it->second.incremental)) {
|
|
indexMap_.erase(it);
|
|
}
|
|
}
|
|
|
|
void HTTPPriorityQueue::buildSequentialIndex() {
|
|
for (size_t i = 0; i < heap_.size(); i++) {
|
|
auto& elem = heap_[i];
|
|
if (!elem.priority->incremental) {
|
|
addIndex(elem.identifier, {false, i});
|
|
}
|
|
}
|
|
}
|
|
|
|
void HTTPPriorityQueue::destroySequentialIndex() {
|
|
for (auto it = indexMap_.begin(); it != indexMap_.end();) {
|
|
if (it->second.incremental) {
|
|
++it;
|
|
} else {
|
|
it = indexMap_.erase(it);
|
|
}
|
|
}
|
|
useIndexMapForSequential_ = false;
|
|
}
|
|
|
|
void HTTPPriorityQueue::insertOrUpdate(
|
|
Identifier id,
|
|
PriorityQueue::Priority basePriority) {
|
|
Priority priority(basePriority);
|
|
auto findResult = find(id);
|
|
if (findResult) {
|
|
if (updateInSequential(findResult->elem, priority)) {
|
|
return;
|
|
} else {
|
|
// moving in/out of a RR, just erase
|
|
eraseImpl(id, findResult->elem);
|
|
removeIndex(findResult->indexIt);
|
|
}
|
|
}
|
|
if (!priority->paused) {
|
|
insert(id, priority);
|
|
}
|
|
}
|
|
|
|
void HTTPPriorityQueue::updateIfExist(
|
|
Identifier id,
|
|
PriorityQueue::Priority basePriority) {
|
|
Priority priority(basePriority);
|
|
auto findResult = find(id);
|
|
if (!findResult) {
|
|
return;
|
|
}
|
|
if (!updateInSequential(findResult->elem, priority)) {
|
|
// moving in/out of a RR/paused, just erase
|
|
bool wasIncremental = findResult->elem.incremental;
|
|
eraseImpl(id, findResult->elem);
|
|
if (priority->paused) {
|
|
removeIndex(findResult->indexIt);
|
|
return;
|
|
}
|
|
if (wasIncremental && !priority->incremental &&
|
|
!useIndexMapForSequential_) {
|
|
removeIndex(findResult->indexIt);
|
|
} // else don't need removeIndex -- it will get updated
|
|
insert(id, priority);
|
|
}
|
|
}
|
|
|
|
void HTTPPriorityQueue::erase(Identifier id) {
|
|
auto findResult = find(id);
|
|
if (findResult) {
|
|
Priority priority(0, false, 0);
|
|
if (findResult->elem.incremental) {
|
|
priority = Priority(findResult->elem.index, true, 0);
|
|
} else {
|
|
priority = heap_[findResult->elem.index].priority;
|
|
}
|
|
if (hasOpenTransaction_) {
|
|
erased_.emplace_back(std::move(priority), id);
|
|
}
|
|
eraseImpl(id, findResult->elem);
|
|
removeIndex(findResult->indexIt);
|
|
}
|
|
if (useIndexMapForSequential_ && heap_.size() < kDestroyIndexThreshold) {
|
|
destroySequentialIndex();
|
|
}
|
|
}
|
|
|
|
void HTTPPriorityQueue::clear() {
|
|
heap_.clear();
|
|
indexMap_.clear();
|
|
useIndexMapForSequential_ = false;
|
|
for (auto& rr : roundRobins_) {
|
|
rr.clear();
|
|
}
|
|
}
|
|
|
|
const HTTPPriorityQueue::Element* FOLLY_NULLABLE
|
|
HTTPPriorityQueue::top() const {
|
|
uint8_t topPri = roundRobins_.size();
|
|
const Element* topElem = nullptr;
|
|
if (!heap_.empty()) {
|
|
topElem = &heap_.front();
|
|
topPri = topElem->priority->urgency;
|
|
}
|
|
if (lowestRoundRobin_ < topPri && !roundRobins_[lowestRoundRobin_].empty()) {
|
|
return nullptr;
|
|
}
|
|
CHECK(topElem) << "Empty";
|
|
return topElem;
|
|
}
|
|
|
|
quic::PriorityQueue::Identifier HTTPPriorityQueue::getNextScheduledID(
|
|
quic::Optional<uint64_t> previousConsumed) {
|
|
auto elem = top();
|
|
if (elem) {
|
|
return elem->identifier;
|
|
} else {
|
|
return roundRobins_[lowestRoundRobin_].getNext(previousConsumed);
|
|
}
|
|
}
|
|
|
|
quic::PriorityQueue::Identifier HTTPPriorityQueue::peekNextScheduledID() const {
|
|
auto elem = top();
|
|
if (elem) {
|
|
return elem->identifier;
|
|
} else {
|
|
return roundRobins_[lowestRoundRobin_].peekNext();
|
|
}
|
|
}
|
|
|
|
void HTTPPriorityQueue::consume(quic::Optional<uint64_t> consumed) {
|
|
auto elem = top();
|
|
if (!elem) {
|
|
roundRobins_[lowestRoundRobin_].consume(consumed);
|
|
}
|
|
}
|
|
|
|
HTTPPriorityQueue::Priority HTTPPriorityQueue::headPriority() const {
|
|
auto elem = top();
|
|
if (elem) {
|
|
return elem->priority;
|
|
} else {
|
|
return {lowestRoundRobin_, true};
|
|
}
|
|
}
|
|
|
|
void HTTPPriorityQueue::heapifyUp(size_t index) {
|
|
while (index > 0) {
|
|
size_t parentIndex = (index - 1) / 2;
|
|
if (heap_[parentIndex] <= heap_[index]) {
|
|
break;
|
|
}
|
|
// Swap elements and update index map
|
|
std::swap(heap_[parentIndex], heap_[index]);
|
|
assignIndex(heap_[parentIndex], parentIndex);
|
|
assignIndex(heap_[index], index);
|
|
index = parentIndex;
|
|
}
|
|
}
|
|
|
|
void HTTPPriorityQueue::heapifyDown(size_t index) {
|
|
while (true) {
|
|
size_t smallest = index;
|
|
size_t leftChildIndex = 2 * index + 1;
|
|
size_t rightChildIndex = 2 * index + 2;
|
|
|
|
if (leftChildIndex < heap_.size() &&
|
|
heap_[leftChildIndex] < heap_[smallest]) {
|
|
smallest = leftChildIndex;
|
|
}
|
|
|
|
if (rightChildIndex < heap_.size() &&
|
|
heap_[rightChildIndex] < heap_[smallest]) {
|
|
smallest = rightChildIndex;
|
|
}
|
|
|
|
if (smallest == index) {
|
|
break;
|
|
}
|
|
|
|
// Swap elements and update index map
|
|
std::swap(heap_[smallest], heap_[index]);
|
|
assignIndex(heap_[smallest], smallest);
|
|
assignIndex(heap_[index], index);
|
|
index = smallest;
|
|
}
|
|
}
|
|
|
|
void HTTPPriorityQueue::assignIndex(Element& element, size_t index) {
|
|
CHECK(!element.priority->incremental);
|
|
addIndex(element.identifier, {false, index});
|
|
}
|
|
|
|
void HTTPPriorityQueue::insert(Identifier id, const Priority& priority) {
|
|
if (!useIndexMapForSequential_ && heap_.size() >= kBuildIndexThreshold) {
|
|
useIndexMapForSequential_ = true;
|
|
buildSequentialIndex();
|
|
}
|
|
if (priority->incremental) {
|
|
auto& rr = roundRobins_[priority->urgency];
|
|
rr.insert(id);
|
|
roundRobinElements_++;
|
|
addIndex(id, {true, priority->urgency});
|
|
if (priority->urgency < lowestRoundRobin_) {
|
|
lowestRoundRobin_ = priority->urgency;
|
|
}
|
|
} else {
|
|
heap_.emplace_back(priority, id);
|
|
auto index = heap_.size() - 1;
|
|
addIndex(id, {false, index});
|
|
heapifyUp(index);
|
|
}
|
|
}
|
|
|
|
bool HTTPPriorityQueue::updateInSequential(
|
|
IndexMapElem indexElem,
|
|
Priority priority) {
|
|
if (priority->paused) {
|
|
return false;
|
|
}
|
|
if (indexElem.incremental || priority->incremental) {
|
|
if (indexElem.incremental && priority->incremental) {
|
|
return indexElem.index == priority->urgency;
|
|
}
|
|
return false;
|
|
}
|
|
auto index = indexElem.index;
|
|
auto& elem = heap_[index];
|
|
if (elem.priority == priority) {
|
|
return true; // no-op
|
|
}
|
|
std::swap(elem.priority, priority);
|
|
if (elem.priority < priority) {
|
|
heapifyUp(index);
|
|
} else {
|
|
heapifyDown(index);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void HTTPPriorityQueue::eraseImpl(Identifier id, IndexMapElem indexElem) {
|
|
auto index = indexElem.index;
|
|
if (indexElem.incremental) {
|
|
auto& rr = roundRobins_[index];
|
|
rr.erase(id);
|
|
roundRobinElements_--;
|
|
if (index == lowestRoundRobin_ && rr.empty()) {
|
|
while (lowestRoundRobin_ < roundRobins_.size() &&
|
|
roundRobins_[lowestRoundRobin_].empty()) {
|
|
lowestRoundRobin_++;
|
|
}
|
|
}
|
|
} else {
|
|
auto lastIndex = heap_.size() - 1;
|
|
std::swap(heap_[index], heap_[lastIndex]);
|
|
assignIndex(heap_[index], index);
|
|
heap_.pop_back();
|
|
|
|
if (index != lastIndex) {
|
|
if (index > 0 && heap_[index] < heap_[(index - 1) / 2]) {
|
|
heapifyUp(index);
|
|
} else {
|
|
heapifyDown(index);
|
|
}
|
|
} // special case, erasing the last element
|
|
}
|
|
}
|
|
|
|
} // namespace quic
|