1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(): dangling pointer/ref issue has been solved for both RGData and BS

This commit is contained in:
drrtuy
2024-12-13 15:03:56 +00:00
parent 397b3ff729
commit a6de8ec1ac
8 changed files with 59 additions and 69 deletions

View File

@ -241,8 +241,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
uint8_t tmp8;
uint16_t tmp16;
Command::CommandType type;
auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory();
std::cout << "initBPP availableMemory: " << cnt << std::endl;
bs.advance(sizeof(ISMPacketHeader)); // skip the header
bs >> tmp8;
@ -848,8 +846,6 @@ int BatchPrimitiveProcessor::endOfJoiner()
{
endOfJoinerRan = true;
pthread_mutex_unlock(&objLock);
auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory();
std::cout << "endOfJoiner availableMemory: " << cnt << std::endl;
return 0;
}
@ -892,8 +888,6 @@ int BatchPrimitiveProcessor::endOfJoiner()
endOfJoinerRan = true;
pthread_mutex_unlock(&objLock);
auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory();
std::cout << "endOfJoiner availableMemory: " << cnt << std::endl;
return 0;
}
@ -2234,7 +2228,7 @@ int BatchPrimitiveProcessor::operator()()
cpDataFromDictScan = false;
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<messageqcpp::BSBufType>();
messageqcpp::SBS bs(new ByteStream(&allocator));
messageqcpp::SBS bs(new ByteStream(allocator));
#ifdef PRIMPROC_STOPWATCH
stopwatch->start("BPP() execute");
@ -2300,14 +2294,14 @@ void BatchPrimitiveProcessor::allocLargeBuffers()
if (ot == ROW_GROUP && !outRowGroupData)
{
// outputRG.setUseStringTable(true);
outRowGroupData.reset(new RGData(outputRG, &allocator));
outRowGroupData.reset(new RGData(outputRG, allocator));
outputRG.setData(outRowGroupData.get());
}
if (fe1 && !fe1Data)
{
// fe1Input.setUseStringTable(true);
fe1Data.reset(new RGData(fe1Input, &allocator));
fe1Data.reset(new RGData(fe1Input, allocator));
// fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]);
fe1Input.setData(fe1Data.get());
}
@ -2315,14 +2309,14 @@ void BatchPrimitiveProcessor::allocLargeBuffers()
if (fe2 && !fe2Data)
{
// fe2Output.setUseStringTable(true);
fe2Data.reset(new RGData(fe2Output, &allocator));
fe2Data.reset(new RGData(fe2Output, allocator));
fe2Output.setData(fe2Data.get());
}
if (getTupleJoinRowGroupData && !joinedRGMem)
{
// joinedRG.setUseStringTable(true);
joinedRGMem.reset(new RGData(joinedRG, &allocator));
joinedRGMem.reset(new RGData(joinedRG, allocator));
joinedRG.setData(joinedRGMem.get());
}
}

View File

@ -15,9 +15,11 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <gtest/gtest.h>
#include <memory>
#include <atomic>
#include <cstddef>
#include <memory>
#include <thread>
#include "countingallocator.h"
#include "rowgroup.h"
@ -116,6 +118,14 @@ TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator)
buf.reset();
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
CountingAllocator<rowgroup::RGDataBufType> allocator1(&allocatedMemory, MemoryAllowance / 100);
std::optional<CountingAllocator<rowgroup::RGDataBufType>> allocator2(allocator1);
auto buf1 = boost::allocate_shared<rowgroup::RGDataBufType>(*allocator2, allocSize);
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - allocSize);
buf1.reset();
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
}
// Test 5: Thread Safety - Concurrent Allocations and Deallocations
@ -159,3 +169,11 @@ TEST_F(CountingAllocatorTest, AllocateZeroObjects)
allocator.deallocate(ptr, 0);
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
}
TEST_F(CountingAllocatorTest, CopyAssignable)
{
CountingAllocator<TestClass> allocator1(&allocatedMemory);
CountingAllocator<TestClass> allocator2(&allocatedMemory);
allocator1 = allocator2;
EXPECT_EQ(allocator1, allocator2);
}

View File

