diff --git a/dbcon/joblist/resourcemanager.h b/dbcon/joblist/resourcemanager.h index 9bf223e63..e6c296c96 100644 --- a/dbcon/joblist/resourcemanager.h +++ b/dbcon/joblist/resourcemanager.h @@ -456,10 +456,12 @@ class ResourceManager return configuredUmMemLimit; } - template - allocators::CountingAllocator getAllocator() + template + allocators::CountingAllocator getAllocator( + const int64_t checkPointStepSize = allocators::CheckPointStepSize, + const int64_t memoryLimitLowerBound = allocators::MemoryLimitLowerBound) { - return allocators::CountingAllocator(&totalUmMemLimit); + return allocators::CountingAllocator(&totalUmMemLimit, checkPointStepSize, memoryLimitLowerBound); } private: diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index f0c9f3225..92f45448c 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -376,7 +376,7 @@ void TupleAnnexStep::execute(uint32_t id) void TupleAnnexStep::executeNoOrderBy() { - utils::setThreadName("TASwoOrd"); + utils::setThreadName("TNSwoOrd"); RGData rgDataIn; RGData rgDataOut; bool more = false; @@ -395,53 +395,69 @@ void TupleAnnexStep::executeNoOrderBy() sts.total_units_of_work = 1; postStepStartTele(sts); - while (more && !cancelled() && !fLimitHit) + if (!fConstant && fLimitCount == std::numeric_limits::max()) { - fRowGroupIn.setData(&rgDataIn); - fRowGroupIn.getRow(0, &fRowIn); - // Get a new output rowgroup for each input rowgroup to preserve the rids - rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount()); - fRowGroupOut.setData(&rgDataOut); - fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid()); - fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot()); - fRowGroupOut.getRow(0, &fRowOut); - - for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled() && !fLimitHit; ++i) + while (more && !cancelled()) { - // skip first limit-start rows - if (fRowsProcessed++ < fLimitStart) + fRowGroupIn.setData(&rgDataIn); + if (fRowGroupIn.getRowCount() > 0) { - fRowIn.nextRow(); - continue; + fOutputDL->insert(rgDataIn); } - if (UNLIKELY(fRowsReturned >= fLimitCount)) - { - fLimitHit = true; - fJobList->abortOnLimit((JobStep*)this); - continue; - } - - if (fConstant) - fConstant->fillInConstants(fRowIn, fRowOut); - else - copyRow(fRowIn, &fRowOut); - - fRowGroupOut.incRowCount(); - - if (++fRowsReturned < fLimitCount) - { - fRowOut.nextRow(); - fRowIn.nextRow(); - } + more = fInputDL->next(fInputIterator, &rgDataIn); } - - if (fRowGroupOut.getRowCount() > 0) + } + else + { + while (more && !cancelled() && !fLimitHit) { - fOutputDL->insert(rgDataOut); - } + fRowGroupIn.setData(&rgDataIn); + fRowGroupIn.getRow(0, &fRowIn); + // Get a new output rowgroup for each input rowgroup to preserve the rids + rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount()); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid()); + fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot()); + fRowGroupOut.getRow(0, &fRowOut); - more = fInputDL->next(fInputIterator, &rgDataIn); + for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled() && !fLimitHit; ++i) + { + // skip first limit-start rows + if (fRowsProcessed++ < fLimitStart) + { + fRowIn.nextRow(); + continue; + } + + if (UNLIKELY(fRowsReturned >= fLimitCount)) + { + fLimitHit = true; + fJobList->abortOnLimit((JobStep*)this); + continue; + } + + if (fConstant) + fConstant->fillInConstants(fRowIn, fRowOut); + else + copyRow(fRowIn, &fRowOut); + + fRowGroupOut.incRowCount(); + + if (++fRowsReturned < fLimitCount) + { + fRowOut.nextRow(); + fRowIn.nextRow(); + } + } + + if (fRowGroupOut.getRowCount() > 0) + { + fOutputDL->insert(rgDataOut); + } + + more = fInputDL->next(fInputIterator, &rgDataIn); + } } } catch (...) @@ -459,7 +475,7 @@ void TupleAnnexStep::executeNoOrderBy() void TupleAnnexStep::executeNoOrderByWithDistinct() { - utils::setThreadName("TASwoOrdDist"); + utils::setThreadName("TNSwoOrdDist"); scoped_ptr distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this))); vector dataVec; vector dataVecSkip; @@ -595,7 +611,7 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() void TupleAnnexStep::executeWithOrderBy() { - utils::setThreadName("TASwOrd"); + utils::setThreadName("TNSwOrd"); RGData rgDataIn; RGData rgDataOut; bool more = false; @@ -698,7 +714,7 @@ void TupleAnnexStep::executeWithOrderBy() */ void TupleAnnexStep::finalizeParallelOrderByDistinct() { - utils::setThreadName("TASwParOrdDistM"); + utils::setThreadName("TNSwParOrdDistM"); uint64_t count = 0; uint64_t offset = 0; uint32_t rowSize = 0; @@ -897,7 +913,7 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() */ void TupleAnnexStep::finalizeParallelOrderBy() { - utils::setThreadName("TASwParOrdMerge"); + utils::setThreadName("TNSwParOrdMerge"); uint64_t count = 0; uint64_t offset = 0; uint32_t rowSize = 0; @@ -1069,7 +1085,7 @@ void TupleAnnexStep::finalizeParallelOrderBy() void TupleAnnexStep::executeParallelOrderBy(uint64_t id) { - utils::setThreadName("TASwParOrd"); + utils::setThreadName("TNSwParOrd"); RGData rgDataIn; RGData rgDataOut; bool more = false; diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index 00d7e7c51..3d7659c8d 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -1403,7 +1403,6 @@ void TupleHashJoinStep::finishSmallOuterJoin() uint32_t smallSideCount = smallDLs.size(); uint32_t i, j, k; std::shared_ptr largeNullMemory; - RGData joinedData; Row joinedBaseRow, fe2InRow, fe2OutRow; std::shared_ptr smallRowTemplates; std::shared_ptr smallNullRows; @@ -1411,6 +1410,10 @@ void TupleHashJoinStep::finishSmallOuterJoin() RowGroup l_outputRG = outputRG; RowGroup l_fe2Output = fe2Output; + // auto alloc = resourceManager->getAllocator(10 * 1024 * 1024); + // RGData joinedData(alloc); + RGData joinedData; + joiners[lastSmallOuterJoiner]->getUnmarkedRows(&unmatched); if (unmatched.empty()) @@ -1724,7 +1727,8 @@ void TupleHashJoinStep::joinOneRG( if (!smallNullMem) smallNullMem = &smallNullMemory; - RGData joinedData; + auto alloc = resourceManager->getAllocator(10 * 1024 * 1024); + RGData joinedData(alloc); uint32_t matchCount, smallSideCount = tjoiners->size(); uint32_t j, k; @@ -1857,13 +1861,13 @@ void TupleHashJoinStep::generateJoinResultSet(const vector { smallRow.setPointer(joinerOutput[depth][i]); - if (UNLIKELY(l_outputRG.getRowCount() == 8192)) + if (UNLIKELY(l_outputRG.getRowCount() == rowgroup::rgCommonSize)) { uint32_t dbRoot = l_outputRG.getDBRoot(); uint64_t baseRid = l_outputRG.getBaseRid(); outputData.push_back(rgData); // Count the memory - if (UNLIKELY(!getMemory(l_outputRG.getMaxDataSize()))) + if (UNLIKELY(!getMemory(l_outputRG.getSizeWithStrings()))) { // MCOL-5512 if (fe2) @@ -1876,6 +1880,9 @@ void TupleHashJoinStep::generateJoinResultSet(const vector l_outputRG.initRow(&fe2InRow); l_fe2RG.initRow(&fe2OutRow); + // WIP do we remove previosuly pushed(line 1824) rgData + // replacing it with a new FE2 rgdata added by processFE2? + // Generates a new RGData w/o accounting its memory consumption processFE2(l_outputRG, l_fe2RG, fe2InRow, fe2OutRow, &outputData, fe2.get()); } // Don't let the join results buffer get out of control. diff --git a/primitives/blockcache/stats.cpp b/primitives/blockcache/stats.cpp index 22721cf2c..6ce64dd59 100644 --- a/primitives/blockcache/stats.cpp +++ b/primitives/blockcache/stats.cpp @@ -176,7 +176,7 @@ class StatMon sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGPIPE); - sigaddset(&sigset, SIGUSR1); + // sigaddset(&sigset, SIGUSR1); sigaddset(&sigset, SIGUSR2); pthread_sigmask(SIG_BLOCK, &sigset, 0); } diff --git a/tests/counting_allocator.cpp b/tests/counting_allocator.cpp index c3f3571d9..6a7bdfbfc 100644 --- a/tests/counting_allocator.cpp +++ b/tests/counting_allocator.cpp @@ -113,8 +113,8 @@ TEST_F(CountingAllocatorTest, AllocatorEquality) TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator) { // Create a shared_ptr using allocate_shared with the custom allocator - CountingAllocator allocatorSmallerStep(&allocatedMemory, MemoryAllowance / 100, - MemoryAllowance / 1000); + CountingAllocator allocatorSmallerStep(&allocatedMemory, + MemoryAllowance / 1000, MemoryAllowance / 100); std::shared_ptr ptr1 = std::allocate_shared(allocatorSmallerStep, 100); std::shared_ptr ptr2 = std::allocate_shared(allocatorSmallerStep, 100); std::shared_ptr ptr3 = std::allocate_shared(allocatorSmallerStep, 100); @@ -164,8 +164,8 @@ TEST_F(CountingAllocatorTest, ThreadSafety) auto worker = [this]() { std::vector ptrs; - CountingAllocator allocatorLocal(&allocatedMemory, MemoryAllowance / 100, - MemoryAllowance / 1000); + CountingAllocator allocatorLocal(&allocatedMemory, MemoryAllowance / 1000, + MemoryAllowance / 100); for (std::size_t i = 0; i < allocationsPerThread; ++i) { ptrs.push_back(allocatorLocal.allocate(1)); diff --git a/tests/poolallocator.cpp b/tests/poolallocator.cpp index 66795d2a7..e0ec5fae4 100644 --- a/tests/poolallocator.cpp +++ b/tests/poolallocator.cpp @@ -213,7 +213,9 @@ class PoolallocatorTest : public ::testing::Test CountingAllocator allocator; // Constructor - PoolallocatorTest() : allocatedMemory(MemoryAllowance), allocator(&allocatedMemory, MemoryAllowance / 100, MemoryAllowance / 1000) + PoolallocatorTest() + : allocatedMemory(MemoryAllowance) + , allocator(&allocatedMemory, MemoryAllowance / 1000, MemoryAllowance / 100) { } diff --git a/utils/common/countingallocator.h b/utils/common/countingallocator.h index 89c6176c1..f49f3132c 100644 --- a/utils/common/countingallocator.h +++ b/utils/common/countingallocator.h @@ -35,8 +35,8 @@ namespace allocators // When a sync op hits MemoryLimitLowerBound trying to allocate more memory, it throws. // SQL operators or TBPS runtime must catch the exception and act acordingly. -const constexpr std::int64_t MemoryLimitLowerBound = 500 * 1024 * 1024; // WIP -const constexpr std::int64_t CheckPointStepSize = 100 * 1024 * 1024; // WIP +const constexpr int64_t MemoryLimitLowerBound = 500 * 1024 * 1024; // WIP +const constexpr int64_t CheckPointStepSize = 100 * 1024 * 1024; // WIP // Custom Allocator that tracks allocated memory using an atomic counter template @@ -88,8 +88,8 @@ class CountingAllocator // Constructor accepting a reference to an atomic counter explicit CountingAllocator(std::atomic* memoryLimit, - const uint64_t lowerBound = MemoryLimitLowerBound, - const uint64_t checkPointStepSize = CheckPointStepSize) noexcept + const int64_t checkPointStepSize = CheckPointStepSize, + const int64_t lowerBound = MemoryLimitLowerBound) noexcept : memoryLimit_(memoryLimit), memoryLimitLowerBound_(lowerBound), checkPointStepSize_(checkPointStepSize) { } diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index 01bfca90f..46ab43c10 100644 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -305,6 +305,12 @@ void UserDataStore::deserialize(ByteStream& bs) return; } + +RGData::RGData(allocators::CountingAllocator& _alloc) : RGData() +{ + alloc = _alloc; +} + RGData::RGData(const RowGroup& rg, uint32_t rowCount) { RGDataSizeType s = rg.getDataSize(rowCount); diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index aa028158c..a9face207 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -262,6 +262,7 @@ class RGData { public: RGData() = default; // useless unless followed by an = or a deserialize operation + RGData(allocators::CountingAllocator&); RGData(const RowGroup& rg, uint32_t rowCount); // allocates memory for rowData explicit RGData(const RowGroup& rg); explicit RGData(const RowGroup& rg, allocators::CountingAllocator& alloc);