1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(): aggregating CountingAllocator

This commit is contained in:
drrtuy
2025-02-07 16:52:43 +00:00
parent cf9b5eb32c
commit bb4cd40ca4
6 changed files with 300 additions and 139 deletions

View File

@ -220,7 +220,7 @@ class QszMonThd
}; };
#endif #endif
#define DUMP_CACHE_CONTENTS // #define DUMP_CACHE_CONTENTS
#ifdef DUMP_CACHE_CONTENTS #ifdef DUMP_CACHE_CONTENTS
void* waitForSIGUSR1(void* p) void* waitForSIGUSR1(void* p)
{ {

View File

@ -17,25 +17,28 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <atomic> #include <atomic>
#include <cstddef> #include <cstddef>
#include <cstdint>
#include <memory> #include <memory>
#include <thread> #include <thread>
#include <boost/smart_ptr/allocate_shared_array.hpp>
#include "countingallocator.h" #include "countingallocator.h"
#include "rowgroup.h"
using namespace allocators; using namespace allocators;
// Example class to be managed by the allocator // Example class to be managed by the allocator
struct TestClass struct TestClass
{ {
int value; int value[1024];
TestClass(int val) : value(val) TestClass(int val) : value(val)
{ {
} }
}; };
static const constexpr int64_t MemoryAllowance = 10 * 1024 * 1024; static const constexpr int64_t MemoryAllowance = 1 * 1024 * 1024;
static const constexpr int64_t MemoryLimitStep = MemoryAllowance / 100;
// Test Fixture for AtomicCounterAllocator // Test Fixture for AtomicCounterAllocator
class CountingAllocatorTest : public ::testing::Test class CountingAllocatorTest : public ::testing::Test
@ -49,7 +52,8 @@ class CountingAllocatorTest : public ::testing::Test
// Constructor // Constructor
CountingAllocatorTest() CountingAllocatorTest()
: allocatedMemory(MemoryAllowance), allocator(&allocatedMemory, MemoryAllowance / 100) : allocatedMemory(MemoryAllowance)
, allocator(&allocatedMemory, MemoryAllowance / 100, MemoryAllowance / 100)
{ {
} }
@ -63,7 +67,15 @@ TEST_F(CountingAllocatorTest, Allocation)
const std::size_t numObjects = 5; const std::size_t numObjects = 5;
TestClass* ptr = allocator.allocate(numObjects); TestClass* ptr = allocator.allocate(numObjects);
EXPECT_NE(ptr, nullptr); EXPECT_NE(ptr, nullptr);
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance - numObjects * static_cast<int64_t>(sizeof(TestClass))); if (MemoryLimitStep > numObjects * static_cast<int64_t>(sizeof(TestClass)))
{
EXPECT_EQ(allocatedMemory.load() - allocator.getCurrentLocalMemoryUsage(),
MemoryAllowance - numObjects * static_cast<int64_t>(sizeof(TestClass)));
}
else
{
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance - numObjects * static_cast<int64_t>(sizeof(TestClass)));
}
allocator.deallocate(ptr, numObjects); allocator.deallocate(ptr, numObjects);
} }
@ -72,8 +84,15 @@ TEST_F(CountingAllocatorTest, Deallocation)
{ {
const std::size_t numObjects = 3; const std::size_t numObjects = 3;
TestClass* ptr = allocator.allocate(numObjects); TestClass* ptr = allocator.allocate(numObjects);
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance - numObjects * static_cast<int64_t>(sizeof(TestClass))); if (MemoryLimitStep > numObjects * static_cast<int64_t>(sizeof(TestClass)))
{
EXPECT_EQ(allocatedMemory.load() - allocator.getCurrentLocalMemoryUsage(),
MemoryAllowance - numObjects * static_cast<int64_t>(sizeof(TestClass)));
}
else
{
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance - numObjects * static_cast<int64_t>(sizeof(TestClass)));
}
allocator.deallocate(ptr, numObjects); allocator.deallocate(ptr, numObjects);
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
} }
@ -94,34 +113,42 @@ TEST_F(CountingAllocatorTest, AllocatorEquality)
TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator) TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator)
{ {
// Create a shared_ptr using allocate_shared with the custom allocator // Create a shared_ptr using allocate_shared with the custom allocator
std::shared_ptr<TestClass> ptr = std::allocate_shared<TestClass>(allocator, 100); CountingAllocator<TestClass> allocatorSmallerStep(&allocatedMemory, MemoryAllowance / 100,
MemoryAllowance / 1000);
std::shared_ptr<TestClass> ptr1 = std::allocate_shared<TestClass>(allocatorSmallerStep, 100);
std::shared_ptr<TestClass> ptr2 = std::allocate_shared<TestClass>(allocatorSmallerStep, 100);
std::shared_ptr<TestClass> ptr3 = std::allocate_shared<TestClass>(allocatorSmallerStep, 100);
// Check that the counter has increased by the size of TestClass plus control block // Check that the counter has increased by the size of TestClass plus control block
// Exact size depends on the implementation, so we verify it's at least sizeof(TestClass) // Exact size depends on the implementation, so we verify it's at least sizeof(TestClass)
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - static_cast<int64_t>(sizeof(TestClass))); EXPECT_LE(allocatedMemory.load(), MemoryAllowance - 3 * static_cast<int64_t>(sizeof(TestClass)));
// Reset the shared_ptr and check that the counter decreases appropriately // Reset the shared_ptr and check that the counter decreases appropriately
ptr.reset(); ptr1.reset();
ptr2.reset();
ptr3.reset();
// After deallocation, the counter should return to zero // After deallocation, the counter should return to zero
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
auto deleter = [this](TestClass* ptr) { this->allocator.deallocate(ptr, 1); }; // auto deleter = [&allocatorSmallerStep](TestClass* ptr) { allocatorSmallerStep.deallocate(ptr, 1); };
ptr.reset(allocator.allocate(1), deleter); // ptr1.reset(allocatorSmallerStep.allocate(3), deleter);
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - static_cast<int64_t>(sizeof(TestClass))); // EXPECT_LE(allocatedMemory.load(), MemoryAllowance - static_cast<int64_t>(sizeof(TestClass)));
ptr.reset(); ptr1.reset();
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
size_t allocSize = 16ULL * rowgroup::rgCommonSize; using RGDataBufType = uint8_t[];
auto buf = boost::allocate_shared<rowgroup::RGDataBufType>(allocator, allocSize); size_t allocSize = 16ULL * 8192;
auto buf = boost::allocate_shared<RGDataBufType>(allocator, allocSize);
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - allocSize); EXPECT_LE(allocatedMemory.load(), MemoryAllowance - allocSize);
buf.reset(); buf.reset();
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
CountingAllocator<rowgroup::RGDataBufType> allocator1(&allocatedMemory, MemoryAllowance / 100); CountingAllocator<RGDataBufType> allocator1(&allocatedMemory, MemoryAllowance / 100, MemoryAllowance / 100);
std::optional<CountingAllocator<rowgroup::RGDataBufType>> allocator2(allocator1); std::optional<CountingAllocator<RGDataBufType>> allocator2(allocator1);
auto buf1 = boost::allocate_shared<rowgroup::RGDataBufType>(*allocator2, allocSize); auto buf1 = boost::allocate_shared<RGDataBufType>(*allocator2, allocSize);
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - allocSize); EXPECT_LE(allocatedMemory.load(), MemoryAllowance - allocSize);
buf1.reset(); buf1.reset();
@ -136,11 +163,27 @@ TEST_F(CountingAllocatorTest, ThreadSafety)
auto worker = [this]() auto worker = [this]()
{ {
std::vector<TestClass*> ptrs;
CountingAllocator<TestClass> allocatorLocal(&allocatedMemory, MemoryAllowance / 100,
MemoryAllowance / 1000);
for (std::size_t i = 0; i < allocationsPerThread; ++i) for (std::size_t i = 0; i < allocationsPerThread; ++i)
{ {
TestClass* ptr = allocator.allocate(1); ptrs.push_back(allocatorLocal.allocate(1));
allocator.deallocate(ptr, 1);
} }
int64_t usedMemory = allocationsPerThread * sizeof(TestClass);
EXPECT_EQ(allocatorLocal.getCurrentLocalMemoryUsage(), allocationsPerThread * sizeof(TestClass));
EXPECT_GE(usedMemory - allocatorLocal.getlastMemoryLimitCheckpoint(), 0LL);
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - allocatorLocal.getlastMemoryLimitCheckpoint());
for (auto* ptr : ptrs)
{
allocatorLocal.deallocate(ptr, 1);
}
EXPECT_EQ(allocatorLocal.getCurrentLocalMemoryUsage(), 0);
EXPECT_EQ(allocatorLocal.getlastMemoryLimitCheckpoint(), 0);
EXPECT_GE(allocatedMemory.load(), allocationsPerThread * sizeof(TestClass));
}; };
std::vector<std::thread> threads; std::vector<std::thread> threads;
@ -156,7 +199,7 @@ TEST_F(CountingAllocatorTest, ThreadSafety)
th.join(); th.join();
} }
// After all allocations and deallocations, the counter should be zero // After all allocations and deallocations, the counter should be zero minus the remainder
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
} }
@ -172,8 +215,8 @@ TEST_F(CountingAllocatorTest, AllocateZeroObjects)
TEST_F(CountingAllocatorTest, CopyAssignable) TEST_F(CountingAllocatorTest, CopyAssignable)
{ {
CountingAllocator<TestClass> allocator1(&allocatedMemory); CountingAllocator<TestClass> allocator1(&allocatedMemory);
CountingAllocator<TestClass> allocator2(&allocatedMemory); CountingAllocator<TestClass> allocator2(&allocatedMemory);
allocator1 = allocator2; allocator1 = allocator2;
EXPECT_EQ(allocator1, allocator2); EXPECT_EQ(allocator1, allocator2);
} }

