1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

feat(): dangling pointer/ref issue has been solved for both RGData and BS

This commit is contained in:
drrtuy
2024-12-13 15:03:56 +00:00
parent fac7dfa552
commit 1c297b9e9e
9 changed files with 56 additions and 67 deletions

View File

@@ -263,7 +263,7 @@ ENDIF()
MY_CHECK_AND_SET_COMPILER_FLAG("-Wno-deprecated-copy" DEBUG RELEASE RELWITHDEBINFO MINSIZEREL) MY_CHECK_AND_SET_COMPILER_FLAG("-Wno-deprecated-copy" DEBUG RELEASE RELWITHDEBINFO MINSIZEREL)
MY_CHECK_AND_SET_COMPILER_FLAG("-Wno-deprecated-declarations" DEBUG RELEASE RELWITHDEBINFO MINSIZEREL) MY_CHECK_AND_SET_COMPILER_FLAG("-Wno-deprecated-declarations" DEBUG RELEASE RELWITHDEBINFO MINSIZEREL)
MY_CHECK_AND_SET_COMPILER_FLAG("-Werror -Wall -Wextra") MY_CHECK_AND_SET_COMPILER_FLAG("-Wall -Wextra")
SET (ENGINE_LDFLAGS "-Wl,--no-as-needed -Wl,--add-needed") SET (ENGINE_LDFLAGS "-Wl,--no-as-needed -Wl,--add-needed")
SET (ENGINE_DT_LIB datatypes) SET (ENGINE_DT_LIB datatypes)
SET (ENGINE_COMMON_LIBS messageqcpp loggingcpp configcpp idbboot boost_thread xml2 pthread rt ${ENGINE_DT_LIB}) SET (ENGINE_COMMON_LIBS messageqcpp loggingcpp configcpp idbboot boost_thread xml2 pthread rt ${ENGINE_DT_LIB})

View File

@@ -241,8 +241,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
uint8_t tmp8; uint8_t tmp8;
uint16_t tmp16; uint16_t tmp16;
Command::CommandType type; Command::CommandType type;
auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory();
std::cout << "initBPP availableMemory: " << cnt << std::endl;
bs.advance(sizeof(ISMPacketHeader)); // skip the header bs.advance(sizeof(ISMPacketHeader)); // skip the header
bs >> tmp8; bs >> tmp8;
@@ -848,8 +846,6 @@ int BatchPrimitiveProcessor::endOfJoiner()
{ {
endOfJoinerRan = true; endOfJoinerRan = true;
pthread_mutex_unlock(&objLock); pthread_mutex_unlock(&objLock);
auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory();
std::cout << "endOfJoiner availableMemory: " << cnt << std::endl;
return 0; return 0;
} }
@@ -892,8 +888,6 @@ int BatchPrimitiveProcessor::endOfJoiner()
endOfJoinerRan = true; endOfJoinerRan = true;
pthread_mutex_unlock(&objLock); pthread_mutex_unlock(&objLock);
auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory();
std::cout << "endOfJoiner availableMemory: " << cnt << std::endl;
return 0; return 0;
} }
@@ -2217,7 +2211,7 @@ int BatchPrimitiveProcessor::operator()()
cpDataFromDictScan = false; cpDataFromDictScan = false;
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<messageqcpp::BSBufType>(); auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<messageqcpp::BSBufType>();
messageqcpp::SBS bs(new ByteStream(&allocator)); messageqcpp::SBS bs(new ByteStream(allocator));
#ifdef PRIMPROC_STOPWATCH #ifdef PRIMPROC_STOPWATCH
stopwatch->start("BPP() execute"); stopwatch->start("BPP() execute");
@@ -2283,14 +2277,14 @@ void BatchPrimitiveProcessor::allocLargeBuffers()
if (ot == ROW_GROUP && !outRowGroupData) if (ot == ROW_GROUP && !outRowGroupData)
{ {
// outputRG.setUseStringTable(true); // outputRG.setUseStringTable(true);
outRowGroupData.reset(new RGData(outputRG, &allocator)); outRowGroupData.reset(new RGData(outputRG, allocator));
outputRG.setData(outRowGroupData.get()); outputRG.setData(outRowGroupData.get());
} }
if (fe1 && !fe1Data) if (fe1 && !fe1Data)
{ {
// fe1Input.setUseStringTable(true); // fe1Input.setUseStringTable(true);
fe1Data.reset(new RGData(fe1Input, &allocator)); fe1Data.reset(new RGData(fe1Input, allocator));
// fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]); // fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]);
fe1Input.setData(fe1Data.get()); fe1Input.setData(fe1Data.get());
} }
@@ -2298,14 +2292,14 @@ void BatchPrimitiveProcessor::allocLargeBuffers()
if (fe2 && !fe2Data) if (fe2 && !fe2Data)
{ {
// fe2Output.setUseStringTable(true); // fe2Output.setUseStringTable(true);
fe2Data.reset(new RGData(fe2Output, &allocator)); fe2Data.reset(new RGData(fe2Output, allocator));
fe2Output.setData(fe2Data.get()); fe2Output.setData(fe2Data.get());
} }
if (getTupleJoinRowGroupData && !joinedRGMem) if (getTupleJoinRowGroupData && !joinedRGMem)
{ {
// joinedRG.setUseStringTable(true); // joinedRG.setUseStringTable(true);
joinedRGMem.reset(new RGData(joinedRG, &allocator)); joinedRGMem.reset(new RGData(joinedRG, allocator));
joinedRG.setData(joinedRGMem.get()); joinedRG.setData(joinedRGMem.get());
} }
} }

