From 90b4322470c45f2d89ee77044ea4ce4eca1acaa1 Mon Sep 17 00:00:00 2001 From: drrtuy Date: Fri, 10 Jan 2025 18:53:49 +0000 Subject: [PATCH] feat(): propagated changes into SLTPoolAllocator and friends --- dbcon/joblist/batchprimitiveprocessor-jl.cpp | 6 +- dbcon/joblist/batchprimitiveprocessor-jl.h | 4 +- dbcon/joblist/tuplehashjoin.cpp | 29 +- .../columnstore/basic/r/mcol-5195.result | 58 ++-- .../columnstore/basic/r/mcol_4617.result | 14 +- mysql-test/columnstore/basic/t/mcol-5195.test | 18 +- mysql-test/columnstore/basic/t/mcol_4617.test | 18 +- .../primproc/batchprimitiveprocessor.cpp | 4 +- tests/CMakeLists.txt | 5 + tests/poolallocator.cpp | 287 ++++++++++++++++++ tests/rowgroup-tests.cpp | 11 - utils/common/fixedallocator.cpp | 33 +- utils/common/fixedallocator.h | 21 +- utils/common/poolallocator.cpp | 29 +- utils/common/poolallocator.h | 21 +- utils/common/stlpoolallocator.h | 16 + utils/joiner/tuplejoiner.cpp | 54 +++- utils/joiner/tuplejoiner.h | 17 +- 18 files changed, 516 insertions(+), 129 deletions(-) create mode 100644 tests/poolallocator.cpp diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index 8b88a498a..6471a2d58 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -54,7 +54,7 @@ using namespace joiner; namespace joblist { -BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm) +BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(ResourceManager* rm) : ot(BPS_ELEMENT_TYPE) , needToSetLBID(true) , count(1) @@ -80,6 +80,7 @@ BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm) , fJoinerChunkSize(rm->getJlJoinerChunkSize()) , hasSmallOuterJoin(false) , _priority(1) + , rm_(rm) { PMJoinerCount = 0; uuid = bu::nil_generator()(); @@ -1481,7 +1482,8 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs) if (tJoiners[joinerNum]->isTypelessJoin()) { - utils::FixedAllocator fa(tlKeyLens[joinerNum], true); + auto alloc = rm_->getAllocator(); + utils::FixedAllocator fa(alloc, tlKeyLens[joinerNum], true); for (i = pos; i < pos + toSend; i++) { diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.h b/dbcon/joblist/batchprimitiveprocessor-jl.h index 2b9e400b0..e3a5e6f31 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.h +++ b/dbcon/joblist/batchprimitiveprocessor-jl.h @@ -59,7 +59,7 @@ class BatchPrimitiveProcessorJL { public: /* Constructor used by the JobStep */ - explicit BatchPrimitiveProcessorJL(const ResourceManager* rm); + explicit BatchPrimitiveProcessorJL(ResourceManager* rm); ~BatchPrimitiveProcessorJL(); /* Interface used by the JobStep */ @@ -384,6 +384,8 @@ class BatchPrimitiveProcessorJL boost::uuids::uuid uuid; + joblist::ResourceManager* rm_ = nullptr; + friend class CommandJL; friend class ColumnCommandJL; friend class PassThruCommandJL; diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index d935a67dc..e05010a58 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -277,22 +277,21 @@ void TupleHashJoinStep::startSmallRunners(uint index) if (typelessJoin[index]) { - joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index], largeSideKeys[index], jt, - &jobstepThreadPool, numCores)); + joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index], largeSideKeys[index], + jt, &jobstepThreadPool, resourceManager, numCores)); } else { - joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0], largeSideKeys[index][0], - jt, &jobstepThreadPool, numCores)); + joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0], largeSideKeys[index][0], + jt, &jobstepThreadPool, resourceManager, numCores)); } - joiner->setUniqueLimit(uniqueLimit); - joiner->setTableName(smallTableNames[index]); - joiners[index] = joiner; + joiners[index]->setUniqueLimit(uniqueLimit); + joiners[index]->setTableName(smallTableNames[index]); /* check for join types unsupported on the PM. */ if (!largeBPS || !isExeMgr) - joiner->setInUM(rgData[index]); + joiners[index]->setInUM(rgData[index]); /* start the small runners @@ -306,7 +305,7 @@ void TupleHashJoinStep::startSmallRunners(uint index) uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); }); // starting 1 thread when in PM mode, since it's only inserting into a // vector of rows. The rest will be started when converted to UM mode. - if (joiner->inUM()) + if (joiners[index]->inUM()) { for (int i = 0; i < numCores; i++) { @@ -320,7 +319,7 @@ void TupleHashJoinStep::startSmallRunners(uint index) // wait for the first thread to join, then decide whether the others exist and need joining jobstepThreadPool.join(jobs[0]); - if (joiner->inUM()) + if (joiners[index]->inUM()) { for (int i = 1; i < numCores; i++) { @@ -352,7 +351,7 @@ void TupleHashJoinStep::startSmallRunners(uint index) end_time = boost::posix_time::microsec_clock::universal_time(); if (!(fSessionId & 0x80000000)) cout << "hash table construction time = " << end_time - start_time << - " size = " << joiner->size() << endl; + " size = " << joiners[index]->size() << endl; */ if (traceOn()) @@ -361,13 +360,13 @@ void TupleHashJoinStep::startSmallRunners(uint index) } ostringstream oss; - if (!joiner->onDisk()) + if (!joiners[index]->onDisk()) { // add extended info, and if not aborted then tell joiner // we're done reading the small side. if (traceOn()) { - if (joiner->inPM()) + if (joiners[index]->inPM()) { { oss << "PM join (" << index << ")" << endl; @@ -377,7 +376,7 @@ void TupleHashJoinStep::startSmallRunners(uint index) extendedInfo += oss.str(); } } - else if (joiner->inUM()) + else if (joiners[index]->inUM()) { oss << "UM join (" << index << ")" << endl; #ifdef JLF_DEBUG @@ -387,7 +386,7 @@ void TupleHashJoinStep::startSmallRunners(uint index) } } if (!cancelled()) - joiner->doneInserting(); + joiners[index]->doneInserting(); } if (traceOn()) diff --git a/mysql-test/columnstore/basic/r/mcol-5195.result b/mysql-test/columnstore/basic/r/mcol-5195.result index dceb428e0..2b5c618da 100644 --- a/mysql-test/columnstore/basic/r/mcol-5195.result +++ b/mysql-test/columnstore/basic/r/mcol-5195.result @@ -5,54 +5,54 @@ create table t1 (a int, b int) engine=columnstore; create table t2 (a int, b int) engine=columnstore; insert into t1 values (1, 2), (1, 3), (1, 4), (2, 5), (2, 6), (2, 7); insert into t2 values (1, 2), (1, 2), (1, 4), (2, 5), (2, 6), (2, 8); -select * from t1, t2 where t1.a = t2.a and t2.b = (select max(b) from t2 where t1.a = t2.a) order by t2.b; +select * from t1, t2 where t1.a = t2.a and t2.b = (select max(b) from t2 where t1.a = t2.a); a b a b -1 4 1 4 1 2 1 4 1 3 1 4 -2 7 2 8 +1 4 1 4 2 5 2 8 2 6 2 8 -select * from t1, t2 where t1.a = t2.a and t2.b < (select max(b) from t2 where t1.a = t2.a) order by t2.b; +2 7 2 8 +select * from t1, t2 where t1.a = t2.a and t2.b < (select max(b) from t2 where t1.a = t2.a); a b a b -1 3 1 2 -1 4 1 2 1 2 1 2 -1 4 1 2 1 2 1 2 1 3 1 2 -2 6 2 5 +1 3 1 2 +1 4 1 2 +1 4 1 2 2 5 2 5 -2 7 2 5 2 5 2 6 -2 6 2 6 -2 7 2 6 -select * from t1, t2 where t1.a = t2.a and t2.b > (select max(b) from t2 where t1.a = t2.a) order by t2.b; -a b a b -select * from t1, t2 where t1.a = t2.a and t1.b = (select avg(t2.b) from t2 where t1.a = t2.a group by t2.a) order by t2.b; -a b a b -select * from t1, t2 where t1.a = t2.a and t2.b < (select avg(t2.b) from t2 where t1.a = t2.a group by t2.a) order by t2.b; -a b a b -1 3 1 2 -1 4 1 2 -1 2 1 2 -1 4 1 2 -1 2 1 2 -1 3 1 2 2 6 2 5 -2 5 2 5 -2 7 2 5 -2 5 2 6 2 6 2 6 +2 7 2 5 2 7 2 6 -select * from t1, t2 where t1.a = t2.a and t2.b > (select avg(t2.b) from t2 where t1.a = t2.a group by t2.a) order by t2.b; +select * from t1, t2 where t1.a = t2.a and t2.b > (select max(b) from t2 where t1.a = t2.a); +a b a b +select * from t1, t2 where t1.a = t2.a and t1.b = (select avg(t2.b) from t2 where t1.a = t2.a group by t2.a); +a b a b +select * from t1, t2 where t1.a = t2.a and t2.b < (select avg(t2.b) from t2 where t1.a = t2.a group by t2.a); +a b a b +1 2 1 2 +1 2 1 2 +1 3 1 2 +1 3 1 2 +1 4 1 2 +1 4 1 2 +2 5 2 5 +2 5 2 6 +2 6 2 5 +2 6 2 6 +2 7 2 5 +2 7 2 6 +select * from t1, t2 where t1.a = t2.a and t2.b > (select avg(t2.b) from t2 where t1.a = t2.a group by t2.a); a b a b -1 4 1 4 1 2 1 4 1 3 1 4 -2 7 2 8 +1 4 1 4 2 5 2 8 2 6 2 8 +2 7 2 8 drop table t1; drop table t2; DROP DATABASE mcol5195; diff --git a/mysql-test/columnstore/basic/r/mcol_4617.result b/mysql-test/columnstore/basic/r/mcol_4617.result index fdbb3c9e3..b63a58c8d 100644 --- a/mysql-test/columnstore/basic/r/mcol_4617.result +++ b/mysql-test/columnstore/basic/r/mcol_4617.result @@ -112,7 +112,7 @@ a 1 2 3 -select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a in (select b from cs2) order by 1,2,3; +select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a in (select b from cs2); a b c 1 1 100 1 1 101 @@ -124,7 +124,7 @@ select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a in (select b from cs2) and c a b c 1 1 100 1 1 101 -select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a in (select t1.b from cs2 t1 join cs2 t2 on t1.b=t2.b and t1.b=1) order by 1,2,3; +select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a in (select t1.b from cs2 t1 join cs2 t2 on t1.b=t2.b and t1.b=1); a b c 1 1 100 1 1 101 @@ -193,7 +193,7 @@ select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select b from cs2); a b c select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select b from cs2) and cs1.a=1; a b c -select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select t1.b from cs2 t1 join cs2 t2 on t1.b=t2.b and t1.b=1) order by 1,2,3; +select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select t1.b from cs2 t1 join cs2 t2 on t1.b=t2.b and t1.b=1); a b c 2 2 200 3 3 300 @@ -249,7 +249,7 @@ select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select b from cs2 wh a b c select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select b from cs2 where b is not null) and cs1.a=1; a b c -select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select t1.b from (select b from cs2 where b is not null) t1 join cs2 t2 on t1.b=t2.b and t1.b=1) order by 1,2,3; +select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select t1.b from (select b from cs2 where b is not null) t1 join cs2 t2 on t1.b=t2.b and t1.b=1); a b c 2 2 200 3 3 300 @@ -348,21 +348,21 @@ select * from cs1 where (a,d) in (select t1.b,t1.c from cs2 t1 join cs2 t2 on t1 a d 1 100 3 302 -select * from cs1 join cs2 on cs1.a=cs2.b and (cs1.a,cs1.d) in (select b,c from cs2) order by 1,2,3,4; +select * from cs1 join cs2 on cs1.a=cs2.b and (cs1.a,cs1.d) in (select b,c from cs2); a d b c 1 100 1 100 1 100 1 101 3 302 3 300 3 302 3 301 3 302 3 302 -select * from cs1 join cs2 on cs1.a=cs2.b and (cs1.a,cs1.d) in (select b,c from cs2) and cs1.a=1 order by 1,2,3,4; +select * from cs1 join cs2 on cs1.a=cs2.b and (cs1.a,cs1.d) in (select b,c from cs2) and cs1.a=1; a d b c 1 100 1 100 1 100 1 101 select * from cs1 join cs2 on cs1.a=cs2.b and (cs1.a,cs1.d) in (select t1.b,t1.c from cs2 t1 join cs2 t2 on t1.b=t2.b and t1.b=1); a d b c -1 100 1 100 1 100 1 101 +1 100 1 100 drop table cs1; create table cs1 (a int); insert into cs1 values (1), (2), (3), (4), (null); diff --git a/mysql-test/columnstore/basic/t/mcol-5195.test b/mysql-test/columnstore/basic/t/mcol-5195.test index 2012a6101..d32d5914f 100644 --- a/mysql-test/columnstore/basic/t/mcol-5195.test +++ b/mysql-test/columnstore/basic/t/mcol-5195.test @@ -15,13 +15,19 @@ create table t2 (a int, b int) engine=columnstore; insert into t1 values (1, 2), (1, 3), (1, 4), (2, 5), (2, 6), (2, 7); insert into t2 values (1, 2), (1, 2), (1, 4), (2, 5), (2, 6), (2, 8); -select * from t1, t2 where t1.a = t2.a and t2.b = (select max(b) from t2 where t1.a = t2.a) order by t2.b; -select * from t1, t2 where t1.a = t2.a and t2.b < (select max(b) from t2 where t1.a = t2.a) order by t2.b; -select * from t1, t2 where t1.a = t2.a and t2.b > (select max(b) from t2 where t1.a = t2.a) order by t2.b; +--sorted_result +select * from t1, t2 where t1.a = t2.a and t2.b = (select max(b) from t2 where t1.a = t2.a); +--sorted_result +select * from t1, t2 where t1.a = t2.a and t2.b < (select max(b) from t2 where t1.a = t2.a); +--sorted_result +select * from t1, t2 where t1.a = t2.a and t2.b > (select max(b) from t2 where t1.a = t2.a); -select * from t1, t2 where t1.a = t2.a and t1.b = (select avg(t2.b) from t2 where t1.a = t2.a group by t2.a) order by t2.b; -select * from t1, t2 where t1.a = t2.a and t2.b < (select avg(t2.b) from t2 where t1.a = t2.a group by t2.a) order by t2.b; -select * from t1, t2 where t1.a = t2.a and t2.b > (select avg(t2.b) from t2 where t1.a = t2.a group by t2.a) order by t2.b; +--sorted_result +select * from t1, t2 where t1.a = t2.a and t1.b = (select avg(t2.b) from t2 where t1.a = t2.a group by t2.a); +--sorted_result +select * from t1, t2 where t1.a = t2.a and t2.b < (select avg(t2.b) from t2 where t1.a = t2.a group by t2.a); +--sorted_result +select * from t1, t2 where t1.a = t2.a and t2.b > (select avg(t2.b) from t2 where t1.a = t2.a group by t2.a); drop table t1; drop table t2; diff --git a/mysql-test/columnstore/basic/t/mcol_4617.test b/mysql-test/columnstore/basic/t/mcol_4617.test index 9f43151b8..39a102125 100644 --- a/mysql-test/columnstore/basic/t/mcol_4617.test +++ b/mysql-test/columnstore/basic/t/mcol_4617.test @@ -69,11 +69,13 @@ select * from cs1 where a in (select t1.b from cs2 t1, cs2 t2 where t1.b=t2.b an select * from cs1 where a in (select t1.b from cs2 t1 join cs2 t2 on t1.b=t2.b and t1.c=t2.c); ### Outer query containing joins -select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a in (select b from cs2) order by 1,2,3; +--sorted_result +select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a in (select b from cs2); select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a in (select b from cs2) and cs1.a=1; ### Both IN subquery and outer queries containing joins -select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a in (select t1.b from cs2 t1 join cs2 t2 on t1.b=t2.b and t1.b=1) order by 1,2,3; +--sorted_result +select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a in (select t1.b from cs2 t1 join cs2 t2 on t1.b=t2.b and t1.b=1); ## NOT IN subquery ### Basic tests @@ -120,7 +122,8 @@ select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select b from cs2); select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select b from cs2) and cs1.a=1; ### Both IN subquery and outer queries containing joins -select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select t1.b from cs2 t1 join cs2 t2 on t1.b=t2.b and t1.b=1) order by 1,2,3; +--sorted_result +select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select t1.b from cs2 t1 join cs2 t2 on t1.b=t2.b and t1.b=1); ## NOT IN subquery without NULLs ### Basic tests @@ -158,7 +161,8 @@ select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select b from cs2 wh select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select b from cs2 where b is not null) and cs1.a=1; ### Both IN subquery and outer queries containing joins -select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select t1.b from (select b from cs2 where b is not null) t1 join cs2 t2 on t1.b=t2.b and t1.b=1) order by 1,2,3; +--sorted_result +select * from cs1 join cs2 on cs1.a=cs2.b and cs1.a not in (select t1.b from (select b from cs2 where b is not null) t1 join cs2 t2 on t1.b=t2.b and t1.b=1); # Special cases involving NULLs select * from cs1 where a in (select b from cs2 where b is null); @@ -213,8 +217,10 @@ select * from cs1 where (a,d) in (select t1.b,t1.c from cs2 t1, cs2 t2 where t1. select * from cs1 where (a,d) in (select t1.b,t1.c from cs2 t1 join cs2 t2 on t1.b=t2.b and t1.c=t2.c); ### Outer query containing joins -select * from cs1 join cs2 on cs1.a=cs2.b and (cs1.a,cs1.d) in (select b,c from cs2) order by 1,2,3,4; -select * from cs1 join cs2 on cs1.a=cs2.b and (cs1.a,cs1.d) in (select b,c from cs2) and cs1.a=1 order by 1,2,3,4; +--sorted_result +select * from cs1 join cs2 on cs1.a=cs2.b and (cs1.a,cs1.d) in (select b,c from cs2); +--sorted_result +select * from cs1 join cs2 on cs1.a=cs2.b and (cs1.a,cs1.d) in (select b,c from cs2) and cs1.a=1; ### Both IN subquery and outer queries containing joins select * from cs1 join cs2 on cs1.a=cs2.b and (cs1.a,cs1.d) in (select t1.b,t1.c from cs2 t1 join cs2 t2 on t1.b=t2.b and t1.b=1); diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 5ebac1850..a39459e95 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -331,8 +331,8 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) { // storedKeyAllocators[j].setUseLock(true); // WIP use one copy of the allocator - auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); - storedKeyAllocators.emplace_back(PoolAllocator(&allocator, PoolAllocator::DEFAULT_WINDOW_SIZE, false, true)); + auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator(); + storedKeyAllocators.emplace_back(PoolAllocator(alloc, PoolAllocator::DEFAULT_WINDOW_SIZE, false, true)); } joinNullValues.reset(new uint64_t[joinerCount]); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d3504af74..67578a98d 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -81,6 +81,11 @@ if (WITH_UNITTESTS) target_link_libraries(counting_allocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES}) gtest_add_tests(TARGET counting_allocator TEST_PREFIX columnstore:) + add_executable(poolallocator poolallocator.cpp) + add_dependencies(poolallocator googletest) + target_link_libraries(poolallocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES}) + gtest_add_tests(TARGET poolallocator TEST_PREFIX columnstore:) + add_executable(comparators_tests comparators-tests.cpp) target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) add_test(NAME columnstore:comparators_tests COMMAND comparators_tests) diff --git a/tests/poolallocator.cpp b/tests/poolallocator.cpp new file mode 100644 index 000000000..8a9dc9d64 --- /dev/null +++ b/tests/poolallocator.cpp @@ -0,0 +1,287 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ +#include +#include +#include +#include +#include +#include + +#include "countingallocator.h" +#include "poolallocator.h" + +using namespace allocators; +using namespace utils; + +/** + * Тест явного задания windowSize при создании: + */ +TEST(PoolAllocatorTest, CustomWindowSize) +{ + const unsigned CUSTOM_SIZE = 1024; + PoolAllocator pa(CUSTOM_SIZE); + EXPECT_EQ(pa.getWindowSize(), CUSTOM_SIZE); + EXPECT_EQ(pa.getMemUsage(), 0ULL); +} + +/** + * Тест базового выделения небольшого блока памяти: + * - Выделяем блок меньше, чем windowSize. + * - Проверяем, что memUsage увеличился на размер выделенного блока. + * - Указатель не должен быть равен nullptr. + */ +TEST(PoolAllocatorTest, AllocateSmallBlock) +{ + PoolAllocator pa; + uint64_t initialUsage = pa.getMemUsage(); + const uint64_t ALLOC_SIZE = 128; + + void* ptr = pa.allocate(ALLOC_SIZE); + + ASSERT_NE(ptr, nullptr); + EXPECT_EQ(pa.getMemUsage(), initialUsage + ALLOC_SIZE); +} + +/** + * Тест выделения блока памяти больше, чем windowSize (Out-Of-Band - OOB): + * - Проверяем, что memUsage увеличился на нужное количество байт. + * - Указатель не nullptr. + */ +TEST(PoolAllocatorTest, AllocateOOBBlock) +{ + // Выбираем размер гарантированно больше, чем окно по умолчанию + const uint64_t BIG_BLOCK_SIZE = PoolAllocator::DEFAULT_WINDOW_SIZE + 1024; + PoolAllocator pa; + uint64_t initialUsage = pa.getMemUsage(); + + void* ptr = pa.allocate(BIG_BLOCK_SIZE); + ASSERT_NE(ptr, nullptr); + EXPECT_EQ(pa.getMemUsage(), initialUsage + BIG_BLOCK_SIZE); +} + +/** + * Тест деаллокации (deallocate) Out-Of-Band блока: + * - Убеждаемся, что после deallocate memUsage возвращается к исходному значению. + */ +TEST(PoolAllocatorTest, DeallocateOOBBlock) +{ + PoolAllocator pa; + // Блок больше windowSize + const uint64_t BIG_BLOCK_SIZE = pa.getWindowSize() + 1024; + + uint64_t initialUsage = pa.getMemUsage(); + void* ptr = pa.allocate(BIG_BLOCK_SIZE); + ASSERT_NE(ptr, nullptr); + EXPECT_EQ(pa.getMemUsage(), initialUsage + BIG_BLOCK_SIZE); + + pa.deallocate(ptr); + EXPECT_EQ(pa.getMemUsage(), initialUsage); +} + +/** + * Тест деаллокации блока, который был выделен внутри "windowSize". + * По текущей логике PoolAllocator::deallocate для "маленьких" блоков ничего не делает. + * Основная проверка – что код не падает и не меняет memUsage. + */ +TEST(PoolAllocatorTest, DeallocateSmallBlock) +{ + PoolAllocator pa; + const uint64_t ALLOC_SIZE = 128; + + uint64_t initialUsage = pa.getMemUsage(); + void* ptr = pa.allocate(ALLOC_SIZE); + ASSERT_NE(ptr, nullptr); + EXPECT_EQ(pa.getMemUsage(), initialUsage + ALLOC_SIZE); + + // Попытка деаллокации "маленького" блока – в текущей реализации + // код его не возвращает в пул, следовательно memUsage не уменьшится. + pa.deallocate(ptr); + EXPECT_EQ(pa.getMemUsage(), initialUsage + ALLOC_SIZE); +} + +/** + * Тест полного освобождения памяти (deallocateAll): + * - Выделяем несколько блоков: и маленький, и большой. + * - После вызова deallocateAll всё должно освободиться, memUsage вернётся к 0. + */ +TEST(PoolAllocatorTest, DeallocateAll) +{ + PoolAllocator pa; + // Блок в пределах windowSize + const uint64_t SMALL_BLOCK = 256; + // Блок Out-Of-Band + const uint64_t LARGE_BLOCK = pa.getWindowSize() + 1024; + + pa.allocate(SMALL_BLOCK); + pa.allocate(LARGE_BLOCK); + // Убедимся, что memUsage > 0 + EXPECT_GT(pa.getMemUsage(), 0ULL); + + // Освобождаем всё + pa.deallocateAll(); + EXPECT_EQ(pa.getMemUsage(), 0ULL); +} + +/** + * Тест копирующего оператора присваивания: + * - Проверяем, что параметры (allocSize, tmpSpace, useLock) копируются. + * - Однако выделенная память не копируется (т.к. после operator= вызывается deallocateAll). + */ +TEST(PoolAllocatorTest, AssignmentOperator) +{ + PoolAllocator pa1(2048, true, true); // windowSize=2048, tmpSpace=true, useLock=true + // Выделяем немного памяти + pa1.allocate(100); + pa1.allocate(200); + + EXPECT_EQ(pa1.getWindowSize(), 2048U); + EXPECT_TRUE(pa1.getMemUsage() > 0); + + // С помощью оператора присваивания: pa2 = pa1 + PoolAllocator pa2; + pa2 = pa1; // После этого deallocateAll() вызывается внутри operator= (в нашем коде) + + // Проверяем скопированные поля: + EXPECT_EQ(pa2.getWindowSize(), 2048U); + // tmpSpace и useLock также должны совпасть + // (В данном коде напрямую нет геттеров для них, + // но, если нужно, можете добавить соответствующие методы или рефлексировать код.) + + // Проверяем, что у pa2 memUsage == 0 после deallocateAll + EXPECT_EQ(pa2.getMemUsage(), 0ULL); + // А у pa1 осталась прежняя статистика использования памяти, + // т.к. operator= сделал deallocateAll только внутри pa2. + EXPECT_TRUE(pa1.getMemUsage() > 0); +} + +TEST(PoolAllocatorTest, MultithreadedAllocationWithLock) +{ + PoolAllocator pa(PoolAllocator::DEFAULT_WINDOW_SIZE, false, true); + // useLock = true + + const int THREAD_COUNT = 4; + const uint64_t ALLOC_PER_THREAD = 1024; + std::vector threads; + + // Стартовое значение + uint64_t initialUsage = pa.getMemUsage(); + + // Запускаем несколько потоков, каждый сделает небольшое кол-во аллокаций + for (int i = 0; i < THREAD_COUNT; i++) + { + threads.emplace_back( + [&pa]() + { + for (int j = 0; j < 10; j++) + { + pa.allocate(ALLOC_PER_THREAD); + } + }); + } + + for (auto& th : threads) + th.join(); + + uint64_t expected = initialUsage + THREAD_COUNT * 10ULL * ALLOC_PER_THREAD; + EXPECT_GE(pa.getMemUsage(), expected); +} + +static const constexpr int64_t MemoryAllowance = 10 * 1024 * 1024; + +// Test Fixture for AtomicCounterAllocator +class PoolallocatorTest : public ::testing::Test +{ + protected: + // Atomic counter to track allocated memory + std::atomic allocatedMemory{MemoryAllowance}; + + // Custom allocator instance + CountingAllocator allocator; + + // Constructor + PoolallocatorTest() : allocatedMemory(MemoryAllowance), allocator(&allocatedMemory, MemoryAllowance / 100) + { + } + + // Destructor + ~PoolallocatorTest() override = default; +}; + +// Тест для проверки учёта потребления памяти в PoolAllocator. +TEST_F(PoolallocatorTest, AllocationWithAccounting) +{ + int bufSize = 512; + const unsigned CUSTOM_SIZE = 1024; + PoolAllocator pa(allocator, CUSTOM_SIZE, false, true); + EXPECT_EQ(pa.getWindowSize(), CUSTOM_SIZE); + EXPECT_EQ(pa.getMemUsage(), 0ULL); + auto* ptr = pa.allocate(bufSize); + + EXPECT_NE(ptr, nullptr); + EXPECT_LE(allocatedMemory.load(), MemoryAllowance - bufSize); + EXPECT_LE(allocatedMemory.load(), MemoryAllowance - CUSTOM_SIZE); + pa.deallocate(ptr); + // B/c this PoolAllocator frees memory only when it's destroyed. + EXPECT_LE(allocatedMemory.load(), MemoryAllowance - bufSize); + EXPECT_LE(allocatedMemory.load(), MemoryAllowance - CUSTOM_SIZE); + + bufSize = 64536; + auto* ptr1 = pa.allocate(bufSize); + + EXPECT_NE(ptr1, nullptr); + EXPECT_LE(allocatedMemory.load(), MemoryAllowance - bufSize); + + pa.deallocate(ptr1); + EXPECT_LE(allocatedMemory.load(), MemoryAllowance - CUSTOM_SIZE); + EXPECT_GE(allocatedMemory.load(), MemoryAllowance - bufSize); +} + +TEST_F(PoolallocatorTest, MultithreadedAccountedAllocationWithLock) +{ + const unsigned CUSTOM_SIZE = 1024; + PoolAllocator pa(allocator, CUSTOM_SIZE, false, true); + + const int THREAD_COUNT = 4; + const uint64_t ALLOC_PER_THREAD = 1024; + const uint64_t NUM_ALLOCS_PER_THREAD = 10; + std::vector threads; + + // Стартовое значение + uint64_t initialUsage = pa.getMemUsage(); + + // Запускаем несколько потоков, каждый сделает небольшое кол-во аллокаций + for (int i = 0; i < THREAD_COUNT; i++) + { + threads.emplace_back( + [&pa]() + { + for (uint64_t j = 0; j < NUM_ALLOCS_PER_THREAD; j++) + { + pa.allocate(ALLOC_PER_THREAD); + } + }); + } + + for (auto& th : threads) + th.join(); + + uint64_t expected = initialUsage + THREAD_COUNT * 10ULL * ALLOC_PER_THREAD; + EXPECT_GE(pa.getMemUsage(), expected); + // 2 * CUSTOM_SIZE semantics is structs allocation overhead. + EXPECT_GE(allocatedMemory.load(), + MemoryAllowance - (THREAD_COUNT * ALLOC_PER_THREAD * NUM_ALLOCS_PER_THREAD) - 2 * CUSTOM_SIZE); +} \ No newline at end of file diff --git a/tests/rowgroup-tests.cpp b/tests/rowgroup-tests.cpp index dffe3a36a..ca2be5c6d 100644 --- a/tests/rowgroup-tests.cpp +++ b/tests/rowgroup-tests.cpp @@ -380,42 +380,31 @@ class RGDataTest : public ::testing::Test // bool useStringTable = true; TEST_F(RGDataTest, AllocData) { - std::cout << " test allocatedMemery " << allocatedMemory.load() << " rowsize " << rg.getRowSize() << " " << rg.getMaxDataSize() << std::endl; rgD = rowgroup::RGData(rg, alloc); rg.setData(&rgD); rg.initRow(&r); rg.getRow(0, &r); - std::cout << " test inStringTable(colIndex) " << r.inStringTable(0) << std::endl; - std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl; auto currentAllocation = allocatedMemory.load(); EXPECT_LE(currentAllocation, MemoryAllowance - rg.getMaxDataSize()); r.setStringField(utils::ConstString{"testaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}, 0); - std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl; - std::cout << " test inStringTable " << r.getColumnWidth(0) << std::endl; EXPECT_LE(allocatedMemory.load(), currentAllocation); currentAllocation = allocatedMemory.load(); r.nextRow(); r.setStringField(utils::ConstString{"testaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}, 0); - std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl; - std::cout << " test inStringTable " << r.getColumnWidth(0) << std::endl; EXPECT_EQ(allocatedMemory.load(), currentAllocation); currentAllocation = allocatedMemory.load(); r.nextRow(); std::string longString(64 * 1024 + 1000, 'a'); auto cs = utils::ConstString(longString); - std::cout << "test longString " << longString.size() << " cs len " << cs.length()<< std::endl; r.setStringField(cs, 0); - std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl; - std::cout << " test inStringTable " << r.getColumnWidth(0) << std::endl; EXPECT_LE(allocatedMemory.load(), currentAllocation); rgD = rowgroup::RGData(rg); - std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl; EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); diff --git a/utils/common/fixedallocator.cpp b/utils/common/fixedallocator.cpp index be830d747..8d55ac7f8 100644 --- a/utils/common/fixedallocator.cpp +++ b/utils/common/fixedallocator.cpp @@ -21,21 +21,13 @@ ******************************************************************************************/ // This is one of the first files we compile, check the compiler... -#if defined(__GNUC__) -#if __GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 1) -#error "This is a very old GCC, and it's probably not going to work." -#endif -#else -#error "This compiler is not known and it's probably not going to work." -#endif - #include -#include -#include -#define FIXEDALLOCATOR_DLLEXPORT #include "fixedallocator.h" -#undef FIXEDALLOCATOR_DLLEXPORT + +#include +#include + using namespace std; @@ -75,15 +67,24 @@ void FixedAllocator::setAllocSize(uint allocSize) void FixedAllocator::newBlock() { - std::shared_ptr next; + // boost::shared_ptr next; capacityRemaining = elementCount * elementSize; if (!tmpSpace || mem.size() == 0) { - next.reset(new uint8_t[elementCount * elementSize]); - mem.push_back(next); - nextAlloc = next.get(); + if (alloc) + { + mem.emplace_back(boost::allocate_shared(*alloc, elementCount * elementSize)); + } + else + { + mem.emplace_back(boost::make_shared(elementCount * elementSize)); + } + // next.reset(new uint8_t[elementCount * elementSize]); + // mem.push_back(next); + // nextAlloc = next.get(); + nextAlloc = mem.back().get(); } else { diff --git a/utils/common/fixedallocator.h b/utils/common/fixedallocator.h index e311dacd2..64f4b684c 100644 --- a/utils/common/fixedallocator.h +++ b/utils/common/fixedallocator.h @@ -34,16 +34,23 @@ #include +#include #include -#include #include #include + +#include + +#include "countingallocator.h" #include "spinlock.h" #define EXPORT namespace utils { +using FixedAllocatorBufIntegralType = uint8_t; +using FixedAllocatorBufType = FixedAllocatorBufIntegralType[]; + class FixedAllocator { public: @@ -60,7 +67,8 @@ class FixedAllocator , lock(false) { } - EXPORT explicit FixedAllocator(unsigned long allocSize, bool isTmpSpace = false, + + EXPORT explicit FixedAllocator(allocators::CountingAllocator alloc, unsigned long allocSize, bool isTmpSpace = false, unsigned long numElements = DEFAULT_NUM_ELEMENTS) : capacityRemaining(0) , elementCount(numElements) @@ -70,8 +78,10 @@ class FixedAllocator , nextAlloc(0) , useLock(false) , lock(false) + , alloc(alloc) { } + EXPORT FixedAllocator(const FixedAllocator&); EXPORT FixedAllocator& operator=(const FixedAllocator&); virtual ~FixedAllocator() @@ -88,20 +98,21 @@ class FixedAllocator EXPORT void deallocateAll(); // drops all memory in use EXPORT uint64_t getMemUsage() const; void setUseLock(bool); - void setAllocSize(uint); + void setAllocSize(uint32_t); private: void newBlock(); - std::vector> mem; + std::vector> mem; unsigned long capacityRemaining; uint64_t elementCount; unsigned long elementSize; uint64_t currentlyStored; bool tmpSpace; - uint8_t* nextAlloc; + FixedAllocatorBufIntegralType* nextAlloc; bool useLock; std::atomic lock; + std::optional> alloc {}; }; inline void* FixedAllocator::allocate() diff --git a/utils/common/poolallocator.cpp b/utils/common/poolallocator.cpp index 7341844c9..d0a284d71 100644 --- a/utils/common/poolallocator.cpp +++ b/utils/common/poolallocator.cpp @@ -23,7 +23,8 @@ #include //#define NDEBUG #include - +#include +#include #include "poolallocator.h" @@ -37,6 +38,7 @@ PoolAllocator& PoolAllocator::operator=(const PoolAllocator& v) allocSize = v.allocSize; tmpSpace = v.tmpSpace; useLock = v.useLock; + alloc = v.alloc; deallocateAll(); return *this; } @@ -46,21 +48,29 @@ void PoolAllocator::deallocateAll() capacityRemaining = 0; nextAlloc = NULL; memUsage = 0; + // WIP double check the space is cleaned up. mem.clear(); oob.clear(); } void PoolAllocator::newBlock() { - std::shared_ptr next; + // boost::shared_ptr next; capacityRemaining = allocSize; if (!tmpSpace || mem.size() == 0) { - next.reset(new PoolAllocatorBufType[allocSize]); - mem.push_back(next); - nextAlloc = next.get(); + if (alloc) + { + mem.emplace_back(boost::allocate_shared(*alloc, allocSize)); + } + else + { + mem.emplace_back(boost::make_shared(allocSize)); + } + // mem.push_back(next); + nextAlloc = mem.back().get(); } else nextAlloc = mem.front().get(); @@ -71,7 +81,14 @@ void* PoolAllocator::allocOOB(uint64_t size) OOBMemInfo memInfo; memUsage += size; - memInfo.mem.reset(new PoolAllocatorBufType[size]); + if (alloc) + { + memInfo.mem = boost::allocate_shared(*alloc, size); + } + else + { + memInfo.mem = boost::make_shared(size); + } memInfo.size = size; void* ret = (void*)memInfo.mem.get(); oob[ret] = memInfo; diff --git a/utils/common/poolallocator.h b/utils/common/poolallocator.h index 64042e927..0de13eaca 100644 --- a/utils/common/poolallocator.h +++ b/utils/common/poolallocator.h @@ -27,17 +27,21 @@ #include #include +#include #include #include #include +#include + #include #include "countingallocator.h" namespace utils { -using PoolAllocatorBufType = uint8_t; +using PoolAllocatorBufIntegralType = uint8_t; +using PoolAllocatorBufType = PoolAllocatorBufIntegralType[]; class PoolAllocator { public: @@ -54,7 +58,7 @@ class PoolAllocator , lock(false) { } - PoolAllocator(allocators::CountingAllocator* allocator, unsigned windowSize = DEFAULT_WINDOW_SIZE, + PoolAllocator(allocators::CountingAllocator alloc, unsigned windowSize = DEFAULT_WINDOW_SIZE, bool isTmpSpace = false, bool _useLock = false) : allocSize(windowSize) , tmpSpace(isTmpSpace) @@ -63,7 +67,7 @@ class PoolAllocator , nextAlloc(0) , useLock(_useLock) , lock(false) - , allocator(allocator) + , alloc(alloc) { } PoolAllocator(const PoolAllocator& p) @@ -74,7 +78,7 @@ class PoolAllocator , nextAlloc(0) , useLock(p.useLock) , lock(false) - , allocator(p.allocator) + , alloc(p.alloc) { } virtual ~PoolAllocator() @@ -106,23 +110,22 @@ class PoolAllocator void* allocOOB(uint64_t size); unsigned allocSize; - std::vector> mem; + std::vector> mem; bool tmpSpace; unsigned capacityRemaining; uint64_t memUsage; - PoolAllocatorBufType* nextAlloc; + PoolAllocatorBufIntegralType* nextAlloc; bool useLock; std::atomic lock; struct OOBMemInfo { - std::shared_ptr mem; + boost::shared_ptr mem; uint64_t size; }; typedef std::map OutOfBandMap; OutOfBandMap oob; // for mem chunks bigger than the window size; these can be dealloc'd - // WIP rename to allocator - allocators::CountingAllocator* allocator = nullptr; + std::optional> alloc {}; }; inline void* PoolAllocator::allocate(uint64_t size) diff --git a/utils/common/stlpoolallocator.h b/utils/common/stlpoolallocator.h index 1ca6f0880..0864deb08 100644 --- a/utils/common/stlpoolallocator.h +++ b/utils/common/stlpoolallocator.h @@ -25,6 +25,7 @@ #include #include #include "poolallocator.h" +#include "resourcemanager.h" #undef min #undef max @@ -61,6 +62,7 @@ class STLPoolAllocator }; STLPoolAllocator() throw(); + STLPoolAllocator(joblist::ResourceManager* rm); STLPoolAllocator(const STLPoolAllocator&) throw(); STLPoolAllocator(uint32_t capacity) throw(); template @@ -94,6 +96,20 @@ STLPoolAllocator::STLPoolAllocator() throw() pa.reset(new PoolAllocator(DEFAULT_SIZE)); } +template +STLPoolAllocator::STLPoolAllocator(joblist::ResourceManager* rm) +{ + if (rm) + { + auto alloc = rm->getAllocator(); + pa.reset(new PoolAllocator(alloc)); + } + else + { + pa.reset(new PoolAllocator(DEFAULT_SIZE)); + } +} + template STLPoolAllocator::STLPoolAllocator(const STLPoolAllocator& s) throw() { diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index d7cea726f..01b7cf1c3 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -24,6 +24,7 @@ #include "hasher.h" #include "lbidlist.h" +#include "resourcemanager.h" #include "spinlock.h" #include "vlarray.h" #include "threadnaming.h" @@ -36,10 +37,17 @@ using namespace joblist; namespace joiner { +// TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, +// uint32_t smallJoinColumn, uint32_t largeJoinColumn, JoinType jt, +// threadpool::ThreadPool* jsThreadPool) +// : TupleJoiner(smallInput, largeInput, smallJoinColumn, largeJoinColumn, jt, jsThreadPool, nullptr) +// { +// } + // Typed joiner ctor TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, uint32_t smallJoinColumn, uint32_t largeJoinColumn, JoinType jt, - threadpool::ThreadPool* jsThreadPool, const uint64_t numCores) + threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, const uint64_t numCores) : smallRG(smallInput) , largeRG(largeInput) , joinAlg(INSERTING) @@ -64,7 +72,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R _pool.reset(new boost::shared_ptr[bucketCount]); for (i = 0; i < bucketCount; i++) { - STLPoolAllocator> alloc; + STLPoolAllocator> alloc(resourceManager_); _pool[i] = alloc.getPoolAllocator(); ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc)); } @@ -75,7 +83,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R _pool.reset(new boost::shared_ptr[bucketCount]); for (i = 0; i < bucketCount; i++) { - STLPoolAllocator> alloc; + STLPoolAllocator> alloc(resourceManager_); _pool[i] = alloc.getPoolAllocator(); sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)); } @@ -86,7 +94,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R _pool.reset(new boost::shared_ptr[bucketCount]); for (i = 0; i < bucketCount; i++) { - STLPoolAllocator> alloc; + STLPoolAllocator> alloc(resourceManager_); _pool[i] = alloc.getPoolAllocator(); h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc)); } @@ -143,10 +151,17 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R nullValueForJoinColumn = smallNullRow.getSignedNullValue(smallJoinColumn); } +// TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, +// const vector& smallJoinColumns, const vector& largeJoinColumns, +// JoinType jt, threadpool::ThreadPool* jsThreadPool) +// : TupleJoiner(smallInput, largeInput, smallJoinColumns, largeJoinColumns, jt, jsThreadPool, nullptr) +// { +// } + // Typeless joiner ctor TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, const vector& smallJoinColumns, const vector& largeJoinColumns, - JoinType jt, threadpool::ThreadPool* jsThreadPool, const uint64_t numCores) + JoinType jt, threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, const uint64_t numCores) : smallRG(smallInput) , largeRG(largeInput) , joinAlg(INSERTING) @@ -170,7 +185,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R ht.reset(new boost::scoped_ptr[bucketCount]); for (i = 0; i < bucketCount; i++) { - STLPoolAllocator> alloc; + STLPoolAllocator> alloc(resourceManager_); _pool[i] = alloc.getPoolAllocator(); ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); } @@ -226,7 +241,10 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R // TODO: make it explicit to avoid future confusion. storedKeyAlloc.reset(new FixedAllocator[numCores]); for (i = 0; i < (uint)numCores; i++) - storedKeyAlloc[i].setAllocSize(keyLength); + { + auto alloc = resourceManager_->getAllocator(); + storedKeyAlloc[i] = FixedAllocator(alloc, keyLength); + } } TupleJoiner::TupleJoiner() @@ -856,7 +874,11 @@ void TupleJoiner::setInUM() tmpKeyAlloc.reset(new FixedAllocator[threadCount]); for (i = 0; i < threadCount; i++) - tmpKeyAlloc[i] = FixedAllocator(keyLength, true); + { + auto alloc = resourceManager_->getAllocator(); + tmpKeyAlloc[i] = FixedAllocator(alloc, keyLength, true); + + } } } @@ -911,7 +933,10 @@ void TupleJoiner::setInUM(vector& rgs) tmpKeyAlloc.reset(new FixedAllocator[threadCount]); for (i = 0; i < threadCount; i++) - tmpKeyAlloc[i] = FixedAllocator(keyLength, true); + { + auto alloc = resourceManager_->getAllocator(); + tmpKeyAlloc[i] = FixedAllocator(alloc, keyLength, true); + } } } @@ -967,7 +992,10 @@ void TupleJoiner::setThreadCount(uint32_t cnt) tmpKeyAlloc.reset(new FixedAllocator[threadCount]); for (uint32_t i = 0; i < threadCount; i++) - tmpKeyAlloc[i] = FixedAllocator(keyLength, true); + { + auto alloc = resourceManager_->getAllocator(); + tmpKeyAlloc[i] = FixedAllocator(alloc, keyLength, true); + } } if (fe) @@ -1839,6 +1867,7 @@ std::shared_ptr TupleJoiner::copyForDiskJoin() ret->discreteValues.reset(new bool[smallKeyColumns.size()]); ret->cpValues.reset(new vector[smallKeyColumns.size()]); + ret->resourceManager_ = resourceManager_; for (uint32_t i = 0; i < smallKeyColumns.size(); i++) { @@ -1877,7 +1906,10 @@ std::shared_ptr TupleJoiner::copyForDiskJoin() { ret->storedKeyAlloc.reset(new FixedAllocator[numCores]); for (int i = 0; i < numCores; i++) - ret->storedKeyAlloc[i].setAllocSize(keyLength); + { + auto alloc = resourceManager_->getAllocator(); + storedKeyAlloc[i] = FixedAllocator(alloc, keyLength); + } } ret->numCores = numCores; diff --git a/utils/joiner/tuplejoiner.h b/utils/joiner/tuplejoiner.h index 98af11b31..82e5c9fb3 100644 --- a/utils/joiner/tuplejoiner.h +++ b/utils/joiner/tuplejoiner.h @@ -26,6 +26,7 @@ #include #include +#include "resourcemanager.h" #include "rowgroup.h" #include "joiner.h" #include "fixedallocator.h" @@ -266,14 +267,22 @@ class TupleJoiner }; /* ctor to use for numeric join */ + // TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, + // uint32_t smallJoinColumn, uint32_t largeJoinColumn, joblist::JoinType jt, + // threadpool::ThreadPool* jsThreadPool); + TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, uint32_t smallJoinColumn, uint32_t largeJoinColumn, joblist::JoinType jt, - threadpool::ThreadPool* jsThreadPool, const uint64_t numCores); + threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, const uint64_t numCores); /* ctor to use for string & compound join */ - TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, + // TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, + // const std::vector& smallJoinColumns, const std::vector& largeJoinColumns, + // joblist::JoinType jt, threadpool::ThreadPool* jsThreadPool); + + TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, const std::vector& smallJoinColumns, const std::vector& largeJoinColumns, - joblist::JoinType jt, threadpool::ThreadPool* jsThreadPool, const uint64_t numCores); + joblist::JoinType jt, threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, const uint64_t numCores); ~TupleJoiner(); @@ -562,6 +571,8 @@ class TupleJoiner void bucketsToTables(buckets_t*, hash_table_t*); bool _convertToDiskJoin; + joblist::ResourceManager* resourceManager_ = nullptr; + }; } // namespace joiner