diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 2fe026142..eb38f95fa 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -291,7 +291,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) addToJoinerLocks[j].reset(new boost::mutex[processorThreads]); smallSideDataLocks.reset(new boost::mutex[joinerCount]); - tJoinerSizes.reset(new uint32_t[joinerCount]); + tJoinerSizes.reset(new atomic[joinerCount]); largeSideKeyColumns.reset(new uint32_t[joinerCount]); tlLargeSideKeyColumns.reset(new vector[joinerCount]); typelessJoin.reset(new bool[joinerCount]); @@ -310,7 +310,10 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) for (i = 0; i < joinerCount; i++) { doMatchNulls[i] = false; - bs >> tJoinerSizes[i]; + uint32_t tmp32; + bs >> tmp32; + tJoinerSizes[i] = tmp32; + //bs >> tJoinerSizes[i]; //cout << "joiner size = " << tJoinerSizes[i] << endl; bs >> joinTypes[i]; bs >> tmp8; @@ -589,7 +592,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) idbassert(joinerNum < joinerCount); arr = (JoinerElements*) bs.buf(); - uint32_t &tJoinerSize = tJoinerSizes[joinerNum]; + atomic &tJoinerSize = tJoinerSizes[joinerNum]; // XXXPAT: enormous if stmts are evil. TODO: move each block into // properly-named functions for clarity. @@ -600,6 +603,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) uint8_t nullFlag; PoolAllocator &storedKeyAllocator = storedKeyAllocators[joinerNum]; // this first loop hashes incoming values into vectors that parallel the hash tables. + uint nullCount = 0; for (i = 0; i < count; ++i) { bs >> nullFlag; @@ -611,8 +615,9 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) tmpBuckets[bucket].push_back(make_pair(tlLargeKey, tlIndex)); } else - --tJoinerSize; + ++nullCount; } + tJoinerSize -= nullCount; bool done = false, didSomeWork; //uint loopCounter = 0, noWorkCounter = 0; diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index 89402242c..e3863e429 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -284,7 +284,7 @@ private: typedef std::tr1::unordered_multimap, utils::STLPoolAllocator > > TJoiner; - + typedef std::tr1::unordered_multimap, utils::STLPoolAllocator > > TLJoiner; @@ -316,7 +316,7 @@ private: boost::shared_array smallSideRowLengths; boost::shared_array joinTypes; uint32_t joinerCount; - boost::shared_array tJoinerSizes; + boost::shared_array > tJoinerSizes; // LSKC[i] = the column in outputRG joiner i uses as its key column boost::shared_array largeSideKeyColumns; // KCPP[i] = true means a joiner uses projection step i as a key column @@ -398,7 +398,7 @@ private: uint processorThreads; uint ptMask; bool firstInstance; - + friend class Command; friend class ColumnCommand; friend class DictStep; diff --git a/utils/common/poolallocator.cpp b/utils/common/poolallocator.cpp index 5fdddce6d..da6f7d6c1 100644 --- a/utils/common/poolallocator.cpp +++ b/utils/common/poolallocator.cpp @@ -36,6 +36,7 @@ PoolAllocator& PoolAllocator::operator=(const PoolAllocator& v) { allocSize = v.allocSize; tmpSpace = v.tmpSpace; + useLock = v.useLock; deallocateAll(); return *this; } @@ -67,18 +68,28 @@ void PoolAllocator::newBlock() void * PoolAllocator::allocOOB(uint64_t size) { + bool _false = false; OOBMemInfo memInfo; + if (useLock) + while (!lock.compare_exchange_weak(_false, true, std::memory_order_acquire)) + _false = false; memUsage += size; memInfo.mem.reset(new uint8_t[size]); memInfo.size = size; void *ret = (void*) memInfo.mem.get(); oob[ret] = memInfo; + if (useLock) + lock.store(false, std::memory_order_release); return ret; } - + void PoolAllocator::deallocate(void* p) { + bool _false = false; + if (useLock) + while (!lock.compare_exchange_weak(_false, true, std::memory_order_acquire)) + _false = false; OutOfBandMap::iterator it = oob.find(p); if (it == oob.end()) @@ -86,6 +97,8 @@ void PoolAllocator::deallocate(void* p) memUsage -= it->second.size; oob.erase(it); + if (useLock) + lock.store(false, std::memory_order_release); } }