1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

feat(): accounts hash tables RAM allocations/removes STLPoolAllocator

This commit is contained in:
drrtuy
2025-01-17 20:47:27 +00:00
parent aeebbff0cd
commit f594d27685
3 changed files with 68 additions and 38 deletions

View File

@ -302,7 +302,7 @@ void TupleHashJoinStep::startSmallRunners(uint index)
stopMemTracking = false; stopMemTracking = false;
utils::VLArray<uint64_t> jobs(numCores); utils::VLArray<uint64_t> jobs(numCores);
uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); }); // uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); });
// starting 1 thread when in PM mode, since it's only inserting into a // starting 1 thread when in PM mode, since it's only inserting into a
// vector of rows. The rest will be started when converted to UM mode. // vector of rows. The rest will be started when converted to UM mode.
if (joiners[index]->inUM()) if (joiners[index]->inUM())
@ -331,7 +331,7 @@ void TupleHashJoinStep::startSmallRunners(uint index)
stopMemTracking = true; stopMemTracking = true;
memTrackDone.notify_one(); memTrackDone.notify_one();
memTrackMutex.unlock(); memTrackMutex.unlock();
jobstepThreadPool.join(memMonitor); // jobstepThreadPool.join(memMonitor);
/* If there was an error or an abort, drain the input DL, /* If there was an error or an abort, drain the input DL,
do endOfInput on the output */ do endOfInput on the output */
@ -481,6 +481,22 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t*
dlMutex.unlock(); dlMutex.unlock();
} }
} }
catch (std::bad_alloc& exc)
{
if (!joinIsTooBig &&
(isDML || !allowDJS || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000)))
{
joinIsTooBig = true;
ostringstream oss;
oss << "(" << __LINE__ << ") "
<< logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG);
fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str());
errorMessage(oss.str());
status(logging::ERR_JOIN_TOO_BIG);
cout << "Join is too big, raise the UM join limit for now" << endl;
abort();
}
}
catch (...) catch (...)
{ {
handleException(std::current_exception(), logging::ERR_EXEMGR_MALFUNCTION, logging::ERR_JOIN_TOO_BIG, handleException(std::current_exception(), logging::ERR_EXEMGR_MALFUNCTION, logging::ERR_JOIN_TOO_BIG,
@ -2036,6 +2052,9 @@ void TupleHashJoinStep::abort()
JobStep::abort(); JobStep::abort();
boost::mutex::scoped_lock sl(djsLock); boost::mutex::scoped_lock sl(djsLock);
for (auto& joiner : joiners)
joiner->abort();
if (djs.size()) if (djs.size())
{ {
for (uint32_t i = 0, e = djs.size(); e < i; i++) for (uint32_t i = 0, e = djs.size(); e < i; i++)

View File

@ -18,6 +18,7 @@
#include "tuplejoiner.h" #include "tuplejoiner.h"
#include <algorithm> #include <algorithm>
#include <boost/thread/lock_types.hpp>
#include <vector> #include <vector>
#include <limits> #include <limits>
#include <unordered_set> #include <unordered_set>
@ -37,13 +38,6 @@ using namespace joblist;
namespace joiner namespace joiner
{ {
// TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput,
// uint32_t smallJoinColumn, uint32_t largeJoinColumn, JoinType jt,
// threadpool::ThreadPool* jsThreadPool)
// : TupleJoiner(smallInput, largeInput, smallJoinColumn, largeJoinColumn, jt, jsThreadPool, nullptr)
// {
// }
// Typed joiner ctor // Typed joiner ctor
TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput,
uint32_t smallJoinColumn, uint32_t largeJoinColumn, JoinType jt, uint32_t smallJoinColumn, uint32_t largeJoinColumn, JoinType jt,
@ -60,6 +54,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
, numCores(numCores) , numCores(numCores)
, jobstepThreadPool(jsThreadPool) , jobstepThreadPool(jsThreadPool)
, _convertToDiskJoin(false) , _convertToDiskJoin(false)
, resourceManager_(rm)
{ {
uint i; uint i;
@ -69,11 +64,12 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE) if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE)
{ {
ld.reset(new boost::scoped_ptr<ldhash_t>[bucketCount]); ld.reset(new boost::scoped_ptr<ldhash_t>[bucketCount]);
_pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]); // _pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]);
for (i = 0; i < bucketCount; i++) for (i = 0; i < bucketCount; i++)
{ {
STLPoolAllocator<pair<const long double, Row::Pointer>> alloc(resourceManager_); // STLPoolAllocator<pair<const long double, Row::Pointer>> alloc(resourceManager_);
_pool[i] = alloc.getPoolAllocator(); // _pool[i] = alloc.getPoolAllocator();
auto alloc = resourceManager_->getAllocator<pair<const long double, Row::Pointer>>();
ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc)); ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc));
} }
} }
@ -83,8 +79,9 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
_pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]); _pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]);
for (i = 0; i < bucketCount; i++) for (i = 0; i < bucketCount; i++)
{ {
STLPoolAllocator<pair<const int64_t, Row::Pointer>> alloc(resourceManager_); // STLPoolAllocator<pair<const int64_t, Row::Pointer>> alloc(resourceManager_);
_pool[i] = alloc.getPoolAllocator(); // _pool[i] = alloc.getPoolAllocator();
auto alloc = resourceManager_->getAllocator<pair<const int64_t, Row::Pointer>>();
sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)); sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc));
} }
} }
@ -94,8 +91,9 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
_pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]); _pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]);
for (i = 0; i < bucketCount; i++) for (i = 0; i < bucketCount; i++)
{ {
STLPoolAllocator<pair<const int64_t, uint8_t*>> alloc(resourceManager_); // STLPoolAllocator<pair<const int64_t, uint8_t*>> alloc(resourceManager_);
_pool[i] = alloc.getPoolAllocator(); // _pool[i] = alloc.getPoolAllocator();
auto alloc = resourceManager_->getAllocator<pair<const int64_t, uint8_t*>>();
h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc)); h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc));
} }
} }
@ -176,6 +174,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
, numCores(numCores) , numCores(numCores)
, jobstepThreadPool(jsThreadPool) , jobstepThreadPool(jsThreadPool)
, _convertToDiskJoin(false) , _convertToDiskJoin(false)
, resourceManager_(rm)
{ {
uint i; uint i;
@ -185,8 +184,9 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
ht.reset(new boost::scoped_ptr<typelesshash_t>[bucketCount]); ht.reset(new boost::scoped_ptr<typelesshash_t>[bucketCount]);
for (i = 0; i < bucketCount; i++) for (i = 0; i < bucketCount; i++)
{ {
STLPoolAllocator<pair<const TypelessData, Row::Pointer>> alloc(resourceManager_); // STLPoolAllocator<pair<const TypelessData, Row::Pointer>> alloc(resourceManager_);
_pool[i] = alloc.getPoolAllocator(); // _pool[i] = alloc.getPoolAllocator();
auto alloc = resourceManager_->getAllocator<pair<const TypelessData, Row::Pointer>>();
ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc));
} }
m_bucketLocks.reset(new boost::mutex[bucketCount]); m_bucketLocks.reset(new boost::mutex[bucketCount]);
@ -284,7 +284,7 @@ void TupleJoiner::bucketsToTables(buckets_t* buckets, hash_table_t* tables)
uint i; uint i;
bool done = false, wasProductive; bool done = false, wasProductive;
while (!done) while (!done && !wasAborted_)
{ {
done = true; done = true;
wasProductive = false; wasProductive = false;
@ -292,14 +292,16 @@ void TupleJoiner::bucketsToTables(buckets_t* buckets, hash_table_t* tables)
{ {
if (buckets[i].empty()) if (buckets[i].empty())
continue; continue;
bool gotIt = m_bucketLocks[i].try_lock();
if (!gotIt)
{ {
done = false; boost::unique_lock<boost::mutex> lock(m_bucketLocks[i], boost::try_to_lock);
continue; if (!lock.owns_lock())
{
done = false;
continue;
}
tables[i]->insert(buckets[i].begin(), buckets[i].end());
} }
tables[i]->insert(buckets[i].begin(), buckets[i].end());
m_bucketLocks[i].unlock();
wasProductive = true; wasProductive = true;
buckets[i].clear(); buckets[i].clear();
} }
@ -398,13 +400,15 @@ void TupleJoiner::insertRGData(RowGroup& rg, uint threadID)
rowCount = rg.getRowCount(); rowCount = rg.getRowCount();
rg.getRow(0, &r); rg.getRow(0, &r);
m_cpValuesLock.lock();
for (i = 0; i < rowCount; i++, r.nextRow())
{ {
updateCPData(r); boost::unique_lock<boost::mutex> lock(m_cpValuesLock);
r.zeroRid(); for (i = 0; i < rowCount; i++, r.nextRow())
{
updateCPData(r);
r.zeroRid();
}
} }
m_cpValuesLock.unlock();
rg.getRow(0, &r); rg.getRow(0, &r);
if (joinAlg == UM) if (joinAlg == UM)
@ -1828,8 +1832,9 @@ void TupleJoiner::clearData()
for (uint i = 0; i < bucketCount; i++) for (uint i = 0; i < bucketCount; i++)
{ {
STLPoolAllocator<pair<const TypelessData, Row::Pointer>> alloc; // STLPoolAllocator<pair<const TypelessData, Row::Pointer>> alloc;
_pool[i] = alloc.getPoolAllocator(); // _pool[i] = alloc.getPoolAllocator();
auto alloc = resourceManager_->getAllocator<pair<const TypelessData, Row::Pointer>>();
if (typelessJoin) if (typelessJoin)
ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc));
else if (smallRG.getColTypes()[smallKeyColumns[0]] == CalpontSystemCatalog::LONGDOUBLE) else if (smallRG.getColTypes()[smallKeyColumns[0]] == CalpontSystemCatalog::LONGDOUBLE)

View File

@ -26,6 +26,7 @@
#include <boost/scoped_array.hpp> #include <boost/scoped_array.hpp>
#include <unordered_map> #include <unordered_map>
#include "countingallocator.h"
#include "resourcemanager.h" #include "resourcemanager.h"
#include "rowgroup.h" #include "rowgroup.h"
#include "joiner.h" #include "joiner.h"
@ -471,22 +472,26 @@ class TupleJoiner
return finished; return finished;
} }
void setConvertToDiskJoin(); void setConvertToDiskJoin();
void abort()
{
wasAborted_ = true;
}
private: private:
typedef std::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>, typedef std::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>,
utils::STLPoolAllocator<std::pair<const int64_t, uint8_t*> > > allocators::CountingAllocator<std::pair<const int64_t, uint8_t*> > >
hash_t; hash_t;
typedef std::unordered_multimap<int64_t, rowgroup::Row::Pointer, hasher, std::equal_to<int64_t>, typedef std::unordered_multimap<int64_t, rowgroup::Row::Pointer, hasher, std::equal_to<int64_t>,
utils::STLPoolAllocator<std::pair<const int64_t, rowgroup::Row::Pointer> > > allocators::CountingAllocator<std::pair<const int64_t, rowgroup::Row::Pointer> > >
sthash_t; sthash_t;
typedef std::unordered_multimap< typedef std::unordered_multimap<
TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to<TypelessData>, TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to<TypelessData>,
utils::STLPoolAllocator<std::pair<const TypelessData, rowgroup::Row::Pointer> > > allocators::CountingAllocator<std::pair<const TypelessData, rowgroup::Row::Pointer> > >
typelesshash_t; typelesshash_t;
// MCOL-1822 Add support for Long Double AVG/SUM small side // MCOL-1822 Add support for Long Double AVG/SUM small side
typedef std::unordered_multimap< typedef std::unordered_multimap<
long double, rowgroup::Row::Pointer, hasher, LongDoubleEq, long double, rowgroup::Row::Pointer, hasher, LongDoubleEq,
utils::STLPoolAllocator<std::pair<const long double, rowgroup::Row::Pointer> > > allocators::CountingAllocator<std::pair<const long double, rowgroup::Row::Pointer> > >
ldhash_t; ldhash_t;
typedef hash_t::iterator iterator; typedef hash_t::iterator iterator;
@ -525,6 +530,7 @@ class TupleJoiner
}; };
JoinAlg joinAlg; JoinAlg joinAlg;
joblist::JoinType joinType; joblist::JoinType joinType;
// WIP
std::shared_ptr<boost::shared_ptr<utils::PoolAllocator>[]> _pool; // pools for the table and nodes std::shared_ptr<boost::shared_ptr<utils::PoolAllocator>[]> _pool; // pools for the table and nodes
uint32_t threadCount; uint32_t threadCount;
std::string tableName; std::string tableName;
@ -558,7 +564,7 @@ class TupleJoiner
uint bucketCount; uint bucketCount;
uint bucketMask; uint bucketMask;
boost::scoped_array<boost::mutex> m_bucketLocks; boost::scoped_array<boost::mutex> m_bucketLocks;
boost::mutex m_typelessLock, m_cpValuesLock; boost::mutex m_cpValuesLock;
utils::Hasher_r bucketPicker; utils::Hasher_r bucketPicker;
const uint32_t bpSeed = 0x4545e1d7; // an arbitrary random # const uint32_t bpSeed = 0x4545e1d7; // an arbitrary random #
threadpool::ThreadPool* jobstepThreadPool; threadpool::ThreadPool* jobstepThreadPool;
@ -572,7 +578,7 @@ class TupleJoiner
bool _convertToDiskJoin; bool _convertToDiskJoin;
joblist::ResourceManager* resourceManager_ = nullptr; joblist::ResourceManager* resourceManager_ = nullptr;
bool wasAborted_ = false;
}; };
} // namespace joiner } // namespace joiner