View File

@ -17,99 +17,217 @@
#pragma once #pragma once
#include <cassert>
#include <cstdint> #include <cstdint>
#include <limits>
#include <memory>
#include <atomic> #include <atomic>
#include <cstddef> #include <cstddef>
#include <iostream> #include <cstdlib>
#include <utility>
namespace allocators #include <iostream>
namespace allocators
{ {
// const constexpr std::uint64_t CounterUpdateUnitSize = 4 * 1024 * 1024; // WIP placement
const constexpr std::int64_t MemoryLimitLowerBound = 100 * 1024 * 1024; // WIP
// const constexpr std::uint64_t CounterUpdateUnitSize = 4 * 1024 * 1024;
const constexpr std::int64_t MemoryLimitLowerBound = 500 * 1024 * 1024; // WIP
const constexpr std::int64_t CheckPointStepSize = 100 * 1024 * 1024; // WIP
// Custom Allocator that tracks allocated memory using an atomic counter // Custom Allocator that tracks allocated memory using an atomic counter
template <typename T> template <typename T>
class CountingAllocator { class CountingAllocator
public: {
using value_type = T; public:
using value_type = T;
// Constructor accepting a reference to an atomic counter bool needCheckPoint(const int64_t sizeChange, const int64_t diffSinceLastCheckPoint,
explicit CountingAllocator(std::atomic<int64_t>* memoryLimit, const uint64_t lowerBound = MemoryLimitLowerBound) noexcept const int64_t checkPointStepSize)
: memoryLimit_(memoryLimit), memoryLimitLowerBound(lowerBound) {} {
return std::llabs(sizeChange + diffSinceLastCheckPoint) > checkPointStepSize;
}
// Copy constructor (template to allow conversion between different types) int64_t int_distance(const int64_t x, const int64_t y)
template <typename U> {
CountingAllocator(const CountingAllocator<U>& other) noexcept return (x > y) ? x - y : y - x;
: memoryLimit_(other.memoryLimit_), memoryLimitLowerBound(other.memoryLimitLowerBound) {} }
// Allocate memory for n objects of type T // INVARIANT: sizeChange > 0
template <typename U = T> void changeLocalAndGlobalMemoryLimits(const int64_t sizeChange)
typename std::enable_if<!std::is_array<U>::value, U*>::type {
allocate(std::size_t n) // This routine must be used for mem allocation accounting path only!
// The case Current > last checkpoint(we deallocated mem since the last checkpoint), sizeIncrease is
// negative b/c we now move into the opposite direction. The case Last Checkpoint > Current (we allocated
// mem since the last checkpoint), sizeIncrease is positive
int64_t sizeChangeWDirection =
(currentLocalMemoryUsage_ <= lastMemoryLimitCheckpoint_) ? -sizeChange : sizeChange;
int64_t diffSinceLastCheckPoint = int_distance(currentLocalMemoryUsage_, lastMemoryLimitCheckpoint_);
if (needCheckPoint(sizeChangeWDirection, diffSinceLastCheckPoint, checkPointStepSize_))
{ {
auto memCounted = memoryLimit_->fetch_sub(n * sizeof(T), std::memory_order_relaxed); // std::cout << "changeLocalAndGlobalMemoryLimits " << sizeChange << " bytes at "
if (memCounted < memoryLimitLowerBound) { // << " diffSinceLastCheckPoint " << diffSinceLastCheckPoint << ". current timit: " << std::dec
memoryLimit_->fetch_add(n * sizeof(T), std::memory_order_relaxed); // << memoryLimit_->load() << std::hex << " bytes.\n";
throw std::bad_alloc(); // std::cout << std::dec;
}
// auto lastMemoryLimitCheckpointDiff = diffSinceLastCheckPoint + sizeChangeWDirection;
T* ptr = static_cast<T*>(::operator new(n * sizeof(T))); int64_t lastMemoryLimitCheckpointDiff = (currentLocalMemoryUsage_ <= lastMemoryLimitCheckpoint_)
// std::cout << "[Allocate] " << n * sizeof(T) << " bytes at " << static_cast<void*>(ptr) ? sizeChange - diffSinceLastCheckPoint
// << ". current timit: " << std::dec << memoryLimit_.load() << std::hex << " bytes.\n"; : sizeChange + diffSinceLastCheckPoint;
// std::cout << std::dec; assert(lastMemoryLimitCheckpointDiff > 0);
return ptr; // {
// std::cout << "[Allocate::changeLocalAndGlobalMemoryLimits!!!] lastMemoryLimitCheckpoint_ "
// << lastMemoryLimitCheckpoint_ << " currentLocalMemoryUsage_ " << currentLocalMemoryUsage_
// << " sizeChangeWDirection " << sizeChangeWDirection << " lastMemoryLimitCheckpointDiff " << lastMemoryLimitCheckpointDiff
// << std::endl;
// }
// lastMemoryLimitCheckpointDiff sign signifies a direction we move allocating memory.
auto currentGlobalMemoryLimit =
memoryLimit_->fetch_sub(lastMemoryLimitCheckpointDiff, std::memory_order_relaxed);
if (currentGlobalMemoryLimit < memoryLimitLowerBound_)
{
memoryLimit_->fetch_add(lastMemoryLimitCheckpointDiff, std::memory_order_relaxed);
// ? what to do with local counters here
throw std::bad_alloc();
}
lastMemoryLimitCheckpoint_ += lastMemoryLimitCheckpointDiff;
} }
template <typename U = T> currentLocalMemoryUsage_ += sizeChange;
typename std::enable_if<std::is_array<U>::value, typename std::remove_extent<U>::type*>::type }
allocate(std::size_t n)
// Constructor accepting a reference to an atomic counter
explicit CountingAllocator(std::atomic<int64_t>* memoryLimit,
const uint64_t lowerBound = MemoryLimitLowerBound,
const uint64_t checkPointStepSize = CheckPointStepSize) noexcept
: memoryLimit_(memoryLimit), memoryLimitLowerBound_(lowerBound), checkPointStepSize_(checkPointStepSize)
{
}
// Copy constructor (template to allow conversion between different types)
template <typename U>
CountingAllocator(const CountingAllocator<U>& other) noexcept
: memoryLimit_(other.memoryLimit_)
, memoryLimitLowerBound_(other.memoryLimitLowerBound_)
, checkPointStepSize_(other.checkPointStepSize_)
{
}
// Allocate memory for n objects of type T
template <typename U = T>
typename std::enable_if<!std::is_array<U>::value, U*>::type allocate(std::size_t n)
{
auto sizeAllocated = n * sizeof(T);
changeLocalAndGlobalMemoryLimits(sizeAllocated);
T* ptr = static_cast<T*>(::operator new(sizeAllocated));
// std::cout << "[Allocate] non-array " << n * sizeof(T) << " bytes at " << static_cast<void*>(ptr)
// << ". current timit: " << std::dec << memoryLimit_->load() << std::hex << " bytes.\n";
// std::cout << std::dec;
return ptr;
}
template <typename U = T>
typename std::enable_if<std::is_array<U>::value, typename std::remove_extent<U>::type*>::type allocate(
std::size_t n)
{
auto sizeAllocated = n * sizeof(T);
changeLocalAndGlobalMemoryLimits(sizeAllocated);
T ptr = static_cast<T>(::operator new[](n));
// std::cout << "[Allocate] array " << n * sizeof(T) << " bytes at " << static_cast<void*>(ptr)
// << ". current timit: " << std::dec << memoryLimit_->load() << std::hex << " bytes.\n";
return ptr;
}
// Deallocate memory for n objects of type T
void deallocate(T* ptr, std::size_t n) noexcept
{
::operator delete(ptr);
int64_t sizeToDeallocate = n * sizeof(T);
// std::cout << "[Deallocate start] " << sizeToDeallocate << " bytes from " << static_cast<void*>(ptr)
// << ". current timit: " << std::dec << memoryLimit_->load() << std::hex << " bytes.\n";
// std::cout << std::dec;
int64_t sizeChangeWDirection =
(currentLocalMemoryUsage_ >= lastMemoryLimitCheckpoint_) ? -sizeToDeallocate : sizeToDeallocate;
int64_t diffSinceLastCheckPoint = int_distance(currentLocalMemoryUsage_, lastMemoryLimitCheckpoint_);
if (needCheckPoint(sizeChangeWDirection, diffSinceLastCheckPoint, checkPointStepSize_))
{ {
auto memCounted = memoryLimit_->fetch_sub(n * sizeof(T), std::memory_order_relaxed); // Invariant is lastMemoryLimitCheckpoint_ >= currentLocalMemoryUsage_ - sizeToDeallocate
if (memCounted < memoryLimitLowerBound) { // and lastMemoryLimitCheckpoint_ value must be negative.
memoryLimit_->fetch_add(n * sizeof(T), std::memory_order_relaxed); // int64_t lastMemoryLimitCheckpointDiff =
throw std::bad_alloc(); // labs(lastMemoryLimitCheckpoint_ - currentLocalMemoryUsage_ - sizeToDeallocate);
} // auto lastMemoryLimitCheckpointDiff = diffSinceLastCheckPoint + sizeChangeWDirection;
int64_t lastMemoryLimitCheckpointDiff =
T ptr = static_cast<T>(::operator new[](n)); (currentLocalMemoryUsage_ >= lastMemoryLimitCheckpoint_)
// std::cout << "[Allocate] " << n * sizeof(T) << " bytes at " << static_cast<void*>(ptr) ? sizeToDeallocate - (currentLocalMemoryUsage_ - lastMemoryLimitCheckpoint_)
// << ". current timit: " << std::dec << memoryLimit_.load() << std::hex << " bytes.\n"; : diffSinceLastCheckPoint + sizeToDeallocate;
return ptr;
assert(lastMemoryLimitCheckpointDiff > 0);
// std::cout << "[Deallocate checkpoint!!!] lastMemoryLimitCheckpoint_ " << lastMemoryLimitCheckpoint_
// << " currentLocalMemoryUsage_ " << currentLocalMemoryUsage_ << " sizeChangeWDirection "
// << sizeChangeWDirection << " lastMemoryLimitCheckpointDiff " << lastMemoryLimitCheckpointDiff
// << std::endl;
// assert(lastMemoryLimitCheckpointDiff < 0);
memoryLimit_->fetch_add(lastMemoryLimitCheckpointDiff, std::memory_order_relaxed);
lastMemoryLimitCheckpoint_ -= (lastMemoryLimitCheckpoint_ == 0) ? 0 : lastMemoryLimitCheckpointDiff;
} }
currentLocalMemoryUsage_ = currentLocalMemoryUsage_ - sizeToDeallocate;
// Deallocate memory for n objects of type T // std::cout << "[Deallocate end] " << n * sizeof(T) << " bytes from " << static_cast<void*>(ptr)
void deallocate(T* ptr, std::size_t n) noexcept // << ". current timit: " << std::dec << memoryLimit_->load() << std::hex << " bytes.\n";
{
::operator delete(ptr);
memoryLimit_->fetch_add(n * sizeof(T), std::memory_order_relaxed);
// std::cout << "[Deallocate] " << n * sizeof(T) << " bytes from " << static_cast<void*>(ptr)
// << ". current timit: " << std::dec << memoryLimit_.load() << std::hex << " bytes.\n";
// std::cout << std::dec;
}
// Equality operators (allocators are equal if they share the same counter) // std::cout << std::dec;
template <typename U> }
bool operator==(const CountingAllocator<U>& other) const noexcept
{
return memoryLimit_ == other.memoryLimit_;
}
template <typename U> // Equality operators (allocators are equal if they share the same counter)
bool operator!=(const CountingAllocator<U>& other) const noexcept template <typename U>
{ bool operator==(const CountingAllocator<U>& other) const noexcept
return !(*this == other); {
} return memoryLimit_ == other.memoryLimit_;
}
private: template <typename U>
std::atomic<int64_t>* memoryLimit_ = nullptr; bool operator!=(const CountingAllocator<U>& other) const noexcept
int64_t memoryLimitLowerBound = 0; {
return !(*this == other);
}
// Grant access to other instances of CountingAllocator with different types int64_t getMemoryLimitLowerBound() const noexcept
template <typename U> {
friend class CountingAllocator; return memoryLimitLowerBound_;
}
int64_t getlastMemoryLimitCheckpoint() const noexcept
{
return lastMemoryLimitCheckpoint_;
}
int64_t getCurrentLocalMemoryUsage() const noexcept
{
return currentLocalMemoryUsage_;
}
private:
std::atomic<int64_t>* memoryLimit_ = nullptr;
int64_t memoryLimitLowerBound_ = MemoryLimitLowerBound;
int64_t checkPointStepSize_ = CheckPointStepSize;
int64_t lastMemoryLimitCheckpoint_ = 0;
int64_t currentLocalMemoryUsage_ = 0;
// Grant access to other instances of CountingAllocator with different types
template <typename U>
friend class CountingAllocator;
}; };
} // namespace allocators } // namespace allocators

