diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index d8f2943b6..e5b4eb7db 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -791,7 +791,7 @@ void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector* if (in.length() == 0) { // done, return an empty RG - rgData = RGData(org, 0); + rgData = RGData(org, 0U); org.setData(&rgData); org.resetRowGroup(0); out->push_back(rgData); @@ -926,7 +926,7 @@ RGData BatchPrimitiveProcessorJL::getErrorRowGroupData(uint16_t error) const RGData ret; rowgroup::RowGroup rg(projectionRG); - ret = RGData(rg, 0); + ret = RGData(rg, 0U); rg.setData(&ret); // rg.convertToInlineDataInPlace(); rg.resetRowGroup(0); diff --git a/dbcon/joblist/subquerystep.cpp b/dbcon/joblist/subquerystep.cpp index 7624134a3..4b8214c8c 100644 --- a/dbcon/joblist/subquerystep.cpp +++ b/dbcon/joblist/subquerystep.cpp @@ -239,7 +239,7 @@ uint32_t SubAdapterStep::nextBand(messageqcpp::ByteStream& bs) if (fEndOfResult) { // send an empty / error band - RGData rgData(fRowGroupDeliver, 0); + RGData rgData(fRowGroupDeliver, 0U); fRowGroupDeliver.setData(&rgData); fRowGroupDeliver.resetRowGroup(0); fRowGroupDeliver.setStatus(status()); diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 673af0995..341ad7958 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -611,7 +611,7 @@ uint32_t TupleAggregateStep::nextBand_singleThread(messageqcpp::ByteStream& bs) postStepSummaryTele(sts); // send an empty / error band - RGData rgData(fRowGroupOut, 0); + RGData rgData(fRowGroupOut, 0U); fRowGroupOut.setData(&rgData); fRowGroupOut.resetRowGroup(0); fRowGroupOut.setStatus(status()); @@ -5857,7 +5857,7 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp else { // send an empty / error band - RGData rgData(fRowGroupOut, 0); + RGData rgData(fRowGroupOut, 0U); fRowGroupOut.setData(&rgData); fRowGroupOut.resetRowGroup(0); fRowGroupOut.setStatus(status()); diff --git a/dbcon/joblist/tupleconstantstep.cpp b/dbcon/joblist/tupleconstantstep.cpp index cef1bb9b9..af3680337 100644 --- a/dbcon/joblist/tupleconstantstep.cpp +++ b/dbcon/joblist/tupleconstantstep.cpp @@ -362,7 +362,7 @@ uint32_t TupleConstantStep::nextBand(messageqcpp::ByteStream& bs) if (fEndOfResult) { // send an empty / error band - RGData rgData(fRowGroupOut, 0); + RGData rgData(fRowGroupOut, 0U); fRowGroupOut.setData(&rgData); fRowGroupOut.resetRowGroup(0); fRowGroupOut.setStatus(status()); @@ -720,7 +720,7 @@ uint32_t TupleConstantOnlyStep::nextBand(messageqcpp::ByteStream& bs) else { // send an empty / error band - RGData rgData(fRowGroupOut, 0); + RGData rgData(fRowGroupOut, 0U); fRowGroupOut.setData(&rgData); fRowGroupOut.resetRowGroup(0); fRowGroupOut.setStatus(status()); @@ -809,7 +809,7 @@ void TupleConstantBooleanStep::run() uint32_t TupleConstantBooleanStep::nextBand(messageqcpp::ByteStream& bs) { // send an empty band - RGData rgData(fRowGroupOut, 0); + RGData rgData(fRowGroupOut, 0U); fRowGroupOut.setData(&rgData); fRowGroupOut.resetRowGroup(0); fRowGroupOut.setStatus(status()); diff --git a/dbcon/joblist/tupleunion.cpp b/dbcon/joblist/tupleunion.cpp index 895c1e954..389e2230f 100644 --- a/dbcon/joblist/tupleunion.cpp +++ b/dbcon/joblist/tupleunion.cpp @@ -1590,7 +1590,7 @@ uint32_t TupleUnion::nextBand(messageqcpp::ByteStream& bs) outputRG.setData(&mem); else { - mem = RGData(outputRG, 0); + mem = RGData(outputRG, 0U); outputRG.setData(&mem); outputRG.resetRowGroup(0); outputRG.setStatus(status()); diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index cd96fbfa7..c6e30f992 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -38,6 +38,7 @@ #include #include #include +#include "rowgroup.h" #include "serviceexemgr.h" #include using namespace std; @@ -327,9 +328,14 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) typelessJoin.reset(new bool[joinerCount]); tlSmallSideKeyLengths.reset(new uint32_t[joinerCount]); - storedKeyAllocators.reset(new PoolAllocator[joinerCount]); + // storedKeyAllocators.reset(new PoolAllocator[joinerCount]); for (uint j = 0; j < joinerCount; ++j) - storedKeyAllocators[j].setUseLock(true); + { + // storedKeyAllocators[j].setUseLock(true); + // WIP use one copy of the allocator + auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); + storedKeyAllocators.emplace_back(PoolAllocator(&allocator, PoolAllocator::DEFAULT_WINDOW_SIZE, false, true)); + } joinNullValues.reset(new uint64_t[joinerCount]); doMatchNulls.reset(new bool[joinerCount]); @@ -360,16 +366,16 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) if (!typelessJoin[i]) { - auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator(); + auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); bs >> joinNullValues[i]; bs >> largeSideKeyColumns[i]; for (uint j = 0; j < processorThreads; ++j) - tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), alloc)); + tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), allocator)); } else { - auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator(); + auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); deserializeVector(bs, tlLargeSideKeyColumns[i]); bs >> tlSmallSideKeyLengths[i]; @@ -392,7 +398,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) mSmallSideKeyColumnsPtr, mSmallSideRGPtr); auto tlComparator = TupleJoiner::TypelessDataComparator(&outputRG, &tlLargeSideKeyColumns[i], mSmallSideKeyColumnsPtr, mSmallSideRGPtr); - tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator, alloc)); + tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator, allocator)); } } } @@ -1375,7 +1381,7 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid, RowGroup& } #ifdef PRIMPROC_STOPWATCH -void BatchPrimitiveProcessor::execute(StopWatch* stopwatch) +void BatchPrimitiveProcessor::execute(StopWatch* stopwatch, messageqcpp::SBS& bs) #else void BatchPrimitiveProcessor::execute(messageqcpp::SBS& bs) #endif @@ -1599,7 +1605,6 @@ void BatchPrimitiveProcessor::execute(messageqcpp::SBS& bs) { for (j = 0; j < projectCount; ++j) { - // cout << "projectionMap[" << j << "] = " << projectionMap[j] << endl; if (projectionMap[j] != -1) { #ifdef PRIMPROC_STOPWATCH @@ -1610,11 +1615,6 @@ void BatchPrimitiveProcessor::execute(messageqcpp::SBS& bs) projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]); #endif } - - // else - // cout << " no target found for OID " << - // projectSteps[j]->getOID() - //<< endl; } if (fe2) { @@ -2216,8 +2216,8 @@ int BatchPrimitiveProcessor::operator()() validCPData = false; cpDataFromDictScan = false; - auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator(); - messageqcpp::SBS bs(new ByteStream(&alloc)); + auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); + messageqcpp::SBS bs(new ByteStream(&allocator)); #ifdef PRIMPROC_STOPWATCH stopwatch->start("BPP() execute"); @@ -2278,17 +2278,19 @@ int BatchPrimitiveProcessor::operator()() void BatchPrimitiveProcessor::allocLargeBuffers() { + auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); + if (ot == ROW_GROUP && !outRowGroupData) { // outputRG.setUseStringTable(true); - outRowGroupData.reset(new RGData(outputRG)); + outRowGroupData.reset(new RGData(outputRG, &allocator)); outputRG.setData(outRowGroupData.get()); } if (fe1 && !fe1Data) { // fe1Input.setUseStringTable(true); - fe1Data.reset(new RGData(fe1Input)); + fe1Data.reset(new RGData(fe1Input, &allocator)); // fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]); fe1Input.setData(fe1Data.get()); } @@ -2296,14 +2298,14 @@ void BatchPrimitiveProcessor::allocLargeBuffers() if (fe2 && !fe2Data) { // fe2Output.setUseStringTable(true); - fe2Data.reset(new RGData(fe2Output)); + fe2Data.reset(new RGData(fe2Output, &allocator)); fe2Output.setData(fe2Data.get()); } if (getTupleJoinRowGroupData && !joinedRGMem) { // joinedRG.setUseStringTable(true); - joinedRGMem.reset(new RGData(joinedRG)); + joinedRGMem.reset(new RGData(joinedRG, &allocator)); joinedRG.setData(joinedRGMem.get()); } } @@ -2471,73 +2473,6 @@ SBPP BatchPrimitiveProcessor::duplicate() return bpp; } -#if 0 -bool BatchPrimitiveProcessor::operator==(const BatchPrimitiveProcessor& bpp) const -{ - uint32_t i; - - if (ot != bpp.ot) - return false; - - if (versionInfo != bpp.versionInfo) - return false; - - if (txnID != bpp.txnID) - return false; - - if (sessionID != bpp.sessionID) - return false; - - if (stepID != bpp.stepID) - return false; - - if (uniqueID != bpp.uniqueID) - return false; - - if (gotValues != bpp.gotValues) - return false; - - if (gotAbsRids != bpp.gotAbsRids) - return false; - - if (needStrValues != bpp.needStrValues) - return false; - - if (filterCount != bpp.filterCount) - return false; - - if (projectCount != bpp.projectCount) - return false; - - if (sendRidsAtDelivery != bpp.sendRidsAtDelivery) - return false; - - if (hasScan != bpp.hasScan) - return false; - - if (hasFilterStep != bpp.hasFilterStep) - return false; - - if (filtOnString != bpp.filtOnString) - return false; - - if (doJoin != bpp.doJoin) - return false; - - if (doJoin) - - /* Join equality test is a bit out of date */ - if (joiner != bpp.joiner || joinerSize != bpp.joinerSize) - return false; - - for (i = 0; i < filterCount; i++) - if (*filterSteps[i] != *bpp.filterSteps[i]) - return false; - - return true; -} -#endif - void BatchPrimitiveProcessor::asyncLoadProjectColumns() { // relLBID is the LBID related to the primMsg->LBID, diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index effa28839..5935232da 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -188,7 +188,7 @@ class BatchPrimitiveProcessor void initProcessor(); #ifdef PRIMPROC_STOPWATCH - void execute(logging::StopWatch* stopwatch); + void execute(logging::StopWatch* stopwatch, messageqcpp::SBS& bs); #else void execute(messageqcpp::SBS& bs); #endif @@ -379,14 +379,15 @@ class BatchPrimitiveProcessor inline void getJoinResults(const rowgroup::Row& r, uint32_t jIndex, std::vector& v); // these allocators hold the memory for the keys stored in tlJoiners - std::shared_ptr storedKeyAllocators; + // WIP This was a shared vec of allocators originally but it might not be necessary. + // This might give far memory allocations for keys used by JOIN hashmap. + std::vector storedKeyAllocators; /* PM Aggregation */ rowgroup::RowGroup joinedRG; // if there's a join, the rows are formatted with this rowgroup::SP_ROWAGG_PM_t fAggregator; rowgroup::RowGroup fAggregateRG; rowgroup::RGData fAggRowGroupData; - // boost::scoped_array fAggRowGroupData; /* OR hacks */ uint8_t bop; // BOP_AND or BOP_OR diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4a786e4b9..211365663 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -73,6 +73,7 @@ if (WITH_UNITTESTS) target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) gtest_add_tests(TARGET fair_threadpool_test TEST_PREFIX columnstore:) + set_source_files_properties(counting_allocator.cpp PROPERTIES COMPILE_FLAGS "-Wno-sign-compare") add_executable(counting_allocator counting_allocator.cpp) add_dependencies(counting_allocator googletest) target_link_libraries(counting_allocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES}) @@ -88,7 +89,7 @@ if (WITH_UNITTESTS) add_test(NAME columnstore:brm_em_standalone COMMAND brm_em_standalone) set_tests_properties(columnstore:brm_em_standalone PROPERTIES DISABLED True) endif() - +# -Werror=sign-compare if (WITH_MICROBENCHMARKS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Debug")) find_package(benchmark REQUIRED) add_executable(primitives_scan_bench primitives_scan_bench.cpp) diff --git a/tests/rowgroup-tests.cpp b/tests/rowgroup-tests.cpp index 0c2a2e789..5b3899e30 100644 --- a/tests/rowgroup-tests.cpp +++ b/tests/rowgroup-tests.cpp @@ -16,8 +16,11 @@ MA 02110-1301, USA. */ #include // googletest header file +#include #include +#include +#include "countingallocator.h" #include "rowgroup.h" #include "columnwidth.h" #include "joblisttypes.h" @@ -28,13 +31,44 @@ using CSCDataType = execplan::CalpontSystemCatalog::ColDataType; using datatypes::TSInt128; +using RGFieldsType = std::vector; + +rowgroup::RowGroup setupRG(const std::vector& cts, + const RGFieldsType& widths, const RGFieldsType& charsets) +{ + std::vector types = cts; + RGFieldsType offsets{2}; + for (auto w : widths) + { + offsets.push_back(offsets.back() + w); + } + RGFieldsType roids(widths.size()); + std::iota(roids.begin(), roids.end(), 3000); + RGFieldsType tkeys(widths.size()); + std::fill(tkeys.begin(), tkeys.end(), 1); + RGFieldsType cscale(widths.size()); + std::fill(cscale.begin(), cscale.end(), 0); + RGFieldsType precision(widths.size()); + std::fill(precision.begin(), precision.end(), 20); + return rowgroup::RowGroup(roids.size(), // column count + offsets, // oldOffset + roids, // column oids + tkeys, // keys + types, // types + charsets, // charset numbers + cscale, // scale + precision, // precision + 20, // sTableThreshold + true // useStringTable + ); +} class RowDecimalTest : public ::testing::Test { protected: void SetUp() override { - uint32_t precision = WIDE_DEC_PRECISION; + uint32_t precision = WIDE_DEC_PRECISION; uint32_t oid = 3001; std::vector types; @@ -81,7 +115,6 @@ class RowDecimalTest : public ::testing::Test 20, // sTableThreshold false // useStringTable ); - rg = rgOut = inRG; rgD.reinit(rg); rgDOut.reinit(rgOut); @@ -317,3 +350,90 @@ TEST_F(RowDecimalTest, RowEqualsCheck) } } } + +static const constexpr int64_t MemoryAllowance = 10 * 1024 * 1024; + +class RGDataTest : public ::testing::Test +{ + protected: + RGDataTest() + : allocatedMemory(MemoryAllowance), alloc(allocatedMemory, MemoryAllowance / 100) {} + void SetUp() override + { + rg = setupRG({execplan::CalpontSystemCatalog::VARCHAR, execplan::CalpontSystemCatalog::UDECIMAL, + execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL, + execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL}, + {65536, 16, 8, 4, 2, 1}, {8, 8, 8, 8, 8, 8}); + + // rgD = rowgroup::RGData(rg, &alloc); + // rg.setData(&rgD); + // rg.initRow(&r); + // rg.getRow(0, &r); + + // for (size_t i = 0; i < sValueVector.size(); i++) + // { + // // setStringField + // r.setBinaryField_offset(&sValueVector[i], sizeof(sValueVector[0]), offsets[0]); + // r.setBinaryField_offset(&anotherValueVector[i], sizeof(anotherValueVector[0]), offsets[1]); + // r.setIntField(s64ValueVector[i], 2); + // r.setIntField(s32ValueVector[i], 3); + // r.setIntField(s16ValueVector[i], 4); + // r.setIntField(s8ValueVector[i], 5); + // r.nextRow(rowSize); + // } + + // rowCount = sValueVector.size(); + } + + // void TearDown() override {} + + rowgroup::Row r; + rowgroup::RowGroup rg; + rowgroup::RGData rgD; + std::atomic allocatedMemory{MemoryAllowance}; + allocators::CountingAllocator alloc; +}; + // bool useStringTable = true; +TEST_F(RGDataTest, AllocData) +{ + std::cout << " test allocatedMemery " << allocatedMemory.load() << " rowsize " << rg.getRowSize() << " " << rg.getMaxDataSize() << std::endl; + rgD = rowgroup::RGData(rg, &alloc); + rg.setData(&rgD); + rg.initRow(&r); + rg.getRow(0, &r); + std::cout << " test inStringTable(colIndex) " << r.inStringTable(0) << std::endl; + + std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl; + auto currentAllocation = allocatedMemory.load(); + EXPECT_LE(currentAllocation, MemoryAllowance - rg.getMaxDataSize()); + + r.setStringField(utils::ConstString{"testaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}, 0); + std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl; + std::cout << " test inStringTable " << r.getColumnWidth(0) << std::endl; + EXPECT_LE(allocatedMemory.load(), currentAllocation); + + currentAllocation = allocatedMemory.load(); + r.nextRow(); + r.setStringField(utils::ConstString{"testaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}, 0); + std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl; + std::cout << " test inStringTable " << r.getColumnWidth(0) << std::endl; + EXPECT_EQ(allocatedMemory.load(), currentAllocation); + + currentAllocation = allocatedMemory.load(); + r.nextRow(); + std::string longString(64 * 1024 + 1000, 'a'); + auto cs = utils::ConstString(longString); + std::cout << "test longString " << longString.size() << " cs len " << cs.length()<< std::endl; + + r.setStringField(cs, 0); + std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl; + std::cout << " test inStringTable " << r.getColumnWidth(0) << std::endl; + EXPECT_LE(allocatedMemory.load(), currentAllocation); + + rgD = rowgroup::RGData(rg); + std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl; + + EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); + + // reinit +} diff --git a/utils/common/countingallocator.h b/utils/common/countingallocator.h index 76f7c9a58..82e8f9105 100644 --- a/utils/common/countingallocator.h +++ b/utils/common/countingallocator.h @@ -46,8 +46,12 @@ public: CountingAllocator(const CountingAllocator& other) noexcept : memoryLimitRef_(other.memoryLimitRef_) {} + // Allocate memory for n objects of type T - T* allocate(std::size_t n) { + template + typename std::enable_if::value, U*>::type + allocate(std::size_t n) + { auto memCounted = memoryLimitRef_.fetch_sub(n * sizeof(T), std::memory_order_relaxed); if (memCounted < memoryLimitLowerBound) { memoryLimitRef_.fetch_add(n * sizeof(T), std::memory_order_relaxed); @@ -60,8 +64,25 @@ public: return ptr; } + template + typename std::enable_if::value, typename std::remove_extent::type*>::type + allocate(std::size_t n) + { + auto memCounted = memoryLimitRef_.fetch_sub(n * sizeof(T), std::memory_order_relaxed); + if (memCounted < memoryLimitLowerBound) { + memoryLimitRef_.fetch_add(n * sizeof(T), std::memory_order_relaxed); + throw std::bad_alloc(); + } + + T ptr = static_cast(::operator new[](n)); + // std::cout << "[Allocate] " << n * sizeof(T) << " bytes at " << static_cast(ptr) + // << ". current timit: " << std::dec << memoryLimitRef_.load() << std::hex << " bytes.\n"; + return ptr; + } + // Deallocate memory for n objects of type T - void deallocate(T* ptr, std::size_t n) noexcept { + void deallocate(T* ptr, std::size_t n) noexcept + { ::operator delete(ptr); memoryLimitRef_.fetch_add(n * sizeof(T), std::memory_order_relaxed); // std::cout << "[Deallocate] " << n * sizeof(T) << " bytes from " << static_cast(ptr) @@ -70,12 +91,14 @@ public: // Equality operators (allocators are equal if they share the same counter) template - bool operator==(const CountingAllocator& other) const noexcept { + bool operator==(const CountingAllocator& other) const noexcept + { return &memoryLimitRef_ == &other.memoryLimitRef_; } template - bool operator!=(const CountingAllocator& other) const noexcept { + bool operator!=(const CountingAllocator& other) const noexcept + { return !(*this == other); } diff --git a/utils/common/poolallocator.cpp b/utils/common/poolallocator.cpp index 3e3966a1b..7341844c9 100644 --- a/utils/common/poolallocator.cpp +++ b/utils/common/poolallocator.cpp @@ -52,13 +52,13 @@ void PoolAllocator::deallocateAll() void PoolAllocator::newBlock() { - std::shared_ptr next; + std::shared_ptr next; capacityRemaining = allocSize; if (!tmpSpace || mem.size() == 0) { - next.reset(new uint8_t[allocSize]); + next.reset(new PoolAllocatorBufType[allocSize]); mem.push_back(next); nextAlloc = next.get(); } @@ -71,7 +71,7 @@ void* PoolAllocator::allocOOB(uint64_t size) OOBMemInfo memInfo; memUsage += size; - memInfo.mem.reset(new uint8_t[size]); + memInfo.mem.reset(new PoolAllocatorBufType[size]); memInfo.size = size; void* ret = (void*)memInfo.mem.get(); oob[ret] = memInfo; diff --git a/utils/common/poolallocator.h b/utils/common/poolallocator.h index 9f5dff6e7..64042e927 100644 --- a/utils/common/poolallocator.h +++ b/utils/common/poolallocator.h @@ -33,8 +33,11 @@ #include +#include "countingallocator.h" + namespace utils { +using PoolAllocatorBufType = uint8_t; class PoolAllocator { public: @@ -51,6 +54,18 @@ class PoolAllocator , lock(false) { } + PoolAllocator(allocators::CountingAllocator* allocator, unsigned windowSize = DEFAULT_WINDOW_SIZE, + bool isTmpSpace = false, bool _useLock = false) + : allocSize(windowSize) + , tmpSpace(isTmpSpace) + , capacityRemaining(0) + , memUsage(0) + , nextAlloc(0) + , useLock(_useLock) + , lock(false) + , allocator(allocator) + { + } PoolAllocator(const PoolAllocator& p) : allocSize(p.allocSize) , tmpSpace(p.tmpSpace) @@ -59,6 +74,7 @@ class PoolAllocator , nextAlloc(0) , useLock(p.useLock) , lock(false) + , allocator(p.allocator) { } virtual ~PoolAllocator() @@ -90,21 +106,23 @@ class PoolAllocator void* allocOOB(uint64_t size); unsigned allocSize; - std::vector> mem; + std::vector> mem; bool tmpSpace; unsigned capacityRemaining; uint64_t memUsage; - uint8_t* nextAlloc; + PoolAllocatorBufType* nextAlloc; bool useLock; std::atomic lock; struct OOBMemInfo { - std::shared_ptr mem; + std::shared_ptr mem; uint64_t size; }; typedef std::map OutOfBandMap; OutOfBandMap oob; // for mem chunks bigger than the window size; these can be dealloc'd + // WIP rename to allocator + allocators::CountingAllocator* allocator = nullptr; }; inline void* PoolAllocator::allocate(uint64_t size) @@ -136,4 +154,4 @@ inline void* PoolAllocator::allocate(uint64_t size) return ret; } -} // namespace utils +} // namespace allocators diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index 42ba63351..f111f2006 100644 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -31,7 +31,6 @@ #include using namespace std; - #include #include "bytestream.h" @@ -49,7 +48,10 @@ namespace rowgroup { using cscType = execplan::CalpontSystemCatalog::ColDataType; - +StringStore::StringStore(allocators::CountingAllocator* alloc) : StringStore() +{ + this->alloc = alloc; +} StringStore::~StringStore() { @@ -94,11 +96,22 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len) if (mem.size() > 0) lastMC = (MemChunk*)mem.back().get(); + std::cout << "StringStore::storeString len " << len << std::endl; if ((len + 4) >= CHUNK_SIZE) { - std::shared_ptr newOne(new uint8_t[len + sizeof(MemChunk) + 4]); - longStrings.push_back(newOne); - lastMC = (MemChunk*)longStrings.back().get(); + auto allocSize = len + sizeof(MemChunk) + 4; + if (alloc) + { + cout << "StringStore::storeString longStrings with alloc " << std::endl; + longStrings.emplace_back(std::allocate_shared(*alloc, allocSize)); + } + else + { + cout << "StringStore::storeString longStrings no alloc " << std::endl; + longStrings.emplace_back(std::make_shared(allocSize)); + } + // std::shared_ptr newOne(new uint8_t[len + sizeof(MemChunk) + 4]); + lastMC = reinterpret_cast(longStrings.back().get()); lastMC->capacity = lastMC->currentSize = len + 4; memcpy(lastMC->data, &len, 4); memcpy(lastMC->data + 4, data, len); @@ -112,10 +125,21 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len) { // mem usage debugging // if (lastMC) - // cout << "Memchunk efficiency = " << lastMC->currentSize << "/" << lastMC->capacity << endl; - std::shared_ptr newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]); - mem.push_back(newOne); - lastMC = (MemChunk*)mem.back().get(); + if (alloc) + { + cout << "StringStore::storeString with alloc " << std::endl; + mem.emplace_back(std::allocate_shared(*alloc, CHUNK_SIZE + sizeof(MemChunk))); + // std::allocate_shared) newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]); + } + else + { + cout << "StringStore::storeString no alloc " << std::endl; + mem.emplace_back(std::make_shared(CHUNK_SIZE + sizeof(MemChunk))); + // mem.emplace_back(std::allocate_shared(*alloc, CHUNK_SIZE + sizeof(MemChunk))); + // std::shared_ptr newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]); + } + // mem.push_back(newOne); + lastMC = reinterpret_cast(mem.back().get()); lastMC->currentSize = 0; lastMC->capacity = CHUNK_SIZE; memset(lastMC->data, 0, CHUNK_SIZE); @@ -172,7 +196,7 @@ void StringStore::deserialize(ByteStream& bs) // mem.clear(); bs >> count; - mem.resize(count); + // mem.resize(count); bs >> tmp8; empty = (bool)tmp8; @@ -181,7 +205,18 @@ void StringStore::deserialize(ByteStream& bs) bs >> size; // cout << "deserializing " << size << " bytes\n"; buf = bs.buf(); - mem[i].reset(new uint8_t[size + sizeof(MemChunk)]); + + if (alloc) + { + cout << "StringStore::deserialize with alloc " << std::endl; + mem.emplace_back(std::allocate_shared(*alloc, size + sizeof(MemChunk))); + } + else + { + cout << "StringStore::deserialize no alloc " << std::endl; + mem.emplace_back(std::make_shared(size + sizeof(MemChunk))); + } + // mem[i].reset(new uint8_t[size + sizeof(MemChunk)]); mc = (MemChunk*)mem[i].get(); mc->currentSize = size; mc->capacity = size; @@ -302,7 +337,6 @@ void UserDataStore::deserialize(ByteStream& bs) return; } - RGData::RGData(const RowGroup& rg, uint32_t rowCount) { // cout << "rgdata++ = " << __sync_add_and_fetch(&rgDataCount, 1) << endl; @@ -313,7 +347,6 @@ RGData::RGData(const RowGroup& rg, uint32_t rowCount) userDataStore.reset(); - #ifdef VALGRIND /* In a PM-join, we can serialize entire tables; not every value has been * filled in yet. Need to look into that. Valgrind complains that @@ -343,13 +376,44 @@ RGData::RGData(const RowGroup& rg) #endif } +RGData::RGData(const RowGroup& rg, allocators::CountingAllocator* alloc) : alloc(alloc) +{ + // rowData = shared_ptr(buf, [alloc, allocSize](uint8_t* p) { alloc->deallocate(p, allocSize); + // }); + rowData = std::allocate_shared(*alloc, rg.getMaxDataSize()); + // rowData = std::make_shared(uint8_t[rg.getMaxDataSize()]); + + if (rg.usesStringTable()) + strings.reset(new StringStore(alloc)); + + userDataStore.reset(); + +#ifdef VALGRIND + /* In a PM-join, we can serialize entire tables; not every value has been + * filled in yet. Need to look into that. Valgrind complains that + * those bytes are uninitialized, this suppresses that error. + */ + memset(rowData.get(), 0, rg.getMaxDataSize()); +#endif +} + void RGData::reinit(const RowGroup& rg, uint32_t rowCount) { - rowData.reset(new uint8_t[rg.getDataSize(rowCount)]); + if (alloc) + { + cout << "RGData::reinit with alloc " << std::endl; + rowData = std::allocate_shared(*alloc, rg.getDataSize(rowCount)); + } + else + { + cout << "RGData::reinit no alloc " << std::endl; + rowData.reset(new uint8_t[rg.getDataSize(rowCount)]); + } + userDataStore.reset(); if (rg.usesStringTable()) - strings.reset(new StringStore()); + strings.reset(new StringStore(alloc)); else strings.reset(); @@ -1314,9 +1378,8 @@ string RowGroup::toString(const std::vector& used) const os << "rowcount = " << getRowCount() << endl; if (!used.empty()) { - uint64_t cnt = - std::accumulate(used.begin(), used.end(), 0ULL, - [](uint64_t a, uint64_t bits) { return a + __builtin_popcountll(bits); }); + uint64_t cnt = std::accumulate(used.begin(), used.end(), 0ULL, [](uint64_t a, uint64_t bits) + { return a + __builtin_popcountll(bits); }); os << "sparse row count = " << cnt << endl; } os << "base rid = " << getBaseRid() << endl; diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index f72517fd8..5e6137009 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -40,6 +40,7 @@ #include +#include "countingallocator.h" #include "hasher.h" #include "joblisttypes.h" @@ -126,10 +127,15 @@ inline T derefFromTwoVectorPtrs(const std::vector* outer, const std::vectoroperator[](outerIdx); } +using RGDataBufType = uint8_t[]; +// using RGDataBufType = std::vector; +using StringStoreBufType = uint8_t[]; + class StringStore { public: StringStore() = default; + StringStore(allocators::CountingAllocator* alloc); StringStore(const StringStore&) = delete; StringStore(StringStore&&) = delete; StringStore& operator=(const StringStore&) = delete; @@ -184,6 +190,7 @@ class StringStore bool empty = true; bool fUseStoreStringMutex = false; //@bug6065, make StringStore::storeString() thread safe boost::mutex fMutex; + allocators::CountingAllocator* alloc = nullptr; }; // Where we store user data for UDA(n)F @@ -248,6 +255,7 @@ class UserDataStore class RowGroup; class Row; + /* TODO: OO the rowgroup data to the extent there's no measurable performance hit. */ class RGData { @@ -255,6 +263,7 @@ class RGData RGData() = default; // useless unless followed by an = or a deserialize operation RGData(const RowGroup& rg, uint32_t rowCount); // allocates memory for rowData explicit RGData(const RowGroup& rg); + explicit RGData(const RowGroup& rg, allocators::CountingAllocator* alloc); RGData& operator=(const RGData&) = default; RGData& operator=(RGData&&) = default; RGData(const RGData&) = default; @@ -314,9 +323,10 @@ class RGData } private: - std::shared_ptr rowData; + std::shared_ptr rowData; std::shared_ptr strings; std::shared_ptr userDataStore; + allocators::CountingAllocator* alloc = nullptr; // Need sig to support backward compat. RGData can deserialize both forms. static const uint32_t RGDATA_SIG = 0xffffffff; // won't happen for 'old' Rowgroup data @@ -584,9 +594,9 @@ class Row } const CHARSET_INFO* getCharset(uint32_t col) const; + inline bool inStringTable(uint32_t col) const; private: - inline bool inStringTable(uint32_t col) const; private: uint32_t columnCount = 0; @@ -987,6 +997,7 @@ inline void Row::setStringField(const utils::ConstString& str, uint32_t colIndex if (inStringTable(colIndex)) { + std::cout << "setStringField storeString len " << length << std::endl; offset = strings->storeString((const uint8_t*)str.str(), length); *((uint64_t*)&data[offsets[colIndex]]) = offset; // cout << " -- stored offset " << *((uint32_t *) &data[offsets[colIndex]]) @@ -995,6 +1006,7 @@ inline void Row::setStringField(const utils::ConstString& str, uint32_t colIndex } else { + std::cout << "setStringField memcpy " << std::endl; memcpy(&data[offsets[colIndex]], str.str(), length); memset(&data[offsets[colIndex] + length], 0, offsets[colIndex + 1] - (offsets[colIndex] + length)); }