diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index 513f6e0cf..dffc17ec9 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -122,6 +122,10 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) if (numCores <= 0) numCores = 8; + + // At least one to enable DJ. + joinerRunnerInputRecordsStats.resize(1, 0); + joinerRunnerInputMatchedStats.resize(1, 0); } TupleHashJoinStep::~TupleHashJoinStep() diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index 772715ab1..8b2c71164 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -38,10 +38,53 @@ using namespace joblist; namespace joiner { +constexpr const size_t DEFAULT_BUCKET_COUNT = 10; + +void TupleJoiner::initHashMaps(uint32_t& smallJoinColumn) +{ + if (typelessJoin) + { + for (size_t i = 0; i < bucketCount; i++) + { + auto alloc = resourceManager_->getAllocator>(); + ht.emplace_back(std::unique_ptr( + new typelesshash_t(DEFAULT_BUCKET_COUNT, hasher(), typelesshash_t::key_equal(), alloc))); + } + } + else if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE) + { + for (size_t i = 0; i < bucketCount; i++) + { + auto alloc = resourceManager_->getAllocator>(); + ld.emplace_back(std::unique_ptr( + new ldhash_t(DEFAULT_BUCKET_COUNT, hasher(), ldhash_t::key_equal(), alloc))); + } + } + else if (smallRG.usesStringTable()) + { + for (size_t i = 0; i < bucketCount; i++) + { + auto alloc = resourceManager_->getAllocator>(); + sth.emplace_back(std::unique_ptr( + new sthash_t(DEFAULT_BUCKET_COUNT, hasher(), sthash_t::key_equal(), alloc))); + } + } + else + { + for (size_t i = 0; i < bucketCount; i++) + { + auto alloc = resourceManager_->getAllocator>(); + h.emplace_back( + std::unique_ptr(new hash_t(DEFAULT_BUCKET_COUNT, hasher(), hash_t::key_equal(), alloc))); + } + } +} + // Typed joiner ctor TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, uint32_t smallJoinColumn, uint32_t largeJoinColumn, JoinType jt, - threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, const uint64_t numCores) + threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, + const uint64_t numCores) : smallRG(smallInput) , largeRG(largeInput) , joinAlg(INSERTING) @@ -56,38 +99,13 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R , _convertToDiskJoin(false) , resourceManager_(rm) { - uint i; - auto alloc = resourceManager_->getAllocator(); rows.reset(new RowPointersVec(alloc)); getBucketCount(); m_bucketLocks.reset(new boost::mutex[bucketCount]); - if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE) - { - for (i = 0; i < bucketCount; i++) - { - auto alloc = resourceManager_->getAllocator>(); - ld.emplace_back(std::unique_ptr(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc))); - } - } - else if (smallRG.usesStringTable()) - { - for (i = 0; i < bucketCount; i++) - { - auto alloc = resourceManager_->getAllocator>(); - sth.emplace_back(std::unique_ptr(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc))); - } - } - else - { - for (i = 0; i < bucketCount; i++) - { - auto alloc = resourceManager_->getAllocator>(); - h.emplace_back(std::unique_ptr(new hash_t(10, hasher(), hash_t::key_equal(), alloc))); - } - } + initHashMaps(smallJoinColumn); smallRG.initRow(&smallNullRow); @@ -167,12 +185,10 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R getBucketCount(); - for (i = 0; i < bucketCount; i++) - { - auto alloc = resourceManager_->getAllocator>(); - ht.emplace_back(std::unique_ptr( - new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc))); - } + uint32_t unused = 0; + // Unused b/c this is a typeless joiner + initHashMaps(unused); + m_bucketLocks.reset(new boost::mutex[bucketCount]); smallRG.initRow(&smallNullRow); @@ -1771,25 +1787,20 @@ void TupleJoiner::setTableName(const string& tname) tableName = tname; } -/* Disk based join support */ +void TupleJoiner::clearHashMaps() +{ + ht.clear(); + ld.clear(); + sth.clear(); + h.clear(); +} +/* Disk based join support */ void TupleJoiner::clearData() { // This loop calls dtors and deallocates mem. - for (uint i = 0; i < bucketCount; i++) - { - auto alloc = resourceManager_->getAllocator>(); - if (typelessJoin) - ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); - else if (smallRG.getColTypes()[smallKeyColumns[0]] == CalpontSystemCatalog::LONGDOUBLE) - ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc)); - else if (smallRG.usesStringTable()) - sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)); - else - { - h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc)); - } - } + clearHashMaps(); + initHashMaps(smallKeyColumns[0]); auto alloc = resourceManager_->getAllocator(); rows.reset(new RowPointersVec(alloc)); diff --git a/utils/joiner/tuplejoiner.h b/utils/joiner/tuplejoiner.h index d2ab743db..47c99390a 100644 --- a/utils/joiner/tuplejoiner.h +++ b/utils/joiner/tuplejoiner.h @@ -470,6 +470,8 @@ class TupleJoiner wasAborted_ = true; } + void initHashMaps(uint32_t& smallJoinColumn); + void clearHashMaps(); private: typedef std::unordered_multimap, allocators::CountingAllocator>>