You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
fix(disk-based-join): this fixes multiple SEGV for disk-based join algo
This commit is contained in:
@ -122,6 +122,10 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo)
|
|||||||
|
|
||||||
if (numCores <= 0)
|
if (numCores <= 0)
|
||||||
numCores = 8;
|
numCores = 8;
|
||||||
|
|
||||||
|
// At least one to enable DJ.
|
||||||
|
joinerRunnerInputRecordsStats.resize(1, 0);
|
||||||
|
joinerRunnerInputMatchedStats.resize(1, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
TupleHashJoinStep::~TupleHashJoinStep()
|
TupleHashJoinStep::~TupleHashJoinStep()
|
||||||
|
@ -38,10 +38,53 @@ using namespace joblist;
|
|||||||
|
|
||||||
namespace joiner
|
namespace joiner
|
||||||
{
|
{
|
||||||
|
constexpr const size_t DEFAULT_BUCKET_COUNT = 10;
|
||||||
|
|
||||||
|
void TupleJoiner::initHashMaps(uint32_t& smallJoinColumn)
|
||||||
|
{
|
||||||
|
if (typelessJoin)
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < bucketCount; i++)
|
||||||
|
{
|
||||||
|
auto alloc = resourceManager_->getAllocator<pair<const TypelessData, Row::Pointer>>();
|
||||||
|
ht.emplace_back(std::unique_ptr<typelesshash_t>(
|
||||||
|
new typelesshash_t(DEFAULT_BUCKET_COUNT, hasher(), typelesshash_t::key_equal(), alloc)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE)
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < bucketCount; i++)
|
||||||
|
{
|
||||||
|
auto alloc = resourceManager_->getAllocator<pair<const long double, Row::Pointer>>();
|
||||||
|
ld.emplace_back(std::unique_ptr<ldhash_t>(
|
||||||
|
new ldhash_t(DEFAULT_BUCKET_COUNT, hasher(), ldhash_t::key_equal(), alloc)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (smallRG.usesStringTable())
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < bucketCount; i++)
|
||||||
|
{
|
||||||
|
auto alloc = resourceManager_->getAllocator<pair<const int64_t, Row::Pointer>>();
|
||||||
|
sth.emplace_back(std::unique_ptr<sthash_t>(
|
||||||
|
new sthash_t(DEFAULT_BUCKET_COUNT, hasher(), sthash_t::key_equal(), alloc)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < bucketCount; i++)
|
||||||
|
{
|
||||||
|
auto alloc = resourceManager_->getAllocator<pair<const int64_t, uint8_t*>>();
|
||||||
|
h.emplace_back(
|
||||||
|
std::unique_ptr<hash_t>(new hash_t(DEFAULT_BUCKET_COUNT, hasher(), hash_t::key_equal(), alloc)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Typed joiner ctor
|
// Typed joiner ctor
|
||||||
TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput,
|
TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput,
|
||||||
uint32_t smallJoinColumn, uint32_t largeJoinColumn, JoinType jt,
|
uint32_t smallJoinColumn, uint32_t largeJoinColumn, JoinType jt,
|
||||||
threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, const uint64_t numCores)
|
threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm,
|
||||||
|
const uint64_t numCores)
|
||||||
: smallRG(smallInput)
|
: smallRG(smallInput)
|
||||||
, largeRG(largeInput)
|
, largeRG(largeInput)
|
||||||
, joinAlg(INSERTING)
|
, joinAlg(INSERTING)
|
||||||
@ -56,38 +99,13 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
|
|||||||
, _convertToDiskJoin(false)
|
, _convertToDiskJoin(false)
|
||||||
, resourceManager_(rm)
|
, resourceManager_(rm)
|
||||||
{
|
{
|
||||||
uint i;
|
|
||||||
|
|
||||||
auto alloc = resourceManager_->getAllocator<rowgroup::Row::Pointer>();
|
auto alloc = resourceManager_->getAllocator<rowgroup::Row::Pointer>();
|
||||||
rows.reset(new RowPointersVec(alloc));
|
rows.reset(new RowPointersVec(alloc));
|
||||||
|
|
||||||
getBucketCount();
|
getBucketCount();
|
||||||
m_bucketLocks.reset(new boost::mutex[bucketCount]);
|
m_bucketLocks.reset(new boost::mutex[bucketCount]);
|
||||||
|
|
||||||
if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE)
|
initHashMaps(smallJoinColumn);
|
||||||
{
|
|
||||||
for (i = 0; i < bucketCount; i++)
|
|
||||||
{
|
|
||||||
auto alloc = resourceManager_->getAllocator<pair<const long double, Row::Pointer>>();
|
|
||||||
ld.emplace_back(std::unique_ptr<ldhash_t>(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (smallRG.usesStringTable())
|
|
||||||
{
|
|
||||||
for (i = 0; i < bucketCount; i++)
|
|
||||||
{
|
|
||||||
auto alloc = resourceManager_->getAllocator<pair<const int64_t, Row::Pointer>>();
|
|
||||||
sth.emplace_back(std::unique_ptr<sthash_t>(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
for (i = 0; i < bucketCount; i++)
|
|
||||||
{
|
|
||||||
auto alloc = resourceManager_->getAllocator<pair<const int64_t, uint8_t*>>();
|
|
||||||
h.emplace_back(std::unique_ptr<hash_t>(new hash_t(10, hasher(), hash_t::key_equal(), alloc)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
smallRG.initRow(&smallNullRow);
|
smallRG.initRow(&smallNullRow);
|
||||||
|
|
||||||
@ -167,12 +185,10 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
|
|||||||
|
|
||||||
getBucketCount();
|
getBucketCount();
|
||||||
|
|
||||||
for (i = 0; i < bucketCount; i++)
|
uint32_t unused = 0;
|
||||||
{
|
// Unused b/c this is a typeless joiner
|
||||||
auto alloc = resourceManager_->getAllocator<pair<const TypelessData, Row::Pointer>>();
|
initHashMaps(unused);
|
||||||
ht.emplace_back(std::unique_ptr<typelesshash_t>(
|
|
||||||
new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)));
|
|
||||||
}
|
|
||||||
m_bucketLocks.reset(new boost::mutex[bucketCount]);
|
m_bucketLocks.reset(new boost::mutex[bucketCount]);
|
||||||
|
|
||||||
smallRG.initRow(&smallNullRow);
|
smallRG.initRow(&smallNullRow);
|
||||||
@ -1771,25 +1787,20 @@ void TupleJoiner::setTableName(const string& tname)
|
|||||||
tableName = tname;
|
tableName = tname;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Disk based join support */
|
void TupleJoiner::clearHashMaps()
|
||||||
|
{
|
||||||
|
ht.clear();
|
||||||
|
ld.clear();
|
||||||
|
sth.clear();
|
||||||
|
h.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Disk based join support */
|
||||||
void TupleJoiner::clearData()
|
void TupleJoiner::clearData()
|
||||||
{
|
{
|
||||||
// This loop calls dtors and deallocates mem.
|
// This loop calls dtors and deallocates mem.
|
||||||
for (uint i = 0; i < bucketCount; i++)
|
clearHashMaps();
|
||||||
{
|
initHashMaps(smallKeyColumns[0]);
|
||||||
auto alloc = resourceManager_->getAllocator<pair<const TypelessData, Row::Pointer>>();
|
|
||||||
if (typelessJoin)
|
|
||||||
ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc));
|
|
||||||
else if (smallRG.getColTypes()[smallKeyColumns[0]] == CalpontSystemCatalog::LONGDOUBLE)
|
|
||||||
ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc));
|
|
||||||
else if (smallRG.usesStringTable())
|
|
||||||
sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc));
|
|
||||||
else
|
|
||||||
{
|
|
||||||
h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
auto alloc = resourceManager_->getAllocator<rowgroup::Row::Pointer>();
|
auto alloc = resourceManager_->getAllocator<rowgroup::Row::Pointer>();
|
||||||
rows.reset(new RowPointersVec(alloc));
|
rows.reset(new RowPointersVec(alloc));
|
||||||
|
@ -470,6 +470,8 @@ class TupleJoiner
|
|||||||
wasAborted_ = true;
|
wasAborted_ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void initHashMaps(uint32_t& smallJoinColumn);
|
||||||
|
void clearHashMaps();
|
||||||
private:
|
private:
|
||||||
typedef std::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>,
|
typedef std::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>,
|
||||||
allocators::CountingAllocator<std::pair<const int64_t, uint8_t*>>>
|
allocators::CountingAllocator<std::pair<const int64_t, uint8_t*>>>
|
||||||
|
Reference in New Issue
Block a user