You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
fix(allocator,perf): performance degradation caused by lack of STLPoolAllocator replaced by CountingAllocator
This commit is contained in:
@ -83,7 +83,10 @@ struct TAEq
|
|||||||
bool operator()(const rowgroup::Row::Pointer&, const rowgroup::Row::Pointer&) const;
|
bool operator()(const rowgroup::Row::Pointer&, const rowgroup::Row::Pointer&) const;
|
||||||
};
|
};
|
||||||
// TODO: Generalize these and put them back in utils/common/hasher.h
|
// TODO: Generalize these and put them back in utils/common/hasher.h
|
||||||
using TNSDistinctMap_t = std::unordered_set<rowgroup::Row::Pointer, TAHasher, TAEq, allocators::CountingAllocator<rowgroup::Row::Pointer> >;
|
// using TNSDistinctMap_t = std::unordered_set<rowgroup::Row::Pointer, TAHasher, TAEq,
|
||||||
|
// allocators::CountingAllocator<rowgroup::Row::Pointer> >;
|
||||||
|
using TNSDistinctMap_t =
|
||||||
|
std::unordered_set<rowgroup::Row::Pointer, TAHasher, TAEq, STLPoolAllocator<rowgroup::Row::Pointer> >;
|
||||||
}; // namespace
|
}; // namespace
|
||||||
|
|
||||||
inline uint64_t TAHasher::operator()(const Row::Pointer& p) const
|
inline uint64_t TAHasher::operator()(const Row::Pointer& p) const
|
||||||
@ -164,7 +167,7 @@ void TupleAnnexStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
|
|||||||
|
|
||||||
void TupleAnnexStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo)
|
void TupleAnnexStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo)
|
||||||
{
|
{
|
||||||
// Initialize ResourceManager to acount memory usage.
|
// Initialize ResourceManager to acount memory usage.
|
||||||
fRm = jobInfo.rm;
|
fRm = jobInfo.rm;
|
||||||
// Initialize structures used by separate workers
|
// Initialize structures used by separate workers
|
||||||
uint64_t id = 1;
|
uint64_t id = 1;
|
||||||
@ -456,8 +459,11 @@ void TupleAnnexStep::executeNoOrderByWithDistinct()
|
|||||||
Row rowSkip;
|
Row rowSkip;
|
||||||
bool more = false;
|
bool more = false;
|
||||||
|
|
||||||
auto alloc = fRm->getAllocator<rowgroup::Row::Pointer>();
|
// auto alloc = fRm->getAllocator<rowgroup::Row::Pointer>();
|
||||||
std::unique_ptr<TNSDistinctMap_t> distinctMap(new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), alloc));
|
// std::unique_ptr<TNSDistinctMap_t> distinctMap(new TNSDistinctMap_t(10, TAHasher(this), TAEq(this),
|
||||||
|
// alloc));
|
||||||
|
std::unique_ptr<TNSDistinctMap_t> distinctMap(
|
||||||
|
new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), STLPoolAllocator<rowgroup::Row::Pointer>(fRm)));
|
||||||
|
|
||||||
rgDataOut.reinit(fRowGroupOut);
|
rgDataOut.reinit(fRowGroupOut);
|
||||||
fRowGroupOut.setData(&rgDataOut);
|
fRowGroupOut.setData(&rgDataOut);
|
||||||
@ -588,12 +594,13 @@ void TupleAnnexStep::executeNoOrderByWithDistinct()
|
|||||||
|
|
||||||
void TupleAnnexStep::checkAndAllocateMemory4RGData(const rowgroup::RowGroup& rowGroup)
|
void TupleAnnexStep::checkAndAllocateMemory4RGData(const rowgroup::RowGroup& rowGroup)
|
||||||
{
|
{
|
||||||
uint64_t size = rowGroup.getSizeWithStrings() - rowGroup.getHeaderSize();
|
uint64_t size = rowGroup.getSizeWithStrings() - rowGroup.getHeaderSize();
|
||||||
if (!fRm->getMemory(size, false))
|
if (!fRm->getMemory(size, false))
|
||||||
{
|
{
|
||||||
cerr << IDBErrorInfo::instance()->errorMsg(ERR_TNS_DISTINCT_IS_TOO_BIG) << " @" << __FILE__ << ":" << __LINE__;
|
cerr << IDBErrorInfo::instance()->errorMsg(ERR_TNS_DISTINCT_IS_TOO_BIG) << " @" << __FILE__ << ":"
|
||||||
throw IDBExcept(ERR_TNS_DISTINCT_IS_TOO_BIG);
|
<< __LINE__;
|
||||||
}
|
throw IDBExcept(ERR_TNS_DISTINCT_IS_TOO_BIG);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TupleAnnexStep::executeWithOrderBy()
|
void TupleAnnexStep::executeWithOrderBy()
|
||||||
@ -713,10 +720,12 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct()
|
|||||||
// Calculate offset here
|
// Calculate offset here
|
||||||
fRowGroupOut.getRow(0, &fRowOut);
|
fRowGroupOut.getRow(0, &fRowOut);
|
||||||
|
|
||||||
auto allocSorting = fRm->getAllocator<ordering::OrderByRow>();
|
// auto allocSorting = fRm->getAllocator<ordering::OrderByRow>();
|
||||||
ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, allocSorting);
|
ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, fRm->getAllocator<ordering::OrderByRow>());
|
||||||
auto allocDistinct = fRm->getAllocator<rowgroup::Row::Pointer>();
|
// ordering::SortingPQ finalPQ(rowgroup::rgCommonSize);
|
||||||
std::unique_ptr<TNSDistinctMap_t> distinctMap(new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), allocDistinct));
|
// auto allocDistinct = fRm->getAllocator<rowgroup::Row::Pointer>();
|
||||||
|
std::unique_ptr<TNSDistinctMap_t> distinctMap(
|
||||||
|
new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), STLPoolAllocator<rowgroup::Row::Pointer>(fRm)));
|
||||||
fRowGroupIn.initRow(&row1);
|
fRowGroupIn.initRow(&row1);
|
||||||
fRowGroupIn.initRow(&row2);
|
fRowGroupIn.initRow(&row2);
|
||||||
|
|
||||||
@ -910,8 +919,9 @@ void TupleAnnexStep::finalizeParallelOrderBy()
|
|||||||
uint32_t rowSize = 0;
|
uint32_t rowSize = 0;
|
||||||
|
|
||||||
rowgroup::RGData rgDataOut;
|
rowgroup::RGData rgDataOut;
|
||||||
auto alloc = fRm->getAllocator<ordering::OrderByRow>();
|
// auto alloc = fRm->getAllocator<ordering::OrderByRow>();
|
||||||
ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, alloc);
|
ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, fRm->getAllocator<ordering::OrderByRow>());
|
||||||
|
// ordering::SortingPQ finalPQ(rowgroup::rgCommonSize);
|
||||||
rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
|
rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
|
||||||
fRowGroupOut.setData(&rgDataOut);
|
fRowGroupOut.setData(&rgDataOut);
|
||||||
fRowGroupOut.resetRowGroup(0);
|
fRowGroupOut.resetRowGroup(0);
|
||||||
|
@ -329,7 +329,9 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
|
|||||||
auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator<utils::PoolAllocatorBufType>();
|
auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator<utils::PoolAllocatorBufType>();
|
||||||
for (uint j = 0; j < joinerCount; ++j)
|
for (uint j = 0; j < joinerCount; ++j)
|
||||||
{
|
{
|
||||||
storedKeyAllocators.emplace_back(PoolAllocator(alloc, PoolAllocator::DEFAULT_WINDOW_SIZE, false, true));
|
storedKeyAllocators.emplace_back(PoolAllocator(alloc, PoolAllocator::DEFAULT_WINDOW_SIZE, false,
|
||||||
|
true));
|
||||||
|
// storedKeyAllocators.emplace_back(PoolAllocator(PoolAllocator::DEFAULT_WINDOW_SIZE, false, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
joinNullValues.reset(new uint64_t[joinerCount]);
|
joinNullValues.reset(new uint64_t[joinerCount]);
|
||||||
@ -338,6 +340,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
|
|||||||
hasJoinFEFilters = false;
|
hasJoinFEFilters = false;
|
||||||
hasSmallOuterJoin = false;
|
hasSmallOuterJoin = false;
|
||||||
bool smallSideRGRecvd = false;
|
bool smallSideRGRecvd = false;
|
||||||
|
auto* resourceManager = &exemgr::globServiceExeMgr->getRm();
|
||||||
|
|
||||||
for (i = 0; i < joinerCount; i++)
|
for (i = 0; i < joinerCount; i++)
|
||||||
{
|
{
|
||||||
@ -361,16 +364,17 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
|
|||||||
|
|
||||||
if (!typelessJoin[i])
|
if (!typelessJoin[i])
|
||||||
{
|
{
|
||||||
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<TJoiner::value_type>();
|
// auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<TJoiner::value_type>();
|
||||||
|
|
||||||
bs >> joinNullValues[i];
|
bs >> joinNullValues[i];
|
||||||
bs >> largeSideKeyColumns[i];
|
bs >> largeSideKeyColumns[i];
|
||||||
for (uint j = 0; j < processorThreads; ++j)
|
for (uint j = 0; j < processorThreads; ++j)
|
||||||
tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), allocator));
|
// tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), allocator));
|
||||||
|
tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), std::equal_to<uint64_t>(),
|
||||||
|
utils::STLPoolAllocator<TJoiner::value_type>(resourceManager)));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<TLJoiner::value_type>();
|
// auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<TLJoiner::value_type>();
|
||||||
|
|
||||||
deserializeVector<uint32_t>(bs, tlLargeSideKeyColumns[i]);
|
deserializeVector<uint32_t>(bs, tlLargeSideKeyColumns[i]);
|
||||||
bs >> tlSmallSideKeyLengths[i];
|
bs >> tlSmallSideKeyLengths[i];
|
||||||
@ -393,7 +397,9 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
|
|||||||
mSmallSideKeyColumnsPtr, mSmallSideRGPtr);
|
mSmallSideKeyColumnsPtr, mSmallSideRGPtr);
|
||||||
auto tlComparator = TupleJoiner::TypelessDataComparator(&outputRG, &tlLargeSideKeyColumns[i],
|
auto tlComparator = TupleJoiner::TypelessDataComparator(&outputRG, &tlLargeSideKeyColumns[i],
|
||||||
mSmallSideKeyColumnsPtr, mSmallSideRGPtr);
|
mSmallSideKeyColumnsPtr, mSmallSideRGPtr);
|
||||||
tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator, allocator));
|
// tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator, allocator));
|
||||||
|
tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator,
|
||||||
|
utils::STLPoolAllocator<TLJoiner::value_type>(resourceManager)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2287,34 +2293,34 @@ int BatchPrimitiveProcessor::operator()()
|
|||||||
|
|
||||||
void BatchPrimitiveProcessor::allocLargeBuffers()
|
void BatchPrimitiveProcessor::allocLargeBuffers()
|
||||||
{
|
{
|
||||||
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<rowgroup::RGDataBufType>();
|
// auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<rowgroup::RGDataBufType>();
|
||||||
|
|
||||||
if (ot == ROW_GROUP && !outRowGroupData)
|
if (ot == ROW_GROUP && !outRowGroupData)
|
||||||
{
|
{
|
||||||
// outputRG.setUseStringTable(true);
|
// outputRG.setUseStringTable(true);
|
||||||
outRowGroupData.reset(new RGData(outputRG, allocator));
|
// outRowGroupData.reset(new RGData(outputRG, allocator));
|
||||||
|
outRowGroupData.reset(new RGData(outputRG));
|
||||||
outputRG.setData(outRowGroupData.get());
|
outputRG.setData(outRowGroupData.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fe1 && !fe1Data)
|
if (fe1 && !fe1Data)
|
||||||
{
|
{
|
||||||
// fe1Input.setUseStringTable(true);
|
// fe1Data.reset(new RGData(fe1Input, allocator));
|
||||||
fe1Data.reset(new RGData(fe1Input, allocator));
|
fe1Data.reset(new RGData(fe1Input));
|
||||||
// fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]);
|
|
||||||
fe1Input.setData(fe1Data.get());
|
fe1Input.setData(fe1Data.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fe2 && !fe2Data)
|
if (fe2 && !fe2Data)
|
||||||
{
|
{
|
||||||
// fe2Output.setUseStringTable(true);
|
// fe2Data.reset(new RGData(fe2Output, allocator));
|
||||||
fe2Data.reset(new RGData(fe2Output, allocator));
|
fe2Data.reset(new RGData(fe2Output));
|
||||||
fe2Output.setData(fe2Data.get());
|
fe2Output.setData(fe2Data.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (getTupleJoinRowGroupData && !joinedRGMem)
|
if (getTupleJoinRowGroupData && !joinedRGMem)
|
||||||
{
|
{
|
||||||
// joinedRG.setUseStringTable(true);
|
// joinedRGMem.reset(new RGData(joinedRG, allocator));
|
||||||
joinedRGMem.reset(new RGData(joinedRG, allocator));
|
joinedRGMem.reset(new RGData(joinedRG));
|
||||||
joinedRG.setData(joinedRGMem.get());
|
joinedRG.setData(joinedRGMem.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -308,14 +308,23 @@ class BatchPrimitiveProcessor
|
|||||||
bool hasRowGroup;
|
bool hasRowGroup;
|
||||||
|
|
||||||
/* Rowgroups + join */
|
/* Rowgroups + join */
|
||||||
using TJoiner =
|
// using TJoiner =
|
||||||
std::unordered_multimap<uint64_t, uint32_t, joiner::TupleJoiner::hasher, std::equal_to<uint64_t>,
|
// std::unordered_multimap<uint64_t, uint32_t, joiner::TupleJoiner::hasher, std::equal_to<uint64_t>,
|
||||||
allocators::CountingAllocator<std::pair<const uint64_t, uint32_t>>>;
|
// allocators::CountingAllocator<std::pair<const uint64_t, uint32_t>>>;
|
||||||
|
|
||||||
using TLJoiner =
|
using TJoiner = std::tr1::unordered_multimap<uint64_t, uint32_t, joiner::TupleJoiner::hasher,
|
||||||
std::unordered_multimap<joiner::TypelessData, uint32_t, joiner::TupleJoiner::TypelessDataHasher,
|
std::equal_to<uint64_t>,
|
||||||
joiner::TupleJoiner::TypelessDataComparator,
|
utils::STLPoolAllocator<std::pair<const uint64_t, uint32_t>>>;
|
||||||
allocators::CountingAllocator<std::pair<const joiner::TypelessData, uint32_t>>>;
|
|
||||||
|
// using TLJoiner =
|
||||||
|
// std::unordered_multimap<joiner::TypelessData, uint32_t, joiner::TupleJoiner::TypelessDataHasher,
|
||||||
|
// joiner::TupleJoiner::TypelessDataComparator,
|
||||||
|
// allocators::CountingAllocator<std::pair<const joiner::TypelessData, uint32_t>>>;
|
||||||
|
|
||||||
|
using TLJoiner = std::tr1::unordered_multimap<
|
||||||
|
joiner::TypelessData, uint32_t, joiner::TupleJoiner::TypelessDataHasher,
|
||||||
|
joiner::TupleJoiner::TypelessDataComparator,
|
||||||
|
utils::STLPoolAllocator<std::pair<const joiner::TypelessData, uint32_t>>>;
|
||||||
|
|
||||||
bool generateJoinedRowGroup(rowgroup::Row& baseRow, const uint32_t depth = 0);
|
bool generateJoinedRowGroup(rowgroup::Row& baseRow, const uint32_t depth = 0);
|
||||||
/* generateJoinedRowGroup helper fcns & vars */
|
/* generateJoinedRowGroup helper fcns & vars */
|
||||||
|
@ -40,42 +40,63 @@ namespace joiner
|
|||||||
{
|
{
|
||||||
constexpr const size_t DEFAULT_BUCKET_COUNT = 10;
|
constexpr const size_t DEFAULT_BUCKET_COUNT = 10;
|
||||||
|
|
||||||
|
template <typename HashTable>
|
||||||
|
std::unique_ptr<HashTable> makeHashMap(size_t bucketCount, ResourceManager* resourceManager)
|
||||||
|
{
|
||||||
|
// auto alloc = resourceManager->getAllocator<T>();
|
||||||
|
// return std::unique_ptr<T>(new T(bucketCount, TupleJoiner::hasher(), typename T::key_equal(), alloc));
|
||||||
|
return std::unique_ptr<HashTable>(new HashTable(bucketCount, TupleJoiner::hasher(),
|
||||||
|
typename HashTable::key_equal(),
|
||||||
|
utils::STLPoolAllocator<typename HashTable::value_type>(resourceManager)));
|
||||||
|
}
|
||||||
|
|
||||||
|
void TupleJoiner::initRowsVector()
|
||||||
|
{
|
||||||
|
// auto alloc = resourceManager_->getAllocator<rowgroup::Row::Pointer>();
|
||||||
|
// rows.reset(new RowPointersVec(alloc));
|
||||||
|
rows.reset(new RowPointersVec(resourceManager_->getAllocator<rowgroup::Row::Pointer>()));
|
||||||
|
}
|
||||||
|
|
||||||
void TupleJoiner::initHashMaps(uint32_t& smallJoinColumn)
|
void TupleJoiner::initHashMaps(uint32_t& smallJoinColumn)
|
||||||
{
|
{
|
||||||
if (typelessJoin)
|
if (typelessJoin)
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < bucketCount; i++)
|
for (size_t i = 0; i < bucketCount; i++)
|
||||||
{
|
{
|
||||||
auto alloc = resourceManager_->getAllocator<pair<const TypelessData, Row::Pointer>>();
|
// auto alloc = resourceManager_->getAllocator<pair<const TypelessData, Row::Pointer>>();
|
||||||
ht.emplace_back(std::unique_ptr<typelesshash_t>(
|
// ht.emplace_back(std::unique_ptr<typelesshash_t>(
|
||||||
new typelesshash_t(DEFAULT_BUCKET_COUNT, hasher(), typelesshash_t::key_equal(), alloc)));
|
// new typelesshash_t(DEFAULT_BUCKET_COUNT, hasher(), typelesshash_t::key_equal(), alloc)));
|
||||||
|
ht.emplace_back(makeHashMap<typelesshash_t>(DEFAULT_BUCKET_COUNT, resourceManager_));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE)
|
else if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE)
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < bucketCount; i++)
|
for (size_t i = 0; i < bucketCount; i++)
|
||||||
{
|
{
|
||||||
auto alloc = resourceManager_->getAllocator<pair<const long double, Row::Pointer>>();
|
// auto alloc = resourceManager_->getAllocator<pair<const long double, Row::Pointer>>();
|
||||||
ld.emplace_back(std::unique_ptr<ldhash_t>(
|
// ld.emplace_back(std::unique_ptr<ldhash_t>(
|
||||||
new ldhash_t(DEFAULT_BUCKET_COUNT, hasher(), ldhash_t::key_equal(), alloc)));
|
// new ldhash_t(DEFAULT_BUCKET_COUNT, hasher(), ldhash_t::key_equal(), alloc)));
|
||||||
|
ld.emplace_back(makeHashMap<ldhash_t>(DEFAULT_BUCKET_COUNT, resourceManager_));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (smallRG.usesStringTable())
|
else if (smallRG.usesStringTable())
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < bucketCount; i++)
|
for (size_t i = 0; i < bucketCount; i++)
|
||||||
{
|
{
|
||||||
auto alloc = resourceManager_->getAllocator<pair<const int64_t, Row::Pointer>>();
|
// auto alloc = resourceManager_->getAllocator<pair<const int64_t, Row::Pointer>>();
|
||||||
sth.emplace_back(std::unique_ptr<sthash_t>(
|
// sth.emplace_back(std::unique_ptr<sthash_t>(
|
||||||
new sthash_t(DEFAULT_BUCKET_COUNT, hasher(), sthash_t::key_equal(), alloc)));
|
// new sthash_t(DEFAULT_BUCKET_COUNT, hasher(), sthash_t::key_equal(), alloc)));
|
||||||
|
sth.emplace_back(makeHashMap<sthash_t>(DEFAULT_BUCKET_COUNT, resourceManager_));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < bucketCount; i++)
|
for (size_t i = 0; i < bucketCount; i++)
|
||||||
{
|
{
|
||||||
auto alloc = resourceManager_->getAllocator<pair<const int64_t, uint8_t*>>();
|
// auto alloc = resourceManager_->getAllocator<pair<const int64_t, uint8_t*>>();
|
||||||
h.emplace_back(
|
// h.emplace_back(
|
||||||
std::unique_ptr<hash_t>(new hash_t(DEFAULT_BUCKET_COUNT, hasher(), hash_t::key_equal(), alloc)));
|
// std::unique_ptr<hash_t>(new hash_t(DEFAULT_BUCKET_COUNT, hasher(), hash_t::key_equal(), alloc)));
|
||||||
|
h.emplace_back(makeHashMap<hash_t>(DEFAULT_BUCKET_COUNT, resourceManager_));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -99,10 +120,9 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
|
|||||||
, _convertToDiskJoin(false)
|
, _convertToDiskJoin(false)
|
||||||
, resourceManager_(rm)
|
, resourceManager_(rm)
|
||||||
{
|
{
|
||||||
auto alloc = resourceManager_->getAllocator<rowgroup::Row::Pointer>();
|
initRowsVector();
|
||||||
rows.reset(new RowPointersVec(alloc));
|
|
||||||
|
|
||||||
getBucketCount();
|
getBucketCount();
|
||||||
|
|
||||||
m_bucketLocks.reset(new boost::mutex[bucketCount]);
|
m_bucketLocks.reset(new boost::mutex[bucketCount]);
|
||||||
|
|
||||||
initHashMaps(smallJoinColumn);
|
initHashMaps(smallJoinColumn);
|
||||||
@ -161,7 +181,8 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
|
|||||||
// Typeless joiner ctor
|
// Typeless joiner ctor
|
||||||
TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput,
|
TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput,
|
||||||
const vector<uint32_t>& smallJoinColumns, const vector<uint32_t>& largeJoinColumns,
|
const vector<uint32_t>& smallJoinColumns, const vector<uint32_t>& largeJoinColumns,
|
||||||
JoinType jt, threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, const uint64_t numCores)
|
JoinType jt, threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm,
|
||||||
|
const uint64_t numCores)
|
||||||
: smallRG(smallInput)
|
: smallRG(smallInput)
|
||||||
, largeRG(largeInput)
|
, largeRG(largeInput)
|
||||||
, joinAlg(INSERTING)
|
, joinAlg(INSERTING)
|
||||||
@ -180,9 +201,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R
|
|||||||
{
|
{
|
||||||
uint i;
|
uint i;
|
||||||
|
|
||||||
auto alloc = resourceManager_->getAllocator<rowgroup::Row::Pointer>();
|
initRowsVector();
|
||||||
rows.reset(new RowPointersVec(alloc));
|
|
||||||
|
|
||||||
getBucketCount();
|
getBucketCount();
|
||||||
|
|
||||||
uint32_t unused = 0;
|
uint32_t unused = 0;
|
||||||
@ -870,8 +889,7 @@ void TupleJoiner::setInUM()
|
|||||||
#ifdef TJ_DEBUG
|
#ifdef TJ_DEBUG
|
||||||
cout << "done\n";
|
cout << "done\n";
|
||||||
#endif
|
#endif
|
||||||
auto alloc = resourceManager_->getAllocator<rowgroup::Row::Pointer>();
|
initRowsVector();
|
||||||
rows.reset(new RowPointersVec(alloc));
|
|
||||||
|
|
||||||
if (typelessJoin)
|
if (typelessJoin)
|
||||||
{
|
{
|
||||||
@ -904,10 +922,7 @@ void TupleJoiner::setInUM(vector<RGData>& rgs)
|
|||||||
if (joinAlg == UM)
|
if (joinAlg == UM)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
{ // don't need rows anymore, free the mem
|
initRowsVector();
|
||||||
auto alloc = resourceManager_->getAllocator<rowgroup::Row::Pointer>();
|
|
||||||
rows.reset(new RowPointersVec(alloc));
|
|
||||||
}
|
|
||||||
|
|
||||||
joinAlg = UM;
|
joinAlg = UM;
|
||||||
size = rgs.size();
|
size = rgs.size();
|
||||||
@ -918,7 +933,8 @@ void TupleJoiner::setInUM(vector<RGData>& rgs)
|
|||||||
i = 0;
|
i = 0;
|
||||||
for (size_t firstRow = 0; i < (uint)numCores && firstRow < size; i++, firstRow += chunkSize)
|
for (size_t firstRow = 0; i < (uint)numCores && firstRow < size; i++, firstRow += chunkSize)
|
||||||
jobs[i] = jobstepThreadPool->invoke(
|
jobs[i] = jobstepThreadPool->invoke(
|
||||||
[this, firstRow, chunkSize, size, i, &rgs] {
|
[this, firstRow, chunkSize, size, i, &rgs]
|
||||||
|
{
|
||||||
this->umJoinConvert(i, rgs, firstRow, (firstRow + chunkSize < size ? firstRow + chunkSize : size));
|
this->umJoinConvert(i, rgs, firstRow, (firstRow + chunkSize < size ? firstRow + chunkSize : size));
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -1291,7 +1307,7 @@ class WideDecimalKeyConverter
|
|||||||
if (value > AT(std::numeric_limits<T>::max()) || value < AT(std::numeric_limits<T>::min()))
|
if (value > AT(std::numeric_limits<T>::max()) || value < AT(std::numeric_limits<T>::min()))
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
convertedValue = (uint64_t) static_cast<T>(value);
|
convertedValue = (uint64_t)static_cast<T>(value);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// As of MCS 6.x there is an asumption MCS can't join having
|
// As of MCS 6.x there is an asumption MCS can't join having
|
||||||
@ -1801,9 +1817,7 @@ void TupleJoiner::clearData()
|
|||||||
// This loop calls dtors and deallocates mem.
|
// This loop calls dtors and deallocates mem.
|
||||||
clearHashMaps();
|
clearHashMaps();
|
||||||
initHashMaps(smallKeyColumns[0]);
|
initHashMaps(smallKeyColumns[0]);
|
||||||
|
initRowsVector();
|
||||||
auto alloc = resourceManager_->getAllocator<rowgroup::Row::Pointer>();
|
|
||||||
rows.reset(new RowPointersVec(alloc));
|
|
||||||
finished = false;
|
finished = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,8 +204,11 @@ class TypelessDataStructure
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
using RowPointersVec =
|
using RowPointersVec =
|
||||||
std::vector<rowgroup::Row::Pointer, allocators::CountingAllocator<rowgroup::Row::Pointer>>;
|
std::vector<rowgroup::Row::Pointer, allocators::CountingAllocator<rowgroup::Row::Pointer>>;
|
||||||
|
// using RowPointersVec =
|
||||||
|
// std::vector<rowgroup::Row::Pointer>;
|
||||||
using RowPointersVecUP = std::unique_ptr<RowPointersVec>;
|
using RowPointersVecUP = std::unique_ptr<RowPointersVec>;
|
||||||
class TupleJoiner
|
class TupleJoiner
|
||||||
{
|
{
|
||||||
@ -473,22 +476,47 @@ class TupleJoiner
|
|||||||
void initHashMaps(uint32_t& smallJoinColumn);
|
void initHashMaps(uint32_t& smallJoinColumn);
|
||||||
void clearHashMaps();
|
void clearHashMaps();
|
||||||
private:
|
private:
|
||||||
typedef std::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>,
|
// typedef std::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>,
|
||||||
allocators::CountingAllocator<std::pair<const int64_t, uint8_t*>>>
|
// allocators::CountingAllocator<std::pair<const int64_t, uint8_t*>>>
|
||||||
hash_t;
|
// hash_t;
|
||||||
typedef std::unordered_multimap<
|
// typedef std::unordered_multimap<
|
||||||
int64_t, rowgroup::Row::Pointer, hasher, std::equal_to<int64_t>,
|
// int64_t, rowgroup::Row::Pointer, hasher, std::equal_to<int64_t>,
|
||||||
allocators::CountingAllocator<std::pair<const int64_t, rowgroup::Row::Pointer>>>
|
// allocators::CountingAllocator<std::pair<const int64_t, rowgroup::Row::Pointer>>>
|
||||||
sthash_t;
|
// sthash_t;
|
||||||
typedef std::unordered_multimap<
|
// typedef std::unordered_multimap<
|
||||||
TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to<TypelessData>,
|
// TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to<TypelessData>,
|
||||||
allocators::CountingAllocator<std::pair<const TypelessData, rowgroup::Row::Pointer>>>
|
// allocators::CountingAllocator<std::pair<const TypelessData, rowgroup::Row::Pointer>>>
|
||||||
typelesshash_t;
|
// typelesshash_t;
|
||||||
|
// // MCOL-1822 Add support for Long Double AVG/SUM small side
|
||||||
|
// typedef std::unordered_multimap<
|
||||||
|
// long double, rowgroup::Row::Pointer, hasher, LongDoubleEq,
|
||||||
|
// allocators::CountingAllocator<std::pair<const long double, rowgroup::Row::Pointer>>>
|
||||||
|
// ldhash_t;
|
||||||
|
|
||||||
|
|
||||||
|
template<typename K, typename V>
|
||||||
|
using HashMapTemplate = std::unordered_multimap<K, V, hasher, std::equal_to<K>,
|
||||||
|
utils::STLPoolAllocator<std::pair<const K, V>>>;
|
||||||
|
using hash_t = HashMapTemplate<int64_t, uint8_t*>;
|
||||||
|
using sthash_t = HashMapTemplate<int64_t, rowgroup::Row::Pointer>;
|
||||||
|
using typelesshash_t = HashMapTemplate<TypelessData, rowgroup::Row::Pointer>;
|
||||||
|
using ldhash_t = HashMapTemplate<long double, rowgroup::Row::Pointer>;
|
||||||
|
// typedef std::unordered_multimap<int64_t, uint8_t*, hasher,td::equal_to<int64_t>,
|
||||||
|
// utils::STLPoolAllocator<std::pair<const int64_t, uint8_t*>>>
|
||||||
|
// hash_t;
|
||||||
|
// typedef std::unordered_multimap<
|
||||||
|
// int64_t, rowgroup::Row::Pointer, hasher, std::equal_to<int64_t>,
|
||||||
|
// utils::STLPoolAllocator<std::pair<const int64_t, rowgroup::Row::Pointer>>>
|
||||||
|
// sthash_t;
|
||||||
|
// typedef std::unordered_multimap<
|
||||||
|
// TypelessData, rowgroup::Row::Pointer, hasher, std::equal_to<TypelessData>,
|
||||||
|
// utils::STLPoolAllocator<std::pair<const TypelessData, rowgroup::Row::Pointer>>>
|
||||||
|
// typelesshash_t;
|
||||||
// MCOL-1822 Add support for Long Double AVG/SUM small side
|
// MCOL-1822 Add support for Long Double AVG/SUM small side
|
||||||
typedef std::unordered_multimap<
|
// typedef std::unordered_multimap<
|
||||||
long double, rowgroup::Row::Pointer, hasher, LongDoubleEq,
|
// long double, rowgroup::Row::Pointer, hasher, LongDoubleEq,
|
||||||
allocators::CountingAllocator<std::pair<const long double, rowgroup::Row::Pointer>>>
|
// utils::STLPoolAllocator<std::pair<const long double, rowgroup::Row::Pointer>>>
|
||||||
ldhash_t;
|
// ldhash_t;
|
||||||
|
|
||||||
typedef hash_t::iterator iterator;
|
typedef hash_t::iterator iterator;
|
||||||
typedef typelesshash_t::iterator thIterator;
|
typedef typelesshash_t::iterator thIterator;
|
||||||
@ -573,6 +601,7 @@ class TupleJoiner
|
|||||||
bool _convertToDiskJoin;
|
bool _convertToDiskJoin;
|
||||||
joblist::ResourceManager* resourceManager_ = nullptr;
|
joblist::ResourceManager* resourceManager_ = nullptr;
|
||||||
bool wasAborted_ = false;
|
bool wasAborted_ = false;
|
||||||
|
void initRowsVector();
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace joiner
|
} // namespace joiner
|
||||||
|
@ -774,15 +774,12 @@ void IdbOrderBy::initialize(const RowGroup& rg)
|
|||||||
fRowGroup.initRow(&row2);
|
fRowGroup.initRow(&row2);
|
||||||
|
|
||||||
// These two blocks contain structs with memory accounting.
|
// These two blocks contain structs with memory accounting.
|
||||||
{
|
fOrderByQueue.reset(new SortingPQ(rowgroup::rgCommonSize, fRm->getAllocator<OrderByRow>()));
|
||||||
auto alloc = fRm->getAllocator<OrderByRow>();
|
|
||||||
fOrderByQueue.reset(new SortingPQ(rowgroup::rgCommonSize, alloc));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fDistinct)
|
if (fDistinct)
|
||||||
{
|
{
|
||||||
auto alloc = fRm->getAllocator<rowgroup::Row::Pointer>();
|
fDistinctMap.reset(new DistinctMap_t(10, Hasher(this, getKeyLength()), Eq(this, getKeyLength()),
|
||||||
fDistinctMap.reset(new DistinctMap_t(10, Hasher(this, getKeyLength()), Eq(this, getKeyLength()), alloc));
|
utils::STLPoolAllocator<rowgroup::Row::Pointer>(fRm)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@
|
|||||||
#include "resourcemanager.h"
|
#include "resourcemanager.h"
|
||||||
#include "rowgroup.h"
|
#include "rowgroup.h"
|
||||||
#include "hasher.h"
|
#include "hasher.h"
|
||||||
// #include "stlpoolallocator.h"
|
#include "stlpoolallocator.h"
|
||||||
|
|
||||||
// forward reference
|
// forward reference
|
||||||
namespace joblist
|
namespace joblist
|
||||||
@ -81,6 +81,31 @@ class ReservablePQ : private std::priority_queue<_Tp, _Sequence, _Compare>
|
|||||||
using std::priority_queue<_Tp, _Sequence, _Compare>::empty;
|
using std::priority_queue<_Tp, _Sequence, _Compare>::empty;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// template <typename _Tp, typename _Sequence = std::vector<_Tp>,
|
||||||
|
// typename _Compare = std::less<typename _Sequence::value_type> >
|
||||||
|
// class ReservablePQ : private std::priority_queue<_Tp, _Sequence, _Compare>
|
||||||
|
// {
|
||||||
|
// public:
|
||||||
|
// typedef typename std::priority_queue<_Tp, _Sequence, _Compare>::size_type size_type;
|
||||||
|
// explicit ReservablePQ(size_type capacity = 0)
|
||||||
|
// {
|
||||||
|
// reserve(capacity);
|
||||||
|
// };
|
||||||
|
// void reserve(size_type capacity)
|
||||||
|
// {
|
||||||
|
// this->c.reserve(capacity);
|
||||||
|
// }
|
||||||
|
// size_type capacity() const
|
||||||
|
// {
|
||||||
|
// return this->c.capacity();
|
||||||
|
// }
|
||||||
|
// using std::priority_queue<_Tp, _Sequence, _Compare>::size;
|
||||||
|
// using std::priority_queue<_Tp, _Sequence, _Compare>::top;
|
||||||
|
// using std::priority_queue<_Tp, _Sequence, _Compare>::pop;
|
||||||
|
// using std::priority_queue<_Tp, _Sequence, _Compare>::push;
|
||||||
|
// using std::priority_queue<_Tp, _Sequence, _Compare>::empty;
|
||||||
|
// };
|
||||||
|
|
||||||
// forward reference
|
// forward reference
|
||||||
class IdbCompare;
|
class IdbCompare;
|
||||||
class OrderByRow;
|
class OrderByRow;
|
||||||
@ -484,7 +509,7 @@ class IdbOrderBy : public IdbCompare
|
|||||||
};
|
};
|
||||||
|
|
||||||
using DistinctMap_t = std::unordered_set<rowgroup::Row::Pointer, Hasher, Eq,
|
using DistinctMap_t = std::unordered_set<rowgroup::Row::Pointer, Hasher, Eq,
|
||||||
allocators::CountingAllocator<rowgroup::Row::Pointer>>;
|
utils::STLPoolAllocator<rowgroup::Row::Pointer>>;
|
||||||
boost::scoped_ptr<DistinctMap_t> fDistinctMap;
|
boost::scoped_ptr<DistinctMap_t> fDistinctMap;
|
||||||
rowgroup::Row row1, row2; // scratch space for Hasher & Eq
|
rowgroup::Row row1, row2; // scratch space for Hasher & Eq
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user