View File

@@ -15,9 +15,11 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */ MA 02110-1301, USA. */
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <memory>
#include <atomic> #include <atomic>
#include <cstddef> #include <cstddef>
#include <memory>
#include <thread>
#include "countingallocator.h" #include "countingallocator.h"
#include "rowgroup.h" #include "rowgroup.h"
@@ -116,6 +118,14 @@ TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator)
buf.reset(); buf.reset();
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
CountingAllocator<rowgroup::RGDataBufType> allocator1(&allocatedMemory, MemoryAllowance / 100);
std::optional<CountingAllocator<rowgroup::RGDataBufType>> allocator2(allocator1);
auto buf1 = boost::allocate_shared<rowgroup::RGDataBufType>(*allocator2, allocSize);
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - allocSize);
buf1.reset();
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
} }
// Test 5: Thread Safety - Concurrent Allocations and Deallocations // Test 5: Thread Safety - Concurrent Allocations and Deallocations
@@ -158,4 +168,12 @@ TEST_F(CountingAllocatorTest, AllocateZeroObjects)
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
allocator.deallocate(ptr, 0); allocator.deallocate(ptr, 0);
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
}
TEST_F(CountingAllocatorTest, CopyAssignable)
{
CountingAllocator<TestClass> allocator1(&allocatedMemory);
CountingAllocator<TestClass> allocator2(&allocatedMemory);
allocator1 = allocator2;
EXPECT_EQ(allocator1, allocator2);
} }

View File

