You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
feat(): use boost::make_shared b/c most distros can't do allocate_shared for array types.
This commit is contained in:
@ -23,117 +23,131 @@
|
|||||||
using namespace allocators;
|
using namespace allocators;
|
||||||
|
|
||||||
// Example class to be managed by the allocator
|
// Example class to be managed by the allocator
|
||||||
struct TestClass {
|
struct TestClass
|
||||||
int value;
|
{
|
||||||
|
int value;
|
||||||
|
|
||||||
TestClass(int val) : value(val) {}
|
TestClass(int val) : value(val)
|
||||||
|
{
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
static const constexpr int64_t MemoryAllowance = 10 * 1024 * 1024;
|
static const constexpr int64_t MemoryAllowance = 10 * 1024 * 1024;
|
||||||
|
|
||||||
// Test Fixture for AtomicCounterAllocator
|
// Test Fixture for AtomicCounterAllocator
|
||||||
class CountingAllocatorTest : public ::testing::Test {
|
class CountingAllocatorTest : public ::testing::Test
|
||||||
protected:
|
{
|
||||||
// Atomic counter to track allocated memory
|
protected:
|
||||||
std::atomic<int64_t> allocatedMemory{MemoryAllowance};
|
// Atomic counter to track allocated memory
|
||||||
|
std::atomic<int64_t> allocatedMemory{MemoryAllowance};
|
||||||
|
|
||||||
// Custom allocator instance
|
// Custom allocator instance
|
||||||
CountingAllocator<TestClass> allocator;
|
CountingAllocator<TestClass> allocator;
|
||||||
|
|
||||||
// Constructor
|
// Constructor
|
||||||
CountingAllocatorTest()
|
CountingAllocatorTest()
|
||||||
: allocatedMemory(MemoryAllowance), allocator(allocatedMemory, MemoryAllowance / 100) {}
|
: allocatedMemory(MemoryAllowance), allocator(allocatedMemory, MemoryAllowance / 100)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
// Destructor
|
// Destructor
|
||||||
~CountingAllocatorTest() override = default;
|
~CountingAllocatorTest() override = default;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Test 1: Allocation increases the counter correctly
|
// Test 1: Allocation increases the counter correctly
|
||||||
TEST_F(CountingAllocatorTest, Allocation) {
|
TEST_F(CountingAllocatorTest, Allocation)
|
||||||
const std::size_t numObjects = 5;
|
{
|
||||||
TestClass* ptr = allocator.allocate(numObjects);
|
const std::size_t numObjects = 5;
|
||||||
EXPECT_NE(ptr, nullptr);
|
TestClass* ptr = allocator.allocate(numObjects);
|
||||||
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance - numObjects * static_cast<int64_t>(sizeof(TestClass)));
|
EXPECT_NE(ptr, nullptr);
|
||||||
allocator.deallocate(ptr, numObjects);
|
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance - numObjects * static_cast<int64_t>(sizeof(TestClass)));
|
||||||
|
allocator.deallocate(ptr, numObjects);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test 2: Deallocation decreases the counter correctly
|
// Test 2: Deallocation decreases the counter correctly
|
||||||
TEST_F(CountingAllocatorTest, Deallocation) {
|
TEST_F(CountingAllocatorTest, Deallocation)
|
||||||
const std::size_t numObjects = 3;
|
{
|
||||||
TestClass* ptr = allocator.allocate(numObjects);
|
const std::size_t numObjects = 3;
|
||||||
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance - numObjects * static_cast<int64_t>(sizeof(TestClass)));
|
TestClass* ptr = allocator.allocate(numObjects);
|
||||||
|
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance - numObjects * static_cast<int64_t>(sizeof(TestClass)));
|
||||||
|
|
||||||
allocator.deallocate(ptr, numObjects);
|
allocator.deallocate(ptr, numObjects);
|
||||||
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
|
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test 3: Allocator equality based on shared counter
|
// Test 3: Allocator equality based on shared counter
|
||||||
TEST_F(CountingAllocatorTest, AllocatorEquality) {
|
TEST_F(CountingAllocatorTest, AllocatorEquality)
|
||||||
CountingAllocator<TestClass> allocator1(allocatedMemory);
|
{
|
||||||
CountingAllocator<TestClass> allocator2(allocatedMemory);
|
CountingAllocator<TestClass> allocator1(allocatedMemory);
|
||||||
EXPECT_TRUE(allocator1 == allocator2);
|
CountingAllocator<TestClass> allocator2(allocatedMemory);
|
||||||
|
EXPECT_TRUE(allocator1 == allocator2);
|
||||||
|
|
||||||
std::atomic<int64_t> anotherCounter(0);
|
std::atomic<int64_t> anotherCounter(0);
|
||||||
CountingAllocator<TestClass> allocator3(anotherCounter);
|
CountingAllocator<TestClass> allocator3(anotherCounter);
|
||||||
EXPECT_FALSE(allocator1 == allocator3);
|
EXPECT_FALSE(allocator1 == allocator3);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test 4: Using allocator with std::allocate_shared
|
// Test 4: Using allocator with std::allocate_shared
|
||||||
TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator) {
|
TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator)
|
||||||
// Create a shared_ptr using allocate_shared with the custom allocator
|
{
|
||||||
std::shared_ptr<TestClass> ptr = std::allocate_shared<TestClass>(allocator, 100);
|
// Create a shared_ptr using allocate_shared with the custom allocator
|
||||||
|
std::shared_ptr<TestClass> ptr = std::allocate_shared<TestClass>(allocator, 100);
|
||||||
|
|
||||||
// Check that the counter has increased by the size of TestClass plus control block
|
// Check that the counter has increased by the size of TestClass plus control block
|
||||||
// Exact size depends on the implementation, so we verify it's at least sizeof(TestClass)
|
// Exact size depends on the implementation, so we verify it's at least sizeof(TestClass)
|
||||||
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - static_cast<int64_t>(sizeof(TestClass)));
|
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - static_cast<int64_t>(sizeof(TestClass)));
|
||||||
|
|
||||||
// Reset the shared_ptr and check that the counter decreases appropriately
|
// Reset the shared_ptr and check that the counter decreases appropriately
|
||||||
ptr.reset();
|
ptr.reset();
|
||||||
// After deallocation, the counter should return to zero
|
// After deallocation, the counter should return to zero
|
||||||
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
|
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
|
||||||
|
|
||||||
auto deleter = [this](TestClass* ptr) {
|
auto deleter = [this](TestClass* ptr) { this->allocator.deallocate(ptr, 1); };
|
||||||
this->allocator.deallocate(ptr, 1);
|
ptr.reset(allocator.allocate(1), deleter);
|
||||||
};
|
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - static_cast<int64_t>(sizeof(TestClass)));
|
||||||
ptr.reset(allocator.allocate(1), deleter);
|
|
||||||
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - static_cast<int64_t>(sizeof(TestClass)));
|
|
||||||
|
|
||||||
ptr.reset();
|
ptr.reset();
|
||||||
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
|
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test 5: Thread Safety - Concurrent Allocations and Deallocations
|
// Test 5: Thread Safety - Concurrent Allocations and Deallocations
|
||||||
TEST_F(CountingAllocatorTest, ThreadSafety) {
|
TEST_F(CountingAllocatorTest, ThreadSafety)
|
||||||
const std::size_t numThreads = 100;
|
{
|
||||||
const std::size_t allocationsPerThread = 3;
|
const std::size_t numThreads = 100;
|
||||||
|
const std::size_t allocationsPerThread = 3;
|
||||||
|
|
||||||
auto worker = [this]() {
|
auto worker = [this]()
|
||||||
for (std::size_t i = 0; i < allocationsPerThread; ++i) {
|
{
|
||||||
TestClass* ptr = allocator.allocate(1);
|
for (std::size_t i = 0; i < allocationsPerThread; ++i)
|
||||||
allocator.deallocate(ptr, 1);
|
{
|
||||||
}
|
TestClass* ptr = allocator.allocate(1);
|
||||||
};
|
allocator.deallocate(ptr, 1);
|
||||||
|
|
||||||
std::vector<std::thread> threads;
|
|
||||||
// Launch multiple threads performing allocations and deallocations
|
|
||||||
for (std::size_t i = 0; i < numThreads; ++i) {
|
|
||||||
threads.emplace_back(worker);
|
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Wait for all threads to finish
|
std::vector<std::thread> threads;
|
||||||
for (auto& th : threads) {
|
// Launch multiple threads performing allocations and deallocations
|
||||||
th.join();
|
for (std::size_t i = 0; i < numThreads; ++i)
|
||||||
}
|
{
|
||||||
|
threads.emplace_back(worker);
|
||||||
|
}
|
||||||
|
|
||||||
// After all allocations and deallocations, the counter should be zero
|
// Wait for all threads to finish
|
||||||
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
|
for (auto& th : threads)
|
||||||
|
{
|
||||||
|
th.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
// After all allocations and deallocations, the counter should be zero
|
||||||
|
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test 6: Allocating zero objects should not change the counter
|
// Test 6: Allocating zero objects should not change the counter
|
||||||
TEST_F(CountingAllocatorTest, AllocateZeroObjects) {
|
TEST_F(CountingAllocatorTest, AllocateZeroObjects)
|
||||||
TestClass* ptr = allocator.allocate(0);
|
{
|
||||||
EXPECT_NE(ptr, nullptr);
|
TestClass* ptr = allocator.allocate(0);
|
||||||
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
|
EXPECT_NE(ptr, nullptr);
|
||||||
allocator.deallocate(ptr, 0);
|
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
|
||||||
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
|
allocator.deallocate(ptr, 0);
|
||||||
|
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
|
||||||
}
|
}
|
@ -61,6 +61,7 @@ public:
|
|||||||
T* ptr = static_cast<T*>(::operator new(n * sizeof(T)));
|
T* ptr = static_cast<T*>(::operator new(n * sizeof(T)));
|
||||||
// std::cout << "[Allocate] " << n * sizeof(T) << " bytes at " << static_cast<void*>(ptr)
|
// std::cout << "[Allocate] " << n * sizeof(T) << " bytes at " << static_cast<void*>(ptr)
|
||||||
// << ". current timit: " << std::dec << memoryLimitRef_.load() << std::hex << " bytes.\n";
|
// << ". current timit: " << std::dec << memoryLimitRef_.load() << std::hex << " bytes.\n";
|
||||||
|
// std::cout << std::dec;
|
||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,6 +88,7 @@ public:
|
|||||||
memoryLimitRef_.fetch_add(n * sizeof(T), std::memory_order_relaxed);
|
memoryLimitRef_.fetch_add(n * sizeof(T), std::memory_order_relaxed);
|
||||||
// std::cout << "[Deallocate] " << n * sizeof(T) << " bytes from " << static_cast<void*>(ptr)
|
// std::cout << "[Deallocate] " << n * sizeof(T) << " bytes from " << static_cast<void*>(ptr)
|
||||||
// << ". current timit: " << std::dec << memoryLimitRef_.load() << std::hex << " bytes.\n";
|
// << ". current timit: " << std::dec << memoryLimitRef_.load() << std::hex << " bytes.\n";
|
||||||
|
// std::cout << std::dec;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Equality operators (allocators are equal if they share the same counter)
|
// Equality operators (allocators are equal if they share the same counter)
|
||||||
|
@ -100,12 +100,12 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
|
|||||||
if ((len + 4) >= CHUNK_SIZE)
|
if ((len + 4) >= CHUNK_SIZE)
|
||||||
{
|
{
|
||||||
auto allocSize = len + sizeof(MemChunk) + 4;
|
auto allocSize = len + sizeof(MemChunk) + 4;
|
||||||
if (alloc)
|
// if (alloc)
|
||||||
{
|
// {
|
||||||
cout << "StringStore::storeString longStrings with alloc " << std::endl;
|
// cout << "StringStore::storeString longStrings with alloc " << std::endl;
|
||||||
longStrings.emplace_back(std::allocate_shared<StringStoreBufType>(*alloc, allocSize));
|
// longStrings.emplace_back(std::allocate_shared<StringStoreBufType>(*alloc, allocSize));
|
||||||
}
|
// }
|
||||||
else
|
// else
|
||||||
{
|
{
|
||||||
cout << "StringStore::storeString longStrings no alloc " << std::endl;
|
cout << "StringStore::storeString longStrings no alloc " << std::endl;
|
||||||
longStrings.emplace_back(std::make_shared<uint8_t[]>(allocSize));
|
longStrings.emplace_back(std::make_shared<uint8_t[]>(allocSize));
|
||||||
@ -128,14 +128,14 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
|
|||||||
if (alloc)
|
if (alloc)
|
||||||
{
|
{
|
||||||
cout << "StringStore::storeString with alloc " << std::endl;
|
cout << "StringStore::storeString with alloc " << std::endl;
|
||||||
mem.emplace_back(std::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk)));
|
mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk)));
|
||||||
// std::allocate_shared) newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
|
// boost::allocate_shared) newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
cout << "StringStore::storeString no alloc " << std::endl;
|
cout << "StringStore::storeString no alloc " << std::endl;
|
||||||
mem.emplace_back(std::make_shared<uint8_t[]>(CHUNK_SIZE + sizeof(MemChunk)));
|
mem.emplace_back(boost::make_shared<uint8_t[]>(CHUNK_SIZE + sizeof(MemChunk)));
|
||||||
// mem.emplace_back(std::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk)));
|
// mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk)));
|
||||||
// std::shared_ptr<uint8_t[]> newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
|
// std::shared_ptr<uint8_t[]> newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
|
||||||
}
|
}
|
||||||
// mem.push_back(newOne);
|
// mem.push_back(newOne);
|
||||||
@ -209,12 +209,12 @@ void StringStore::deserialize(ByteStream& bs)
|
|||||||
if (alloc)
|
if (alloc)
|
||||||
{
|
{
|
||||||
cout << "StringStore::deserialize with alloc " << std::endl;
|
cout << "StringStore::deserialize with alloc " << std::endl;
|
||||||
mem.emplace_back(std::allocate_shared<StringStoreBufType>(*alloc, size + sizeof(MemChunk)));
|
mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, size + sizeof(MemChunk)));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
cout << "StringStore::deserialize no alloc " << std::endl;
|
cout << "StringStore::deserialize no alloc " << std::endl;
|
||||||
mem.emplace_back(std::make_shared<uint8_t[]>(size + sizeof(MemChunk)));
|
mem.emplace_back(boost::make_shared<uint8_t[]>(size + sizeof(MemChunk)));
|
||||||
}
|
}
|
||||||
// mem[i].reset(new uint8_t[size + sizeof(MemChunk)]);
|
// mem[i].reset(new uint8_t[size + sizeof(MemChunk)]);
|
||||||
mc = (MemChunk*)mem[i].get();
|
mc = (MemChunk*)mem[i].get();
|
||||||
@ -230,7 +230,7 @@ void StringStore::deserialize(ByteStream& bs)
|
|||||||
|
|
||||||
void StringStore::clear()
|
void StringStore::clear()
|
||||||
{
|
{
|
||||||
vector<std::shared_ptr<uint8_t[]> > emptyv;
|
vector<boost::shared_ptr<uint8_t[]> > emptyv;
|
||||||
vector<std::shared_ptr<uint8_t[]> > emptyv2;
|
vector<std::shared_ptr<uint8_t[]> > emptyv2;
|
||||||
mem.swap(emptyv);
|
mem.swap(emptyv);
|
||||||
longStrings.swap(emptyv2);
|
longStrings.swap(emptyv2);
|
||||||
@ -367,7 +367,7 @@ RGData::RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>*
|
|||||||
{
|
{
|
||||||
// rowData = shared_ptr<uint8_t[]>(buf, [alloc, allocSize](uint8_t* p) { alloc->deallocate(p, allocSize);
|
// rowData = shared_ptr<uint8_t[]>(buf, [alloc, allocSize](uint8_t* p) { alloc->deallocate(p, allocSize);
|
||||||
// });
|
// });
|
||||||
rowData = std::allocate_shared<RGDataBufType>(*alloc, rg.getMaxDataSize());
|
rowData = boost::allocate_shared<RGDataBufType>(*alloc, rg.getMaxDataSize());
|
||||||
// rowData = std::make_shared(uint8_t[rg.getMaxDataSize()]);
|
// rowData = std::make_shared(uint8_t[rg.getMaxDataSize()]);
|
||||||
|
|
||||||
if (rg.usesStringTable())
|
if (rg.usesStringTable())
|
||||||
@ -381,7 +381,7 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount)
|
|||||||
if (alloc)
|
if (alloc)
|
||||||
{
|
{
|
||||||
cout << "RGData::reinit with alloc " << std::endl;
|
cout << "RGData::reinit with alloc " << std::endl;
|
||||||
rowData = std::allocate_shared<RGDataBufType>(*alloc, rg.getDataSize(rowCount));
|
rowData = boost::allocate_shared<RGDataBufType>(*alloc, rg.getDataSize(rowCount));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -190,7 +190,7 @@ class StringStore
|
|||||||
std::string empty_str;
|
std::string empty_str;
|
||||||
static constexpr const uint32_t CHUNK_SIZE = 64 * 1024; // allocators like powers of 2
|
static constexpr const uint32_t CHUNK_SIZE = 64 * 1024; // allocators like powers of 2
|
||||||
|
|
||||||
std::vector<std::shared_ptr<uint8_t[]>> mem;
|
std::vector<boost::shared_ptr<uint8_t[]>> mem;
|
||||||
|
|
||||||
// To store strings > 64KB (BLOB/TEXT)
|
// To store strings > 64KB (BLOB/TEXT)
|
||||||
std::vector<std::shared_ptr<uint8_t[]>> longStrings;
|
std::vector<std::shared_ptr<uint8_t[]>> longStrings;
|
||||||
@ -286,7 +286,7 @@ class RGData
|
|||||||
void clear();
|
void clear();
|
||||||
void reinit(const RowGroup& rg);
|
void reinit(const RowGroup& rg);
|
||||||
void reinit(const RowGroup& rg, uint32_t rowCount);
|
void reinit(const RowGroup& rg, uint32_t rowCount);
|
||||||
inline void setStringStore(std::shared_ptr<StringStore>& ss)
|
inline void setStringStore(boost::shared_ptr<StringStore>& ss)
|
||||||
{
|
{
|
||||||
strings = ss;
|
strings = ss;
|
||||||
}
|
}
|
||||||
@ -327,8 +327,8 @@ class RGData
|
|||||||
private:
|
private:
|
||||||
uint32_t rowSize = 0; // can't be.
|
uint32_t rowSize = 0; // can't be.
|
||||||
uint32_t columnCount = 0; // shouldn't be, but...
|
uint32_t columnCount = 0; // shouldn't be, but...
|
||||||
std::shared_ptr<RGDataBufType> rowData;
|
boost::shared_ptr<RGDataBufType> rowData;
|
||||||
std::shared_ptr<StringStore> strings;
|
boost::shared_ptr<StringStore> strings;
|
||||||
std::shared_ptr<UserDataStore> userDataStore;
|
std::shared_ptr<UserDataStore> userDataStore;
|
||||||
allocators::CountingAllocator<RGDataBufType>* alloc = nullptr;
|
allocators::CountingAllocator<RGDataBufType>* alloc = nullptr;
|
||||||
|
|
||||||
@ -1599,7 +1599,7 @@ class RowGroup : public messageqcpp::Serializeable
|
|||||||
const uint16_t& blockNum);
|
const uint16_t& blockNum);
|
||||||
inline void getLocation(uint32_t* partNum, uint16_t* segNum, uint8_t* extentNum, uint16_t* blockNum);
|
inline void getLocation(uint32_t* partNum, uint16_t* segNum, uint8_t* extentNum, uint16_t* blockNum);
|
||||||
|
|
||||||
inline void setStringStore(std::shared_ptr<StringStore>);
|
inline void setStringStore(boost::shared_ptr<StringStore>);
|
||||||
|
|
||||||
const CHARSET_INFO* getCharset(uint32_t col);
|
const CHARSET_INFO* getCharset(uint32_t col);
|
||||||
|
|
||||||
@ -1911,7 +1911,8 @@ inline uint32_t RowGroup::getStringTableThreshold() const
|
|||||||
return sTableThreshold;
|
return sTableThreshold;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void RowGroup::setStringStore(std::shared_ptr<StringStore> ss)
|
// WIP mb unused
|
||||||
|
inline void RowGroup::setStringStore(boost::shared_ptr<StringStore> ss)
|
||||||
{
|
{
|
||||||
if (useStringTable)
|
if (useStringTable)
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user