You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-08 14:22:09 +03:00
fix(perf,allocator): reduce CountingAllocator step size to improve its memory consumption reaction speed.
This commit is contained in:
@@ -2221,7 +2221,7 @@ void TupleBPS::processByteStreamVector(vector<boost::shared_ptr<messageqcpp::Byt
|
|||||||
// changes made here should also be made there and vice versa.
|
// changes made here should also be made there and vice versa.
|
||||||
if (hasUMJoin || !fBPP->pmSendsFinalResult())
|
if (hasUMJoin || !fBPP->pmSendsFinalResult())
|
||||||
{
|
{
|
||||||
utils::setThreadName("BSPJoin");
|
utils::setThreadName("BPSJoin");
|
||||||
|
|
||||||
data->joinedData = RGData(data->local_outputRG);
|
data->joinedData = RGData(data->local_outputRG);
|
||||||
data->local_outputRG.setData(&data->joinedData);
|
data->local_outputRG.setData(&data->joinedData);
|
||||||
|
@@ -94,9 +94,6 @@ if (WITH_UNITTESTS)
|
|||||||
target_link_libraries(stlpoolallocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES})
|
target_link_libraries(stlpoolallocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES})
|
||||||
gtest_add_tests(TARGET stlpoolallocator TEST_PREFIX columnstore:)
|
gtest_add_tests(TARGET stlpoolallocator TEST_PREFIX columnstore:)
|
||||||
|
|
||||||
add_executable(oid_server_return oid-server-return.cpp)
|
|
||||||
target_link_libraries(oid_server_return ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS})
|
|
||||||
|
|
||||||
add_executable(comparators_tests comparators-tests.cpp)
|
add_executable(comparators_tests comparators-tests.cpp)
|
||||||
target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit)
|
target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit)
|
||||||
add_test(NAME columnstore:comparators_tests COMMAND comparators_tests)
|
add_test(NAME columnstore:comparators_tests COMMAND comparators_tests)
|
||||||
|
@@ -116,94 +116,36 @@ TEST_F(STLPoolAllocatorTest, VectorIntegration)
|
|||||||
vec.shrink_to_fit();
|
vec.shrink_to_fit();
|
||||||
}
|
}
|
||||||
|
|
||||||
// /**
|
|
||||||
// * Test multithreaded allocation
|
|
||||||
// */
|
|
||||||
// TEST_F(STLPoolAllocatorTest, MultithreadedAllocation)
|
|
||||||
// {
|
|
||||||
// const int THREAD_COUNT = 4;
|
|
||||||
// const size_t ALLOC_SIZE = 100;
|
|
||||||
// std::vector<std::thread> threads;
|
|
||||||
|
|
||||||
// Allocator alloc;
|
|
||||||
// uint64_t initialUsage = alloc.getMemUsage();
|
|
||||||
|
|
||||||
// // Create threads that will allocate memory
|
|
||||||
// for (int i = 0; i < THREAD_COUNT; ++i)
|
|
||||||
// {
|
|
||||||
// threads.emplace_back(
|
|
||||||
// [&alloc]()
|
|
||||||
// {
|
|
||||||
// std::vector<TestType*, STLPoolAllocator<TestType*>> ptrs(alloc);
|
|
||||||
|
|
||||||
// for (size_t j = 0; j < 10; ++j)
|
|
||||||
// {
|
|
||||||
// TestType* ptr = alloc.allocate(ALLOC_SIZE);
|
|
||||||
// for (size_t k = 0; k < ALLOC_SIZE; ++k)
|
|
||||||
// {
|
|
||||||
// alloc.construct(ptr + k, static_cast<TestType>(k));
|
|
||||||
// }
|
|
||||||
// ptrs.push_back(ptr);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Cleanup
|
|
||||||
// for (auto ptr : ptrs)
|
|
||||||
// {
|
|
||||||
// for (size_t k = 0; k < ALLOC_SIZE; ++k)
|
|
||||||
// {
|
|
||||||
// alloc.destroy(ptr + k);
|
|
||||||
// }
|
|
||||||
// alloc.deallocate(ptr, ALLOC_SIZE);
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Wait for all threads to complete
|
|
||||||
// for (auto& th : threads)
|
|
||||||
// {
|
|
||||||
// th.join();
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Memory usage should be greater than initial due to vector allocations
|
|
||||||
// EXPECT_GT(alloc.getMemUsage(), initialUsage);
|
|
||||||
// }
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test ResourceManager integration
|
* Test ResourceManager integration
|
||||||
*/
|
*/
|
||||||
TEST_F(STLPoolAllocatorTest, ResourceManagerIntegration)
|
TEST_F(STLPoolAllocatorTest, ResourceManagerIntegration)
|
||||||
{
|
{
|
||||||
|
using TestType = int8_t;
|
||||||
|
using Allocator = STLPoolAllocator<TestType>;
|
||||||
|
|
||||||
joblist::ResourceManager rm(true, nullptr);
|
joblist::ResourceManager rm(true, nullptr);
|
||||||
// To set the memory allowance
|
// To set the memory allowance
|
||||||
rm.setMemory(MemoryAllowance);
|
rm.setMemory(MemoryAllowance);
|
||||||
std::cout << "Memory allowance: " << MemoryAllowance << std::endl;
|
Allocator alloc(&rm, 1024, 512);
|
||||||
|
|
||||||
Allocator alloc(&rm);
|
|
||||||
|
|
||||||
std::cout << "Memory available 1 : " << rm.availableMemory() << std::endl;
|
|
||||||
// Basic allocation test with ResourceManager
|
|
||||||
TestType* ptr = alloc.allocate(1);
|
TestType* ptr = alloc.allocate(1);
|
||||||
ASSERT_NE(ptr, nullptr);
|
ASSERT_NE(ptr, nullptr);
|
||||||
std::cout << "Memory available 2 : " << rm.availableMemory() << std::endl;
|
|
||||||
|
|
||||||
|
|
||||||
alloc.construct(ptr, 42);
|
alloc.construct(ptr, 42);
|
||||||
EXPECT_EQ(*ptr, 42);
|
EXPECT_EQ(*ptr, 42);
|
||||||
|
|
||||||
alloc.destroy(ptr);
|
alloc.destroy(ptr);
|
||||||
alloc.deallocate(ptr, 1);
|
alloc.deallocate(ptr, 1);
|
||||||
std::cout << "Memory available 3 : " << rm.availableMemory() << std::endl;
|
|
||||||
|
|
||||||
TestType* ptr2 = alloc.allocate(65537);
|
TestType* ptr2 = alloc.allocate(65537);
|
||||||
ASSERT_NE(ptr2, nullptr);
|
ASSERT_NE(ptr2, nullptr);
|
||||||
alloc.construct(ptr2, 42);
|
alloc.construct(ptr2, 42);
|
||||||
EXPECT_EQ(*ptr2, 42);
|
EXPECT_EQ(*ptr2, 42);
|
||||||
std::cout << "Memory available 4 : " << rm.availableMemory() << std::endl;
|
|
||||||
|
|
||||||
|
|
||||||
alloc.destroy(ptr2);
|
alloc.destroy(ptr2);
|
||||||
alloc.deallocate(ptr2, 1);
|
alloc.deallocate(ptr2, 1);
|
||||||
std::cout << "Memory available 5 : " << rm.availableMemory() << std::endl;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@@ -36,8 +36,9 @@ namespace allocators
|
|||||||
// When a sync op hits MemoryLimitLowerBound trying to allocate more memory, it throws.
|
// When a sync op hits MemoryLimitLowerBound trying to allocate more memory, it throws.
|
||||||
// SQL operators or TBPS runtime must catch the exception and act acordingly.
|
// SQL operators or TBPS runtime must catch the exception and act acordingly.
|
||||||
|
|
||||||
const constexpr int64_t MemoryLimitLowerBound = 500 * 1024 * 1024; // WIP
|
const constexpr int64_t MemoryLimitLowerBound = 500 * 1024 * 1024;
|
||||||
const constexpr int64_t CheckPointStepSize = 100 * 1024 * 1024; // WIP
|
// Higher values demonstrate slower response to memory limit violations.
|
||||||
|
const constexpr int64_t CheckPointStepSize = 1024;
|
||||||
|
|
||||||
// Custom Allocator that tracks allocated memory using an atomic counter
|
// Custom Allocator that tracks allocated memory using an atomic counter
|
||||||
template <typename T>
|
template <typename T>
|
||||||
|
@@ -57,19 +57,15 @@ void PoolAllocator::deallocateAll()
|
|||||||
void PoolAllocator::newBlock()
|
void PoolAllocator::newBlock()
|
||||||
{
|
{
|
||||||
capacityRemaining = allocSize;
|
capacityRemaining = allocSize;
|
||||||
std::cout << "PoolAllocator new block" << std::endl;
|
|
||||||
|
|
||||||
if (!tmpSpace || mem.size() == 0)
|
if (!tmpSpace || mem.size() == 0)
|
||||||
{
|
{
|
||||||
if (alloc)
|
if (alloc)
|
||||||
{
|
{
|
||||||
// std::cout << "PoolAllocator new block with counting alloc" << std::endl;
|
|
||||||
|
|
||||||
mem.emplace_back(boost::allocate_shared<PoolAllocatorBufType>(*alloc, allocSize));
|
mem.emplace_back(boost::allocate_shared<PoolAllocatorBufType>(*alloc, allocSize));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// std::cout << "PoolAllocator new block w/o counting alloc" << std::endl;
|
|
||||||
mem.emplace_back(boost::make_shared<PoolAllocatorBufType>(allocSize));
|
mem.emplace_back(boost::make_shared<PoolAllocatorBufType>(allocSize));
|
||||||
}
|
}
|
||||||
nextAlloc = mem.back().get();
|
nextAlloc = mem.back().get();
|
||||||
@@ -81,17 +77,14 @@ void PoolAllocator::newBlock()
|
|||||||
void* PoolAllocator::allocOOB(uint64_t size)
|
void* PoolAllocator::allocOOB(uint64_t size)
|
||||||
{
|
{
|
||||||
OOBMemInfo memInfo;
|
OOBMemInfo memInfo;
|
||||||
// std::cout << "PoolAllocator allocOOB" << std::endl;
|
|
||||||
|
|
||||||
memUsage += size;
|
memUsage += size;
|
||||||
if (alloc)
|
if (alloc)
|
||||||
{
|
{
|
||||||
// std::cout << "PoolAllocator allocOOB with counting alloc" << std::endl;
|
|
||||||
memInfo.mem = boost::allocate_shared<PoolAllocatorBufType>(*alloc, size);
|
memInfo.mem = boost::allocate_shared<PoolAllocatorBufType>(*alloc, size);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// std::cout << "PoolAllocator allocOOB w/o counting alloc" << std::endl;
|
|
||||||
memInfo.mem = boost::make_shared<PoolAllocatorBufType>(size);
|
memInfo.mem = boost::make_shared<PoolAllocatorBufType>(size);
|
||||||
}
|
}
|
||||||
memInfo.size = size;
|
memInfo.size = size;
|
||||||
|
@@ -59,7 +59,6 @@ class PoolAllocator
|
|||||||
, useLock(_useLock)
|
, useLock(_useLock)
|
||||||
, lock(false)
|
, lock(false)
|
||||||
{
|
{
|
||||||
std::cout << "PoolAllocator w/o counting allocator created" << std::endl;
|
|
||||||
}
|
}
|
||||||
PoolAllocator(allocators::CountingAllocator<PoolAllocatorBufType> alloc, unsigned windowSize = DEFAULT_WINDOW_SIZE,
|
PoolAllocator(allocators::CountingAllocator<PoolAllocatorBufType> alloc, unsigned windowSize = DEFAULT_WINDOW_SIZE,
|
||||||
bool isTmpSpace = false, bool _useLock = false)
|
bool isTmpSpace = false, bool _useLock = false)
|
||||||
@@ -72,7 +71,6 @@ class PoolAllocator
|
|||||||
, lock(false)
|
, lock(false)
|
||||||
, alloc(alloc)
|
, alloc(alloc)
|
||||||
{
|
{
|
||||||
std::cout << "PoolAllocator with counting allocator created" << std::endl;
|
|
||||||
}
|
}
|
||||||
PoolAllocator(const PoolAllocator& p)
|
PoolAllocator(const PoolAllocator& p)
|
||||||
: allocSize(p.allocSize)
|
: allocSize(p.allocSize)
|
||||||
|
@@ -38,7 +38,7 @@ namespace utils
|
|||||||
as the deleter. */
|
as the deleter. */
|
||||||
struct BoostPoolDeallocator
|
struct BoostPoolDeallocator
|
||||||
{
|
{
|
||||||
inline void operator()(void* ptr){};
|
inline void operator()(void* ptr) {};
|
||||||
};
|
};
|
||||||
|
|
||||||
/* This is an STL-compliant wrapper for PoolAllocator + an optimization for containers
|
/* This is an STL-compliant wrapper for PoolAllocator + an optimization for containers
|
||||||
@@ -62,7 +62,9 @@ class STLPoolAllocator
|
|||||||
};
|
};
|
||||||
|
|
||||||
STLPoolAllocator() throw();
|
STLPoolAllocator() throw();
|
||||||
STLPoolAllocator(joblist::ResourceManager* rm);
|
STLPoolAllocator(joblist::ResourceManager* rm,
|
||||||
|
const int64_t checkPointStepSize = allocators::CheckPointStepSize,
|
||||||
|
const int64_t memoryLimitLowerBound = allocators::MemoryLimitLowerBound);
|
||||||
STLPoolAllocator(const STLPoolAllocator&) throw();
|
STLPoolAllocator(const STLPoolAllocator&) throw();
|
||||||
STLPoolAllocator(uint32_t capacity) throw();
|
STLPoolAllocator(uint32_t capacity) throw();
|
||||||
template <class U>
|
template <class U>
|
||||||
@@ -97,17 +99,17 @@ STLPoolAllocator<T>::STLPoolAllocator() throw()
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <class T>
|
template <class T>
|
||||||
STLPoolAllocator<T>::STLPoolAllocator(joblist::ResourceManager* rm)
|
STLPoolAllocator<T>::STLPoolAllocator(joblist::ResourceManager* rm,
|
||||||
|
const int64_t checkPointStepSize,
|
||||||
|
const int64_t memoryLimitLowerBound)
|
||||||
{
|
{
|
||||||
if (rm)
|
if (rm)
|
||||||
{
|
{
|
||||||
// std::cout << "STLPoolAllocator with RM " << std::endl;
|
auto alloc = rm->getAllocator<PoolAllocatorBufType>(checkPointStepSize, memoryLimitLowerBound);
|
||||||
auto alloc = rm->getAllocator<PoolAllocatorBufType>(1024);
|
|
||||||
pa.reset(new PoolAllocator(alloc, DEFAULT_SIZE));
|
pa.reset(new PoolAllocator(alloc, DEFAULT_SIZE));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// std::cout << "STLPoolAllocator w/o RM " << std::endl;
|
|
||||||
pa.reset(new PoolAllocator(DEFAULT_SIZE));
|
pa.reset(new PoolAllocator(DEFAULT_SIZE));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user