1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

feat(RM,allocators): use atomic counters math abstraction

This commit is contained in:
drrtuy
2025-03-17 14:32:50 +00:00
parent b14613a66b
commit 729db829a2
4 changed files with 25 additions and 13 deletions

View File

@@ -347,23 +347,23 @@ bool ResourceManager::userPriorityEnabled() const
// If both have space, return true. // If both have space, return true.
bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr<int64_t>& sessionLimit, bool patience) bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr<int64_t>& sessionLimit, bool patience)
{ {
bool ret1 = (totalUmMemLimit.fetch_sub(amount, std::memory_order_relaxed) >= 0); bool ret1 = (atomicops::atomicSubRef(totalUmMemLimit, amount) >= 0);
bool ret2 = sessionLimit ? (atomicops::atomicSub(sessionLimit.get(), amount) >= 0) : ret1; bool ret2 = sessionLimit ? (atomicops::atomicSub(sessionLimit.get(), amount) >= 0) : ret1;
uint32_t retryCounter = 0, maxRetries = 20; // 10s delay uint32_t retryCounter = 0, maxRetries = 20; // 10s delay
while (patience && !(ret1 && ret2) && retryCounter++ < maxRetries) while (patience && !(ret1 && ret2) && retryCounter++ < maxRetries)
{ {
totalUmMemLimit.fetch_add(amount, std::memory_order_relaxed); atomicops::atomicAddRef(totalUmMemLimit, amount);
sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0; sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0;
usleep(500000); usleep(500000);
ret1 = (totalUmMemLimit.fetch_sub(amount, std::memory_order_relaxed) >= 0); ret1 = (atomicops::atomicSubRef(totalUmMemLimit, amount) >= 0);
ret2 = sessionLimit ? (atomicops::atomicSub(sessionLimit.get(), amount) >= 0) : ret1; ret2 = sessionLimit ? (atomicops::atomicSub(sessionLimit.get(), amount) >= 0) : ret1;
} }
if (!(ret1 && ret2)) if (!(ret1 && ret2))
{ {
// If we didn't get any memory, restore the counters. // If we didn't get any memory, restore the counters.
totalUmMemLimit.fetch_add(amount, std::memory_order_relaxed); atomicops::atomicAddRef(totalUmMemLimit, amount);
sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0; sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0;
} }
return (ret1 && ret2); return (ret1 && ret2);
@@ -372,20 +372,20 @@ bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr<int64_t>& sess
// The amount type is unsafe if amount close to max<int64_t> that is unrealistic in 2024. // The amount type is unsafe if amount close to max<int64_t> that is unrealistic in 2024.
bool ResourceManager::getMemory(int64_t amount, bool patience) bool ResourceManager::getMemory(int64_t amount, bool patience)
{ {
bool ret1 = (totalUmMemLimit.fetch_sub(amount, std::memory_order_relaxed) >= 0); bool ret1 = (atomicops::atomicSubRef(totalUmMemLimit, amount) >= 0);
uint32_t retryCounter = 0, maxRetries = 20; // 10s delay uint32_t retryCounter = 0, maxRetries = 20; // 10s delay
while (patience && !ret1 && retryCounter++ < maxRetries) while (patience && !ret1 && retryCounter++ < maxRetries)
{ {
totalUmMemLimit.fetch_add(amount, std::memory_order_relaxed); atomicops::atomicAddRef(totalUmMemLimit, amount);
usleep(500000); usleep(500000);
ret1 = (totalUmMemLimit.fetch_sub(amount, std::memory_order_relaxed) >= 0); ret1 = (atomicops::atomicSubRef(totalUmMemLimit, amount) >= 0);
} }
if (!ret1) if (!ret1)
{ {
// If we didn't get any memory, restore the counters. // If we didn't get any memory, restore the counters.
totalUmMemLimit.fetch_add(amount, std::memory_order_relaxed); atomicops::atomicAddRef(totalUmMemLimit, amount);
} }
return ret1; return ret1;
} }

View File

@@ -327,11 +327,11 @@ class ResourceManager
bool getMemory(int64_t amount, bool patience = true); bool getMemory(int64_t amount, bool patience = true);
inline void returnMemory(int64_t amount) inline void returnMemory(int64_t amount)
{ {
totalUmMemLimit.fetch_add(amount, std::memory_order_relaxed); atomicops::atomicAddRef(totalUmMemLimit, amount);
} }
inline void returnMemory(int64_t amount, boost::shared_ptr<int64_t>& sessionLimit) inline void returnMemory(int64_t amount, boost::shared_ptr<int64_t>& sessionLimit)
{ {
totalUmMemLimit.fetch_add(amount, std::memory_order_relaxed); atomicops::atomicAddRef(totalUmMemLimit, amount);
sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0; sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0;
} }
inline int64_t availableMemory() const inline int64_t availableMemory() const

View File

@@ -22,6 +22,7 @@
#include <unistd.h> #include <unistd.h>
#include <stdint.h> #include <stdint.h>
#include <sched.h> #include <sched.h>
#include <atomic>
/* /*
This is an attempt to wrap the differneces between Windows and Linux around atomic ops. This is an attempt to wrap the differneces between Windows and Linux around atomic ops.
@@ -30,6 +31,16 @@ Boost has something in interprocess::ipcdetail, but it doesn't have 64-bit API's
namespace atomicops namespace atomicops
{ {
// Atomic operations for atomic<int64_t> references
inline int64_t atomicAddRef(std::atomic<int64_t>& ref, int64_t val)
{
return ref.fetch_add(val, std::memory_order_relaxed);
}
inline int64_t atomicSubRef(std::atomic<int64_t>& ref, int64_t val)
{
return ref.fetch_sub(val, std::memory_order_relaxed);
}
// Returns the resulting, incremented value // Returns the resulting, incremented value
template <typename T> template <typename T>
inline T atomicInc(volatile T* mem) inline T atomicInc(volatile T* mem)

View File

@@ -22,6 +22,7 @@
#include <atomic> #include <atomic>
#include <cstddef> #include <cstddef>
#include <cstdlib> #include <cstdlib>
#include "atomicops.h"
namespace allocators namespace allocators
{ {
@@ -74,10 +75,10 @@ class CountingAllocator
assert(lastMemoryLimitCheckpointDiff > 0); assert(lastMemoryLimitCheckpointDiff > 0);
auto currentGlobalMemoryLimit = auto currentGlobalMemoryLimit =
memoryLimit_->fetch_sub(lastMemoryLimitCheckpointDiff, std::memory_order_relaxed); atomicops::atomicSubRef(*memoryLimit_, lastMemoryLimitCheckpointDiff);
if (currentGlobalMemoryLimit < memoryLimitLowerBound_) if (currentGlobalMemoryLimit < memoryLimitLowerBound_)
{ {
memoryLimit_->fetch_add(lastMemoryLimitCheckpointDiff, std::memory_order_relaxed); atomicops::atomicAddRef(*memoryLimit_, lastMemoryLimitCheckpointDiff);
throw std::bad_alloc(); throw std::bad_alloc();
} }
lastMemoryLimitCheckpoint_ += lastMemoryLimitCheckpointDiff; lastMemoryLimitCheckpoint_ += lastMemoryLimitCheckpointDiff;
@@ -147,7 +148,7 @@ class CountingAllocator
: diffSinceLastCheckPoint + sizeToDeallocate; : diffSinceLastCheckPoint + sizeToDeallocate;
assert(lastMemoryLimitCheckpointDiff > 0); assert(lastMemoryLimitCheckpointDiff > 0);
memoryLimit_->fetch_add(lastMemoryLimitCheckpointDiff, std::memory_order_relaxed); atomicops::atomicAddRef(*memoryLimit_, lastMemoryLimitCheckpointDiff);
lastMemoryLimitCheckpoint_ -= (lastMemoryLimitCheckpoint_ == 0) ? 0 : lastMemoryLimitCheckpointDiff; lastMemoryLimitCheckpoint_ -= (lastMemoryLimitCheckpoint_ == 0) ? 0 : lastMemoryLimitCheckpointDiff;
} }