1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(): first cleanup

This commit is contained in:
drrtuy
2025-02-14 13:32:02 +00:00
parent 4c1d9bceb7
commit 3dfc8cd454
11 changed files with 18 additions and 151 deletions

View File

@ -23,15 +23,18 @@
#include <cstddef>
#include <cstdlib>
#include <iostream>
namespace allocators
{
// WIP placement
// This is an aggregating custom allocator that tracks the memory usage using
// a globally unique atomic counter.
// It is supposed to recv a ptr to an atomic from a singleton entity, e.g. ResourceManager.
// NB The atomic provides an upper hard limit for the memory usage and not the usage counter.
// The allocator's model counts allocated size locally and to sync allocated size difference
// every CheckPointStepSize(100MB by default) both allocating and deallocating.
// When a sync op hits MemoryLimitLowerBound trying to allocate more memory, it throws.
// SQL operators or TBPS runtime must catch the exception and act acordingly.
// const constexpr std::uint64_t CounterUpdateUnitSize = 4 * 1024 * 1024;
const constexpr std::int64_t MemoryLimitLowerBound = 500 * 1024 * 1024; // WIP
const constexpr std::int64_t CheckPointStepSize = 100 * 1024 * 1024; // WIP
@ -57,38 +60,24 @@ class CountingAllocator
void changeLocalAndGlobalMemoryLimits(const int64_t sizeChange)
{
// This routine must be used for mem allocation accounting path only!
// The case Current > last checkpoint(we deallocated mem since the last checkpoint), sizeIncrease is
// negative b/c we now move into the opposite direction. The case Last Checkpoint > Current (we allocated
// The case CurrentCheckpoint > LastCheckpoint(we deallocated mem since the last checkpoint), sizeIncrease is
// negative b/c we now move into the opposite direction. The case Last > Current (we allocated
// mem since the last checkpoint), sizeIncrease is positive
int64_t sizeChangeWDirection =
(currentLocalMemoryUsage_ <= lastMemoryLimitCheckpoint_) ? -sizeChange : sizeChange;
int64_t diffSinceLastCheckPoint = int_distance(currentLocalMemoryUsage_, lastMemoryLimitCheckpoint_);
if (needCheckPoint(sizeChangeWDirection, diffSinceLastCheckPoint, checkPointStepSize_))
{
// std::cout << "changeLocalAndGlobalMemoryLimits " << sizeChange << " bytes at "
// << " diffSinceLastCheckPoint " << diffSinceLastCheckPoint << ". current timit: " << std::dec
// << memoryLimit_->load() << std::hex << " bytes.\n";
// std::cout << std::dec;
// auto lastMemoryLimitCheckpointDiff = diffSinceLastCheckPoint + sizeChangeWDirection;
int64_t lastMemoryLimitCheckpointDiff = (currentLocalMemoryUsage_ <= lastMemoryLimitCheckpoint_)
? sizeChange - diffSinceLastCheckPoint
: sizeChange + diffSinceLastCheckPoint;
assert(lastMemoryLimitCheckpointDiff > 0);
// {
// std::cout << "[Allocate::changeLocalAndGlobalMemoryLimits!!!] lastMemoryLimitCheckpoint_ "
// << lastMemoryLimitCheckpoint_ << " currentLocalMemoryUsage_ " << currentLocalMemoryUsage_
// << " sizeChangeWDirection " << sizeChangeWDirection << " lastMemoryLimitCheckpointDiff " << lastMemoryLimitCheckpointDiff
// << std::endl;
// }
// lastMemoryLimitCheckpointDiff sign signifies a direction we move allocating memory.
auto currentGlobalMemoryLimit =
memoryLimit_->fetch_sub(lastMemoryLimitCheckpointDiff, std::memory_order_relaxed);
if (currentGlobalMemoryLimit < memoryLimitLowerBound_)
{
memoryLimit_->fetch_add(lastMemoryLimitCheckpointDiff, std::memory_order_relaxed);
// ? what to do with local counters here
throw std::bad_alloc();
}
lastMemoryLimitCheckpoint_ += lastMemoryLimitCheckpointDiff;
@ -123,9 +112,6 @@ class CountingAllocator
changeLocalAndGlobalMemoryLimits(sizeAllocated);
T* ptr = static_cast<T*>(::operator new(sizeAllocated));
// std::cout << "[Allocate] non-array " << n * sizeof(T) << " bytes at " << static_cast<void*>(ptr)
// << ". current timit: " << std::dec << memoryLimit_->load() << std::hex << " bytes.\n";
// std::cout << std::dec;
return ptr;
}
@ -138,8 +124,6 @@ class CountingAllocator
changeLocalAndGlobalMemoryLimits(sizeAllocated);
T ptr = static_cast<T>(::operator new[](n));
// std::cout << "[Allocate] array " << n * sizeof(T) << " bytes at " << static_cast<void*>(ptr)
// << ". current timit: " << std::dec << memoryLimit_->load() << std::hex << " bytes.\n";
return ptr;
}
@ -150,10 +134,6 @@ class CountingAllocator
int64_t sizeToDeallocate = n * sizeof(T);
// std::cout << "[Deallocate start] " << sizeToDeallocate << " bytes from " << static_cast<void*>(ptr)
// << ". current timit: " << std::dec << memoryLimit_->load() << std::hex << " bytes.\n";
// std::cout << std::dec;
int64_t sizeChangeWDirection =
(currentLocalMemoryUsage_ >= lastMemoryLimitCheckpoint_) ? -sizeToDeallocate : sizeToDeallocate;
int64_t diffSinceLastCheckPoint = int_distance(currentLocalMemoryUsage_, lastMemoryLimitCheckpoint_);
@ -161,33 +141,17 @@ class CountingAllocator
if (needCheckPoint(sizeChangeWDirection, diffSinceLastCheckPoint, checkPointStepSize_))
{
// Invariant is lastMemoryLimitCheckpoint_ >= currentLocalMemoryUsage_ - sizeToDeallocate
// and lastMemoryLimitCheckpoint_ value must be negative.
// int64_t lastMemoryLimitCheckpointDiff =
// labs(lastMemoryLimitCheckpoint_ - currentLocalMemoryUsage_ - sizeToDeallocate);
// auto lastMemoryLimitCheckpointDiff = diffSinceLastCheckPoint + sizeChangeWDirection;
int64_t lastMemoryLimitCheckpointDiff =
(currentLocalMemoryUsage_ >= lastMemoryLimitCheckpoint_)
? sizeToDeallocate - (currentLocalMemoryUsage_ - lastMemoryLimitCheckpoint_)
: diffSinceLastCheckPoint + sizeToDeallocate;
assert(lastMemoryLimitCheckpointDiff > 0);
// std::cout << "[Deallocate checkpoint!!!] lastMemoryLimitCheckpoint_ " << lastMemoryLimitCheckpoint_
// << " currentLocalMemoryUsage_ " << currentLocalMemoryUsage_ << " sizeChangeWDirection "
// << sizeChangeWDirection << " lastMemoryLimitCheckpointDiff " << lastMemoryLimitCheckpointDiff
// << std::endl;
// assert(lastMemoryLimitCheckpointDiff < 0);
memoryLimit_->fetch_add(lastMemoryLimitCheckpointDiff, std::memory_order_relaxed);
lastMemoryLimitCheckpoint_ -= (lastMemoryLimitCheckpoint_ == 0) ? 0 : lastMemoryLimitCheckpointDiff;
}
currentLocalMemoryUsage_ = currentLocalMemoryUsage_ - sizeToDeallocate;
// std::cout << "[Deallocate end] " << n * sizeof(T) << " bytes from " << static_cast<void*>(ptr)
// << ". current timit: " << std::dec << memoryLimit_->load() << std::hex << " bytes.\n";
// std::cout << std::dec;
}
// Equality operators (allocators are equal if they share the same counter)