View File

@ -102,7 +102,7 @@ STLPoolAllocator<T>::STLPoolAllocator(joblist::ResourceManager* rm)
if (rm) if (rm)
{ {
auto alloc = rm->getAllocator<PoolAllocatorBufType>(); auto alloc = rm->getAllocator<PoolAllocatorBufType>();
pa.reset(new PoolAllocator(alloc)); pa.reset(new PoolAllocator(alloc, DEFAULT_SIZE));
} }
else else
{ {

View File

@ -67,9 +67,9 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
// _pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]); // _pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]);
for (i = 0; i < bucketCount; i++) for (i = 0; i < bucketCount; i++)
{ {
STLPoolAllocator<pair<const long double, Row::Pointer>> alloc(resourceManager_); // STLPoolAllocator<pair<const long double, Row::Pointer>> alloc(resourceManager_);
// _pool[i] = alloc.getPoolAllocator(); // _pool[i] = alloc.getPoolAllocator();
// auto alloc = resourceManager_->getAllocator<pair<const long double, Row::Pointer>>(); auto alloc = resourceManager_->getAllocator<pair<const long double, Row::Pointer>>();
ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc)); ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc));
} }
} }
@ -79,9 +79,9 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
_pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]); _pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]);
for (i = 0; i < bucketCount; i++) for (i = 0; i < bucketCount; i++)
{ {
STLPoolAllocator<pair<const int64_t, Row::Pointer>> alloc(resourceManager_); // STLPoolAllocator<pair<const int64_t, Row::Pointer>> alloc(resourceManager_);
// _pool[i] = alloc.getPoolAllocator(); // _pool[i] = alloc.getPoolAllocator();
// auto alloc = resourceManager_->getAllocator<pair<const int64_t, Row::Pointer>>(); auto alloc = resourceManager_->getAllocator<pair<const int64_t, Row::Pointer>>();
sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)); sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc));
} }
} }
@ -91,9 +91,9 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
_pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]); _pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]);
for (i = 0; i < bucketCount; i++) for (i = 0; i < bucketCount; i++)
{ {
STLPoolAllocator<pair<const int64_t, uint8_t*>> alloc(resourceManager_); // STLPoolAllocator<pair<const int64_t, uint8_t*>> alloc(resourceManager_);
// _pool[i] = alloc.getPoolAllocator(); // _pool[i] = alloc.getPoolAllocator();
// auto alloc = resourceManager_->getAllocator<pair<const int64_t, uint8_t*>>(); auto alloc = resourceManager_->getAllocator<pair<const int64_t, uint8_t*>>();
h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc)); h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc));
} }
} }
@ -184,9 +184,9 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
ht.reset(new boost::scoped_ptr<typelesshash_t>[bucketCount]); ht.reset(new boost::scoped_ptr<typelesshash_t>[bucketCount]);
for (i = 0; i < bucketCount; i++) for (i = 0; i < bucketCount; i++)
{ {
STLPoolAllocator<pair<const TypelessData, Row::Pointer>> alloc(resourceManager_); // STLPoolAllocator<pair<const TypelessData, Row::Pointer>> alloc(resourceManager_);
// _pool[i] = alloc.getPoolAllocator(); // _pool[i] = alloc.getPoolAllocator();
// auto alloc = resourceManager_->getAllocator<pair<const TypelessData, Row::Pointer>>(); auto alloc = resourceManager_->getAllocator<pair<const TypelessData, Row::Pointer>>();
ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc));
} }
m_bucketLocks.reset(new boost::mutex[bucketCount]); m_bucketLocks.reset(new boost::mutex[bucketCount]);
@ -1833,9 +1833,9 @@ void TupleJoiner::clearData()
for (uint i = 0; i < bucketCount; i++) for (uint i = 0; i < bucketCount; i++)
{ {
STLPoolAllocator<pair<const TypelessData, Row::Pointer>> alloc(resourceManager_); // STLPoolAllocator<pair<const TypelessData, Row::Pointer>> alloc(resourceManager_);
// _pool[i] = alloc.getPoolAllocator(); // _pool[i] = alloc.getPoolAllocator();
// auto alloc = resourceManager_->getAllocator<pair<const TypelessData, Row::Pointer>>(); auto alloc = resourceManager_->getAllocator<pair<const TypelessData, Row::Pointer>>();
if (typelessJoin) if (typelessJoin)
ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc));
else if (smallRG.getColTypes()[smallKeyColumns[0]] == CalpontSystemCatalog::LONGDOUBLE) else if (smallRG.getColTypes()[smallKeyColumns[0]] == CalpontSystemCatalog::LONGDOUBLE)

