1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

fix(join, UM, perf): UM join is multi-threaded now (#3286)

* chore: UM join is multi-threaded now

* fix(UMjoin):  replace TR1 maps with stdlib versions
This commit is contained in:
drrtuy
2024-08-28 18:18:57 +01:00
committed by Leonid Fedorov
parent 1d40b4bb45
commit 47d01b2d2f
4 changed files with 75 additions and 74 deletions

View File

@ -119,12 +119,9 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo)
allowDJS = false; allowDJS = false;
numCores = resourceManager->numCores(); numCores = resourceManager->numCores();
if (numCores <= 0) if (numCores <= 0)
numCores = 8; numCores = 8;
/* Debugging, rand() is used to simulate failures
time_t t = time(NULL);
srand(t);
*/
} }
TupleHashJoinStep::~TupleHashJoinStep() TupleHashJoinStep::~TupleHashJoinStep()
@ -207,7 +204,6 @@ void TupleHashJoinStep::join()
jobstepThreadPool.join(djsReader); jobstepThreadPool.join(djsReader);
jobstepThreadPool.join(djsRelay); jobstepThreadPool.join(djsRelay);
// cout << "THJS: joined all DJS threads, shared usage = " << *djsSmallUsage << endl;
} }
} }
@ -305,28 +301,32 @@ void TupleHashJoinStep::startSmallRunners(uint index)
handle abort, out of memory, etc handle abort, out of memory, etc
*/ */
/* To measure wall-time spent constructing the small-side tables...
boost::posix_time::ptime end_time, start_time =
boost::posix_time::microsec_clock::universal_time();
*/
stopMemTracking = false; stopMemTracking = false;
utils::VLArray<uint64_t> jobs(numCores); utils::VLArray<uint64_t> jobs(numCores);
uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); }); uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); });
// starting 1 thread when in PM mode, since it's only inserting into a // starting 1 thread when in PM mode, since it's only inserting into a
// vector of rows. The rest will be started when converted to UM mode. // vector of rows. The rest will be started when converted to UM mode.
if (joiner->inUM()) if (joiner->inUM())
{
for (int i = 0; i < numCores; i++) for (int i = 0; i < numCores; i++)
{
jobs[i] = jobstepThreadPool.invoke([this, i, index, &jobs] { this->smallRunnerFcn(index, i, jobs); }); jobs[i] = jobstepThreadPool.invoke([this, i, index, &jobs] { this->smallRunnerFcn(index, i, jobs); });
}
}
else else
{
jobs[0] = jobstepThreadPool.invoke([this, index, &jobs] { this->smallRunnerFcn(index, 0, jobs); }); jobs[0] = jobstepThreadPool.invoke([this, index, &jobs] { this->smallRunnerFcn(index, 0, jobs); });
}
// wait for the first thread to join, then decide whether the others exist and need joining // wait for the first thread to join, then decide whether the others exist and need joining
jobstepThreadPool.join(jobs[0]); jobstepThreadPool.join(jobs[0]);
if (joiner->inUM()) if (joiner->inUM())
{
for (int i = 1; i < numCores; i++) for (int i = 1; i < numCores; i++)
{
jobstepThreadPool.join(jobs[i]); jobstepThreadPool.join(jobs[i]);
}
}
// stop the monitor thread // stop the monitor thread
memTrackMutex.lock(); memTrackMutex.lock();
stopMemTracking = true; stopMemTracking = true;
@ -468,10 +468,13 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t*
if (!joiner->inUM() && (memUsedByEachJoin[index] > pmMemLimit)) if (!joiner->inUM() && (memUsedByEachJoin[index] > pmMemLimit))
{ {
joiner->setInUM(rgData[index]); joiner->setInUM(rgData[index]);
for (int i = 1; i < numCores; i++) for (int i = 1; i < numCores; i++)
{
jobs[i] = jobs[i] =
jobstepThreadPool.invoke([this, i, index, jobs] { this->smallRunnerFcn(index, i, jobs); }); jobstepThreadPool.invoke([this, i, index, jobs] { this->smallRunnerFcn(index, i, jobs); });
} }
}
next: next:
dlMutex.lock(); dlMutex.lock();
more = smallDL->next(smallIt, &oneRG); more = smallDL->next(smallIt, &oneRG);
@ -1765,15 +1768,7 @@ void TupleHashJoinStep::joinOneRG(
for (j = 0; j < smallSideCount; j++) for (j = 0; j < smallSideCount; j++)
{ {
(*tjoiners)[j]->match(largeSideRow, k, threadID, &joinMatches[j]); (*tjoiners)[j]->match(largeSideRow, k, threadID, &joinMatches[j]);
/* Debugging code to print the matches
Row r;
smallRGs[j].initRow(&r);
cout << joinMatches[j].size() << " matches: \n";
for (uint32_t z = 0; z < joinMatches[j].size(); z++) {
r.setData(joinMatches[j][z]);
cout << " " << r.toString() << endl;
}
*/
matchCount = joinMatches[j].size(); matchCount = joinMatches[j].size();
if ((*tjoiners)[j]->hasFEFilter() && matchCount > 0) if ((*tjoiners)[j]->hasFEFilter() && matchCount > 0)
@ -1861,8 +1856,9 @@ void TupleHashJoinStep::joinOneRG(
void TupleHashJoinStep::generateJoinResultSet(const vector<vector<Row::Pointer> >& joinerOutput, Row& baseRow, void TupleHashJoinStep::generateJoinResultSet(const vector<vector<Row::Pointer> >& joinerOutput, Row& baseRow,
const std::shared_ptr<std::shared_ptr<int[]>[]>& mappings, const std::shared_ptr<std::shared_ptr<int[]>[]>& mappings,
const uint32_t depth, RowGroup& l_outputRG, RGData& rgData, const uint32_t depth, RowGroup& l_outputRG, RGData& rgData,
vector<RGData>& outputData, const std::shared_ptr<Row[]>& smallRows, vector<RGData>& outputData,
Row& joinedRow, RowGroupDL* dlp) const std::shared_ptr<Row[]>& smallRows, Row& joinedRow,
RowGroupDL* dlp)
{ {
uint32_t i; uint32_t i;
Row& smallRow = smallRows[depth]; Row& smallRow = smallRows[depth];

View File

@ -74,9 +74,11 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep
void tableOid1(execplan::CalpontSystemCatalog::OID tableOid1) void tableOid1(execplan::CalpontSystemCatalog::OID tableOid1)
{ {
fTableOID1 = tableOid1; fTableOID1 = tableOid1;
if (fTableOID1 < 3000) if (fTableOID1 >= 1000 && fTableOID1 < 3000)
{
numCores = 1; // syscat query, no need for more than 1 thread numCores = 1; // syscat query, no need for more than 1 thread
} }
}
void tableOid2(execplan::CalpontSystemCatalog::OID tableOid2) void tableOid2(execplan::CalpontSystemCatalog::OID tableOid2)
{ {
fTableOID2 = tableOid2; fTableOID2 = tableOid2;
@ -536,8 +538,8 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep
void startJoinThreads(); void startJoinThreads();
void generateJoinResultSet(const std::vector<std::vector<rowgroup::Row::Pointer>>& joinerOutput, void generateJoinResultSet(const std::vector<std::vector<rowgroup::Row::Pointer>>& joinerOutput,
rowgroup::Row& baseRow, rowgroup::Row& baseRow,
const std::shared_ptr<std::shared_ptr<int[]>[] >& mappings, const std::shared_ptr<std::shared_ptr<int[]>[]>& mappings, const uint32_t depth,
const uint32_t depth, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData,
std::vector<rowgroup::RGData>& outputData, std::vector<rowgroup::RGData>& outputData,
const std::shared_ptr<rowgroup::Row[]>& smallRows, rowgroup::Row& joinedRow, const std::shared_ptr<rowgroup::Row[]>& smallRows, rowgroup::Row& joinedRow,
RowGroupDL* outputDL); RowGroupDL* outputDL);
@ -653,4 +655,3 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep
}; };
} // namespace joblist } // namespace joblist

View File

@ -20,12 +20,13 @@
#include <algorithm> #include <algorithm>
#include <vector> #include <vector>
#include <limits> #include <limits>
#include <tr1/unordered_set> #include <unordered_set>
#include "hasher.h" #include "hasher.h"
#include "lbidlist.h" #include "lbidlist.h"
#include "spinlock.h" #include "spinlock.h"
#include "vlarray.h" #include "vlarray.h"
#include "threadnaming.h"
using namespace std; using namespace std;
using namespace rowgroup; using namespace rowgroup;
@ -282,8 +283,7 @@ void TupleJoiner::bucketsToTables(buckets_t* buckets, hash_table_t* tables)
done = false; done = false;
continue; continue;
} }
for (auto& element : buckets[i]) tables[i]->insert(buckets[i].begin(), buckets[i].end());
tables[i]->insert(element);
m_bucketLocks[i].unlock(); m_bucketLocks[i].unlock();
wasProductive = true; wasProductive = true;
buckets[i].clear(); buckets[i].clear();
@ -306,7 +306,7 @@ void TupleJoiner::um_insertTypeless(uint threadID, uint rowCount, Row& r)
if (td[i].len == 0) if (td[i].len == 0)
continue; continue;
uint bucket = bucketPicker((char*)td[i].data, td[i].len, bpSeed) & bucketMask; uint bucket = bucketPicker((char*)td[i].data, td[i].len, bpSeed) & bucketMask;
v[bucket].push_back(pair<TypelessData, Row::Pointer>(td[i], r.getPointer())); v[bucket].emplace_back(pair<TypelessData, Row::Pointer>(td[i], r.getPointer()));
} }
bucketsToTables(&v[0], ht.get()); bucketsToTables(&v[0], ht.get());
} }
@ -323,9 +323,9 @@ void TupleJoiner::um_insertLongDouble(uint rowCount, Row& r)
uint bucket = bucketPicker((char*)&smallKey, 10, bpSeed) & uint bucket = bucketPicker((char*)&smallKey, 10, bpSeed) &
bucketMask; // change if we decide to support windows again bucketMask; // change if we decide to support windows again
if (UNLIKELY(smallKey == joblist::LONGDOUBLENULL)) if (UNLIKELY(smallKey == joblist::LONGDOUBLENULL))
v[bucket].push_back(pair<long double, Row::Pointer>(joblist::LONGDOUBLENULL, r.getPointer())); v[bucket].emplace_back(pair<long double, Row::Pointer>(joblist::LONGDOUBLENULL, r.getPointer()));
else else
v[bucket].push_back(pair<long double, Row::Pointer>(smallKey, r.getPointer())); v[bucket].emplace_back(pair<long double, Row::Pointer>(smallKey, r.getPointer()));
} }
bucketsToTables(&v[0], ld.get()); bucketsToTables(&v[0], ld.get());
} }
@ -345,9 +345,9 @@ void TupleJoiner::um_insertInlineRows(uint rowCount, Row& r)
smallKey = (int64_t)r.getUintField(smallKeyColumn); smallKey = (int64_t)r.getUintField(smallKeyColumn);
uint bucket = bucketPicker((char*)&smallKey, sizeof(smallKey), bpSeed) & bucketMask; uint bucket = bucketPicker((char*)&smallKey, sizeof(smallKey), bpSeed) & bucketMask;
if (UNLIKELY(smallKey == nullValueForJoinColumn)) if (UNLIKELY(smallKey == nullValueForJoinColumn))
v[bucket].push_back(pair<int64_t, uint8_t*>(getJoinNullValue(), r.getData())); v[bucket].emplace_back(pair<int64_t, uint8_t*>(getJoinNullValue(), r.getData()));
else else
v[bucket].push_back(pair<int64_t, uint8_t*>(smallKey, r.getData())); v[bucket].emplace_back(pair<int64_t, uint8_t*>(smallKey, r.getData()));
} }
bucketsToTables(&v[0], h.get()); bucketsToTables(&v[0], h.get());
} }
@ -367,9 +367,9 @@ void TupleJoiner::um_insertStringTable(uint rowCount, Row& r)
smallKey = (int64_t)r.getUintField(smallKeyColumn); smallKey = (int64_t)r.getUintField(smallKeyColumn);
uint bucket = bucketPicker((char*)&smallKey, sizeof(smallKey), bpSeed) & bucketMask; uint bucket = bucketPicker((char*)&smallKey, sizeof(smallKey), bpSeed) & bucketMask;
if (UNLIKELY(smallKey == nullValueForJoinColumn)) if (UNLIKELY(smallKey == nullValueForJoinColumn))
v[bucket].push_back(pair<int64_t, Row::Pointer>(getJoinNullValue(), r.getPointer())); v[bucket].emplace_back(pair<int64_t, Row::Pointer>(getJoinNullValue(), r.getPointer()));
else else
v[bucket].push_back(pair<int64_t, Row::Pointer>(smallKey, r.getPointer())); v[bucket].emplace_back(pair<int64_t, Row::Pointer>(smallKey, r.getPointer()));
} }
bucketsToTables(&v[0], sth.get()); bucketsToTables(&v[0], sth.get());
} }
@ -670,6 +670,8 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
} }
} }
using unordered_set_int128 = std::unordered_set<int128_t, utils::Hash128, utils::Equal128>;
void TupleJoiner::doneInserting() void TupleJoiner::doneInserting()
{ {
// a minor textual cleanup // a minor textual cleanup
@ -694,7 +696,6 @@ void TupleJoiner::doneInserting()
for (col = 0; col < smallKeyColumns.size(); col++) for (col = 0; col < smallKeyColumns.size(); col++)
{ {
typedef std::tr1::unordered_set<int128_t, utils::Hash128, utils::Equal128> unordered_set_int128;
unordered_set_int128 uniquer; unordered_set_int128 uniquer;
unordered_set_int128::iterator uit; unordered_set_int128::iterator uit;
sthash_t::iterator sthit; sthash_t::iterator sthit;
@ -811,6 +812,8 @@ void TupleJoiner::setInPM()
void TupleJoiner::umJoinConvert(size_t begin, size_t end) void TupleJoiner::umJoinConvert(size_t begin, size_t end)
{ {
utils::setThreadName("TJUMJoinConvert1");
Row smallRow; Row smallRow;
smallRG.initRow(&smallRow); smallRG.initRow(&smallRow);
@ -862,6 +865,8 @@ void TupleJoiner::setInUM()
void TupleJoiner::umJoinConvert(uint threadID, vector<RGData>& rgs, size_t begin, size_t end) void TupleJoiner::umJoinConvert(uint threadID, vector<RGData>& rgs, size_t begin, size_t end)
{ {
utils::setThreadName("TJUMJoinConvert2");
RowGroup l_smallRG(smallRG); RowGroup l_smallRG(smallRG);
while (begin < end) while (begin < end)

View File

@ -24,7 +24,7 @@
#include <boost/scoped_ptr.hpp> #include <boost/scoped_ptr.hpp>
#include <boost/scoped_array.hpp> #include <boost/scoped_array.hpp>
#include <tr1/unordered_map> #include <unordered_map>
#include "rowgroup.h" #include "rowgroup.h"
#include "joiner.h" #include "joiner.h"
@ -464,19 +464,18 @@ class TupleJoiner
void setConvertToDiskJoin(); void setConvertToDiskJoin();
private: private:
typedef std::tr1::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>, typedef std::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>,
utils::STLPoolAllocator<std::pair<const int64_t, uint8_t*> > > utils::STLPoolAllocator<std::pair<const int64_t, uint8_t*> > >
hash_t; hash_t;
typedef std::tr1::unordered_multimap< typedef std::unordered_multimap<int64_t, rowgroup::Row::Pointer, hasher, std::equal_to<int64_t>,
int64_t, rowgroup::Row::Pointer, hasher, std::equal_to<int64_t>,
utils::STLPoolAllocator<std::pair<const int64_t, rowgroup::Row::Pointer> > > utils::STLPoolAllocator<std::pair<const int64_t, rowgroup::Row::Pointer> > >
sthash_t; sthash_t;
typedef std::tr1::unordered_multimap< typedef std::unordered_multimap<
TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to<TypelessData>, TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to<TypelessData>,
utils::STLPoolAllocator<std::pair<const TypelessData, rowgroup::Row::Pointer> > > utils::STLPoolAllocator<std::pair<const TypelessData, rowgroup::Row::Pointer> > >
typelesshash_t; typelesshash_t;
// MCOL-1822 Add support for Long Double AVG/SUM small side // MCOL-1822 Add support for Long Double AVG/SUM small side
typedef std::tr1::unordered_multimap< typedef std::unordered_multimap<
long double, rowgroup::Row::Pointer, hasher, LongDoubleEq, long double, rowgroup::Row::Pointer, hasher, LongDoubleEq,
utils::STLPoolAllocator<std::pair<const long double, rowgroup::Row::Pointer> > > utils::STLPoolAllocator<std::pair<const long double, rowgroup::Row::Pointer> > >
ldhash_t; ldhash_t;