View File

@ -69,8 +69,6 @@ void FixedAllocator::setAllocSize(uint allocSize)
void FixedAllocator::newBlock()
{
// boost::shared_ptr<FixedAllocatorBufType> next;
capacityRemaining = elementCount * elementSize;
if (!tmpSpace || mem.size() == 0)
@ -83,9 +81,6 @@ void FixedAllocator::newBlock()
{
mem.emplace_back(boost::make_shared<FixedAllocatorBufType>(elementCount * elementSize));
}
// next.reset(new uint8_t[elementCount * elementSize]);
// mem.push_back(next);
// nextAlloc = next.get();
nextAlloc = mem.back().get();
}
else

View File

@ -20,7 +20,6 @@
*
******************************************************************************************/
#include <iostream>
//#define NDEBUG
#include <cassert>
#include <boost/smart_ptr/allocate_shared_array.hpp>
@ -48,15 +47,12 @@ void PoolAllocator::deallocateAll()
capacityRemaining = 0;
nextAlloc = NULL;
memUsage = 0;
// WIP double check the space is cleaned up.
mem.clear();
oob.clear();
}
void PoolAllocator::newBlock()
{
// boost::shared_ptr<PoolAllocatorBufType[]> next;
capacityRemaining = allocSize;
if (!tmpSpace || mem.size() == 0)
@ -69,7 +65,6 @@ void PoolAllocator::newBlock()
{
mem.emplace_back(boost::make_shared<PoolAllocatorBufType>(allocSize));
}
// mem.push_back(next);
nextAlloc = mem.back().get();
}
else

