1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Found and fixed a race on a counter in the new HJ hash table

construction code in PrimProc.
This commit is contained in:
Patrick LeBlanc
2019-12-17 18:17:04 -05:00
parent d8e763655c
commit 98d546c2c2
3 changed files with 26 additions and 8 deletions

View File

@ -291,7 +291,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
addToJoinerLocks[j].reset(new boost::mutex[processorThreads]); addToJoinerLocks[j].reset(new boost::mutex[processorThreads]);
smallSideDataLocks.reset(new boost::mutex[joinerCount]); smallSideDataLocks.reset(new boost::mutex[joinerCount]);
tJoinerSizes.reset(new uint32_t[joinerCount]); tJoinerSizes.reset(new atomic<uint32_t>[joinerCount]);
largeSideKeyColumns.reset(new uint32_t[joinerCount]); largeSideKeyColumns.reset(new uint32_t[joinerCount]);
tlLargeSideKeyColumns.reset(new vector<uint32_t>[joinerCount]); tlLargeSideKeyColumns.reset(new vector<uint32_t>[joinerCount]);
typelessJoin.reset(new bool[joinerCount]); typelessJoin.reset(new bool[joinerCount]);
@ -310,7 +310,10 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
for (i = 0; i < joinerCount; i++) for (i = 0; i < joinerCount; i++)
{ {
doMatchNulls[i] = false; doMatchNulls[i] = false;
bs >> tJoinerSizes[i]; uint32_t tmp32;
bs >> tmp32;
tJoinerSizes[i] = tmp32;
//bs >> tJoinerSizes[i];
//cout << "joiner size = " << tJoinerSizes[i] << endl; //cout << "joiner size = " << tJoinerSizes[i] << endl;
bs >> joinTypes[i]; bs >> joinTypes[i];
bs >> tmp8; bs >> tmp8;
@ -589,7 +592,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
idbassert(joinerNum < joinerCount); idbassert(joinerNum < joinerCount);
arr = (JoinerElements*) bs.buf(); arr = (JoinerElements*) bs.buf();
uint32_t &tJoinerSize = tJoinerSizes[joinerNum]; atomic<uint32_t> &tJoinerSize = tJoinerSizes[joinerNum];
// XXXPAT: enormous if stmts are evil. TODO: move each block into // XXXPAT: enormous if stmts are evil. TODO: move each block into
// properly-named functions for clarity. // properly-named functions for clarity.
@ -600,6 +603,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
uint8_t nullFlag; uint8_t nullFlag;
PoolAllocator &storedKeyAllocator = storedKeyAllocators[joinerNum]; PoolAllocator &storedKeyAllocator = storedKeyAllocators[joinerNum];
// this first loop hashes incoming values into vectors that parallel the hash tables. // this first loop hashes incoming values into vectors that parallel the hash tables.
uint nullCount = 0;
for (i = 0; i < count; ++i) for (i = 0; i < count; ++i)
{ {
bs >> nullFlag; bs >> nullFlag;
@ -611,8 +615,9 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
tmpBuckets[bucket].push_back(make_pair(tlLargeKey, tlIndex)); tmpBuckets[bucket].push_back(make_pair(tlLargeKey, tlIndex));
} }
else else
--tJoinerSize; ++nullCount;
} }
tJoinerSize -= nullCount;
bool done = false, didSomeWork; bool done = false, didSomeWork;
//uint loopCounter = 0, noWorkCounter = 0; //uint loopCounter = 0, noWorkCounter = 0;

View File

@ -284,7 +284,7 @@ private:
typedef std::tr1::unordered_multimap<uint64_t, uint32_t, typedef std::tr1::unordered_multimap<uint64_t, uint32_t,
joiner::TupleJoiner::hasher, std::equal_to<uint64_t>, joiner::TupleJoiner::hasher, std::equal_to<uint64_t>,
utils::STLPoolAllocator<std::pair<const uint64_t, uint32_t> > > TJoiner; utils::STLPoolAllocator<std::pair<const uint64_t, uint32_t> > > TJoiner;
typedef std::tr1::unordered_multimap<joiner::TypelessData, typedef std::tr1::unordered_multimap<joiner::TypelessData,
uint32_t, joiner::TupleJoiner::hasher, std::equal_to<joiner::TypelessData>, uint32_t, joiner::TupleJoiner::hasher, std::equal_to<joiner::TypelessData>,
utils::STLPoolAllocator<std::pair<const joiner::TypelessData, uint32_t> > > TLJoiner; utils::STLPoolAllocator<std::pair<const joiner::TypelessData, uint32_t> > > TLJoiner;
@ -316,7 +316,7 @@ private:
boost::shared_array<uint32_t> smallSideRowLengths; boost::shared_array<uint32_t> smallSideRowLengths;
boost::shared_array<joblist::JoinType> joinTypes; boost::shared_array<joblist::JoinType> joinTypes;
uint32_t joinerCount; uint32_t joinerCount;
boost::shared_array<uint32_t> tJoinerSizes; boost::shared_array<std::atomic<uint32_t> > tJoinerSizes;
// LSKC[i] = the column in outputRG joiner i uses as its key column // LSKC[i] = the column in outputRG joiner i uses as its key column
boost::shared_array<uint32_t> largeSideKeyColumns; boost::shared_array<uint32_t> largeSideKeyColumns;
// KCPP[i] = true means a joiner uses projection step i as a key column // KCPP[i] = true means a joiner uses projection step i as a key column
@ -398,7 +398,7 @@ private:
uint processorThreads; uint processorThreads;
uint ptMask; uint ptMask;
bool firstInstance; bool firstInstance;
friend class Command; friend class Command;
friend class ColumnCommand; friend class ColumnCommand;
friend class DictStep; friend class DictStep;

View File

@ -36,6 +36,7 @@ PoolAllocator& PoolAllocator::operator=(const PoolAllocator& v)
{ {
allocSize = v.allocSize; allocSize = v.allocSize;
tmpSpace = v.tmpSpace; tmpSpace = v.tmpSpace;
useLock = v.useLock;
deallocateAll(); deallocateAll();
return *this; return *this;
} }
@ -67,18 +68,28 @@ void PoolAllocator::newBlock()
void * PoolAllocator::allocOOB(uint64_t size) void * PoolAllocator::allocOOB(uint64_t size)
{ {
bool _false = false;
OOBMemInfo memInfo; OOBMemInfo memInfo;
if (useLock)
while (!lock.compare_exchange_weak(_false, true, std::memory_order_acquire))
_false = false;
memUsage += size; memUsage += size;
memInfo.mem.reset(new uint8_t[size]); memInfo.mem.reset(new uint8_t[size]);
memInfo.size = size; memInfo.size = size;
void *ret = (void*) memInfo.mem.get(); void *ret = (void*) memInfo.mem.get();
oob[ret] = memInfo; oob[ret] = memInfo;
if (useLock)
lock.store(false, std::memory_order_release);
return ret; return ret;
} }
void PoolAllocator::deallocate(void* p) void PoolAllocator::deallocate(void* p)
{ {
bool _false = false;
if (useLock)
while (!lock.compare_exchange_weak(_false, true, std::memory_order_acquire))
_false = false;
OutOfBandMap::iterator it = oob.find(p); OutOfBandMap::iterator it = oob.find(p);
if (it == oob.end()) if (it == oob.end())
@ -86,6 +97,8 @@ void PoolAllocator::deallocate(void* p)
memUsage -= it->second.size; memUsage -= it->second.size;
oob.erase(it); oob.erase(it);
if (useLock)
lock.store(false, std::memory_order_release);
} }
} }