@@ -364,25 +364,6 @@ class RGDataTest : public ::testing::Test
execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL,
execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL}, execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL},
{65536, 16, 8, 4, 2, 1}, {8, 8, 8, 8, 8, 8}); {65536, 16, 8, 4, 2, 1}, {8, 8, 8, 8, 8, 8});
// rgD = rowgroup::RGData(rg, &alloc);
// rg.setData(&rgD);
// rg.initRow(&r);
// rg.getRow(0, &r);
// for (size_t i = 0; i < sValueVector.size(); i++)
// {
// // setStringField
// r.setBinaryField_offset(&sValueVector[i], sizeof(sValueVector[0]), offsets[0]);
// r.setBinaryField_offset(&anotherValueVector[i], sizeof(anotherValueVector[0]), offsets[1]);
// r.setIntField(s64ValueVector[i], 2);
// r.setIntField(s32ValueVector[i], 3);
// r.setIntField(s16ValueVector[i], 4);
// r.setIntField(s8ValueVector[i], 5);
// r.nextRow(rowSize);
// }
// rowCount = sValueVector.size();
} }
// void TearDown() override {} // void TearDown() override {}
@@ -397,7 +378,7 @@ class RGDataTest : public ::testing::Test
TEST_F(RGDataTest, AllocData) TEST_F(RGDataTest, AllocData)
{ {
std::cout << " test allocatedMemery " << allocatedMemory.load() << " rowsize " << rg.getRowSize() << " " << rg.getMaxDataSize() << std::endl; std::cout << " test allocatedMemery " << allocatedMemory.load() << " rowsize " << rg.getRowSize() << " " << rg.getMaxDataSize() << std::endl;
rgD = rowgroup::RGData(rg, &alloc); rgD = rowgroup::RGData(rg, alloc);
rg.setData(&rgD); rg.setData(&rgD);
rg.initRow(&r); rg.initRow(&r);
rg.getRow(0, &r); rg.getRow(0, &r);

View File

@@ -44,8 +44,7 @@ public:
// Copy constructor (template to allow conversion between different types) // Copy constructor (template to allow conversion between different types)
template <typename U> template <typename U>
CountingAllocator(const CountingAllocator<U>& other) noexcept CountingAllocator(const CountingAllocator<U>& other) noexcept
: memoryLimit_(other.memoryLimit_) {} : memoryLimit_(other.memoryLimit_), memoryLimitLowerBound(other.memoryLimitLowerBound) {}
// Allocate memory for n objects of type T // Allocate memory for n objects of type T
template <typename U = T> template <typename U = T>

View File

@@ -101,7 +101,7 @@ ByteStream::ByteStream(uint32_t initSize) : fBuf(0), fCurInPtr(0), fCurOutPtr(0)
} }
// WIP remove this one, replacing the allocator arg with a default nullptr. // WIP remove this one, replacing the allocator arg with a default nullptr.
ByteStream::ByteStream(allocators::CountingAllocator<uint8_t>* allocator, uint32_t initSize) ByteStream::ByteStream(allocators::CountingAllocator<uint8_t>& allocator, uint32_t initSize)
: fBuf(0), fCurInPtr(0), fCurOutPtr(0), fMaxLen(0), allocator(allocator) : fBuf(0), fCurInPtr(0), fCurOutPtr(0), fMaxLen(0), allocator(allocator)
{ {
if (initSize > 0) if (initSize > 0)

View File

@@ -19,6 +19,7 @@
*/ */
#pragma once #pragma once
#include <optional>
#include <string> #include <string>
#include <iostream> #include <iostream>
#include <sys/types.h> #include <sys/types.h>
@@ -78,7 +79,7 @@ class ByteStream : public Serializeable
* default ctor * default ctor
*/ */
EXPORT explicit ByteStream(uint32_t initSize = 8192); // multiples of pagesize are best EXPORT explicit ByteStream(uint32_t initSize = 8192); // multiples of pagesize are best
explicit ByteStream(allocators::CountingAllocator<BSBufType>* alloc, uint32_t initSize = 8192); explicit ByteStream(allocators::CountingAllocator<BSBufType>& alloc, uint32_t initSize = 8192);
/** /**
* ctor with a uint8_t array and len initializer * ctor with a uint8_t array and len initializer
*/ */
@@ -476,7 +477,7 @@ class ByteStream : public Serializeable
uint32_t fMaxLen; // how big fBuf is currently uint32_t fMaxLen; // how big fBuf is currently
// Stores `long strings`. // Stores `long strings`.
std::vector<rowgroup::StringStoreBufSPType> longStrings; std::vector<rowgroup::StringStoreBufSPType> longStrings;
allocators::CountingAllocator<BSBufType>* allocator = nullptr; std::optional<allocators::CountingAllocator<BSBufType>> allocator = {};
}; };
template <int W, typename T = void> template <int W, typename T = void>

View File