@ -367,25 +367,6 @@ class RGDataTest : public ::testing::Test
execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL,
execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL},
{65536, 16, 8, 4, 2, 1}, {8, 8, 8, 8, 8, 8});
// rgD = rowgroup::RGData(rg, &alloc);
// rg.setData(&rgD);
// rg.initRow(&r);
// rg.getRow(0, &r);
// for (size_t i = 0; i < sValueVector.size(); i++)
// {
// // setStringField
// r.setBinaryField_offset(&sValueVector[i], sizeof(sValueVector[0]), offsets[0]);
// r.setBinaryField_offset(&anotherValueVector[i], sizeof(anotherValueVector[0]), offsets[1]);
// r.setIntField(s64ValueVector[i], 2);
// r.setIntField(s32ValueVector[i], 3);
// r.setIntField(s16ValueVector[i], 4);
// r.setIntField(s8ValueVector[i], 5);
// r.nextRow(rowSize);
// }
// rowCount = sValueVector.size();
}
// void TearDown() override {}
@ -400,7 +381,7 @@ class RGDataTest : public ::testing::Test
TEST_F(RGDataTest, AllocData)
{
std::cout << " test allocatedMemery " << allocatedMemory.load() << " rowsize " << rg.getRowSize() << " " << rg.getMaxDataSize() << std::endl;
rgD = rowgroup::RGData(rg, &alloc);
rgD = rowgroup::RGData(rg, alloc);
rg.setData(&rgD);
rg.initRow(&r);
rg.getRow(0, &r);

View File

@ -44,8 +44,7 @@ public:
// Copy constructor (template to allow conversion between different types)
template <typename U>
CountingAllocator(const CountingAllocator<U>& other) noexcept
: memoryLimit_(other.memoryLimit_) {}
: memoryLimit_(other.memoryLimit_), memoryLimitLowerBound(other.memoryLimitLowerBound) {}
// Allocate memory for n objects of type T
template <typename U = T>

View File

@ -100,7 +100,8 @@ ByteStream::ByteStream(BSSizeType initSize) : fBuf(0), fCurInPtr(0), fCurOutPtr(
growBuf(initSize);
}
ByteStream::ByteStream(allocators::CountingAllocator<uint8_t>* allocator, uint32_t initSize)
// WIP remove this one, replacing the allocator arg with a default nullptr.
ByteStream::ByteStream(allocators::CountingAllocator<uint8_t>& allocator, uint32_t initSize)
: fBuf(0), fCurInPtr(0), fCurOutPtr(0), fMaxLen(0), allocator(allocator)
{
if (initSize > 0)

View File

@ -19,6 +19,7 @@
*/
#pragma once
#include <optional>
#include <string>
#include <iostream>
#include <sys/types.h>
@ -80,7 +81,7 @@ class ByteStream : public Serializeable
* default ctor
*/
EXPORT explicit ByteStream(BSSizeType initSize = 8192); // multiples of pagesize are best
explicit ByteStream(allocators::CountingAllocator<BSBufType>* alloc, uint32_t initSize = 8192);
explicit ByteStream(allocators::CountingAllocator<BSBufType>& alloc, uint32_t initSize = 8192);
/**
* ctor with a uint8_t array and len initializer
*/
@ -486,7 +487,7 @@ class ByteStream : public Serializeable
BSSizeType fMaxLen; // how big fBuf is currently
// Stores `long strings`.
std::vector<rowgroup::StringStoreBufSPType> longStrings;
allocators::CountingAllocator<BSBufType>* allocator = nullptr;
std::optional<allocators::CountingAllocator<BSBufType>> allocator = {};
};
template <int W, typename T = void>

View File

@ -48,29 +48,13 @@ namespace rowgroup
{
using cscType = execplan::CalpontSystemCatalog::ColDataType;
StringStore::StringStore(allocators::CountingAllocator<StringStoreBufType>* alloc) : StringStore()
StringStore::StringStore(allocators::CountingAllocator<StringStoreBufType> alloc) : StringStore()
{
this->alloc = alloc;
}
StringStore::~StringStore()
{
#if 0
// for mem usage debugging
uint32_t i;
uint64_t inUse = 0, allocated = 0;
for (i = 0; i < mem.size(); i++)
{
MemChunk* tmp = (MemChunk*) mem.back().get();
inUse += tmp->currentSize;
allocated += tmp->capacity;
}
if (allocated > 0)
cout << "~SS: " << inUse << "/" << allocated << " = " << (float) inUse / (float) allocated << endl;
#endif
}
uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
@ -363,15 +347,16 @@ columnCount = rg.getColumnCount();
}
RGData::RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>* alloc) : alloc(alloc)
RGData::RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>& _alloc) : alloc(_alloc)
{
// rowData = shared_ptr<uint8_t[]>(buf, [alloc, allocSize](uint8_t* p) { alloc->deallocate(p, allocSize);
// });
rowData = boost::allocate_shared<RGDataBufType>(*alloc, rg.getMaxDataSize());
rowData = boost::allocate_shared<RGDataBufType>(alloc.value(), rg.getMaxDataSize());
// rowData = std::make_shared(uint8_t[rg.getMaxDataSize()]);
if (rg.usesStringTable())
strings.reset(new StringStore(alloc));
{
allocators::CountingAllocator<StringStoreBufType> ssAlloc = _alloc;
strings.reset(new StringStore(ssAlloc));
}
userDataStore.reset();
}
@ -392,7 +377,18 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount)
userDataStore.reset();
if (rg.usesStringTable())
strings.reset(new StringStore(alloc));
{
if (alloc)
{
allocators::CountingAllocator<StringStoreBufType> ssAlloc = alloc.value();
strings.reset(new StringStore(ssAlloc));
}
else
{
strings.reset(new StringStore());
}
}
else
strings.reset();
columnCount = rg.getColumnCount();

