1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(): zerocopy TNS case and JOIN results RGData with CountingAllocator

This commit is contained in:
drrtuy
2025-02-17 21:38:10 +00:00
parent 3dfc8cd454
commit a4c4d33ee7
9 changed files with 96 additions and 62 deletions

View File

@ -456,10 +456,12 @@ class ResourceManager
return configuredUmMemLimit; return configuredUmMemLimit;
} }
template<typename T> template <typename T>
allocators::CountingAllocator<T> getAllocator() allocators::CountingAllocator<T> getAllocator(
const int64_t checkPointStepSize = allocators::CheckPointStepSize,
const int64_t memoryLimitLowerBound = allocators::MemoryLimitLowerBound)
{ {
return allocators::CountingAllocator<T>(&totalUmMemLimit); return allocators::CountingAllocator<T>(&totalUmMemLimit, checkPointStepSize, memoryLimitLowerBound);
} }
private: private:

View File

@ -376,7 +376,7 @@ void TupleAnnexStep::execute(uint32_t id)
void TupleAnnexStep::executeNoOrderBy() void TupleAnnexStep::executeNoOrderBy()
{ {
utils::setThreadName("TASwoOrd"); utils::setThreadName("TNSwoOrd");
RGData rgDataIn; RGData rgDataIn;
RGData rgDataOut; RGData rgDataOut;
bool more = false; bool more = false;
@ -395,53 +395,69 @@ void TupleAnnexStep::executeNoOrderBy()
sts.total_units_of_work = 1; sts.total_units_of_work = 1;
postStepStartTele(sts); postStepStartTele(sts);
while (more && !cancelled() && !fLimitHit) if (!fConstant && fLimitCount == std::numeric_limits<uint64_t>::max())
{ {
fRowGroupIn.setData(&rgDataIn); while (more && !cancelled())
fRowGroupIn.getRow(0, &fRowIn);
// Get a new output rowgroup for each input rowgroup to preserve the rids
rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot());
fRowGroupOut.getRow(0, &fRowOut);
for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled() && !fLimitHit; ++i)
{ {
// skip first limit-start rows fRowGroupIn.setData(&rgDataIn);
if (fRowsProcessed++ < fLimitStart) if (fRowGroupIn.getRowCount() > 0)
{ {
fRowIn.nextRow(); fOutputDL->insert(rgDataIn);
continue;
} }
if (UNLIKELY(fRowsReturned >= fLimitCount)) more = fInputDL->next(fInputIterator, &rgDataIn);
{
fLimitHit = true;
fJobList->abortOnLimit((JobStep*)this);
continue;
}
if (fConstant)
fConstant->fillInConstants(fRowIn, fRowOut);
else
copyRow(fRowIn, &fRowOut);
fRowGroupOut.incRowCount();
if (++fRowsReturned < fLimitCount)
{
fRowOut.nextRow();
fRowIn.nextRow();
}
} }
}
if (fRowGroupOut.getRowCount() > 0) else
{
while (more && !cancelled() && !fLimitHit)
{ {
fOutputDL->insert(rgDataOut); fRowGroupIn.setData(&rgDataIn);
} fRowGroupIn.getRow(0, &fRowIn);
// Get a new output rowgroup for each input rowgroup to preserve the rids
rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot());
fRowGroupOut.getRow(0, &fRowOut);
more = fInputDL->next(fInputIterator, &rgDataIn); for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled() && !fLimitHit; ++i)
{
// skip first limit-start rows
if (fRowsProcessed++ < fLimitStart)
{
fRowIn.nextRow();
continue;
}
if (UNLIKELY(fRowsReturned >= fLimitCount))
{
fLimitHit = true;
fJobList->abortOnLimit((JobStep*)this);
continue;
}
if (fConstant)
fConstant->fillInConstants(fRowIn, fRowOut);
else
copyRow(fRowIn, &fRowOut);
fRowGroupOut.incRowCount();
if (++fRowsReturned < fLimitCount)
{
fRowOut.nextRow();
fRowIn.nextRow();
}
}
if (fRowGroupOut.getRowCount() > 0)
{
fOutputDL->insert(rgDataOut);
}
more = fInputDL->next(fInputIterator, &rgDataIn);
}
} }
} }
catch (...) catch (...)
@ -459,7 +475,7 @@ void TupleAnnexStep::executeNoOrderBy()
void TupleAnnexStep::executeNoOrderByWithDistinct() void TupleAnnexStep::executeNoOrderByWithDistinct()
{ {
utils::setThreadName("TASwoOrdDist"); utils::setThreadName("TNSwoOrdDist");
scoped_ptr<DistinctMap_t> distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this))); scoped_ptr<DistinctMap_t> distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this)));
vector<RGData> dataVec; vector<RGData> dataVec;
vector<RGData> dataVecSkip; vector<RGData> dataVecSkip;
@ -595,7 +611,7 @@ void TupleAnnexStep::executeNoOrderByWithDistinct()
void TupleAnnexStep::executeWithOrderBy() void TupleAnnexStep::executeWithOrderBy()
{ {
utils::setThreadName("TASwOrd"); utils::setThreadName("TNSwOrd");
RGData rgDataIn; RGData rgDataIn;
RGData rgDataOut; RGData rgDataOut;
bool more = false; bool more = false;
@ -698,7 +714,7 @@ void TupleAnnexStep::executeWithOrderBy()
*/ */
void TupleAnnexStep::finalizeParallelOrderByDistinct() void TupleAnnexStep::finalizeParallelOrderByDistinct()
{ {
utils::setThreadName("TASwParOrdDistM"); utils::setThreadName("TNSwParOrdDistM");
uint64_t count = 0; uint64_t count = 0;
uint64_t offset = 0; uint64_t offset = 0;
uint32_t rowSize = 0; uint32_t rowSize = 0;
@ -897,7 +913,7 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct()
*/ */
void TupleAnnexStep::finalizeParallelOrderBy() void TupleAnnexStep::finalizeParallelOrderBy()
{ {
utils::setThreadName("TASwParOrdMerge"); utils::setThreadName("TNSwParOrdMerge");
uint64_t count = 0; uint64_t count = 0;
uint64_t offset = 0; uint64_t offset = 0;
uint32_t rowSize = 0; uint32_t rowSize = 0;
@ -1069,7 +1085,7 @@ void TupleAnnexStep::finalizeParallelOrderBy()
void TupleAnnexStep::executeParallelOrderBy(uint64_t id) void TupleAnnexStep::executeParallelOrderBy(uint64_t id)
{ {
utils::setThreadName("TASwParOrd"); utils::setThreadName("TNSwParOrd");
RGData rgDataIn; RGData rgDataIn;
RGData rgDataOut; RGData rgDataOut;
bool more = false; bool more = false;

View File

@ -1403,7 +1403,6 @@ void TupleHashJoinStep::finishSmallOuterJoin()
uint32_t smallSideCount = smallDLs.size(); uint32_t smallSideCount = smallDLs.size();
uint32_t i, j, k; uint32_t i, j, k;
std::shared_ptr<uint8_t[]> largeNullMemory; std::shared_ptr<uint8_t[]> largeNullMemory;
RGData joinedData;
Row joinedBaseRow, fe2InRow, fe2OutRow; Row joinedBaseRow, fe2InRow, fe2OutRow;
std::shared_ptr<Row[]> smallRowTemplates; std::shared_ptr<Row[]> smallRowTemplates;
std::shared_ptr<Row[]> smallNullRows; std::shared_ptr<Row[]> smallNullRows;
@ -1411,6 +1410,10 @@ void TupleHashJoinStep::finishSmallOuterJoin()
RowGroup l_outputRG = outputRG; RowGroup l_outputRG = outputRG;
RowGroup l_fe2Output = fe2Output; RowGroup l_fe2Output = fe2Output;
// auto alloc = resourceManager->getAllocator<RGDataBufType>(10 * 1024 * 1024);
// RGData joinedData(alloc);
RGData joinedData;
joiners[lastSmallOuterJoiner]->getUnmarkedRows(&unmatched); joiners[lastSmallOuterJoiner]->getUnmarkedRows(&unmatched);
if (unmatched.empty()) if (unmatched.empty())
@ -1724,7 +1727,8 @@ void TupleHashJoinStep::joinOneRG(
if (!smallNullMem) if (!smallNullMem)
smallNullMem = &smallNullMemory; smallNullMem = &smallNullMemory;
RGData joinedData; auto alloc = resourceManager->getAllocator<RGDataBufType>(10 * 1024 * 1024);
RGData joinedData(alloc);
uint32_t matchCount, smallSideCount = tjoiners->size(); uint32_t matchCount, smallSideCount = tjoiners->size();
uint32_t j, k; uint32_t j, k;
@ -1857,13 +1861,13 @@ void TupleHashJoinStep::generateJoinResultSet(const vector<vector<Row::Pointer>
{ {
smallRow.setPointer(joinerOutput[depth][i]); smallRow.setPointer(joinerOutput[depth][i]);
if (UNLIKELY(l_outputRG.getRowCount() == 8192)) if (UNLIKELY(l_outputRG.getRowCount() == rowgroup::rgCommonSize))
{ {
uint32_t dbRoot = l_outputRG.getDBRoot(); uint32_t dbRoot = l_outputRG.getDBRoot();
uint64_t baseRid = l_outputRG.getBaseRid(); uint64_t baseRid = l_outputRG.getBaseRid();
outputData.push_back(rgData); outputData.push_back(rgData);
// Count the memory // Count the memory
if (UNLIKELY(!getMemory(l_outputRG.getMaxDataSize()))) if (UNLIKELY(!getMemory(l_outputRG.getSizeWithStrings())))
{ {
// MCOL-5512 // MCOL-5512
if (fe2) if (fe2)
@ -1876,6 +1880,9 @@ void TupleHashJoinStep::generateJoinResultSet(const vector<vector<Row::Pointer>
l_outputRG.initRow(&fe2InRow); l_outputRG.initRow(&fe2InRow);
l_fe2RG.initRow(&fe2OutRow); l_fe2RG.initRow(&fe2OutRow);
// WIP do we remove previosuly pushed(line 1824) rgData
// replacing it with a new FE2 rgdata added by processFE2?
// Generates a new RGData w/o accounting its memory consumption
processFE2(l_outputRG, l_fe2RG, fe2InRow, fe2OutRow, &outputData, fe2.get()); processFE2(l_outputRG, l_fe2RG, fe2InRow, fe2OutRow, &outputData, fe2.get());
} }
// Don't let the join results buffer get out of control. // Don't let the join results buffer get out of control.

View File

@ -176,7 +176,7 @@ class StatMon
sigset_t sigset; sigset_t sigset;
sigemptyset(&sigset); sigemptyset(&sigset);
sigaddset(&sigset, SIGPIPE); sigaddset(&sigset, SIGPIPE);
sigaddset(&sigset, SIGUSR1); // sigaddset(&sigset, SIGUSR1);
sigaddset(&sigset, SIGUSR2); sigaddset(&sigset, SIGUSR2);
pthread_sigmask(SIG_BLOCK, &sigset, 0); pthread_sigmask(SIG_BLOCK, &sigset, 0);
} }

View File

@ -113,8 +113,8 @@ TEST_F(CountingAllocatorTest, AllocatorEquality)
TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator) TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator)
{ {
// Create a shared_ptr using allocate_shared with the custom allocator // Create a shared_ptr using allocate_shared with the custom allocator
CountingAllocator<TestClass> allocatorSmallerStep(&allocatedMemory, MemoryAllowance / 100, CountingAllocator<TestClass> allocatorSmallerStep(&allocatedMemory,
MemoryAllowance / 1000); MemoryAllowance / 1000, MemoryAllowance / 100);
std::shared_ptr<TestClass> ptr1 = std::allocate_shared<TestClass>(allocatorSmallerStep, 100); std::shared_ptr<TestClass> ptr1 = std::allocate_shared<TestClass>(allocatorSmallerStep, 100);
std::shared_ptr<TestClass> ptr2 = std::allocate_shared<TestClass>(allocatorSmallerStep, 100); std::shared_ptr<TestClass> ptr2 = std::allocate_shared<TestClass>(allocatorSmallerStep, 100);
std::shared_ptr<TestClass> ptr3 = std::allocate_shared<TestClass>(allocatorSmallerStep, 100); std::shared_ptr<TestClass> ptr3 = std::allocate_shared<TestClass>(allocatorSmallerStep, 100);
@ -164,8 +164,8 @@ TEST_F(CountingAllocatorTest, ThreadSafety)
auto worker = [this]() auto worker = [this]()
{ {
std::vector<TestClass*> ptrs; std::vector<TestClass*> ptrs;
CountingAllocator<TestClass> allocatorLocal(&allocatedMemory, MemoryAllowance / 100, CountingAllocator<TestClass> allocatorLocal(&allocatedMemory, MemoryAllowance / 1000,
MemoryAllowance / 1000); MemoryAllowance / 100);
for (std::size_t i = 0; i < allocationsPerThread; ++i) for (std::size_t i = 0; i < allocationsPerThread; ++i)
{ {
ptrs.push_back(allocatorLocal.allocate(1)); ptrs.push_back(allocatorLocal.allocate(1));

View File

@ -213,7 +213,9 @@ class PoolallocatorTest : public ::testing::Test
CountingAllocator<PoolAllocatorBufType> allocator; CountingAllocator<PoolAllocatorBufType> allocator;
// Constructor // Constructor
PoolallocatorTest() : allocatedMemory(MemoryAllowance), allocator(&allocatedMemory, MemoryAllowance / 100, MemoryAllowance / 1000) PoolallocatorTest()
: allocatedMemory(MemoryAllowance)
, allocator(&allocatedMemory, MemoryAllowance / 1000, MemoryAllowance / 100)
{ {
} }

View File

@ -35,8 +35,8 @@ namespace allocators
// When a sync op hits MemoryLimitLowerBound trying to allocate more memory, it throws. // When a sync op hits MemoryLimitLowerBound trying to allocate more memory, it throws.
// SQL operators or TBPS runtime must catch the exception and act acordingly. // SQL operators or TBPS runtime must catch the exception and act acordingly.
const constexpr std::int64_t MemoryLimitLowerBound = 500 * 1024 * 1024; // WIP const constexpr int64_t MemoryLimitLowerBound = 500 * 1024 * 1024; // WIP
const constexpr std::int64_t CheckPointStepSize = 100 * 1024 * 1024; // WIP const constexpr int64_t CheckPointStepSize = 100 * 1024 * 1024; // WIP
// Custom Allocator that tracks allocated memory using an atomic counter // Custom Allocator that tracks allocated memory using an atomic counter
template <typename T> template <typename T>
@ -88,8 +88,8 @@ class CountingAllocator
// Constructor accepting a reference to an atomic counter // Constructor accepting a reference to an atomic counter
explicit CountingAllocator(std::atomic<int64_t>* memoryLimit, explicit CountingAllocator(std::atomic<int64_t>* memoryLimit,
const uint64_t lowerBound = MemoryLimitLowerBound, const int64_t checkPointStepSize = CheckPointStepSize,
const uint64_t checkPointStepSize = CheckPointStepSize) noexcept const int64_t lowerBound = MemoryLimitLowerBound) noexcept
: memoryLimit_(memoryLimit), memoryLimitLowerBound_(lowerBound), checkPointStepSize_(checkPointStepSize) : memoryLimit_(memoryLimit), memoryLimitLowerBound_(lowerBound), checkPointStepSize_(checkPointStepSize)
{ {
} }

View File

@ -305,6 +305,12 @@ void UserDataStore::deserialize(ByteStream& bs)
return; return;
} }
RGData::RGData(allocators::CountingAllocator<RGDataBufType>& _alloc) : RGData()
{
alloc = _alloc;
}
RGData::RGData(const RowGroup& rg, uint32_t rowCount) RGData::RGData(const RowGroup& rg, uint32_t rowCount)
{ {
RGDataSizeType s = rg.getDataSize(rowCount); RGDataSizeType s = rg.getDataSize(rowCount);

View File

@ -262,6 +262,7 @@ class RGData
{ {
public: public:
RGData() = default; // useless unless followed by an = or a deserialize operation RGData() = default; // useless unless followed by an = or a deserialize operation
RGData(allocators::CountingAllocator<RGDataBufType>&);
RGData(const RowGroup& rg, uint32_t rowCount); // allocates memory for rowData RGData(const RowGroup& rg, uint32_t rowCount); // allocates memory for rowData
explicit RGData(const RowGroup& rg); explicit RGData(const RowGroup& rg);
explicit RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>& alloc); explicit RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>& alloc);