1
0
mirror of https://github.com/facebookincubator/mvfst.git synced 2025-04-18 17:24:03 +03:00
mvfst/quic/common/FunctionLooper.cpp
Konstantin Tsoy 476607c217 Clean up inline writes param
Summary: Clean up inline writes param

Differential Revision: D68977602

fbshipit-source-id: 4edb4ca08605fab76b56e706c2ccc85836018594
2025-02-04 23:51:44 -08:00

184 lines
4.7 KiB
C++

/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <folly/ScopeGuard.h>
#include <glog/logging.h>
#include <quic/common/FunctionLooper.h>
namespace quic {
using namespace std::chrono_literals;
FunctionLooper::FunctionLooper(
std::shared_ptr<QuicEventBase> evb,
folly::Function<void()>&& func,
LooperType type)
: evb_(std::move(evb)),
func_(std::move(func)),
type_(type),
running_(false),
inLoopBody_(false),
fireLoopEarly_(false) {
CHECK(func_);
}
void FunctionLooper::setPacingTimer(QuicTimer::SharedPtr pacingTimer) noexcept {
pacingTimer_ = std::move(pacingTimer);
}
void FunctionLooper::setPacingFunction(
folly::Function<std::chrono::microseconds()>&& pacingFunc) {
CHECK(pacingFunc);
pacingFunc_ = std::move(pacingFunc);
}
void FunctionLooper::commonLoopBody() noexcept {
inLoopBody_ = true;
SCOPE_EXIT {
inLoopBody_ = false;
};
auto hasBeenRunning = running_;
func_();
// callback could cause us to stop ourselves.
// Someone could have also called run() in the callback.
VLOG(10) << __func__ << ": " << type_ << " hasBeenRunning=" << hasBeenRunning
<< " running_=" << running_;
if (!running_) {
return;
}
if (!schedulePacingTimeout()) {
evb_->runInLoop(this);
}
}
bool FunctionLooper::schedulePacingTimeout() noexcept {
if (pacingFunc_ && (pacingTimer_ || evb_) && !isTimerCallbackScheduled()) {
auto timeUntilWrite = pacingFunc_();
if (timeUntilWrite != 0us) {
nextPacingTime_ = Clock::now() + timeUntilWrite;
if (pacingTimer_) {
pacingTimer_->scheduleTimeout(this, timeUntilWrite);
return true;
}
if (evb_) {
evb_->scheduleTimeoutHighRes(this, timeUntilWrite);
return true;
}
}
}
return false;
}
void FunctionLooper::runLoopCallback() noexcept {
folly::DelayedDestruction::DestructorGuard dg(this);
commonLoopBody();
}
void FunctionLooper::run(bool thisIteration) noexcept {
VLOG(10) << __func__ << ": " << type_;
running_ = true;
// Caller can call run() in func_. But if we are in pacing mode, we should
// prevent such loop.
if (pacingTimer_ && inLoopBody_) {
VLOG(4) << __func__ << ": " << type_
<< " in loop body and using pacing - not rescheduling";
return;
}
if (isLoopCallbackScheduled() ||
(!fireLoopEarly_ && pacingFunc_ && isTimerCallbackScheduled())) {
VLOG(10) << __func__ << ": " << type_ << " already scheduled";
return;
}
// If we are pacing, we're about to write again, if it's close, just write
// now.
if (pacingFunc_ && isTimerCallbackScheduled()) {
auto n = Clock::now();
auto timeUntilWrite = nextPacingTime_ < n
? 0us
: std::chrono::duration_cast<std::chrono::milliseconds>(
nextPacingTime_ - n);
if (timeUntilWrite <= 1ms) {
cancelTimerCallback();
// The next loop is good enough
thisIteration = false;
} else {
return;
}
}
evb_->runInLoop(this, thisIteration);
}
void FunctionLooper::stop() noexcept {
VLOG(10) << __func__ << ": " << type_;
running_ = false;
if (evb_) {
cancelLoopCallback();
}
cancelTimerCallback();
}
bool FunctionLooper::isRunning() const {
return running_;
}
bool FunctionLooper::isPacingScheduled() {
return pacingFunc_ && isTimerCallbackScheduled();
}
bool FunctionLooper::isLoopCallbackScheduled() {
return QuicEventBaseLoopCallback::isLoopCallbackScheduled();
}
void FunctionLooper::attachEventBase(std::shared_ptr<QuicEventBase> evb) {
VLOG(10) << __func__ << ": " << type_;
DCHECK(!evb_);
DCHECK(evb && evb->isInEventBaseThread());
evb_ = std::move(evb);
}
void FunctionLooper::detachEventBase() {
VLOG(10) << __func__ << ": " << type_;
DCHECK(evb_ && evb_->isInEventBaseThread());
stop();
cancelTimerCallback();
evb_ = nullptr;
}
void FunctionLooper::timeoutExpired() noexcept {
folly::DelayedDestruction::DestructorGuard dg(this);
commonLoopBody();
}
void FunctionLooper::callbackCanceled() noexcept {
return;
}
OptionalMicros FunctionLooper::getTimerTickInterval() noexcept {
if (pacingTimer_) {
return pacingTimer_->getTickInterval();
}
return std::nullopt;
}
std::ostream& operator<<(std::ostream& out, const LooperType& rhs) {
switch (rhs) {
case LooperType::ReadLooper:
out << "ReadLooper";
break;
case LooperType::PeekLooper:
out << "PeekLooper";
break;
case LooperType::WriteLooper:
out << "WriteLooper";
break;
default:
out << "unknown";
break;
}
return out;
}
} // namespace quic