View File

@ -140,7 +140,7 @@ class StringStore
{
public:
StringStore() = default;
StringStore(allocators::CountingAllocator<StringStoreBufType>* alloc);
StringStore(allocators::CountingAllocator<StringStoreBufType> alloc);
StringStore(const StringStore&) = delete;
StringStore(StringStore&&) = delete;
StringStore& operator=(const StringStore&) = delete;
@ -194,11 +194,11 @@ class StringStore
std::vector<boost::shared_ptr<uint8_t[]>> mem;
// To store strings > 64KB (BLOB/TEXT)
std::vector<StringStoreBufSPType> longStrings;
std::vector<boost::shared_ptr<uint8_t[]>> longStrings;
bool empty = true;
bool fUseStoreStringMutex = false; //@bug6065, make StringStore::storeString() thread safe
boost::mutex fMutex;
allocators::CountingAllocator<StringStoreBufType>* alloc = nullptr;
std::optional<allocators::CountingAllocator<StringStoreBufType>> alloc {};
};
// Where we store user data for UDA(n)F
@ -268,8 +268,8 @@ class RGData
RGData() = default; // useless unless followed by an = or a deserialize operation
RGData(const RowGroup& rg, uint32_t rowCount); // allocates memory for rowData
explicit RGData(const RowGroup& rg);
explicit RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>* alloc);
RGData& operator=(const RGData&) = default;
explicit RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>& alloc);
RGData& operator=(const RGData& rhs) = default;
RGData& operator=(RGData&&) = default;
RGData(const RGData&) = default;
RGData(RGData&&) = default;
@ -331,7 +331,7 @@ class RGData
boost::shared_ptr<RGDataBufType> rowData;
boost::shared_ptr<StringStore> strings;
std::shared_ptr<UserDataStore> userDataStore;
allocators::CountingAllocator<RGDataBufType>* alloc = nullptr;
std::optional<allocators::CountingAllocator<RGDataBufType>> alloc = {};
// Need sig to support backward compat. RGData can deserialize both forms.
static const uint32_t RGDATA_SIG = 0xffffffff; // won't happen for 'old' Rowgroup data
@ -1043,7 +1043,7 @@ inline void Row::setStringField(const utils::ConstString& str, uint32_t colIndex
if (inStringTable(colIndex))
{
std::cout << "setStringField storeString len " << length << std::endl;
// std::cout << "setStringField storeString len " << length << std::endl;
offset = strings->storeString((const uint8_t*)str.str(), length);
*((uint64_t*)&data[offsets[colIndex]]) = offset;
// cout << " -- stored offset " << *((uint32_t *) &data[offsets[colIndex]])
@ -1052,7 +1052,7 @@ inline void Row::setStringField(const utils::ConstString& str, uint32_t colIndex
}
else
{
std::cout << "setStringField memcpy " << std::endl;
// std::cout << "setStringField memcpy " << std::endl;
uint8_t* buf = &data[offsets[colIndex]];
memset(buf + length, 0,
offsets[colIndex + 1] - (offsets[colIndex] + length)); // needed for memcmp in equals().