View File

@ -520,7 +520,8 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
return SBS(new ByteStream(0U));
// Allocate new memory for the `long string`.
// WIP must account this allocation also.
// TODO account this allocation also despite the fact BS allocations are insignificant
// compared with structs used by SQL operators.
rowgroup::StringStoreBufSPType longString(
new uint8_t[sizeof(rowgroup::StringStore::MemChunk) + memChunk.currentSize]);

View File

@ -63,7 +63,6 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
uint64_t ret = 0;
empty = false; // At least a nullptr is being stored.
// Sometimes the caller actually wants "" to be returned....... argggghhhh......
// if (len == 0)
// return numeric_limits<uint32_t>::max();
@ -80,18 +79,15 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
if (mem.size() > 0)
lastMC = (MemChunk*)mem.back().get();
// std::cout << "StringStore::storeString len " << len << std::endl;
if ((len + 4) >= CHUNK_SIZE)
{
auto allocSize = len + sizeof(MemChunk) + 4;
if (alloc)
{
// cout << "StringStore::storeString longStrings with alloc " << std::endl;
longStrings.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, allocSize));
}
else
{
// cout << "StringStore::storeString longStrings no alloc " << std::endl;
longStrings.emplace_back(boost::make_shared<uint8_t[]>(allocSize));
}
// std::shared_ptr<uint8_t[]> newOne(new uint8_t[len + sizeof(MemChunk) + 4]);
@ -107,22 +103,14 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
{
if ((lastMC == nullptr) || (lastMC->capacity - lastMC->currentSize < (len + 4)))
{
// mem usage debugging
// if (lastMC)
if (alloc)
{
// cout << "StringStore::storeString with alloc " << std::endl;
mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk)));
// boost::allocate_shared) newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
}
else
{
// cout << "StringStore::storeString no alloc " << std::endl;
mem.emplace_back(boost::make_shared<uint8_t[]>(CHUNK_SIZE + sizeof(MemChunk)));
// mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk)));
// std::shared_ptr<uint8_t[]> newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
}
// mem.push_back(newOne);
lastMC = reinterpret_cast<MemChunk*>(mem.back().get());
lastMC->currentSize = 0;
lastMC->capacity = CHUNK_SIZE;
@ -187,20 +175,16 @@ void StringStore::deserialize(ByteStream& bs)
for (i = 0; i < count; i++)
{
bs >> size;
// cout << "deserializing " << size << " bytes\n";
buf = bs.buf();
if (alloc)
{
// cout << "StringStore::deserialize with alloc " << std::endl;
mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, size + sizeof(MemChunk)));
}
else
{
// cout << "StringStore::deserialize no alloc " << std::endl;
mem.emplace_back(boost::make_shared<uint8_t[]>(size + sizeof(MemChunk)));
}
// mem[i].reset(new uint8_t[size + sizeof(MemChunk)]);
mc = (MemChunk*)mem[i].get();
mc->currentSize = size;
mc->capacity = size;
@ -365,12 +349,10 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount)
{
if (alloc)
{
// cout << "RGData::reinit with alloc " << std::endl;
rowData = boost::allocate_shared<RGDataBufType>(*alloc, rg.getDataSize(rowCount));
}
else
{
// cout << "RGData::reinit no alloc " << std::endl;
rowData.reset(new uint8_t[rg.getDataSize(rowCount)]);
}

View File

@ -1,6 +1,6 @@
/*
Copyright (C) 2014 InfiniDB, Inc.
Copyright (c) 2019 MariaDB Corporation
Copyright (c) 2016-2024 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -132,10 +132,6 @@ inline T derefFromTwoVectorPtrs(const std::vector<T>* outer, const std::vector<T
return outer->operator[](outerIdx);
}
// using RGDataBufType = uint8_t[];
// using StringStoreBufType = uint8_t[];
// using StringStoreBufSPType = boost::shared_ptr<uint8_t[]>;
class StringStore
{
public:
@ -1043,7 +1039,6 @@ inline void Row::setStringField(const utils::ConstString& str, uint32_t colIndex
if (inStringTable(colIndex))
{
// std::cout << "setStringField storeString len " << length << std::endl;
offset = strings->storeString((const uint8_t*)str.str(), length);
*((uint64_t*)&data[offsets[colIndex]]) = offset;
// cout << " -- stored offset " << *((uint32_t *) &data[offsets[colIndex]])
@ -1912,7 +1907,7 @@ inline uint32_t RowGroup::getStringTableThreshold() const
return sTableThreshold;
}
// WIP mb unused
// TODO This is unused, so rm this in the dev branch.
inline void RowGroup::setStringStore(boost::shared_ptr<StringStore> ss)
{
if (useStringTable)