@@ -48,29 +48,13 @@ namespace rowgroup
{ {
using cscType = execplan::CalpontSystemCatalog::ColDataType; using cscType = execplan::CalpontSystemCatalog::ColDataType;
StringStore::StringStore(allocators::CountingAllocator<StringStoreBufType>* alloc) : StringStore() StringStore::StringStore(allocators::CountingAllocator<StringStoreBufType> alloc) : StringStore()
{ {
this->alloc = alloc; this->alloc = alloc;
} }
StringStore::~StringStore() StringStore::~StringStore()
{ {
#if 0
// for mem usage debugging
uint32_t i;
uint64_t inUse = 0, allocated = 0;
for (i = 0; i < mem.size(); i++)
{
MemChunk* tmp = (MemChunk*) mem.back().get();
inUse += tmp->currentSize;
allocated += tmp->capacity;
}
if (allocated > 0)
cout << "~SS: " << inUse << "/" << allocated << " = " << (float) inUse / (float) allocated << endl;
#endif
} }
uint64_t StringStore::storeString(const uint8_t* data, uint32_t len) uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
@@ -376,15 +360,16 @@ RGData::RGData(const RowGroup& rg)
#endif #endif
} }
RGData::RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>* alloc) : alloc(alloc) RGData::RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>& _alloc) : alloc(_alloc)
{ {
// rowData = shared_ptr<uint8_t[]>(buf, [alloc, allocSize](uint8_t* p) { alloc->deallocate(p, allocSize); rowData = boost::allocate_shared<RGDataBufType>(alloc.value(), rg.getMaxDataSize());
// });
rowData = boost::allocate_shared<RGDataBufType>(*alloc, rg.getMaxDataSize());
// rowData = std::make_shared(uint8_t[rg.getMaxDataSize()]); // rowData = std::make_shared(uint8_t[rg.getMaxDataSize()]);
if (rg.usesStringTable()) if (rg.usesStringTable())
strings.reset(new StringStore(alloc)); {
allocators::CountingAllocator<StringStoreBufType> ssAlloc = _alloc;
strings.reset(new StringStore(ssAlloc));
}
userDataStore.reset(); userDataStore.reset();
@@ -413,7 +398,18 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount)
userDataStore.reset(); userDataStore.reset();
if (rg.usesStringTable()) if (rg.usesStringTable())
strings.reset(new StringStore(alloc)); {
if (alloc)
{
allocators::CountingAllocator<StringStoreBufType> ssAlloc = alloc.value();
strings.reset(new StringStore(ssAlloc));
}
else
{
strings.reset(new StringStore());
}
}
else else
strings.reset(); strings.reset();

View File

@@ -136,7 +136,7 @@ class StringStore
{ {
public: public:
StringStore() = default; StringStore() = default;
StringStore(allocators::CountingAllocator<StringStoreBufType>* alloc); StringStore(allocators::CountingAllocator<StringStoreBufType> alloc);
StringStore(const StringStore&) = delete; StringStore(const StringStore&) = delete;
StringStore(StringStore&&) = delete; StringStore(StringStore&&) = delete;
StringStore& operator=(const StringStore&) = delete; StringStore& operator=(const StringStore&) = delete;
@@ -187,11 +187,11 @@ class StringStore
std::vector<boost::shared_ptr<uint8_t[]>> mem; std::vector<boost::shared_ptr<uint8_t[]>> mem;
// To store strings > 64KB (BLOB/TEXT) // To store strings > 64KB (BLOB/TEXT)
std::vector<StringStoreBufSPType> longStrings; std::vector<boost::shared_ptr<uint8_t[]>> longStrings;
bool empty = true; bool empty = true;
bool fUseStoreStringMutex = false; //@bug6065, make StringStore::storeString() thread safe bool fUseStoreStringMutex = false; //@bug6065, make StringStore::storeString() thread safe
boost::mutex fMutex; boost::mutex fMutex;
allocators::CountingAllocator<StringStoreBufType>* alloc = nullptr; std::optional<allocators::CountingAllocator<StringStoreBufType>> alloc {};
}; };
// Where we store user data for UDA(n)F // Where we store user data for UDA(n)F
@@ -264,8 +264,8 @@ class RGData
RGData() = default; // useless unless followed by an = or a deserialize operation RGData() = default; // useless unless followed by an = or a deserialize operation
RGData(const RowGroup& rg, uint32_t rowCount); // allocates memory for rowData RGData(const RowGroup& rg, uint32_t rowCount); // allocates memory for rowData
explicit RGData(const RowGroup& rg); explicit RGData(const RowGroup& rg);
explicit RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>* alloc); explicit RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>& alloc);
RGData& operator=(const RGData&) = default; RGData& operator=(const RGData& rhs) = default;
RGData& operator=(RGData&&) = default; RGData& operator=(RGData&&) = default;
RGData(const RGData&) = default; RGData(const RGData&) = default;
RGData(RGData&&) = default; RGData(RGData&&) = default;
@@ -327,7 +327,7 @@ class RGData
boost::shared_ptr<RGDataBufType> rowData; boost::shared_ptr<RGDataBufType> rowData;
boost::shared_ptr<StringStore> strings; boost::shared_ptr<StringStore> strings;
std::shared_ptr<UserDataStore> userDataStore; std::shared_ptr<UserDataStore> userDataStore;
allocators::CountingAllocator<RGDataBufType>* alloc = nullptr; std::optional<allocators::CountingAllocator<RGDataBufType>> alloc = {};
// Need sig to support backward compat. RGData can deserialize both forms. // Need sig to support backward compat. RGData can deserialize both forms.
static const uint32_t RGDATA_SIG = 0xffffffff; // won't happen for 'old' Rowgroup data static const uint32_t RGDATA_SIG = 0xffffffff; // won't happen for 'old' Rowgroup data