View File

@ -478,37 +478,37 @@ class TupleJoiner
} }
private: private:
typedef std::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>, // typedef std::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>,
utils::STLPoolAllocator<std::pair<const int64_t, uint8_t*> > > // utils::STLPoolAllocator<std::pair<const int64_t, uint8_t*> > >
hash_t; // hash_t;
typedef std::unordered_multimap<int64_t, rowgroup::Row::Pointer, hasher, std::equal_to<int64_t>, // typedef std::unordered_multimap<int64_t, rowgroup::Row::Pointer, hasher, std::equal_to<int64_t>,
utils::STLPoolAllocator<std::pair<const int64_t, rowgroup::Row::Pointer> > > // utils::STLPoolAllocator<std::pair<const int64_t, rowgroup::Row::Pointer> > >
sthash_t; // sthash_t;
typedef std::unordered_multimap< // typedef std::unordered_multimap<
TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to<TypelessData>, // TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to<TypelessData>,
utils::STLPoolAllocator<std::pair<const TypelessData, rowgroup::Row::Pointer> > > // utils::STLPoolAllocator<std::pair<const TypelessData, rowgroup::Row::Pointer> > >
typelesshash_t; // typelesshash_t;
// MCOL-1822 Add support for Long Double AVG/SUM small side // // MCOL-1822 Add support for Long Double AVG/SUM small side
typedef std::unordered_multimap< // typedef std::unordered_multimap<
long double, rowgroup::Row::Pointer, hasher, LongDoubleEq, // long double, rowgroup::Row::Pointer, hasher, LongDoubleEq,
utils::STLPoolAllocator<std::pair<const long double, rowgroup::Row::Pointer> > > // utils::STLPoolAllocator<std::pair<const long double, rowgroup::Row::Pointer> > >
ldhash_t; // ldhash_t;
// typedef std::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>, typedef std::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>,
// allocators::CountingAllocator<std::pair<const int64_t, uint8_t*> > > allocators::CountingAllocator<std::pair<const int64_t, uint8_t*> > >
// hash_t; hash_t;
// typedef std::unordered_multimap<int64_t, rowgroup::Row::Pointer, hasher, std::equal_to<int64_t>, typedef std::unordered_multimap<int64_t, rowgroup::Row::Pointer, hasher, std::equal_to<int64_t>,
// allocators::CountingAllocator<std::pair<const int64_t, rowgroup::Row::Pointer> > > allocators::CountingAllocator<std::pair<const int64_t, rowgroup::Row::Pointer> > >
// sthash_t; sthash_t;
// typedef std::unordered_multimap< typedef std::unordered_multimap<
// TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to<TypelessData>, TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to<TypelessData>,
// allocators::CountingAllocator<std::pair<const TypelessData, rowgroup::Row::Pointer> > > allocators::CountingAllocator<std::pair<const TypelessData, rowgroup::Row::Pointer> > >
// typelesshash_t; typelesshash_t;
// // MCOL-1822 Add support for Long Double AVG/SUM small side // MCOL-1822 Add support for Long Double AVG/SUM small side
// typedef std::unordered_multimap< typedef std::unordered_multimap<
// long double, rowgroup::Row::Pointer, hasher, LongDoubleEq, long double, rowgroup::Row::Pointer, hasher, LongDoubleEq,
// allocators::CountingAllocator<std::pair<const long double, rowgroup::Row::Pointer> > > allocators::CountingAllocator<std::pair<const long double, rowgroup::Row::Pointer> > >
// ldhash_t; ldhash_t;
typedef hash_t::iterator iterator; typedef hash_t::iterator iterator;
typedef typelesshash_t::iterator thIterator; typedef typelesshash_t::iterator thIterator;