diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index c3a667b36..2ad3c495b 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -54,6 +54,7 @@ using namespace funcexp; using namespace querytele; #include "atomicops.h" +#include "spinlock.h" namespace joblist { @@ -73,8 +74,6 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) : fTupleId2(-1), fCorrelatedSide(0), resourceManager(jobInfo.rm), - totalUMMemoryUsage(0), - rgDataSize(0), runRan(false), joinRan(false), largeSideIndex(1), @@ -84,7 +83,8 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) : fTokenJoin(-1), fStatsMutexPtr(new boost::mutex()), fFunctionJoinKeys(jobInfo.keyInfo->functionJoinKeys), - sessionMemLimit(jobInfo.umMemLimit) + sessionMemLimit(jobInfo.umMemLimit), + rgdLock(false) { /* Need to figure out how much memory these use... Overhead storing 16 byte elements is about 32 bytes. That @@ -116,11 +116,13 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) : else allowDJS = false; + numCores = sysconf(_SC_NPROCESSORS_ONLN); + if (numCores <= 0) + numCores = 8; /* Debugging, rand() is used to simulate failures time_t t = time(NULL); srand(t); */ - } TupleHashJoinStep::~TupleHashJoinStep() @@ -130,8 +132,8 @@ TupleHashJoinStep::~TupleHashJoinStep() if (ownsOutputDL) delete outputDL; - if (totalUMMemoryUsage != 0) - resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit); + for (uint i = 0 ; i < smallDLs.size(); i++) + resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit); //cout << "deallocated THJS, UM memory available: " << resourceManager.availableMemory() << endl; } @@ -200,140 +202,237 @@ void TupleHashJoinStep::join() } } -/* Index is which small input to read. */ -void TupleHashJoinStep::smallRunnerFcn(uint32_t index) +// simple sol'n. Poll mem usage of Joiner once per second. Request mem +// increase after the fact. Failure to get mem will be detected and handled by +// the threads inserting into Joiner. +void TupleHashJoinStep::trackMem(uint index) { - uint64_t i; - bool more, flippedUMSwitch = false, gotMem; - RGData oneRG; - //shared_array oneRG; - Row r; + boost::shared_ptr joiner = joiners[index]; + ssize_t memBefore = 0, memAfter = 0; + bool gotMem; + + boost::unique_lock scoped(memTrackMutex); + while (!stopMemTracking) + { + memAfter = joiner->getMemUsage(); + if (memAfter != memBefore) + { + gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, false); + atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore); + memBefore = memAfter; + if (!gotMem) + return; + } + memTrackDone.timed_wait(scoped, boost::posix_time::seconds(1)); + } + + // one more iteration to capture mem usage since last poll, for this one + // raise an error if mem went over the limit + memAfter = joiner->getMemUsage(); + if (memAfter == memBefore) + return; + gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, false); + atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore); + if (!gotMem) + { + if (!joinIsTooBig && (isDML || !allowDJS || (fSessionId & 0x80000000) || + (tableOid() < 3000 && tableOid() >= 1000))) + { + joinIsTooBig = true; + fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG); + errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG)); + status(logging::ERR_JOIN_TOO_BIG); + cout << "Join is too big, raise the UM join limit for now (monitor thread)" << endl; + abort(); + } + } +} + +void TupleHashJoinStep::startSmallRunners(uint index) +{ + utils::setThreadName("HJSStartSmall"); + string extendedInfo; JoinType jt; - RowGroupDL* smallDL; - uint32_t smallIt; - RowGroup smallRG; boost::shared_ptr joiner; - string extendedInfo; - extendedInfo += toString(); // is each small side supposed to have the whole THJS info? - - smallDL = smallDLs[index]; - smallIt = smallIts[index]; - smallRG = smallRGs[index]; jt = joinTypes[index]; + extendedInfo += toString(); - //cout << " smallRunner " << index << " sees jointype " << jt << " joinTypes has " << joinTypes.size() - // << " elements" << endl; if (typelessJoin[index]) { - joiner.reset(new TupleJoiner(smallRG, largeRG, smallSideKeys[index], - largeSideKeys[index], jt)); + joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index], + largeSideKeys[index], jt, &jobstepThreadPool)); } else { - joiner.reset(new TupleJoiner(smallRG, largeRG, smallSideKeys[index][0], - largeSideKeys[index][0], jt)); + joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0], + largeSideKeys[index][0], jt, &jobstepThreadPool)); } joiner->setUniqueLimit(uniqueLimit); joiner->setTableName(smallTableNames[index]); joiners[index] = joiner; + /* check for join types unsupported on the PM. */ + if (!largeBPS || !isExeMgr) + joiner->setInUM(rgData[index]); + /* - read the small side into a TupleJoiner - send the TupleJoiner to the large side TBPS - start the large TBPS - read the large side, write to the output + start the small runners + join them + check status + handle abort, out of memory, etc */ - smallRG.initRow(&r); -// cout << "reading smallDL" << endl; - more = smallDL->next(smallIt, &oneRG); + boost::posix_time::ptime end_time, start_time = + boost::posix_time::microsec_clock::universal_time(); + + stopMemTracking = false; + uint64_t jobs[numCores]; + uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); }); + // starting 1 thread when in PM mode, since it's only inserting into a + // vector of rows. The rest will be started when converted to UM mode. + if (joiner->inUM()) + for (int i = 0; i < numCores; i++) + jobs[i] = jobstepThreadPool.invoke([this, i, index, &jobs] { this->smallRunnerFcn(index, i, jobs); }); + else + jobs[0] = jobstepThreadPool.invoke([this, index, &jobs] { this->smallRunnerFcn(index, 0, jobs); }); + + // wait for the first thread to join, then decide whether the others exist and need joining + jobstepThreadPool.join(jobs[0]); + if (joiner->inUM()) + for (int i = 1; i < numCores; i++) + jobstepThreadPool.join(jobs[i]); + + // stop the monitor thread + memTrackMutex.lock(); + stopMemTracking = true; + memTrackDone.notify_one(); + memTrackMutex.unlock(); + jobstepThreadPool.join(memMonitor); + + /* If there was an error or an abort, drain the input DL, + do endOfInput on the output */ + if (cancelled()) + { +// cout << "HJ stopping... status is " << status() << endl; + if (largeBPS) + largeBPS->abort(); + + bool more = true; + RGData oneRG; + while (more) + more = smallDLs[index]->next(smallIts[index], &oneRG); + } + //joiner->doneInserting(); + + end_time = boost::posix_time::microsec_clock::universal_time(); + if (!(fSessionId & 0x80000000)) + cout << "hash table construction time = " << end_time - start_time << + " size = " << joiner->size() << endl; + + extendedInfo += "\n"; + ostringstream oss; + if (joiner->inPM()) + { + oss << "PM join (" << index << ")" << endl; + #ifdef JLF_DEBUG + cout << oss.str(); + #endif + extendedInfo += oss.str(); + } + else if (joiner->inUM() && !joiner->onDisk()) + { + oss << "UM join (" << index << ")" << endl; + #ifdef JLF_DEBUG + cout << oss.str(); + #endif + extendedInfo += oss.str(); + } + + /* Trying to get the extended info to match the original version + It's kind of kludgey at the moment, need to clean it up at some point */ + if (!joiner->onDisk()) + { + joiner->doneInserting(); + boost::mutex::scoped_lock lk(*fStatsMutexPtr); + fExtendedInfo += extendedInfo; + formatMiniStats(index); + } +} + +/* Index is which small input to read. */ +void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t *jobs) +{ + utils::setThreadName("HJSmallRunner"); + bool more = true; + RGData oneRG; + Row r; + RowGroupDL* smallDL; + uint32_t smallIt; + RowGroup smallRG; + boost::shared_ptr joiner = joiners[index]; + + smallDL = smallDLs[index]; + smallIt = smallIts[index]; + smallRG = smallRGs[index]; + + smallRG.initRow(&r); try { - /* check for join types unsupported on the PM. */ - if (!largeBPS || !isExeMgr) - { - flippedUMSwitch = true; - oss << "UM join (" << index << ")"; -#ifdef JLF_DEBUG - cout << oss.str() << endl; -#endif - extendedInfo += oss.str(); - joiner->setInUM(); - } - - resourceManager->getMemory(joiner->getMemUsage(), sessionMemLimit, false); - (void)atomicops::atomicAdd(&totalUMMemoryUsage, joiner->getMemUsage()); - memUsedByEachJoin[index] += joiner->getMemUsage(); - + ssize_t rgSize; + bool gotMem; + goto next; while (more && !cancelled()) { - uint64_t memUseBefore, memUseAfter; - smallRG.setData(&oneRG); - if (smallRG.getRowCount() == 0) goto next; - smallRG.getRow(0, &r); - - memUseBefore = joiner->getMemUsage() + rgDataSize; - // TupleHJ owns the row memory + utils::getSpinlock(rgdLock); rgData[index].push_back(oneRG); - rgDataSize += smallRG.getSizeWithStrings(); + utils::releaseSpinlock(rgdLock); - for (i = 0; i < smallRG.getRowCount(); i++, r.nextRow()) + rgSize = smallRG.getSizeWithStrings(); + atomicops::atomicAdd(&memUsedByEachJoin[index], rgSize); + gotMem = resourceManager->getMemory(rgSize, sessionMemLimit, false); + if (!gotMem) { - //cout << "inserting " << r.toString() << endl; - joiner->insert(r); - } - - memUseAfter = joiner->getMemUsage() + rgDataSize; - - if (UNLIKELY(!flippedUMSwitch && (memUseAfter >= pmMemLimit))) - { - flippedUMSwitch = true; - oss << "UM join (" << index << ") "; -#ifdef JLF_DEBUG - cout << oss.str() << endl; -#endif - extendedInfo += oss.str(); - joiner->setInUM(); - memUseAfter = joiner->getMemUsage() + rgDataSize; - } - - gotMem = resourceManager->getMemory(memUseAfter - memUseBefore, sessionMemLimit, false); - atomicops::atomicAdd(&totalUMMemoryUsage, memUseAfter - memUseBefore); - memUsedByEachJoin[index] += memUseAfter - memUseBefore; - - /* This is kind of kludgy and overlaps with segreateJoiners() atm. - If this join won't be converted to disk-based, this fcn needs to abort the same as - it did before. If it will be converted, it should just return. */ - if (UNLIKELY(!gotMem)) - { - if (isDML || !allowDJS || (fSessionId & 0x80000000) || - (tableOid() < 3000 && tableOid() >= 1000)) + boost::unique_lock sl(saneErrMsg); + if (!joinIsTooBig && (isDML || !allowDJS || (fSessionId & 0x80000000) || + (tableOid() < 3000 && tableOid() >= 1000))) { joinIsTooBig = true; fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG); errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG)); status(logging::ERR_JOIN_TOO_BIG); - cout << "Join is too big, raise the UM join limit for now" << endl; + cout << "Join is too big, raise the UM join limit for now (small runner)" << endl; abort(); break; } else + { + joiner->setConvertToDiskJoin(); return; + } } + joiner->insertRGData(smallRG, threadID); + + if (!joiner->inUM() && (memUsedByEachJoin[index] > pmMemLimit)) + { + joiner->setInUM(rgData[index]); + for (int i = 1; i < numCores; i++) + jobs[i] = jobstepThreadPool.invoke([this, i, index, jobs] + { this->smallRunnerFcn(index, i, jobs); }); + } next: -// cout << "inserted one rg into the joiner, rowcount = " << -// smallRG.getRowCount() << endl; + dlMutex.lock(); more = smallDL->next(smallIt, &oneRG); + dlMutex.unlock(); } } catch (boost::exception& e) @@ -356,34 +455,8 @@ next: status(logging::ERR_EXEMGR_MALFUNCTION); } - if (!flippedUMSwitch && !cancelled()) - { - oss << "PM join (" << index << ")"; -#ifdef JLF_DEBUG - cout << oss.str() << endl; -#endif - extendedInfo += oss.str(); + if (!joiner->inUM()) joiner->setInPM(); - } - - /* If there was an error or an abort drain the input DL, - do endOfInput on the output */ - if (cancelled()) - { -// cout << "HJ stopping... status is " << status() << endl; - if (largeBPS) - largeBPS->abort(); - - while (more) - more = smallDL->next(smallIt, &oneRG); - } - - joiner->doneInserting(); - extendedInfo += "\n"; - - boost::mutex::scoped_lock lk(*fStatsMutexPtr); - fExtendedInfo += extendedInfo; - formatMiniStats(index); } void TupleHashJoinStep::forwardCPData() @@ -517,24 +590,6 @@ void TupleHashJoinStep::djsReaderFcn(int index) processFE2(l_outputRG, l_fe2RG, fe2InRow, fe2OutRow, &v_rgData, &l_fe); processDupList(0, (fe2 ? l_fe2RG : l_outputRG), &v_rgData); -#if 0 - - if (!v_rgData.empty() && fSessionId < 0x80000000) - { - if (fe2) - { - l_fe2RG.setData(&v_rgData[0]); - cout << "fully processed rowgroup: " << l_fe2RG.toString() << endl; - } - else - { - l_outputRG.setData(&v_rgData[0]); - cout << "fully processed rowgroup: " << l_outputRG.toString() << endl; - } - } - - cout << "ownsOutputDL = " << (int) ownsOutputDL << " fDelivery = " << (int) fDelivery << endl; -#endif sendResult(v_rgData); } @@ -553,6 +608,7 @@ void TupleHashJoinStep::djsReaderFcn(int index) void TupleHashJoinStep::hjRunner() { uint32_t i; + std::vector smallRunners; // thread handles from thread pool if (cancelled()) { @@ -581,7 +637,7 @@ void TupleHashJoinStep::hjRunner() /* Start the small-side runners */ rgData.reset(new vector[smallDLs.size()]); - memUsedByEachJoin.reset(new uint64_t[smallDLs.size()]); + memUsedByEachJoin.reset(new ssize_t[smallDLs.size()]); for (i = 0; i < smallDLs.size(); i++) memUsedByEachJoin[i] = 0; @@ -680,7 +736,7 @@ void TupleHashJoinStep::hjRunner() { vector empty; resourceManager->returnMemory(memUsedByEachJoin[djsJoinerMap[i]], sessionMemLimit); - atomicops::atomicSub(&totalUMMemoryUsage, memUsedByEachJoin[djsJoinerMap[i]]); + memUsedByEachJoin[djsJoinerMap[i]] = 0; djs[i].loadExistingData(rgData[djsJoinerMap[i]]); rgData[djsJoinerMap[i]].swap(empty); } @@ -792,8 +848,11 @@ void TupleHashJoinStep::hjRunner() joiners.clear(); tbpsJoiners.clear(); rgData.reset(); - resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit); - totalUMMemoryUsage = 0; + for (uint i = 0; i < smallDLs.size(); i++) + { + resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit); + memUsedByEachJoin[i] = 0; + } } } } @@ -840,7 +899,7 @@ void TupleHashJoinStep::hjRunner() #ifdef JLF_DEBUG cout << "moving join " << i << " to UM (PM join can't follow a UM join)\n"; #endif - tbpsJoiners[i]->setInUM(); + tbpsJoiners[i]->setInUM(rgData[i]); } } @@ -880,12 +939,10 @@ void TupleHashJoinStep::hjRunner() } #ifdef JLF_DEBUG - if (runFE2onPM) cout << "PM runs FE2\n"; else cout << "UM runs FE2\n"; - #endif largeBPS->setFcnExpGroup2(fe2, fe2Output, runFE2onPM); } @@ -971,8 +1028,11 @@ uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream& bs) joiners.clear(); rgData.reset(); - resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit); - totalUMMemoryUsage = 0; + for (uint i = 0; i < smallDLs.size(); i++) + { + resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit); + memUsedByEachJoin[i] = 0; + } return 0; } @@ -992,8 +1052,11 @@ uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream& bs) cout << " -- returning error status " << deliveredRG->getStatus() << endl; deliveredRG->serializeRGData(bs); - resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit); - totalUMMemoryUsage = 0; + for (uint i = 0; i < smallDLs.size(); i++) + { + resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit); + memUsedByEachJoin[i] = 0; + } return 0; } @@ -1850,7 +1913,8 @@ void TupleHashJoinStep::segregateJoiners() return; } - /* Debugging code, this makes all eligible joins disk-based. + #if 0 + // Debugging code, this makes all eligible joins disk-based. else { cout << "making all joins disk-based" << endl; for (i = 0; i < smallSideCount; i++) { @@ -1860,7 +1924,7 @@ void TupleHashJoinStep::segregateJoiners() } return; } - */ + #endif /* For now if there is no largeBPS all joins need to either be DJS or not, not mixed */ if (!largeBPS) diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index 27154946f..164f1f541 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -75,6 +75,8 @@ public: void tableOid1(execplan::CalpontSystemCatalog::OID tableOid1) { fTableOID1 = tableOid1; + if (fTableOID1 < 3000) + numCores = 1; // syscat query, no need for more than 1 thread } void tableOid2(execplan::CalpontSystemCatalog::OID tableOid2) { @@ -425,7 +427,6 @@ private: std::vector > smallSideKeys; ResourceManager* resourceManager; - volatile uint64_t totalUMMemoryUsage; struct JoinerSorter { @@ -436,16 +437,14 @@ private: } }; std::vector > joiners; - boost::scoped_array > rgData; TupleBPS* largeBPS; rowgroup::RowGroup largeRG, outputRG; std::vector smallRGs; - uint64_t pmMemLimit; - uint64_t rgDataSize; + ssize_t pmMemLimit; void hjRunner(); - void smallRunnerFcn(uint32_t index); + void smallRunnerFcn(uint32_t index, uint threadID, uint64_t *threads); struct HJRunner { @@ -462,15 +461,13 @@ private: SmallRunner(TupleHashJoinStep* hj, uint32_t i) : HJ(hj), index(i) { } void operator()() { - utils::setThreadName("HJSSmallSide"); - HJ->smallRunnerFcn(index); + HJ->startSmallRunners(index); } TupleHashJoinStep* HJ; uint32_t index; }; int64_t mainRunner; // thread handle from thread pool - std::vector smallRunners; // thread handles from thread pool // for notify TupleAggregateStep PM hashjoin // Ideally, hashjoin and delivery communicate with RowGroupDL, @@ -614,10 +611,19 @@ private: std::vector > tbpsJoiners; std::vector > djsJoiners; std::vector djsJoinerMap; - boost::scoped_array memUsedByEachJoin; + boost::scoped_array memUsedByEachJoin; boost::mutex djsLock; boost::shared_ptr sessionMemLimit; + /* Threaded UM join support */ + int numCores; + boost::mutex dlMutex, memTrackMutex, saneErrMsg; + boost::condition memTrackDone; + std::atomic rgdLock; + bool stopMemTracking; + void trackMem(uint index); + void startSmallRunners(uint index); + friend class DiskJoinStep; }; diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 4b43f71bc..288d2bada 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -1305,7 +1305,7 @@ int8_t setupCwd(joblist::ResourceManager* rm) if (rc < 0 || access(".", W_OK) != 0) rc = chdir("/tmp"); - + return (rc < 0) ? -5 : rc; } @@ -1359,7 +1359,7 @@ void cleanTempDir() return; if (tmpPrefix.empty()) - tmpPrefix = "/tmp/infinidb"; + tmpPrefix = "/tmp/cs-diskjoin"; tmpPrefix += "/"; @@ -1636,4 +1636,3 @@ int main(int argc, char* argv[]) return 0; } // vim:ts=4 sw=4: - diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml index 78ecc633e..ea4cf6857 100644 --- a/oam/etc/Columnstore.xml +++ b/oam/etc/Columnstore.xml @@ -495,7 +495,7 @@ + /tmp/cs-diskjoin --> Y diff --git a/oam/etc/Columnstore.xml.singleserver b/oam/etc/Columnstore.xml.singleserver index de91668f0..4c3286c40 100644 --- a/oam/etc/Columnstore.xml.singleserver +++ b/oam/etc/Columnstore.xml.singleserver @@ -241,6 +241,7 @@ 128M 10 /tmp/columnstore_tmp_files + $INSTALLDIR 10 3 10 @@ -486,7 +487,7 @@ - /var/lib/columnstore/tmp + /var/lib/columnstore/tmp/cs-diskjoin Y diff --git a/utils/common/fixedallocator.cpp b/utils/common/fixedallocator.cpp index d2f705f25..5bb8bc7c4 100644 --- a/utils/common/fixedallocator.cpp +++ b/utils/common/fixedallocator.cpp @@ -50,6 +50,9 @@ FixedAllocator::FixedAllocator(const FixedAllocator& f) tmpSpace = f.tmpSpace; capacityRemaining = 0; currentlyStored = 0; + useLock = f.useLock; + lock = false; + } FixedAllocator& FixedAllocator::operator=(const FixedAllocator& f) @@ -57,10 +60,22 @@ FixedAllocator& FixedAllocator::operator=(const FixedAllocator& f) elementCount = f.elementCount; elementSize = f.elementSize; tmpSpace = f.tmpSpace; + useLock = f.useLock; + lock = false; deallocateAll(); return *this; } +void FixedAllocator::setUseLock(bool useIt) +{ + useLock = useIt; +} + +void FixedAllocator::setAllocSize(uint allocSize) +{ + elementSize = allocSize; +} + void FixedAllocator::newBlock() { shared_array next; @@ -80,39 +95,15 @@ void FixedAllocator::newBlock() } } -void* FixedAllocator::allocate() -{ - void* ret; - - if (capacityRemaining < elementSize) - newBlock(); - - ret = nextAlloc; - nextAlloc += elementSize; - capacityRemaining -= elementSize; - currentlyStored += elementSize; - return ret; -} - -void* FixedAllocator::allocate(uint32_t len) -{ - void* ret; - - if (capacityRemaining < len) - newBlock(); - - ret = nextAlloc; - nextAlloc += len; - capacityRemaining -= len; - currentlyStored += len; - return ret; -} - void FixedAllocator::truncateBy(uint32_t amt) { + if (useLock) + getSpinlock(lock); nextAlloc -= amt; capacityRemaining += amt; currentlyStored -= amt; + if (useLock) + releaseSpinlock(lock); } void FixedAllocator::deallocateAll() diff --git a/utils/common/fixedallocator.h b/utils/common/fixedallocator.h index 31e7b4d75..9e716a133 100644 --- a/utils/common/fixedallocator.h +++ b/utils/common/fixedallocator.h @@ -38,6 +38,8 @@ #include #include #include +#include +#include "spinlock.h" #if defined(_MSC_VER) && defined(xxxFIXEDALLOCATOR_DLLEXPORT) #define EXPORT __declspec(dllexport) @@ -55,11 +57,13 @@ public: EXPORT FixedAllocator() : capacityRemaining(0), - elementCount(std::numeric_limits::max()), + elementCount(DEFAULT_NUM_ELEMENTS), elementSize(0), currentlyStored(0), tmpSpace(false), - nextAlloc(0) {} + nextAlloc(0), + useLock(false), + lock(false) {} EXPORT explicit FixedAllocator(unsigned long allocSize, bool isTmpSpace = false, unsigned long numElements = DEFAULT_NUM_ELEMENTS) : capacityRemaining(0), @@ -67,7 +71,9 @@ public: elementSize(allocSize), currentlyStored(0), tmpSpace(isTmpSpace), - nextAlloc(0) {} + nextAlloc(0), + useLock(false), + lock(false) {} EXPORT FixedAllocator(const FixedAllocator&); EXPORT FixedAllocator& operator=(const FixedAllocator&); virtual ~FixedAllocator() {} @@ -78,6 +84,8 @@ public: void deallocate() { } // does nothing EXPORT void deallocateAll(); // drops all memory in use EXPORT uint64_t getMemUsage() const; + void setUseLock(bool); + void setAllocSize(uint); private: void newBlock(); @@ -89,10 +97,46 @@ private: uint64_t currentlyStored; bool tmpSpace; uint8_t* nextAlloc; + bool useLock; + std::atomic lock; }; +inline void* FixedAllocator::allocate() +{ + void* ret; + + if (useLock) + getSpinlock(lock); + if (capacityRemaining < elementSize) + newBlock(); + + ret = nextAlloc; + nextAlloc += elementSize; + capacityRemaining -= elementSize; + currentlyStored += elementSize; + if (useLock) + releaseSpinlock(lock); + return ret; +} + +inline void* FixedAllocator::allocate(uint32_t len) +{ + void* ret; + + if (useLock) + getSpinlock(lock); + if (capacityRemaining < len) + newBlock(); + + ret = nextAlloc; + nextAlloc += len; + capacityRemaining -= len; + currentlyStored += len; + if (useLock) + releaseSpinlock(lock); + return ret; } #undef EXPORT - +} // namespace #endif diff --git a/utils/common/spinlock.h b/utils/common/spinlock.h new file mode 100644 index 000000000..1dc84391f --- /dev/null +++ b/utils/common/spinlock.h @@ -0,0 +1,27 @@ +#pragma once + +#include + +namespace utils +{ + +inline void getSpinlock(std::atomic &lock) +{ + bool _false = false; + while (!lock.compare_exchange_weak(_false, true, std::memory_order_acquire)) + _false = false; +} + +inline bool trySpinlock(std::atomic &lock) +{ + bool _false = false; + bool ret = lock.compare_exchange_weak(_false, true, std::memory_order_acquire); + return ret; +} + +inline void releaseSpinlock(std::atomic &lock) +{ + lock.store(false, std::memory_order_release); +} + +} diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index 88bab4bb0..34d3702e6 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -27,6 +27,7 @@ #endif #include "hasher.h" #include "lbidlist.h" +#include "spinlock.h" using namespace std; using namespace rowgroup; @@ -42,30 +43,49 @@ TupleJoiner::TupleJoiner( const rowgroup::RowGroup& largeInput, uint32_t smallJoinColumn, uint32_t largeJoinColumn, - JoinType jt) : + JoinType jt, + threadpool::ThreadPool *jsThreadPool) : smallRG(smallInput), largeRG(largeInput), joinAlg(INSERTING), joinType(jt), - threadCount(1), typelessJoin(false), bSignedUnsignedJoin(false), uniqueLimit(100), finished(false) + threadCount(1), typelessJoin(false), bSignedUnsignedJoin(false), uniqueLimit(100), finished(false), + jobstepThreadPool(jsThreadPool), _convertToDiskJoin(false) { + uint i; + + getBucketCount(); + m_bucketLocks.reset(new boost::mutex[bucketCount]); + if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE) { - STLPoolAllocator > alloc; - _pool = alloc.getPoolAllocator(); - - ld.reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc)); + ld.reset(new boost::scoped_ptr[bucketCount]); + _pool.reset(new boost::shared_ptr[bucketCount]); + for (i = 0; i < bucketCount; i++) + { + STLPoolAllocator > alloc; + _pool[i] = alloc.getPoolAllocator(); + ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc)); + } } else if (smallRG.usesStringTable()) { - STLPoolAllocator > alloc; - _pool = alloc.getPoolAllocator(); - - sth.reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)); + sth.reset(new boost::scoped_ptr[bucketCount]); + _pool.reset(new boost::shared_ptr[bucketCount]); + for (i = 0; i < bucketCount; i++) + { + STLPoolAllocator > alloc; + _pool[i] = alloc.getPoolAllocator(); + sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)); + } } else { - STLPoolAllocator > alloc; - _pool = alloc.getPoolAllocator(); - - h.reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc)); + h.reset(new boost::scoped_ptr[bucketCount]); + _pool.reset(new boost::shared_ptr[bucketCount]); + for (i = 0; i < bucketCount; i++) + { + STLPoolAllocator > alloc; + _pool[i] = alloc.getPoolAllocator(); + h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc)); + } } smallRG.initRow(&smallNullRow); @@ -106,16 +126,28 @@ TupleJoiner::TupleJoiner( const rowgroup::RowGroup& largeInput, const vector& smallJoinColumns, const vector& largeJoinColumns, - JoinType jt) : + JoinType jt, + threadpool::ThreadPool *jsThreadPool) : smallRG(smallInput), largeRG(largeInput), joinAlg(INSERTING), joinType(jt), threadCount(1), typelessJoin(true), smallKeyColumns(smallJoinColumns), largeKeyColumns(largeJoinColumns), - bSignedUnsignedJoin(false), uniqueLimit(100), finished(false) + bSignedUnsignedJoin(false), uniqueLimit(100), finished(false), + jobstepThreadPool(jsThreadPool), _convertToDiskJoin(false) { - STLPoolAllocator > alloc; - _pool = alloc.getPoolAllocator(); + uint i; + + getBucketCount(); + + _pool.reset(new boost::shared_ptr[bucketCount]); + ht.reset(new boost::scoped_ptr[bucketCount]); + for (i = 0; i < bucketCount; i++) + { + STLPoolAllocator > alloc; + _pool[i] = alloc.getPoolAllocator(); + ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); + } + m_bucketLocks.reset(new boost::mutex[bucketCount]); - ht.reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); smallRG.initRow(&smallNullRow); if (smallOuterJoin() || largeOuterJoin() || semiJoin() || antiJoin()) @@ -126,7 +158,7 @@ TupleJoiner::TupleJoiner( smallNullRow.initToNull(); } - for (uint32_t i = keyLength = 0; i < smallKeyColumns.size(); i++) + for (i = keyLength = 0; i < smallKeyColumns.size(); i++) { if (smallRG.getColTypes()[smallKeyColumns[i]] == CalpontSystemCatalog::CHAR || smallRG.getColTypes()[smallKeyColumns[i]] == CalpontSystemCatalog::VARCHAR @@ -155,12 +187,16 @@ TupleJoiner::TupleJoiner( } } - storedKeyAlloc = FixedAllocator(keyLength); + // note, 'numcores' is implied by tuplehashjoin on calls to insertRGData(). + // TODO: make it explicit to avoid future confusion. + storedKeyAlloc.reset(new FixedAllocator[numCores]); + for (i = 0; i < (uint) numCores; i++) + storedKeyAlloc[i].setAllocSize(keyLength); discreteValues.reset(new bool[smallKeyColumns.size()]); cpValues.reset(new vector[smallKeyColumns.size()]); - for (uint32_t i = 0; i < smallKeyColumns.size(); i++) + for (i = 0; i < smallKeyColumns.size(); i++) { discreteValues[i] = false; if (isUnsigned(smallRG.getColTypes()[smallKeyColumns[i]])) @@ -199,6 +235,166 @@ bool TupleJoiner::operator<(const TupleJoiner& tj) const return size() < tj.size(); } +void TupleJoiner::getBucketCount() +{ + // get the # of cores, round up to nearest power of 2 + // make the bucket mask + numCores = sysconf(_SC_NPROCESSORS_ONLN); + if (numCores <= 0) + numCores = 8; + bucketCount = (numCores == 1 ? 1 : (1 << (32 - __builtin_clz(numCores - 1)))); + bucketMask = bucketCount - 1; +} + +template +void TupleJoiner::bucketsToTables(buckets_t *buckets, hash_table_t *tables) +{ + uint i; + + bool done = false, wasProductive; + while (!done) + { + done = true; + wasProductive = false; + for (i = 0; i < bucketCount; i++) + { + if (buckets[i].empty()) + continue; + bool gotIt = m_bucketLocks[i].try_lock(); + if (!gotIt) + { + done = false; + continue; + } + for (auto &element : buckets[i]) + tables[i]->insert(element); + m_bucketLocks[i].unlock(); + wasProductive = true; + buckets[i].clear(); + } + if (!done && !wasProductive) + ::usleep(1000 * numCores); + } +} + +void TupleJoiner::um_insertTypeless(uint threadID, uint rowCount, Row &r) +{ + TypelessData td[rowCount]; + vector > v[bucketCount]; + uint i; + FixedAllocator *alloc = &storedKeyAlloc[threadID]; + + for (i = 0; i < rowCount; i++, r.nextRow()) + { + td[i] = makeTypelessKey(r, smallKeyColumns, keyLength, alloc, + largeRG, largeKeyColumns); + if (td[i].len == 0) + continue; + uint bucket = bucketPicker((char *) td[i].data, td[i].len, bpSeed) & bucketMask; + v[bucket].push_back(pair(td[i], r.getPointer())); + } + bucketsToTables(&v[0], ht.get()); +} + +void TupleJoiner::um_insertLongDouble(uint rowCount, Row &r) +{ + vector > v[bucketCount]; + uint i; + uint smallKeyColumn = smallKeyColumns[0]; + + for (i = 0; i < rowCount; i++, r.nextRow()) + { + long double smallKey = r.getLongDoubleField(smallKeyColumn); + uint bucket = bucketPicker((char *) &smallKey, 10, bpSeed) & bucketMask; // change if we decide to support windows again + if (UNLIKELY(smallKey == joblist::LONGDOUBLENULL)) + v[bucket].push_back(pair(joblist::LONGDOUBLENULL, r.getPointer())); + else + v[bucket].push_back(pair(smallKey, r.getPointer())); + } + bucketsToTables(&v[0], ld.get()); +} + +void TupleJoiner::um_insertInlineRows(uint rowCount, Row &r) +{ + uint i; + int64_t smallKey; + vector > v[bucketCount]; + uint smallKeyColumn = smallKeyColumns[0]; + + for (i = 0; i < rowCount; i++, r.nextRow()) + { + if (!r.isUnsigned(smallKeyColumn)) + smallKey = r.getIntField(smallKeyColumn); + else + smallKey = (int64_t) r.getUintField(smallKeyColumn); + uint bucket = bucketPicker((char *) &smallKey, sizeof(smallKey), bpSeed) & bucketMask; + if (UNLIKELY(smallKey == nullValueForJoinColumn)) + v[bucket].push_back(pair(getJoinNullValue(), r.getData())); + else + v[bucket].push_back(pair(smallKey, r.getData())); + } + bucketsToTables(&v[0], h.get()); +} + +void TupleJoiner::um_insertStringTable(uint rowCount, Row &r) +{ + int64_t smallKey; + uint i; + vector > v[bucketCount]; + uint smallKeyColumn = smallKeyColumns[0]; + + for (i = 0; i < rowCount; i++, r.nextRow()) + { + if (!r.isUnsigned(smallKeyColumn)) + smallKey = r.getIntField(smallKeyColumn); + else + smallKey = (int64_t) r.getUintField(smallKeyColumn); + uint bucket = bucketPicker((char *) &smallKey, sizeof(smallKey), bpSeed) & bucketMask; + if (UNLIKELY(smallKey == nullValueForJoinColumn)) + v[bucket].push_back(pair(getJoinNullValue(), r.getPointer())); + else + v[bucket].push_back(pair(smallKey, r.getPointer())); + } + bucketsToTables(&v[0], sth.get()); +} + +void TupleJoiner::insertRGData(RowGroup &rg, uint threadID) +{ + uint i, rowCount; + Row r; + + rg.initRow(&r); + rowCount = rg.getRowCount(); + + rg.getRow(0, &r); + m_cpValuesLock.lock(); + for (i = 0; i < rowCount; i++, r.nextRow()) + { + updateCPData(r); + r.zeroRid(); + } + m_cpValuesLock.unlock(); + rg.getRow(0, &r); + + if (joinAlg == UM) + { + if (typelessJoin) + um_insertTypeless(threadID, rowCount, r); + else if (r.getColType(smallKeyColumns[0]) == execplan::CalpontSystemCatalog::LONGDOUBLE) + um_insertLongDouble(rowCount, r); + else if (!smallRG.usesStringTable()) + um_insertInlineRows(rowCount, r); + else + um_insertStringTable(rowCount, r); + } + else + { + // while in PM-join mode, inserting is single-threaded + for (i = 0; i < rowCount; i++, r.nextRow()) + rows.push_back(r.getPointer()); + } +} + void TupleJoiner::insert(Row& r, bool zeroTheRid) { /* when doing a disk-based join, only the first iteration on the large side @@ -212,49 +408,54 @@ void TupleJoiner::insert(Row& r, bool zeroTheRid) { if (typelessJoin) { - TypelessData td = makeTypelessKey(r, smallKeyColumns, keyLength, &storedKeyAlloc, + TypelessData td = makeTypelessKey(r, smallKeyColumns, keyLength, &storedKeyAlloc[0], largeRG, largeKeyColumns); if (td.len > 0) { - ht->insert(pair(td, r.getPointer())); + uint bucket = bucketPicker((char *) td.data, td.len, bpSeed) & bucketMask; + ht[bucket]->insert(pair(td, r.getPointer())); } } else if (r.getColType(smallKeyColumns[0]) == execplan::CalpontSystemCatalog::LONGDOUBLE) { long double smallKey = r.getLongDoubleField(smallKeyColumns[0]); + uint bucket = bucketPicker((char *) &smallKey, 10, bpSeed) & bucketMask; // change if we decide to support windows again if (UNLIKELY(smallKey == joblist::LONGDOUBLENULL)) - ld->insert(pair(joblist::LONGDOUBLENULL, r.getPointer())); + ld[bucket]->insert(pair(joblist::LONGDOUBLENULL, r.getPointer())); else - ld->insert(pair(smallKey, r.getPointer())); + ld[bucket]->insert(pair(smallKey, r.getPointer())); } else if (!smallRG.usesStringTable()) { int64_t smallKey; - if (r.isUnsigned(smallKeyColumns[0])) - smallKey = (int64_t)(r.getUintField(smallKeyColumns[0])); - else + if (!r.isUnsigned(smallKeyColumns[0])) smallKey = r.getIntField(smallKeyColumns[0]); - - if (UNLIKELY(smallKey == nullValueForJoinColumn)) - h->insert(pair(getJoinNullValue(), r.getData())); else - h->insert(pair(smallKey, r.getData())); // Normal path for integers + smallKey = (int64_t) r.getUintField(smallKeyColumns[0]); + uint bucket = bucketPicker((char *) &smallKey, sizeof(smallKey), bpSeed) & bucketMask; + if (UNLIKELY(smallKey == nullValueForJoinColumn)) + h[bucket]->insert(pair(getJoinNullValue(), r.getData())); + else + h[bucket]->insert(pair(smallKey, r.getData())); // Normal path for integers } else { - int64_t smallKey = r.getIntField(smallKeyColumns[0]); + int64_t smallKey; - if (UNLIKELY(smallKey == nullValueForJoinColumn)) - sth->insert(pair(getJoinNullValue(), r.getPointer())); + if (!r.isUnsigned(smallKeyColumns[0])) + smallKey = r.getIntField(smallKeyColumns[0]); else - sth->insert(pair(smallKey, r.getPointer())); + smallKey = (int64_t) r.getUintField(smallKeyColumns[0]); + uint bucket = bucketPicker((char *) &smallKey, sizeof(smallKey), bpSeed) & bucketMask; + if (UNLIKELY(smallKey == nullValueForJoinColumn)) + sth[bucket]->insert(pair(getJoinNullValue(), r.getPointer())); + else + sth[bucket]->insert(pair(smallKey, r.getPointer())); } } else - { rows.push_back(r.getPointer()); - } } void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uint32_t threadID, @@ -262,7 +463,6 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin { uint32_t i; bool isNull = hasNullJoinColumn(largeSideRow); - matches->clear(); if (inPM()) @@ -286,19 +486,14 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin pair range; largeKey = makeTypelessKey(largeSideRow, largeKeyColumns, keyLength, &tmpKeyAlloc[threadID], smallRG, smallKeyColumns); - if (largeKey.len > 0) - { - it = ht->find(largeKey); - } - else - { - return; - } - - if (it == ht->end() && !(joinType & (LARGEOUTER | MATCHNULLS))) + if (largeKey.len == 0) return; - range = ht->equal_range(largeKey); + uint bucket = bucketPicker((char *) largeKey.data, largeKey.len, bpSeed) & bucketMask; + range = ht[bucket]->equal_range(largeKey); + + if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS))) + return; for (; range.first != range.second; ++range.first) matches->push_back(range.first->second); @@ -313,13 +508,11 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin Row r; largeKey = largeSideRow.getLongDoubleField(largeKeyColumns[0]); - it = ld->find(largeKey); + uint bucket = bucketPicker((char *) &largeKey, 10, bpSeed) & bucketMask; + range = ld[bucket]->equal_range(largeKey); - if (it == ld->end() && !(joinType & (LARGEOUTER | MATCHNULLS))) + if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS))) return; - - range = ld->equal_range(largeKey); - for (; range.first != range.second; ++range.first) { matches->push_back(range.first->second); @@ -328,9 +521,6 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin else if (!smallRG.usesStringTable()) { int64_t largeKey; - iterator it; - pair range; - Row r; if (largeSideRow.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE) { @@ -348,62 +538,39 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin if (ld) { // Compare against long double - ldIterator it; - pair range; long double ldKey = largeKey; -// ldKey = 6920; - it = ld->find(ldKey); + uint bucket = bucketPicker((char *) &ldKey, 10, bpSeed) & bucketMask; + auto range = ld[bucket]->equal_range(ldKey); - if (it == ld->end() && !(joinType & (LARGEOUTER | MATCHNULLS))) + if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS))) return; - range = ld->equal_range(ldKey); - for (; range.first != range.second; ++range.first) - { matches->push_back(range.first->second); - } } else { - it = h->find(largeKey); + uint bucket = bucketPicker((char *) &largeKey, sizeof(largeKey), bpSeed) & bucketMask; + auto range = h[bucket]->equal_range(largeKey); - if (it == h->end() && !(joinType & (LARGEOUTER | MATCHNULLS))) + if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS))) return; - range = h->equal_range(largeKey); - - //smallRG.initRow(&r); for (; range.first != range.second; ++range.first) - { - //r.setData(range.first->second); - //cerr << "matched small side row: " << r.toString() << endl; matches->push_back(range.first->second); - } } } else { - int64_t largeKey; - sthash_t::iterator it; - pair range; - Row r; + int64_t largeKey = largeSideRow.getIntField(largeKeyColumns[0]); + uint bucket = bucketPicker((char *) &largeKey, sizeof(largeKey), bpSeed) & bucketMask; + auto range = sth[bucket]->equal_range(largeKey); - largeKey = largeSideRow.getIntField(largeKeyColumns[0]); - it = sth->find(largeKey); - - if (it == sth->end() && !(joinType & (LARGEOUTER | MATCHNULLS))) + if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS))) return; - range = sth->equal_range(largeKey); - - //smallRG.initRow(&r); for (; range.first != range.second; ++range.first) - { - //r.setPointer(range.first->second); - //cerr << "matched small side row: " << r.toString() << endl; matches->push_back(range.first->second); - } } } @@ -417,21 +584,27 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin { if (smallRG.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE) { - pair range = ld->equal_range(joblist::LONGDOUBLENULL); + uint bucket = bucketPicker((char *) &(joblist::LONGDOUBLENULL), + sizeof(joblist::LONGDOUBLENULL), bpSeed) & bucketMask; + pair range = ld[bucket]->equal_range(joblist::LONGDOUBLENULL); for (; range.first != range.second; ++range.first) matches->push_back(range.first->second); } else if (!smallRG.usesStringTable()) { - pair range = h->equal_range(getJoinNullValue()); + auto nullVal = getJoinNullValue(); + uint bucket = bucketPicker((char *) &nullVal, sizeof(nullVal), bpSeed) & bucketMask; + pair range = h[bucket]->equal_range(nullVal); for (; range.first != range.second; ++range.first) matches->push_back(range.first->second); } else { - pair range = sth->equal_range(getJoinNullValue()); + auto nullVal = getJoinNullValue(); + uint bucket = bucketPicker((char *) &nullVal, sizeof(nullVal), bpSeed) & bucketMask; + pair range = sth[bucket]->equal_range(nullVal); for (; range.first != range.second; ++range.first) matches->push_back(range.first->second); @@ -448,30 +621,34 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin { ldIterator it; - for (it = ld->begin(); it != ld->end(); ++it) - matches->push_back(it->second); + for (uint i = 0; i < bucketCount; i++) + for (it = ld[i]->begin(); it != ld[i]->end(); ++it) + matches->push_back(it->second); } else if (!smallRG.usesStringTable()) { iterator it; - for (it = h->begin(); it != h->end(); ++it) - matches->push_back(it->second); + for (uint i = 0; i < bucketCount; i++) + for (it = h[i]->begin(); it != h[i]->end(); ++it) + matches->push_back(it->second); } else { sthash_t::iterator it; - for (it = sth->begin(); it != sth->end(); ++it) - matches->push_back(it->second); + for (uint i = 0; i < bucketCount; i++) + for (it = sth[i]->begin(); it != sth[i]->end(); ++it) + matches->push_back(it->second); } } else { thIterator it; - for (it = ht->begin(); it != ht->end(); ++it) - matches->push_back(it->second); + for (uint i = 0; i < bucketCount; i++) + for (it = ht[i]->begin(); it != ht[i]->end(); ++it) + matches->push_back(it->second); } } } @@ -516,16 +693,17 @@ void TupleJoiner::doneInserting() rowCount = size(); + uint bucket = 0; if (joinAlg == PM) pmpos = 0; else if (typelessJoin) - thit = ht->begin(); + thit = ht[bucket]->begin(); else if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE) - ldit = ld->begin(); + ldit = ld[bucket]->begin(); else if (!smallRG.usesStringTable()) - hit = h->begin(); + hit = h[bucket]->begin(); else - sthit = sth->begin(); + sthit = sth[bucket]->begin(); for (i = 0; i < rowCount; i++) { @@ -533,21 +711,29 @@ void TupleJoiner::doneInserting() smallRow.setPointer(rows[pmpos++]); else if (typelessJoin) { + while (thit == ht[bucket]->end()) + thit = ht[++bucket]->begin(); smallRow.setPointer(thit->second); ++thit; } else if (smallRG.getColType(smallKeyColumns[col]) == CalpontSystemCatalog::LONGDOUBLE) { + while (ldit == ld[bucket]->end()) + ldit = ld[++bucket]->begin(); smallRow.setPointer(ldit->second); ++ldit; } else if (!smallRG.usesStringTable()) { + while (hit == h[bucket]->end()) + hit = h[++bucket]->begin(); smallRow.setPointer(hit->second); ++hit; } else { + while (sthit == sth[bucket]->end()) + sthit = sth[++bucket]->begin(); smallRow.setPointer(sthit->second); ++sthit; } @@ -565,13 +751,12 @@ void TupleJoiner::doneInserting() uniquer.insert(*(int64_t*)&dval); } default: - { + { uniquer.insert((int64_t)dval); } } } - else - if (smallRow.isUnsigned(smallKeyColumns[col])) + else if (smallRow.isUnsigned(smallKeyColumns[col])) { uniquer.insert((int64_t)smallRow.getUintField(smallKeyColumns[col])); } @@ -599,6 +784,18 @@ void TupleJoiner::setInPM() joinAlg = PM; } +void TupleJoiner::umJoinConvert(size_t begin, size_t end) +{ + Row smallRow; + smallRG.initRow(&smallRow); + + while (begin < end) + { + smallRow.setPointer(rows[begin++]); + insert(smallRow); + } +} + void TupleJoiner::setInUM() { vector empty; @@ -610,16 +807,17 @@ void TupleJoiner::setInUM() joinAlg = UM; size = rows.size(); - smallRG.initRow(&smallRow); -#ifdef TJ_DEBUG - cout << "converting array to hash, size = " << size << "\n"; -#endif + size_t chunkSize = ((size / numCores) + 1 < 50000 ? 50000 : (size / numCores) + 1); // don't start a thread to process < 50k rows - for (i = 0; i < size; i++) - { - smallRow.setPointer(rows[i]); - insert(smallRow); - } + uint64_t jobs[numCores]; + i = 0; + for (size_t firstRow = 0; i < (uint) numCores && firstRow < size; i++, firstRow += chunkSize) + jobs[i] = jobstepThreadPool->invoke([this, firstRow, chunkSize, size] { + this->umJoinConvert(firstRow, (firstRow + chunkSize < size ? firstRow + chunkSize : size)); + } ); + + for (uint j = 0; j < i; j++) + jobstepThreadPool->join(jobs[j]); #ifdef TJ_DEBUG cout << "done\n"; @@ -635,6 +833,57 @@ void TupleJoiner::setInUM() } } +void TupleJoiner::umJoinConvert(uint threadID, vector &rgs, size_t begin, size_t end) +{ + RowGroup l_smallRG(smallRG); + + while (begin < end) + { + l_smallRG.setData(&(rgs[begin++])); + insertRGData(l_smallRG, threadID); + } +} + +void TupleJoiner::setInUM(vector &rgs) +{ + Row smallRow; + uint32_t i, size; + + if (joinAlg == UM) + return; + + { // don't need rows anymore, free the mem + vector empty; + rows.swap(empty); + } + + joinAlg = UM; + size = rgs.size(); + size_t chunkSize = ((size / numCores) + 1 < 10 ? 10 : (size / numCores) + 1); // don't issue jobs for < 10 rowgroups + + uint64_t jobs[numCores]; + i = 0; + for (size_t firstRow = 0; i < (uint) numCores && firstRow < size; i++, firstRow += chunkSize) + jobs[i] = jobstepThreadPool->invoke([this, firstRow, chunkSize, size, i, &rgs] { + this->umJoinConvert(i, rgs, firstRow, (firstRow + chunkSize < size ? firstRow + chunkSize : size)); + } ); + + for (uint j = 0; j < i; j++) + jobstepThreadPool->join(jobs[j]); + +#ifdef TJ_DEBUG + cout << "done\n"; +#endif + + if (typelessJoin) + { + tmpKeyAlloc.reset(new FixedAllocator[threadCount]); + + for (i = 0; i < threadCount; i++) + tmpKeyAlloc[i] = FixedAllocator(keyLength, true); + } +} + void TupleJoiner::setPMJoinResults(boost::shared_array > jr, uint32_t threadID) { @@ -727,49 +976,53 @@ void TupleJoiner::getUnmarkedRows(vector* out) { typelesshash_t::iterator it; - for (it = ht->begin(); it != ht->end(); ++it) - { - smallR.setPointer(it->second); + for (uint i = 0; i < bucketCount; i++) + for (it = ht[i]->begin(); it != ht[i]->end(); ++it) + { + smallR.setPointer(it->second); - if (!smallR.isMarked()) - out->push_back(it->second); - } + if (!smallR.isMarked()) + out->push_back(it->second); + } } else if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE) { ldIterator it; - for (it = ld->begin(); it != ld->end(); ++it) - { - smallR.setPointer(it->second); + for (uint i = 0; i < bucketCount; i++) + for (it = ld[i]->begin(); it != ld[i]->end(); ++it) + { + smallR.setPointer(it->second); - if (!smallR.isMarked()) - out->push_back(it->second); - } + if (!smallR.isMarked()) + out->push_back(it->second); + } } else if (!smallRG.usesStringTable()) { iterator it; - for (it = h->begin(); it != h->end(); ++it) - { - smallR.setPointer(it->second); + for (uint i = 0; i < bucketCount; i++) + for (it = h[i]->begin(); it != h[i]->end(); ++it) + { + smallR.setPointer(it->second); - if (!smallR.isMarked()) - out->push_back(it->second); - } + if (!smallR.isMarked()) + out->push_back(it->second); + } } else { sthash_t::iterator it; - for (it = sth->begin(); it != sth->end(); ++it) - { - smallR.setPointer(it->second); + for (uint i = 0; i < bucketCount; i++) + for (it = sth[i]->begin(); it != sth[i]->end(); ++it) + { + smallR.setPointer(it->second); - if (!smallR.isMarked()) - out->push_back(it->second); - } + if (!smallR.isMarked()) + out->push_back(it->second); + } } } } @@ -777,9 +1030,21 @@ void TupleJoiner::getUnmarkedRows(vector* out) uint64_t TupleJoiner::getMemUsage() const { if (inUM() && typelessJoin) - return _pool->getMemUsage() + storedKeyAlloc.getMemUsage(); + { + size_t ret = 0; + for (uint i = 0; i < bucketCount; i++) + ret += _pool[i]->getMemUsage(); + for (int i = 0; i < numCores; i++) + ret += storedKeyAlloc[i].getMemUsage(); + return ret; + } else if (inUM()) - return _pool->getMemUsage(); + { + size_t ret = 0; + for (uint i = 0; i < bucketCount; i++) + ret += _pool[i]->getMemUsage(); + return ret; + } else return (rows.size() * sizeof(Row::Pointer)); } @@ -841,7 +1106,7 @@ void TupleJoiner::updateCPData(const Row& r) uval = *(uint64_t*)&dval; } default: - { + { uval = (uint64_t)dval; } } @@ -873,7 +1138,7 @@ void TupleJoiner::updateCPData(const Row& r) val = *(int64_t*)&dval; } default: - { + { val = (int64_t)dval; } } @@ -896,14 +1161,17 @@ size_t TupleJoiner::size() const { if (joinAlg == UM || joinAlg == INSERTING) { - if (UNLIKELY(typelessJoin)) - return ht->size(); - else if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE) - return ld->size(); - else if (!smallRG.usesStringTable()) - return h->size(); - else - return sth->size(); + size_t ret = 0; + for (uint i = 0; i < bucketCount; i++) + if (UNLIKELY(typelessJoin)) + ret += ht[i]->size(); + else if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE) + ret += ld[i]->size(); + else if (!smallRG.usesStringTable()) + ret += h[i]->size(); + else + ret += sth[i]->size(); + return ret; } return rows.size(); @@ -1050,7 +1318,7 @@ TypelessData makeTypelessKey(const Row& r, const vector& keyCols, break; } default: - { + { if (off + 8 > keylen) goto toolong; if (r.isUnsigned(keyCols[i]) && keyld > MAX_UBIGINT) @@ -1178,7 +1446,7 @@ TypelessData makeTypelessKey(const Row& r, const vector& keyCols, Pool break; } default: - { + { if (r.isUnsigned(keyCols[i]) && keyld > MAX_UBIGINT) { ret.len = 0; @@ -1352,15 +1620,29 @@ void TupleJoiner::setTableName(const string& tname) void TupleJoiner::clearData() { - STLPoolAllocator > alloc; - _pool = alloc.getPoolAllocator(); - + _pool.reset(new boost::shared_ptr[bucketCount]); if (typelessJoin) - ht.reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); + ht.reset(new boost::scoped_ptr[bucketCount]); + else if (smallRG.getColTypes()[smallKeyColumns[0]] == CalpontSystemCatalog::LONGDOUBLE) + ld.reset(new boost::scoped_ptr[bucketCount]); else if (smallRG.usesStringTable()) - sth.reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)); + sth.reset(new boost::scoped_ptr[bucketCount]); else - h.reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc)); + h.reset(new boost::scoped_ptr[bucketCount]); + + for (uint i = 0; i < bucketCount; i++) + { + STLPoolAllocator > alloc; + _pool[i] = alloc.getPoolAllocator(); + if (typelessJoin) + ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); + else if (smallRG.getColTypes()[smallKeyColumns[0]] == CalpontSystemCatalog::LONGDOUBLE) + ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc)); + else if (smallRG.usesStringTable()) + sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)); + else + h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc)); + } std::vector empty; rows.swap(empty); @@ -1406,7 +1688,16 @@ boost::shared_ptr TupleJoiner::copyForDiskJoin() } if (typelessJoin) - ret->storedKeyAlloc = FixedAllocator(keyLength); + { + ret->storedKeyAlloc.reset(new FixedAllocator[numCores]); + for (int i = 0; i < numCores; i++) + ret->storedKeyAlloc[i].setAllocSize(keyLength); + } + + ret->numCores = numCores; + ret->bucketCount = bucketCount; + ret->bucketMask = bucketMask; + ret->jobstepThreadPool = jobstepThreadPool; ret->setThreadCount(1); ret->clearData(); @@ -1414,9 +1705,9 @@ boost::shared_ptr TupleJoiner::copyForDiskJoin() return ret; } - - - - +void TupleJoiner::setConvertToDiskJoin() +{ + _convertToDiskJoin = true; +} }; diff --git a/utils/joiner/tuplejoiner.h b/utils/joiner/tuplejoiner.h index 83d68dfca..b0d38364d 100644 --- a/utils/joiner/tuplejoiner.h +++ b/utils/joiner/tuplejoiner.h @@ -38,6 +38,7 @@ #include "../funcexp/funcexpwrapper.h" #include "stlpoolallocator.h" #include "hasher.h" +#include "threadpool.h" namespace joiner { @@ -147,7 +148,8 @@ public: const rowgroup::RowGroup& largeInput, uint32_t smallJoinColumn, uint32_t largeJoinColumn, - joblist::JoinType jt); + joblist::JoinType jt, + threadpool::ThreadPool *jsThreadPool); /* ctor to use for string & compound join */ TupleJoiner( @@ -155,12 +157,14 @@ public: const rowgroup::RowGroup& largeInput, const std::vector& smallJoinColumns, const std::vector& largeJoinColumns, - joblist::JoinType jt); + joblist::JoinType jt, + threadpool::ThreadPool *jsThreadPool); ~TupleJoiner(); size_t size() const; - void insert(rowgroup::Row& r, bool zeroTheRid = true); + void insert(rowgroup::Row& r, bool zeroTheRid = true); // not thread-safe + void insertRGData(rowgroup::RowGroup &rg, uint threadID); void doneInserting(); /* match() returns the small-side rows that match the large-side row. @@ -187,8 +191,20 @@ public: { return joinAlg == UM; } + inline bool onDisk() const + { + return _convertToDiskJoin; + } void setInPM(); + void setInUM(std::vector &rgs); + void umJoinConvert(uint threadID, std::vector &rgs, size_t begin, size_t end); + + // TODO: these are currently in use by edge cases, ex, converting to disk + // join. Would be nice to make those cases use the rgdata variants + // above. void setInUM(); + void umJoinConvert(size_t begin, size_t end); + void setThreadCount(uint32_t cnt); void setPMJoinResults(boost::shared_array >, uint32_t threadID); @@ -325,6 +341,7 @@ public: { return finished; } + void setConvertToDiskJoin(); private: typedef std::tr1::unordered_multimap, @@ -344,13 +361,13 @@ private: TupleJoiner(); TupleJoiner(const TupleJoiner&); TupleJoiner& operator=(const TupleJoiner&); + void getBucketCount(); rowgroup::RGData smallNullMemory; - - boost::scoped_ptr h; // used for UM joins on ints - boost::scoped_ptr sth; // used for UM join on ints where the backing table uses a string table - boost::scoped_ptr ld; // used for UM join on long double + boost::scoped_array > h; // used for UM joins on ints + boost::scoped_array > sth; // used for UM join on ints where the backing table uses a string table + boost::scoped_array > ld; // used for UM join on long double std::vector rows; // used for PM join /* This struct is rough. The BPP-JL stores the parsed results for @@ -372,16 +389,16 @@ private: }; JoinAlg joinAlg; joblist::JoinType joinType; - boost::shared_ptr _pool; // pool for the table and nodes + boost::shared_array > _pool; // pools for the table and nodes uint32_t threadCount; std::string tableName; /* vars, & fcns for typeless join */ bool typelessJoin; std::vector smallKeyColumns, largeKeyColumns; - boost::scoped_ptr ht; // used for UM join on strings + boost::scoped_array > ht; // used for UM join on strings uint32_t keyLength; - utils::FixedAllocator storedKeyAlloc; + boost::scoped_array storedKeyAlloc; boost::scoped_array tmpKeyAlloc; bool bSignedUnsignedJoin; // Set if we have a signed vs unsigned compare in a join. When not set, we can save checking for the signed bit. @@ -398,9 +415,27 @@ private: boost::scoped_array > cpValues; // if !discreteValues, [0] has min, [1] has max uint32_t uniqueLimit; bool finished; + + // multithreaded UM hash table construction + int numCores; + uint bucketCount; + uint bucketMask; + boost::scoped_array m_bucketLocks; + boost::mutex m_typelessLock, m_cpValuesLock; + utils::Hasher_r bucketPicker; + const uint32_t bpSeed = 0x4545e1d7; // an arbitrary random # + threadpool::ThreadPool *jobstepThreadPool; + void um_insertTypeless(uint threadID, uint rowcount, rowgroup::Row &r); + void um_insertLongDouble(uint rowcount, rowgroup::Row &r); + void um_insertInlineRows(uint rowcount, rowgroup::Row &r); + void um_insertStringTable(uint rowcount, rowgroup::Row &r); + + template + void bucketsToTables(buckets_t *, hash_table_t *); + + bool _convertToDiskJoin; }; } #endif - diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index e62de7769..da2b5f24f 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -29,6 +29,7 @@ using namespace std; using namespace logging; #include "threadpool.h" +#include "threadnaming.h" #include #include #include "boost/date_time/posix_time/posix_time_types.hpp" @@ -321,6 +322,7 @@ uint64_t ThreadPool::invoke(const Functor_T& threadfunc) void ThreadPool::beginThread() throw() { + utils::setThreadName("Idle"); try { boost::mutex::scoped_lock lock1(fMutex); @@ -386,6 +388,7 @@ void ThreadPool::beginThread() throw() lock1.unlock(); + utils::setThreadName("Unspecified"); try { todo->functor(); @@ -405,6 +408,7 @@ void ThreadPool::beginThread() throw() #endif } + utils::setThreadName("Idle"); lock1.lock(); --fIssued; --waitingFunctorsSize;