diff --git a/dbcon/joblist/diskjoinstep.cpp b/dbcon/joblist/diskjoinstep.cpp index c10f78376..87be21660 100644 --- a/dbcon/joblist/diskjoinstep.cpp +++ b/dbcon/joblist/diskjoinstep.cpp @@ -419,8 +419,9 @@ void DiskJoinStep::joinFcn() while (largeData) { l_largeRG.setData(largeData.get()); - thjs->joinOneRG(0, &joinResults, l_largeRG, l_outputRG, l_largeRow, l_joinFERow, l_outputRow, baseRow, - joinMatches, smallRowTemplates, &joiners, &colMappings, &fergMappings, &smallNullMem); + thjs->joinOneRG(0, joinResults, l_largeRG, l_outputRG, l_largeRow, l_joinFERow, l_outputRow, baseRow, + joinMatches, smallRowTemplates, outputDL.get(), &joiners, &colMappings, &fergMappings, + &smallNullMem); for (j = 0; j < (int)joinResults.size(); j++) { @@ -428,7 +429,7 @@ void DiskJoinStep::joinFcn() // cout << "got joined output " << l_outputRG.toString() << endl; outputDL->insert(joinResults[j]); } - + thjs->returnMemory(); joinResults.clear(); largeData = in->jp->getNextLargeRGData(); } @@ -477,6 +478,22 @@ void DiskJoinStep::joinFcn() { outputDL->insert(rgData); // cout << "inserting a full RG" << endl; + if (thjs) + { + if (!thjs->getMemory(l_outputRG.getMaxDataSize())) + { + // calculate guess of size required for error message + uint64_t memReqd = (unmatched.size() * outputRG.getDataSize(1)) / 1048576; + Message::Args args; + args.add(memReqd); + args.add(thjs->resourceManager->getConfiguredUMMemLimit() / 1048576); + std::cerr << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_RESULT_TOO_BIG, + args) + << " @" << __FILE__ << ":" << __LINE__; + throw logging::IDBExcept(logging::ERR_JOIN_RESULT_TOO_BIG, args); + } + } + rgData.reinit(l_outputRG); l_outputRG.setData(&rgData); l_outputRG.resetRowGroup(0); @@ -491,6 +508,10 @@ void DiskJoinStep::joinFcn() // cout << "inserting an rg with " << l_outputRG.getRowCount() << endl; outputDL->insert(rgData); } + if (thjs) + { + thjs->returnMemory(); + } } } } diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 5d0bcb694..026cbf516 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -297,7 +297,11 @@ void DistributedEngineComm::Setup() writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId) + ": " + ex.what(), LOG_TYPE_ERROR); - cerr << "Could not connect to PMS" << std::to_string(connectionId) << ": " << ex.what() << endl; + if (newPmCount == 0) + { + writeToLog(__FILE__, __LINE__, "No more PMs to try to connect to", LOG_TYPE_ERROR); + break; + } } catch (...) { @@ -306,6 +310,11 @@ void DistributedEngineComm::Setup() writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId), LOG_TYPE_ERROR); + if (newPmCount == 0) + { + writeToLog(__FILE__, __LINE__, "No more PMs to try to connect to", LOG_TYPE_ERROR); + break; + } } } diff --git a/dbcon/joblist/groupconcat.cpp b/dbcon/joblist/groupconcat.cpp index 7b47799f0..5eb0e1b10 100644 --- a/dbcon/joblist/groupconcat.cpp +++ b/dbcon/joblist/groupconcat.cpp @@ -763,13 +763,13 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row) fDataQueue.push(fData); uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize(); - fMemSize += newSize; if (!fRm->getMemory(newSize, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; throw IDBExcept(fErrorCode); } + fMemSize += newSize; fData.reinit(fRowGroup, fRowsPerRG); fRowGroup.setData(&fData); @@ -945,13 +945,13 @@ void GroupConcatNoOrder::initialize(const rowgroup::SP_GroupConcat& gcc) fConcatColumns.push_back((*(i++)).second); uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize(); - fMemSize += newSize; if (!fRm->getMemory(newSize, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; throw IDBExcept(fErrorCode); } + fMemSize += newSize; fData.reinit(fRowGroup, fRowsPerRG); fRowGroup.setData(&fData); @@ -978,13 +978,12 @@ void GroupConcatNoOrder::processRow(const rowgroup::Row& row) { uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize(); - fMemSize += newSize; - if (!fRm->getMemory(newSize, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; throw IDBExcept(fErrorCode); } + fMemSize += newSize; fDataQueue.push(fData); fData.reinit(fRowGroup, fRowsPerRG); diff --git a/dbcon/joblist/limitedorderby.cpp b/dbcon/joblist/limitedorderby.cpp index 09bf8d5de..3aee6fc02 100644 --- a/dbcon/joblist/limitedorderby.cpp +++ b/dbcon/joblist/limitedorderby.cpp @@ -123,12 +123,12 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row) fUncommitedMemory += memSizeInc; if (fUncommitedMemory >= fMaxUncommited) { - fMemSize += fUncommitedMemory; if (!fRm->getMemory(fUncommitedMemory, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; throw IDBExcept(fErrorCode); } + fMemSize += fUncommitedMemory; fUncommitedMemory = 0; } @@ -143,13 +143,13 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row) { fDataQueue.push(fData); uint64_t newSize = fRowGroup.getSizeWithStrings() - fRowGroup.getHeaderSize(); - fMemSize += newSize; if (!fRm->getMemory(newSize, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; throw IDBExcept(fErrorCode); } + fMemSize += newSize; fData.reinit(fRowGroup, fRowsPerRG); fRowGroup.setData(&fData); @@ -184,12 +184,12 @@ void LimitedOrderBy::finalize() { if (fUncommitedMemory > 0) { - fMemSize += fUncommitedMemory; if (!fRm->getMemory(fUncommitedMemory, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; throw IDBExcept(fErrorCode); } + fMemSize += fUncommitedMemory; fUncommitedMemory = 0; } @@ -202,13 +202,13 @@ void LimitedOrderBy::finalize() // *DRRTUY Very memory intensive. CS needs to account active // memory only and release memory if needed. uint64_t memSizeInc = fRowGroup.getSizeWithStrings() - fRowGroup.getHeaderSize(); - fMemSize += memSizeInc; if (!fRm->getMemory(memSizeInc, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; throw IDBExcept(fErrorCode); } + fMemSize += memSizeInc; uint64_t offset = 0; uint64_t i = 0; @@ -256,13 +256,13 @@ void LimitedOrderBy::finalize() if (offset == (uint64_t)-1) { tempRGDataList.push_front(fData); - fMemSize += memSizeInc; if (!fRm->getMemory(memSizeInc, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; throw IDBExcept(fErrorCode); } + fMemSize += memSizeInc; fData.reinit(fRowGroup, fRowsPerRG); fRowGroup.setData(&fData); diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index 4683abd69..97372ff9c 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -1349,6 +1349,14 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep { return true; } + ResourceManager* resourceManager() const + { + return fRm; + } + bool runFEonPM() const + { + return bRunFEonPM; + } protected: void sendError(uint16_t status); @@ -1438,12 +1446,6 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep void serializeJoiner(); void serializeJoiner(uint32_t connectionNumber); - void generateJoinResultSet(const std::vector>& joinerOutput, - rowgroup::Row& baseRow, const std::vector>& mappings, - const uint32_t depth, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData, - std::vector* outputData, - const boost::scoped_array& smallRows, rowgroup::Row& joinedRow); - std::vector> tjoiners; bool doJoin, hasPMJoin, hasUMJoin; std::vector joinerMatchesRGs; // parses the small-side matches from joiner @@ -1475,18 +1477,12 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep boost::shared_ptr fe1, fe2; rowgroup::RowGroup fe1Input, fe2Output; boost::shared_array fe2Mapping; - bool runFEonPM; + bool bRunFEonPM; /* for UM F & E 2 processing */ rowgroup::RGData fe2Data; rowgroup::Row fe2InRow, fe2OutRow; - void processFE2(rowgroup::RowGroup& input, rowgroup::RowGroup& output, rowgroup::Row& inRow, - rowgroup::Row& outRow, std::vector* rgData, - funcexp::FuncExpWrapper* localFE2); - void processFE2_oneRG(rowgroup::RowGroup& input, rowgroup::RowGroup& output, rowgroup::Row& inRow, - rowgroup::Row& outRow, funcexp::FuncExpWrapper* localFE2); - /* Runtime Casual Partitioning adjustments. The CP code is needlessly complicated; * to avoid making it worse, decided to designate 'scanFlags' as the static * component and this new array as the runtime component. The final CP decision @@ -1500,8 +1496,9 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep boost::shared_ptr deliveryDL; uint32_t deliveryIt; - struct JoinLocalData + class JoinLocalData { + public: JoinLocalData() = delete; JoinLocalData(const JoinLocalData&) = delete; JoinLocalData(JoinLocalData&&) = delete; @@ -1509,12 +1506,20 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep JoinLocalData& operator=(JoinLocalData&&) = delete; ~JoinLocalData() = default; - JoinLocalData(rowgroup::RowGroup& primRowGroup, rowgroup::RowGroup& outputRowGroup, + JoinLocalData(TupleBPS* pTupleBPS, rowgroup::RowGroup& primRowGroup, rowgroup::RowGroup& outputRowGroup, boost::shared_ptr& fe2, rowgroup::RowGroup& fe2Output, std::vector& joinerMatchesRGs, rowgroup::RowGroup& joinFERG, std::vector>& tjoiners, uint32_t smallSideCount, bool doJoin); + friend class TupleBPS; + + private: + uint64_t generateJoinResultSet(const uint32_t depth, std::vector& outputData, + RowGroupDL* dlp); + void processFE2(vector& rgData); + + TupleBPS* tbps; // Parent rowgroup::RowGroup local_primRG; rowgroup::RowGroup local_outputRG; @@ -1576,7 +1581,7 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep for (uint32_t i = 0; i < numThreads; ++i) { joinLocalDataPool.push_back(std::shared_ptr( - new JoinLocalData(primRowGroup, outputRowGroup, fe2, fe2Output, joinerMatchesRGs, joinFERG, + new JoinLocalData(this, primRowGroup, outputRowGroup, fe2, fe2Output, joinerMatchesRGs, joinFERG, tjoiners, smallSideCount, doJoin))); } diff --git a/dbcon/joblist/resourcemanager.cpp b/dbcon/joblist/resourcemanager.cpp index 92740a4c0..94d5193e0 100644 --- a/dbcon/joblist/resourcemanager.cpp +++ b/dbcon/joblist/resourcemanager.cpp @@ -261,6 +261,9 @@ ResourceManager::ResourceManager(bool runningInExeMgr) fAllowedDiskAggregation = getBoolVal(fRowAggregationStr, "AllowDiskBasedAggregation", defaultAllowDiskAggregation); + + fMaxBPPSendQueue = getUintVal(fPrimitiveServersStr, "MaxBPPSendQueue", defaultMaxBPPSendQueue); + if (!load_encryption_keys()) { Logger log; @@ -463,23 +466,51 @@ bool ResourceManager::userPriorityEnabled() const return "Y" == val; } -bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr sessionLimit, bool patience) +// Counts memory. This funtion doesn't actually malloc, just counts against two limits +// totalUmMemLimit for overall UM counting and (optional) sessionLimit for a single session. +// If both have space, return true. +bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr& sessionLimit, bool patience) { bool ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); - bool ret2 = (atomicops::atomicSub(sessionLimit.get(), amount) >= 0); + bool ret2 = sessionLimit ? (atomicops::atomicSub(sessionLimit.get(), amount) >= 0) : ret1; uint32_t retryCounter = 0, maxRetries = 20; // 10s delay while (patience && !(ret1 && ret2) && retryCounter++ < maxRetries) { atomicops::atomicAdd(&totalUmMemLimit, amount); - atomicops::atomicAdd(sessionLimit.get(), amount); + sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0; usleep(500000); ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); - ret2 = (atomicops::atomicSub(sessionLimit.get(), amount) >= 0); + ret2 = sessionLimit ? (atomicops::atomicSub(sessionLimit.get(), amount) >= 0) : ret1; + } + if (!(ret1 && ret2)) + { + // If we didn't get any memory, restore the counters. + atomicops::atomicAdd(&totalUmMemLimit, amount); + sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0; } - return (ret1 && ret2); } +// Don't care about session memory +bool ResourceManager::getMemory(int64_t amount, bool patience) +{ + bool ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); + + uint32_t retryCounter = 0, maxRetries = 20; // 10s delay + + while (patience && !ret1 && retryCounter++ < maxRetries) + { + atomicops::atomicAdd(&totalUmMemLimit, amount); + usleep(500000); + ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); + } + if (!ret1) + { + // If we didn't get any memory, restore the counters. + atomicops::atomicAdd(&totalUmMemLimit, amount); + } + return ret1; +} } // namespace joblist diff --git a/dbcon/joblist/resourcemanager.h b/dbcon/joblist/resourcemanager.h index 64ff17a55..ff712419b 100644 --- a/dbcon/joblist/resourcemanager.h +++ b/dbcon/joblist/resourcemanager.h @@ -82,6 +82,17 @@ const uint32_t defaultMaxOutstandingRequests = 20; const uint32_t defaultProcessorThreadsPerScan = 16; const uint32_t defaultJoinerChunkSize = 16 * 1024 * 1024; +// I estimate that the average non-cloud columnstore node has 64GB. I've seen from 16GB to 256GB. Cloud can be +// as low as 4GB However, ExeMgr has a targetRecvQueueSize hardcoded to 50,000,000, so some number greater +// than this makes sense. Seriously greater doesn't make sense, so I went with 5x. If there are a number of +// simultaneous queries that return giant result sets, then 0.25 GB each seems reasonable. This is only for +// the return queue. We still need room for all the processing, and if a single node system, for ExeMgr as +// well. On small systems, I recommend we use a smaller value. I believe a larger value will not improve +// anything since at this point, we're just filling a queue much faster than it can be emptied. Even if we +// make this default larger, giant results will still eventually block. Just with less memory available for +// other processing. +const uint64_t defaultMaxBPPSendQueue = 250000000; // ~250MB + // bucketreuse const std::string defaultTempDiskPath = "/tmp"; const std::string defaultWorkingDir = "."; //"/tmp"; @@ -382,6 +393,11 @@ class ResourceManager return getUintVal(fJobListStr, "DECThrottleThreshold", defaultDECThrottleThreshold); } + uint64_t getMaxBPPSendQueue() const + { + return fMaxBPPSendQueue; + } + EXPORT void emServerThreads(); EXPORT void emServerQueueSize(); EXPORT void emSecondsBetweenMemChecks(); @@ -399,11 +415,16 @@ class ResourceManager /* sessionLimit is a pointer to the var holding the session-scope limit, should be JobInfo.umMemLimit for the query. */ /* Temporary parameter 'patience', will wait for up to 10s to get the memory. */ - EXPORT bool getMemory(int64_t amount, boost::shared_ptr sessionLimit, bool patience = true); - inline void returnMemory(int64_t amount, boost::shared_ptr sessionLimit) + EXPORT bool getMemory(int64_t amount, boost::shared_ptr& sessionLimit, bool patience = true); + EXPORT bool getMemory(int64_t amount, bool patience = true); + inline void returnMemory(int64_t amount) { atomicops::atomicAdd(&totalUmMemLimit, amount); - atomicops::atomicAdd(sessionLimit.get(), amount); + } + inline void returnMemory(int64_t amount, boost::shared_ptr& sessionLimit) + { + atomicops::atomicAdd(&totalUmMemLimit, amount); + sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0; } inline int64_t availableMemory() const { @@ -607,7 +628,7 @@ class ResourceManager /* new HJ/Union/Aggregation support */ volatile int64_t totalUmMemLimit; // mem limit for join, union, and aggregation on the UM - uint64_t configuredUmMemLimit; + int64_t configuredUmMemLimit; uint64_t pmJoinMemLimit; // mem limit on individual PM joins /* multi-thread aggregate */ @@ -622,6 +643,7 @@ class ResourceManager bool fUseHdfs; bool fAllowedDiskAggregation{false}; uint64_t fDECConnectionsPerQuery; + uint64_t fMaxBPPSendQueue = 250000000; }; inline std::string ResourceManager::getStringVal(const std::string& section, const std::string& name, diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index e722f92af..f9a6d7361 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -155,14 +155,15 @@ struct TupleBPSAggregators } }; -TupleBPS::JoinLocalData::JoinLocalData(RowGroup& primRowGroup, RowGroup& outputRowGroup, +TupleBPS::JoinLocalData::JoinLocalData(TupleBPS* pTupleBPS, RowGroup& primRowGroup, RowGroup& outputRowGroup, boost::shared_ptr& fe2, rowgroup::RowGroup& fe2Output, std::vector& joinerMatchesRGs, rowgroup::RowGroup& joinFERG, std::vector>& tjoiners, uint32_t smallSideCount, bool doJoin) - : local_primRG(primRowGroup) + : tbps(pTupleBPS) + , local_primRG(primRowGroup) , local_outputRG(outputRowGroup) , fe2(fe2) , fe2Output(fe2Output) @@ -239,6 +240,130 @@ TupleBPS::JoinLocalData::JoinLocalData(RowGroup& primRowGroup, RowGroup& outputR } } +uint64_t TupleBPS::JoinLocalData::generateJoinResultSet(const uint32_t depth, + std::vector& outputData, + RowGroupDL* dlp) +{ + uint32_t i; + Row& smallRow = smallSideRows[depth]; + uint64_t memSizeForOutputRG = 0; + + if (depth < smallSideCount - 1) + { + for (i = 0; i < joinerOutput[depth].size() && !tbps->cancelled(); i++) + { + smallRow.setPointer(joinerOutput[depth][i]); + applyMapping(smallMappings[depth], smallRow, &joinedBaseRow); + memSizeForOutputRG += generateJoinResultSet(depth + 1, outputData, dlp); + } + } + else + { + local_outputRG.getRow(local_outputRG.getRowCount(), &postJoinRow); + + for (i = 0; i < joinerOutput[depth].size() && !tbps->cancelled(); + i++, postJoinRow.nextRow(), local_outputRG.incRowCount()) + { + smallRow.setPointer(joinerOutput[depth][i]); + + if (UNLIKELY(local_outputRG.getRowCount() == 8192)) + { + uint32_t dbRoot = local_outputRG.getDBRoot(); + uint64_t baseRid = local_outputRG.getBaseRid(); + outputData.push_back(joinedData); + // Don't let the join results buffer get out of control. + if (tbps->resourceManager()->getMemory(local_outputRG.getMaxDataSize(), false)) + { + memSizeForOutputRG += local_outputRG.getMaxDataSize(); + } + else + { + // Don't wait for memory, just send the data on to DL. + RowGroup out(local_outputRG); + if (fe2 && tbps->runFEonPM()) + { + processFE2(outputData); + tbps->rgDataVecToDl(outputData, local_fe2Output, dlp); + } + else + { + tbps->rgDataVecToDl(outputData, out, dlp); + } + tbps->resourceManager()->returnMemory(memSizeForOutputRG); + memSizeForOutputRG = 0; + } + joinedData.reinit(local_outputRG); + local_outputRG.setData(&joinedData); + local_outputRG.resetRowGroup(baseRid); + local_outputRG.setDBRoot(dbRoot); + local_outputRG.getRow(0, &postJoinRow); + } + + applyMapping(smallMappings[depth], smallRow, &joinedBaseRow); + copyRow(joinedBaseRow, &postJoinRow); + } + } + return memSizeForOutputRG; +} + +void TupleBPS::JoinLocalData::processFE2(vector& rgData) +{ + vector results; + RGData result; + uint32_t i, j; + bool ret; + + result = RGData(local_fe2Output); + local_fe2Output.setData(&result); + local_fe2Output.resetRowGroup(-1); + local_fe2Output.getRow(0, &local_fe2OutRow); + + for (i = 0; i < rgData.size(); i++) + { + local_outputRG.setData(&(rgData)[i]); + + if (local_fe2Output.getRowCount() == 0) + { + local_fe2Output.resetRowGroup(local_outputRG.getBaseRid()); + local_fe2Output.setDBRoot(local_outputRG.getDBRoot()); + } + + local_outputRG.getRow(0, &postJoinRow); + + for (j = 0; j < local_outputRG.getRowCount(); j++, postJoinRow.nextRow()) + { + ret = local_fe2.evaluate(&postJoinRow); + + if (ret) + { + applyMapping(tbps->fe2Mapping, postJoinRow, &local_fe2OutRow); + local_fe2OutRow.setRid(postJoinRow.getRelRid()); + local_fe2Output.incRowCount(); + local_fe2OutRow.nextRow(); + + if (local_fe2Output.getRowCount() == 8192 || + local_fe2Output.getDBRoot() != local_outputRG.getDBRoot() || + local_fe2Output.getBaseRid() != local_outputRG.getBaseRid()) + { + results.push_back(result); + result = RGData(local_fe2Output); + local_fe2Output.setData(&result); + local_fe2Output.resetRowGroup(local_outputRG.getBaseRid()); + local_fe2Output.setDBRoot(local_outputRG.getDBRoot()); + local_fe2Output.getRow(0, &local_fe2OutRow); + } + } + } + } + + if (local_fe2Output.getRowCount() > 0) + { + results.push_back(result); + } + + rgData.swap(results); +} + struct ByteStreamProcessor { ByteStreamProcessor(TupleBPS* tbps, vector>& bsv, @@ -1265,7 +1390,7 @@ void TupleBPS::run() if (fe1) fBPP->setFEGroup1(fe1, fe1Input); - if (fe2 && runFEonPM) + if (fe2 && bRunFEonPM) fBPP->setFEGroup2(fe2, fe2Output); if (fe2) @@ -1970,6 +2095,7 @@ void TupleBPS::processByteStreamVector(vectorlargeMapping, data->largeSideRow, &data->joinedBaseRow); data->joinedBaseRow.setRid(data->largeSideRow.getRelRid()); - generateJoinResultSet(data->joinerOutput, data->joinedBaseRow, data->smallMappings, 0, - data->local_outputRG, data->joinedData, &rgDatav, data->smallSideRows, - data->postJoinRow); - - // Bug 3510: Don't let the join results buffer get out of control. Need - // to refactor this. All post-join processing needs to go here AND below - // for now. - if (rgDatav.size() * data->local_outputRG.getMaxDataSize() > 50000000) - { - RowGroup out(data->local_outputRG); - - if (fe2 && !runFEonPM) - { - processFE2(out, data->local_fe2Output, data->postJoinRow, data->local_fe2OutRow, &rgDatav, - &data->local_fe2); - rgDataVecToDl(rgDatav, data->local_fe2Output, dlp); - } - else - rgDataVecToDl(rgDatav, out, dlp); - } + memAmount += data->generateJoinResultSet(0, rgDatav, dlp); } } // end of the for-loop in the join code @@ -2163,12 +2270,16 @@ void TupleBPS::processByteStreamVector(vectorreturnMemory(memAmount); + memAmount = 0; + } // Execute UM F & E group 2 on rgDatav - if (fe2 && !runFEonPM && rgDatav.size() > 0 && !cancelled()) + if (fe2 && !bRunFEonPM && rgDatav.size() > 0 && !cancelled()) { - processFE2(data->local_outputRG, data->local_fe2Output, data->postJoinRow, data->local_fe2OutRow, - &rgDatav, &data->local_fe2); + data->processFE2(rgDatav); rgDataVecToDl(rgDatav, data->local_fe2Output, dlp); } @@ -2192,7 +2303,7 @@ void TupleBPS::processByteStreamVector(vector 0) { - if (fe2 && runFEonPM) + if (fe2 && bRunFEonPM) rgDataVecToDl(rgDatav, data->local_fe2Output, dlp); else rgDataVecToDl(rgDatav, data->local_outputRG, dlp); @@ -2339,10 +2450,7 @@ void TupleBPS::receiveMultiPrimitiveMessages() start = end; } - // Join threads. - for (uint32_t i = 0, e = fProcessorThreads.size(); i < e; ++i) - jobstepThreadPool.join(fProcessorThreads[i]); - + jobstepThreadPool.join(fProcessorThreads); // Clear all. fProcessorThreads.clear(); bsv.clear(); @@ -2399,7 +2507,7 @@ void TupleBPS::receiveMultiPrimitiveMessages() abort_nolock(); } - // We have on thread here and do not need to notify any waiting producer threads, because we are done of + // We have one thread here and do not need to notify any waiting producer threads, because we are done with // consuming messages from queue. tplLock.unlock(); @@ -2447,8 +2555,7 @@ void TupleBPS::receiveMultiPrimitiveMessages() if (fe2) { rgDatav.push_back(data->joinedData); - processFE2(data->local_outputRG, data->local_fe2Output, data->postJoinRow, data->local_fe2OutRow, - &rgDatav, &data->local_fe2); + data->processFE2(rgDatav); if (rgDatav.size() > 0) rgDataToDl(rgDatav[0], data->local_fe2Output, dlp); @@ -2470,8 +2577,7 @@ void TupleBPS::receiveMultiPrimitiveMessages() if (fe2) { rgDatav.push_back(data->joinedData); - processFE2(data->local_outputRG, data->local_fe2Output, data->postJoinRow, data->local_fe2OutRow, - &rgDatav, &data->local_fe2); + data->processFE2(rgDatav); if (rgDatav.size() > 0) rgDataToDl(rgDatav[0], data->local_fe2Output, dlp); @@ -2811,51 +2917,6 @@ void TupleBPS::setJoinedResultRG(const rowgroup::RowGroup& rg) fe2Mapping = makeMapping(outputRowGroup, fe2Output); } -/* probably worthwhile to make some of these class vars */ -void TupleBPS::generateJoinResultSet(const vector>& joinerOutput, Row& baseRow, - const vector>& mappings, const uint32_t depth, - RowGroup& outputRG, RGData& rgData, vector* outputData, - const scoped_array& smallRows, Row& joinedRow) -{ - uint32_t i; - Row& smallRow = smallRows[depth]; - - if (depth < smallSideCount - 1) - { - for (i = 0; i < joinerOutput[depth].size(); i++) - { - smallRow.setPointer(joinerOutput[depth][i]); - applyMapping(mappings[depth], smallRow, &baseRow); - generateJoinResultSet(joinerOutput, baseRow, mappings, depth + 1, outputRG, rgData, outputData, - smallRows, joinedRow); - } - } - else - { - outputRG.getRow(outputRG.getRowCount(), &joinedRow); - - for (i = 0; i < joinerOutput[depth].size(); i++, joinedRow.nextRow(), outputRG.incRowCount()) - { - smallRow.setPointer(joinerOutput[depth][i]); - - if (UNLIKELY(outputRG.getRowCount() == 8192)) - { - uint32_t dbRoot = outputRG.getDBRoot(); - uint64_t baseRid = outputRG.getBaseRid(); - outputData->push_back(rgData); - rgData = RGData(outputRG); - outputRG.setData(&rgData); - outputRG.resetRowGroup(baseRid); - outputRG.setDBRoot(dbRoot); - outputRG.getRow(0, &joinedRow); - } - - applyMapping(mappings[depth], smallRow, &baseRow); - copyRow(baseRow, &joinedRow); - } - } -} - const rowgroup::RowGroup& TupleBPS::getOutputRowGroup() const { return outputRowGroup; @@ -2970,9 +3031,9 @@ void TupleBPS::setFcnExpGroup2(const boost::shared_ptr& fe2Output = rg; checkDupOutputColumns(rg); fe2Mapping = makeMapping(outputRowGroup, fe2Output); - runFEonPM = runFE2onPM; + bRunFEonPM = runFE2onPM; - if (runFEonPM) + if (bRunFEonPM) fBPP->setFEGroup2(fe2, fe2Output); } @@ -2985,7 +3046,7 @@ void TupleBPS::setFcnExpGroup3(const vector& fe) fe2->addReturnedColumn(fe[i]); // if this is called, there's no join, so it can always run on the PM - runFEonPM = true; + bRunFEonPM = true; fBPP->setFEGroup2(fe2, fe2Output); } @@ -2995,93 +3056,10 @@ void TupleBPS::setFE23Output(const rowgroup::RowGroup& feOutput) checkDupOutputColumns(feOutput); fe2Mapping = makeMapping(outputRowGroup, fe2Output); - if (fe2 && runFEonPM) + if (fe2 && bRunFEonPM) fBPP->setFEGroup2(fe2, fe2Output); } -void TupleBPS::processFE2_oneRG(RowGroup& input, RowGroup& output, Row& inRow, Row& outRow, - funcexp::FuncExpWrapper* local_fe) -{ - bool ret; - uint32_t i; - - output.resetRowGroup(input.getBaseRid()); - output.setDBRoot(input.getDBRoot()); - output.getRow(0, &outRow); - input.getRow(0, &inRow); - - for (i = 0; i < input.getRowCount(); i++, inRow.nextRow()) - { - ret = local_fe->evaluate(&inRow); - - if (ret) - { - applyMapping(fe2Mapping, inRow, &outRow); - outRow.setRid(inRow.getRelRid()); - output.incRowCount(); - outRow.nextRow(); - } - } -} - -void TupleBPS::processFE2(RowGroup& input, RowGroup& output, Row& inRow, Row& outRow, vector* rgData, - funcexp::FuncExpWrapper* local_fe) -{ - vector results; - RGData result; - uint32_t i, j; - bool ret; - - result = RGData(output); - output.setData(&result); - output.resetRowGroup(-1); - output.getRow(0, &outRow); - - for (i = 0; i < rgData->size(); i++) - { - input.setData(&(*rgData)[i]); - - if (output.getRowCount() == 0) - { - output.resetRowGroup(input.getBaseRid()); - output.setDBRoot(input.getDBRoot()); - } - - input.getRow(0, &inRow); - - for (j = 0; j < input.getRowCount(); j++, inRow.nextRow()) - { - ret = local_fe->evaluate(&inRow); - - if (ret) - { - applyMapping(fe2Mapping, inRow, &outRow); - outRow.setRid(inRow.getRelRid()); - output.incRowCount(); - outRow.nextRow(); - - if (output.getRowCount() == 8192 || output.getDBRoot() != input.getDBRoot() || - output.getBaseRid() != input.getBaseRid()) - { - results.push_back(result); - result = RGData(output); - output.setData(&result); - output.resetRowGroup(input.getBaseRid()); - output.setDBRoot(input.getDBRoot()); - output.getRow(0, &outRow); - } - } - } - } - - if (output.getRowCount() > 0) - { - results.push_back(result); - } - - rgData->swap(results); -} - const rowgroup::RowGroup& TupleBPS::getDeliveredRowGroup() const { if (fe2) diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 3b64e6ca1..dfb8cc4f3 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -5432,11 +5432,10 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID) if (more) { fRowGroupIns[threadID].setData(&rgData); - fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings(); bool diskAggAllowed = fRm->getAllowDiskAggregation(); - if (!fRm->getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit, - !diskAggAllowed)) + int64_t memSize = fRowGroupIns[threadID].getSizeWithStrings(); + if (!fRm->getMemory(memSize, fSessionMemLimit, !diskAggAllowed)) { if (!diskAggAllowed) { @@ -5456,6 +5455,7 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID) } break; } + fMemUsage[threadID] += memSize; rgDatas.push_back(rgData); } else diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index 930d1c82f..191cd9c12 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -39,7 +39,6 @@ using namespace std; #include "tuplehashjoin.h" #include "calpontsystemcatalog.h" #include "elementcompression.h" -#include "resourcemanager.h" #include "tupleaggregatestep.h" #include "errorids.h" #include "diskjoinstep.h" @@ -74,6 +73,7 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) , fTupleId2(-1) , fCorrelatedSide(0) , resourceManager(jobInfo.rm) + , fMemSizeForOutputRG(0) , runRan(false) , joinRan(false) , largeSideIndex(1) @@ -135,9 +135,12 @@ TupleHashJoinStep::~TupleHashJoinStep() if (memUsedByEachJoin) { for (uint i = 0; i < smallDLs.size(); i++) + { + if (memUsedByEachJoin[i]) resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit); } - + } + returnMemory(); // cout << "deallocated THJS, UM memory available: " << resourceManager.availableMemory() << endl; } @@ -221,11 +224,13 @@ void TupleHashJoinStep::trackMem(uint index) memAfter = joiner->getMemUsage(); if (memAfter != memBefore) { - gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, false); + gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, true); + if (gotMem) atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore); - memBefore = memAfter; - if (!gotMem) + else return; + + memBefore = memAfter; } memTrackDone.timed_wait(scoped, boost::posix_time::seconds(1)); } @@ -235,16 +240,22 @@ void TupleHashJoinStep::trackMem(uint index) memAfter = joiner->getMemUsage(); if (memAfter == memBefore) return; - gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, false); + gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, true); + if (gotMem) + { atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore); - if (!gotMem) + } + else { if (!joinIsTooBig && (isDML || !allowDJS || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000))) { joinIsTooBig = true; - fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG); - errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG)); + ostringstream oss; + oss << "(" << __LINE__ << ") " + << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); + fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); + errorMessage(oss.str()); status(logging::ERR_JOIN_TOO_BIG); cout << "Join is too big, raise the UM join limit for now (monitor thread)" << endl; abort(); @@ -387,7 +398,6 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* smallRG = smallRGs[index]; smallRG.initRow(&r); - try { ssize_t rgSize; @@ -405,9 +415,12 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* utils::releaseSpinlock(rgdLock); rgSize = smallRG.getSizeWithStrings(); + gotMem = resourceManager->getMemory(rgSize, sessionMemLimit, true); + if (gotMem) + { atomicops::atomicAdd(&memUsedByEachJoin[index], rgSize); - gotMem = resourceManager->getMemory(rgSize, sessionMemLimit, false); - if (!gotMem) + } + else { /* Mem went over the limit. If DML or a syscat query, abort. @@ -420,19 +433,21 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* if (!allowDJS || isDML || (fSessionId & 0x80000000) || (tableOid() < 3000 && tableOid() >= 1000)) { joinIsTooBig = true; - fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG); - errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG)); + ostringstream oss; + oss << "(" << __LINE__ << ") " + << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); + fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); + errorMessage(oss.str()); status(logging::ERR_JOIN_TOO_BIG); cout << "Join is too big, raise the UM join limit for now (small runner)" << endl; abort(); } else if (allowDJS) joiner->setConvertToDiskJoin(); + return; } - joiner->insertRGData(smallRG, threadID); - if (!joiner->inUM() && (memUsedByEachJoin[index] > pmMemLimit)) { joiner->setInUM(rgData[index]); @@ -452,7 +467,6 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* "TupleHashJoinStep::smallRunnerFcn()"); status(logging::ERR_EXEMGR_MALFUNCTION); } - if (!joiner->inUM()) joiner->setInPM(); } @@ -644,7 +658,7 @@ void TupleHashJoinStep::hjRunner() memUsedByEachJoin.reset(new ssize_t[smallDLs.size()]); for (i = 0; i < smallDLs.size(); i++) - memUsedByEachJoin[i] = 0; + atomicops::atomicZero(&memUsedByEachJoin[i]); try { @@ -742,7 +756,7 @@ void TupleHashJoinStep::hjRunner() { vector empty; resourceManager->returnMemory(memUsedByEachJoin[djsJoinerMap[i]], sessionMemLimit); - memUsedByEachJoin[djsJoinerMap[i]] = 0; + atomicops::atomicZero(&memUsedByEachJoin[i]); djs[i].loadExistingData(rgData[djsJoinerMap[i]]); rgData[djsJoinerMap[i]].swap(empty); } @@ -828,8 +842,11 @@ void TupleHashJoinStep::hjRunner() { if (joinIsTooBig && !status()) { - fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG); - errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG)); + ostringstream oss; + oss << "(" << __LINE__ << ") " + << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG); + fLogger->logMessage(logging::LOG_TYPE_INFO, oss.str()); + errorMessage(oss.str()); status(logging::ERR_JOIN_TOO_BIG); cout << "Join is too big, raise the UM join limit for now" << endl; @@ -842,7 +859,7 @@ void TupleHashJoinStep::hjRunner() for (uint i = 0; i < smallDLs.size(); i++) { resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit); - memUsedByEachJoin[i] = 0; + atomicops::atomicZero(&memUsedByEachJoin[i]); } } } @@ -1022,7 +1039,7 @@ uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream& bs) for (uint i = 0; i < smallDLs.size(); i++) { resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit); - memUsedByEachJoin[i] = 0; + atomicops::atomicZero(&memUsedByEachJoin[i]); } return 0; } @@ -1046,7 +1063,7 @@ uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream& bs) for (uint i = 0; i < smallDLs.size(); i++) { resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit); - memUsedByEachJoin[i] = 0; + atomicops::atomicZero(&memUsedByEachJoin[i]); } return 0; } @@ -1544,8 +1561,8 @@ void TupleHashJoinStep::joinRunnerFcn(uint32_t threadID) if (local_inputRG.getRowCount() == 0) continue; - joinOneRG(threadID, &joinedRowData, local_inputRG, local_outputRG, largeRow, joinFERow, joinedRow, - baseRow, joinMatches, smallRowTemplates); + joinOneRG(threadID, joinedRowData, local_inputRG, local_outputRG, largeRow, joinFERow, joinedRow, + baseRow, joinMatches, smallRowTemplates, outputDL); } if (fe2) @@ -1553,6 +1570,7 @@ void TupleHashJoinStep::joinRunnerFcn(uint32_t threadID) processDupList(threadID, (fe2 ? local_fe2RG : local_outputRG), &joinedRowData); sendResult(joinedRowData); + returnMemory(); joinedRowData.clear(); grabSomeWork(&inputData); } @@ -1684,9 +1702,9 @@ void TupleHashJoinStep::grabSomeWork(vector* work) /* This function is a port of the main join loop in TupleBPS::receiveMultiPrimitiveMessages(). Any * changes made here should also be made there and vice versa. */ void TupleHashJoinStep::joinOneRG( - uint32_t threadID, vector* out, RowGroup& inputRG, RowGroup& joinOutput, Row& largeSideRow, + uint32_t threadID, vector& out, RowGroup& inputRG, RowGroup& joinOutput, Row& largeSideRow, Row& joinFERow, Row& joinedRow, Row& baseRow, vector >& joinMatches, - shared_array& smallRowTemplates, + shared_array& smallRowTemplates, RowGroupDL* outputDL, // disk-join support vars. This param list is insane; refactor attempt would be nice at some point. vector >* tjoiners, boost::shared_array >* rgMappings, @@ -1812,19 +1830,19 @@ void TupleHashJoinStep::joinOneRG( applyMapping((*rgMappings)[smallSideCount], largeSideRow, &baseRow); baseRow.setRid(largeSideRow.getRelRid()); generateJoinResultSet(joinMatches, baseRow, *rgMappings, 0, joinOutput, joinedData, out, - smallRowTemplates, joinedRow); + smallRowTemplates, joinedRow, outputDL); } } if (joinOutput.getRowCount() > 0) - out->push_back(joinedData); + out.push_back(joinedData); } void TupleHashJoinStep::generateJoinResultSet(const vector >& joinerOutput, Row& baseRow, const shared_array >& mappings, const uint32_t depth, RowGroup& l_outputRG, RGData& rgData, - vector* outputData, const shared_array& smallRows, - Row& joinedRow) + vector& outputData, const shared_array& smallRows, + Row& joinedRow, RowGroupDL* dlp) { uint32_t i; Row& smallRow = smallRows[depth]; @@ -1836,10 +1854,8 @@ void TupleHashJoinStep::generateJoinResultSet(const vector { smallRow.setPointer(joinerOutput[depth][i]); applyMapping(mappings[depth], smallRow, &baseRow); - // cout << "depth " << depth << ", size " << joinerOutput[depth].size() << ", row " - // << i << ": " << smallRow.toString() << endl; generateJoinResultSet(joinerOutput, baseRow, mappings, depth + 1, l_outputRG, rgData, outputData, - smallRows, joinedRow); + smallRows, joinedRow, dlp); } } else @@ -1854,7 +1870,15 @@ void TupleHashJoinStep::generateJoinResultSet(const vector { uint32_t dbRoot = l_outputRG.getDBRoot(); uint64_t baseRid = l_outputRG.getBaseRid(); - outputData->push_back(rgData); + outputData.push_back(rgData); + // Count the memory + if (UNLIKELY(!getMemory(l_outputRG.getMaxDataSize()))) + { + // Don't let the join results buffer get out of control. + sendResult(outputData); + outputData.clear(); + returnMemory(); + } rgData.reinit(l_outputRG); l_outputRG.setData(&rgData); l_outputRG.resetRowGroup(baseRid); @@ -1862,12 +1886,8 @@ void TupleHashJoinStep::generateJoinResultSet(const vector l_outputRG.getRow(0, &joinedRow); } - // cout << "depth " << depth << ", size " << joinerOutput[depth].size() << ", row " - // << i << ": " << smallRow.toString() << endl; applyMapping(mappings[depth], smallRow, &baseRow); copyRow(baseRow, &joinedRow); - // memcpy(joinedRow.getData(), baseRow.getData(), joinedRow.getSize()); - // cout << "(step " << stepID << ") fully joined row is: " << joinedRow.toString() << endl; } } } diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index 9191de337..3bdffb3d9 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -30,11 +30,12 @@ #include #include #include +#include "resourcemanager.h" +#include "exceptclasses.h" namespace joblist { class BatchPrimitive; -class ResourceManager; class TupleBPS; struct FunctionJoinInfo; class DiskJoinStep; @@ -365,6 +366,21 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep } void abort(); + void returnMemory() + { + if (fMemSizeForOutputRG > 0) + { + resourceManager->returnMemory(fMemSizeForOutputRG); + fMemSizeForOutputRG = 0; + } + } + bool getMemory(uint64_t memSize) + { + bool gotMem = resourceManager->getMemory(memSize); + if (gotMem) + fMemSizeForOutputRG += memSize; + return gotMem; + } private: TupleHashJoinStep(); @@ -422,6 +438,7 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep std::vector > smallSideKeys; ResourceManager* resourceManager; + uint64_t fMemSizeForOutputRG; struct JoinerSorter { @@ -521,18 +538,19 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep rowgroup::Row& baseRow, const boost::shared_array >& mappings, const uint32_t depth, rowgroup::RowGroup& outputRG, rowgroup::RGData& rgData, - std::vector* outputData, - const boost::shared_array& smallRows, rowgroup::Row& joinedRow); + std::vector& outputData, + const boost::shared_array& smallRows, rowgroup::Row& joinedRow, + RowGroupDL* outputDL); void grabSomeWork(std::vector* work); void sendResult(const std::vector& res); void processFE2(rowgroup::RowGroup& input, rowgroup::RowGroup& output, rowgroup::Row& inRow, rowgroup::Row& outRow, std::vector* rgData, funcexp::FuncExpWrapper* local_fe); - void joinOneRG(uint32_t threadID, std::vector* out, rowgroup::RowGroup& inputRG, + void joinOneRG(uint32_t threadID, std::vector& out, rowgroup::RowGroup& inputRG, rowgroup::RowGroup& joinOutput, rowgroup::Row& largeSideRow, rowgroup::Row& joinFERow, rowgroup::Row& joinedRow, rowgroup::Row& baseRow, std::vector >& joinMatches, - boost::shared_array& smallRowTemplates, + boost::shared_array& smallRowTemplates, RowGroupDL* outputDL, std::vector >* joiners = NULL, boost::shared_array >* rgMappings = NULL, boost::shared_array >* feMappings = NULL, @@ -577,6 +595,7 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep } void operator()() { + utils::setThreadName("DJSReader"); HJ->djsReaderFcn(index); } TupleHashJoinStep* HJ; diff --git a/dbcon/joblist/tupleunion.cpp b/dbcon/joblist/tupleunion.cpp index ac3e9a9f0..291dd325a 100644 --- a/dbcon/joblist/tupleunion.cpp +++ b/dbcon/joblist/tupleunion.cpp @@ -244,10 +244,13 @@ void TupleUnion::readInput(uint32_t which) memUsageAfter = allocator.getMemUsage(); memDiff += (memUsageAfter - memUsageBefore); - memUsage += memDiff; } - if (!rm->getMemory(memDiff, sessionMemLimit)) + if (rm->getMemory(memDiff, sessionMemLimit)) + { + memUsage += memDiff; + } + else { fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_UNION_TOO_BIG); diff --git a/dbcon/joblist/windowfunctionstep.cpp b/dbcon/joblist/windowfunctionstep.cpp index e4c56ecd9..1ab8d3984 100644 --- a/dbcon/joblist/windowfunctionstep.cpp +++ b/dbcon/joblist/windowfunctionstep.cpp @@ -884,11 +884,12 @@ void WindowFunctionStep::execute() { fInRowGroupData.push_back(rgData); uint64_t memAdd = fRowGroupIn.getSizeWithStrings() + rowCnt * sizeof(RowPosition); - fMemUsage += memAdd; if (fRm->getMemory(memAdd, fSessionMemLimit) == false) throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG); + fMemUsage += memAdd; + for (uint64_t j = 0; j < rowCnt; ++j) { if (i > 0x0000FFFFFFFFFFFFULL || j > 0x000000000000FFFFULL) @@ -1012,11 +1013,12 @@ void WindowFunctionStep::doFunction() while (((i = nextFunctionIndex()) < fFunctionCount) && !cancelled()) { uint64_t memAdd = fRows.size() * sizeof(RowPosition); - fMemUsage += memAdd; if (fRm->getMemory(memAdd, fSessionMemLimit) == false) throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG); + fMemUsage += memAdd; + fFunctions[i]->setCallback(this, i); (*fFunctions[i].get())(); } diff --git a/dbcon/mysql/ha_mcs_dml.cpp b/dbcon/mysql/ha_mcs_dml.cpp index 05cbaad07..3c7bd5573 100644 --- a/dbcon/mysql/ha_mcs_dml.cpp +++ b/dbcon/mysql/ha_mcs_dml.cpp @@ -754,7 +754,7 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca } } - rc = fprintf(ci.filePtr, "\n"); //@bug 6077 check whether thhe pipe is still open + rc = fprintf(ci.filePtr, "\n"); //@bug 6077 check whether the pipe is still open if (rc < 0) rc = -1; diff --git a/exemgr/femsghandler.cpp b/exemgr/femsghandler.cpp index 0ce22519f..cdb0cd81d 100644 --- a/exemgr/femsghandler.cpp +++ b/exemgr/femsghandler.cpp @@ -19,6 +19,7 @@ #include "iosocket.h" #include "femsghandler.h" +#include "threadnaming.h" using namespace std; using namespace joblist; @@ -36,6 +37,7 @@ class Runner } void operator()() { + utils::setThreadName("FEMsgHandler"); target->threadFcn(); } FEMsgHandler* target; diff --git a/exemgr/main.cpp b/exemgr/main.cpp index b1b31b9c6..513cc075e 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -79,6 +79,7 @@ #include "mariadb_my_sys.h" #include "statistics.h" +#include "threadnaming.h" class Opt { @@ -617,8 +618,6 @@ class SessionThread if (jl->status() == 0) { - std::string emsg; - if (jl->putEngineComm(fEc) != 0) throw std::runtime_error(jl->errMsg()); } @@ -711,6 +710,7 @@ class SessionThread public: void operator()() { + utils::setThreadName("SessionThread"); messageqcpp::ByteStream bs, inbs; execplan::CalpontSelectExecutionPlan csep; csep.sessionID(0); diff --git a/primitives/blockcache/iomanager.cpp b/primitives/blockcache/iomanager.cpp index dbfa22a84..cd0952ede 100644 --- a/primitives/blockcache/iomanager.cpp +++ b/primitives/blockcache/iomanager.cpp @@ -104,6 +104,7 @@ using namespace compress; using namespace idbdatafile; #include "mcsconfig.h" +#include "threadnaming.h" typedef tr1::unordered_set USOID; @@ -384,6 +385,7 @@ static int updateptrs(char* ptr, FdCacheType_t::iterator fdit) void* thr_popper(ioManager* arg) { + utils::setThreadName("thr_popper"); ioManager* iom = arg; FileBufferMgr* fbm; int totalRqst = 0; diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index ca52daf22..afc5e6daa 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -1148,17 +1148,24 @@ void BatchPrimitiveProcessor::initProcessor() } /* This version does a join on projected rows */ -void BatchPrimitiveProcessor::executeTupleJoin() +// In order to prevent super size result sets in the case of near cartesian joins on three or more joins, +// the startRid start at 0) is used to begin the rid loop and if we cut off processing early because of +// the size of the result set, we return the next rid to start with. If we finish ridCount rids, return 0- +uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid) { uint32_t newRowCount = 0, i, j; vector matches; uint64_t largeKey; - + uint64_t resultCount = 0; + uint32_t newStartRid = startRid; outputRG.getRow(0, &oldRow); outputRG.getRow(0, &newRow); // cout << "before join, RG has " << outputRG.getRowCount() << " BPP ridcount= " << ridCount << endl; - for (i = 0; i < ridCount && !sendThread->aborted(); i++, oldRow.nextRow()) + // ridCount gets modified based on the number of Rids actually processed during this call. + // origRidCount is the number of rids for this thread after filter, which are the total + // number of rids to be processed from all calls to this function during this thread. + for (i = startRid; i < origRidCount && !sendThread->aborted(); i++, oldRow.nextRow()) { /* Decide whether this large-side row belongs in the output. The breaks * in the loop mean that it doesn't. @@ -1265,10 +1272,9 @@ void BatchPrimitiveProcessor::executeTupleJoin() if (j == joinerCount) { + uint32_t matchCount; for (j = 0; j < joinerCount; j++) { - uint32_t matchCount; - /* The result is already known if... * -- anti-join with no fcnexp * -- semi-join with no fcnexp and not scalar @@ -1356,6 +1362,8 @@ void BatchPrimitiveProcessor::executeTupleJoin() tSmallSideMatches[j][newRowCount].push_back(-1); matchCount = 1; } + + resultCount += matchCount; } /* Finally, copy the row into the output */ @@ -1379,8 +1387,18 @@ void BatchPrimitiveProcessor::executeTupleJoin() // else // cout << "j != joinerCount\n"; } + // If we've accumulated more than maxResultCount -- 1048576 (2^20)_ of resultCounts, cut off processing. + // The caller will restart to continue where we left off. + if (resultCount >= maxResultCount) + { + newStartRid += newRowCount; + break; + } } + if (resultCount < maxResultCount) + newStartRid = 0; + ridCount = newRowCount; outputRG.setRowCount(ridCount); @@ -1397,6 +1415,7 @@ void BatchPrimitiveProcessor::executeTupleJoin() } } */ + return newStartRid; } #ifdef PRIMPROC_STOPWATCH @@ -1405,6 +1424,9 @@ void BatchPrimitiveProcessor::execute(StopWatch* stopwatch) void BatchPrimitiveProcessor::execute() #endif { + uint8_t sendCount = 0; + // bool smoreRGs = false; + // uint32_t sStartRid = 0; uint32_t i, j; try @@ -1443,6 +1465,7 @@ void BatchPrimitiveProcessor::execute() // filters use relrids and values for intermediate results. if (bop == BOP_AND) + { for (j = 0; j < filterCount; ++j) { #ifdef PRIMPROC_STOPWATCH @@ -1453,6 +1476,7 @@ void BatchPrimitiveProcessor::execute() filterSteps[j]->execute(); #endif } + } else // BOP_OR { /* XXXPAT: This is a hacky impl of OR logic. Each filter is configured to @@ -1542,10 +1566,12 @@ void BatchPrimitiveProcessor::execute() // projection commands read relrids and write output directly to a rowgroup // or the serialized bytestream if (ot != ROW_GROUP) + { for (j = 0; j < projectCount; ++j) { projectSteps[j]->project(); } + } else { /* Function & Expression group 1 processing @@ -1621,15 +1647,93 @@ void BatchPrimitiveProcessor::execute() // cout << " no target found for OID " << projectSteps[j]->getOID() << //endl; } + if (fe2) + { + /* functionize this -> processFE2() */ + fe2Output.resetRowGroup(baseRid); + fe2Output.getRow(0, &fe2Out); + fe2Input->getRow(0, &fe2In); + + // cerr << "input row: " << fe2In.toString() << endl; + for (j = 0; j < outputRG.getRowCount(); j++, fe2In.nextRow()) + { + if (fe2->evaluate(&fe2In)) + { + applyMapping(fe2Mapping, fe2In, &fe2Out); + // cerr << " passed. output row: " << fe2Out.toString() << endl; + fe2Out.setRid(fe2In.getRelRid()); + fe2Output.incRowCount(); + fe2Out.nextRow(); + } + } + + if (!fAggregator) + { + *serialized << (uint8_t)1; // the "count this msg" var + fe2Output.setDBRoot(dbRoot); + fe2Output.serializeRGData(*serialized); + //*serialized << fe2Output.getDataSize(); + // serialized->append(fe2Output.getData(), fe2Output.getDataSize()); + } + } + + if (fAggregator) + { + *serialized << (uint8_t)1; // the "count this msg" var + + RowGroup& toAggregate = (fe2 ? fe2Output : outputRG); + // toAggregate.convertToInlineDataInPlace(); + + if (fe2) + fe2Output.setDBRoot(dbRoot); + else + outputRG.setDBRoot(dbRoot); + + fAggregator->addRowGroup(&toAggregate); + + if ((currentBlockOffset + 1) == count) // @bug4507, 8k + { + fAggregator->loadResult(*serialized); // @bug4507, 8k + } // @bug4507, 8k + else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k + { + fAggregator->loadEmptySet(*serialized); // @bug4507, 8k + } // @bug4507, 8k + else // @bug4507, 8k + { + fAggregator->loadResult(*serialized); // @bug4507, 8k + fAggregator->aggReset(); // @bug4507, 8k + } // @bug4507, 8k + } + + if (!fAggregator && !fe2) + { + *serialized << (uint8_t)1; // the "count this msg" var + outputRG.setDBRoot(dbRoot); + // cerr << "serializing " << outputRG.toString() << endl; + outputRG.serializeRGData(*serialized); + + //*serialized << outputRG.getDataSize(); + // serialized->append(outputRG.getData(), outputRG.getDataSize()); + } + +#ifdef PRIMPROC_STOPWATCH + stopwatch->stop("- if(ot != ROW_GROUP) else"); +#endif } - else + else // Is doJoin { + uint32_t startRid = 0; + ByteStream preamble = *serialized; + origRidCount = ridCount; // ridCount can get modified by executeTupleJoin(). We need to keep track of + // the original val. /* project the key columns. If there's the filter IN the join, project everything. Also need to project 'long' strings b/c executeTupleJoin may copy entire rows using copyRow(), which will try to interpret the uninit'd string ptr. Valgrind will legitimately complain about copying uninit'd values for the other types but that is technically safe. */ for (j = 0; j < projectCount; j++) + { if (keyColumnProj[j] || (projectionMap[j] != -1 && (hasJoinFEFilters || oldRow.isLongString(projectionMap[j])))) { @@ -1639,215 +1743,176 @@ void BatchPrimitiveProcessor::execute() stopwatch->stop("-- projectIntoRowGroup"); #else projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]); -#endif - } - -#ifdef PRIMPROC_STOPWATCH - stopwatch->start("-- executeTupleJoin()"); - executeTupleJoin(); - stopwatch->stop("-- executeTupleJoin()"); -#else - executeTupleJoin(); -#endif - - /* project the non-key columns */ - for (j = 0; j < projectCount; ++j) - { - if (projectionMap[j] != -1 && !keyColumnProj[j] && !hasJoinFEFilters && - !oldRow.isLongString(projectionMap[j])) - { -#ifdef PRIMPROC_STOPWATCH - stopwatch->start("-- projectIntoRowGroup"); - projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]); - stopwatch->stop("-- projectIntoRowGroup"); -#else - projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]); #endif } } - } - /* The RowGroup is fully joined at this point. - Add additional RowGroup processing here. - TODO: Try to clean up all of the switching */ - - if (doJoin && (fe2 || fAggregator)) - { - bool moreRGs = true; - ByteStream preamble = *serialized; - initGJRG(); - - while (moreRGs && !sendThread->aborted()) + do // while (startRid > 0) { - /* - generate 1 rowgroup (8192 rows max) of joined rows - if there's an FE2, run it - -pack results into a new rowgroup - -if there are < 8192 rows in the new RG, continue - if there's an agg, run it - send the result - */ - resetGJRG(); - moreRGs = generateJoinedRowGroup(baseJRow); - *serialized << (uint8_t)!moreRGs; - - if (fe2) +#ifdef PRIMPROC_STOPWATCH + stopwatch->start("-- executeTupleJoin()"); + startRid = executeTupleJoin(startRid); + stopwatch->stop("-- executeTupleJoin()"); +#else + startRid = executeTupleJoin(startRid); +// sStartRid = startRid; +#endif + /* project the non-key columns */ + for (j = 0; j < projectCount; ++j) { - /* functionize this -> processFE2()*/ - fe2Output.resetRowGroup(baseRid); - fe2Output.setDBRoot(dbRoot); - fe2Output.getRow(0, &fe2Out); - fe2Input->getRow(0, &fe2In); - - for (j = 0; j < joinedRG.getRowCount(); j++, fe2In.nextRow()) - if (fe2->evaluate(&fe2In)) - { - applyMapping(fe2Mapping, fe2In, &fe2Out); - fe2Out.setRid(fe2In.getRelRid()); - fe2Output.incRowCount(); - fe2Out.nextRow(); - } + if (projectionMap[j] != -1 && !keyColumnProj[j] && !hasJoinFEFilters && + !oldRow.isLongString(projectionMap[j])) + { +#ifdef PRIMPROC_STOPWATCH + stopwatch->start("-- projectIntoRowGroup"); + projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]); + stopwatch->stop("-- projectIntoRowGroup"); +#else + projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]); +#endif + } } + /* The RowGroup is fully joined at this point. + * Add additional RowGroup processing here. + * TODO: Try to clean up all of the switching */ - RowGroup& nextRG = (fe2 ? fe2Output : joinedRG); - nextRG.setDBRoot(dbRoot); - - if (fAggregator) + if (fe2 || fAggregator) { - fAggregator->addRowGroup(&nextRG); + bool moreRGs = true; + initGJRG(); - if ((currentBlockOffset + 1) == count && moreRGs == false) // @bug4507, 8k + while (moreRGs && !sendThread->aborted()) { - fAggregator->loadResult(*serialized); // @bug4507, 8k - } // @bug4507, 8k - else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k + /* + * generate 1 rowgroup (8192 rows max) of joined rows + * if there's an FE2, run it + * -pack results into a new rowgroup + * -if there are < 8192 rows in the new RG, continue + * if there's an agg, run it + * send the result + */ + resetGJRG(); + moreRGs = generateJoinedRowGroup(baseJRow); + // smoreRGs = moreRGs; + sendCount = (uint8_t)(!moreRGs && !startRid); + // *serialized << (uint8_t)(!moreRGs && !startRid); // the "count + // this msg" var + *serialized << sendCount; + if (fe2) + { + /* functionize this -> processFE2()*/ + fe2Output.resetRowGroup(baseRid); + fe2Output.setDBRoot(dbRoot); + fe2Output.getRow(0, &fe2Out); + fe2Input->getRow(0, &fe2In); + + for (j = 0; j < joinedRG.getRowCount(); j++, fe2In.nextRow()) + { + if (fe2->evaluate(&fe2In)) + { + applyMapping(fe2Mapping, fe2In, &fe2Out); + fe2Out.setRid(fe2In.getRelRid()); + fe2Output.incRowCount(); + fe2Out.nextRow(); + } + } + } + + RowGroup& nextRG = (fe2 ? fe2Output : joinedRG); + nextRG.setDBRoot(dbRoot); + + if (fAggregator) + { + fAggregator->addRowGroup(&nextRG); + + if ((currentBlockOffset + 1) == count && moreRGs == false && startRid == 0) // @bug4507, 8k + { + fAggregator->loadResult(*serialized); // @bug4507, 8k + } // @bug4507, 8k + else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k + { + fAggregator->loadEmptySet(*serialized); // @bug4507, 8k + } // @bug4507, 8k + else // @bug4507, 8k + { + fAggregator->loadResult(*serialized); // @bug4507, 8k + fAggregator->aggReset(); // @bug4507, 8k + } // @bug4507, 8k + } + else + { + // cerr <<" * serialzing " << nextRG.toString() << endl; + nextRG.serializeRGData(*serialized); + } + + /* send the msg & reinit the BS */ + if (moreRGs) + { + sendResponse(); + serialized.reset(new ByteStream()); + *serialized = preamble; + } + } + + if (hasSmallOuterJoin) { - fAggregator->loadEmptySet(*serialized); // @bug4507, 8k - } // @bug4507, 8k - else // @bug4507, 8k + // Should we happen to finish sending data rows right on the boundary of when moreRGs flips off, + // then we need to start a new buffer. I.e., it needs the count this message byte pushed. + if (serialized->length() == preamble.length()) + *serialized << (uint8_t)(startRid > 0 ? 0 : 1); // the "count this msg" var + + *serialized << ridCount; + + for (i = 0; i < joinerCount; i++) + { + for (j = 0; j < ridCount; ++j) + { + serializeInlineVector(*serialized, tSmallSideMatches[i][j]); + tSmallSideMatches[i][j].clear(); + } + } + } + else { - fAggregator->loadResult(*serialized); // @bug4507, 8k - fAggregator->aggReset(); // @bug4507, 8k - } // @bug4507, 8k + // We hae no more use for this allocation + for (i = 0; i < joinerCount; i++) + for (j = 0; j < ridCount; ++j) + tSmallSideMatches[i][j].clear(); + } } else { - // cerr <<" * serialzing " << nextRG.toString() << endl; - nextRG.serializeRGData(*serialized); - } + *serialized << (uint8_t)(startRid > 0 ? 0 : 1); // the "count this msg" var + outputRG.setDBRoot(dbRoot); + // cerr << "serializing " << outputRG.toString() << endl; + outputRG.serializeRGData(*serialized); - /* send the msg & reinit the BS */ - if (moreRGs) + //*serialized << outputRG.getDataSize(); + // serialized->append(outputRG.getData(), outputRG.getDataSize()); + for (i = 0; i < joinerCount; i++) + { + for (j = 0; j < ridCount; ++j) + { + serializeInlineVector(*serialized, tSmallSideMatches[i][j]); + tSmallSideMatches[i][j].clear(); + } + } + } + if (startRid > 0) { sendResponse(); serialized.reset(new ByteStream()); *serialized = preamble; } - } - - if (hasSmallOuterJoin) - { - *serialized << ridCount; - - for (i = 0; i < joinerCount; i++) - for (j = 0; j < ridCount; ++j) - serializeInlineVector(*serialized, tSmallSideMatches[i][j]); - } + } while (startRid > 0); } - - if (!doJoin && fe2) - { - /* functionize this -> processFE2() */ - fe2Output.resetRowGroup(baseRid); - fe2Output.getRow(0, &fe2Out); - fe2Input->getRow(0, &fe2In); - - // cerr << "input row: " << fe2In.toString() << endl; - for (j = 0; j < outputRG.getRowCount(); j++, fe2In.nextRow()) - { - if (fe2->evaluate(&fe2In)) - { - applyMapping(fe2Mapping, fe2In, &fe2Out); - // cerr << " passed. output row: " << fe2Out.toString() << endl; - fe2Out.setRid(fe2In.getRelRid()); - fe2Output.incRowCount(); - fe2Out.nextRow(); - } - } - - if (!fAggregator) - { - *serialized << (uint8_t)1; // the "count this msg" var - fe2Output.setDBRoot(dbRoot); - fe2Output.serializeRGData(*serialized); - //*serialized << fe2Output.getDataSize(); - // serialized->append(fe2Output.getData(), fe2Output.getDataSize()); - } - } - - if (!doJoin && fAggregator) - { - *serialized << (uint8_t)1; // the "count this msg" var - - RowGroup& toAggregate = (fe2 ? fe2Output : outputRG); - // toAggregate.convertToInlineDataInPlace(); - - if (fe2) - fe2Output.setDBRoot(dbRoot); - else - outputRG.setDBRoot(dbRoot); - - fAggregator->addRowGroup(&toAggregate); - - if ((currentBlockOffset + 1) == count) // @bug4507, 8k - { - fAggregator->loadResult(*serialized); // @bug4507, 8k - } // @bug4507, 8k - else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k - { - fAggregator->loadEmptySet(*serialized); // @bug4507, 8k - } // @bug4507, 8k - else // @bug4507, 8k - { - fAggregator->loadResult(*serialized); // @bug4507, 8k - fAggregator->aggReset(); // @bug4507, 8k - } // @bug4507, 8k - } - - if (!fAggregator && !fe2) - { - *serialized << (uint8_t)1; // the "count this msg" var - outputRG.setDBRoot(dbRoot); - // cerr << "serializing " << outputRG.toString() << endl; - outputRG.serializeRGData(*serialized); - - //*serialized << outputRG.getDataSize(); - // serialized->append(outputRG.getData(), outputRG.getDataSize()); - if (doJoin) - { - for (i = 0; i < joinerCount; i++) - { - for (j = 0; j < ridCount; ++j) - { - serializeInlineVector(*serialized, tSmallSideMatches[i][j]); - } - } - } - } - - // clear small side match vector - if (doJoin) - { - for (i = 0; i < joinerCount; i++) - for (j = 0; j < ridCount; ++j) - tSmallSideMatches[i][j].clear(); - } - #ifdef PRIMPROC_STOPWATCH stopwatch->stop("- if(ot != ROW_GROUP) else"); #endif } - + ridCount = origRidCount; // May not be needed, but just to be safe. + // std::cout << "end of send. startRid=" << sStartRid << " moreRG=" << smoreRGs << " sendCount=" << + // sendCount << std::endl; if (projectCount > 0 || ot == ROW_GROUP) { *serialized << cachedIO; @@ -2187,8 +2252,9 @@ int BatchPrimitiveProcessor::operator()() if (sendThread->aborted()) break; - if (!sendThread->okToProceed()) + if (sendThread->sizeTooBig()) { + // The send buffer is full of messages yet to be sent, so this thread would block anyway. freeLargeBuffers(); return -1; // the reschedule error code } diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index 42dbe6e46..5e53ae8f5 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -224,6 +224,7 @@ class BatchPrimitiveProcessor int128_t wide128Values[LOGICAL_BLOCK_RIDS]; boost::scoped_array absRids; boost::scoped_array strValues; + uint16_t origRidCount; uint16_t ridCount; bool needStrValues; uint16_t wideColumnsWidths; @@ -333,7 +334,7 @@ class BatchPrimitiveProcessor boost::shared_array>> tJoiners; typedef std::vector MatchedData[LOGICAL_BLOCK_RIDS]; boost::shared_array tSmallSideMatches; - void executeTupleJoin(); + uint32_t executeTupleJoin(uint32_t startRid); bool getTupleJoinRowGroupData; std::vector smallSideRGs; rowgroup::RowGroup largeSideRG; @@ -432,6 +433,8 @@ class BatchPrimitiveProcessor uint ptMask; bool firstInstance; + static const uint64_t maxResultCount = 1048576; // 2^20 + friend class Command; friend class ColumnCommand; friend class DictStep; diff --git a/primitives/primproc/bppsendthread.cpp b/primitives/primproc/bppsendthread.cpp index 3f98a8846..ba134a479 100644 --- a/primitives/primproc/bppsendthread.cpp +++ b/primitives/primproc/bppsendthread.cpp @@ -23,16 +23,14 @@ #include #include +#include #include "bppsendthread.h" - -using namespace std; -using namespace boost; - -#include "atomicops.h" +#include "resourcemanager.h" namespace primitiveprocessor { extern uint32_t connectionsPerUM; +extern uint32_t BPPCount; BPPSendThread::BPPSendThread() : die(false) @@ -44,8 +42,8 @@ BPPSendThread::BPPSendThread() , sawAllConnections(false) , fcEnabled(false) , currentByteSize(0) - , maxByteSize(25000000) { + maxByteSize = joblist::ResourceManager::instance()->getMaxBPPSendQueue(); runner = boost::thread(Runner_t(this)); } @@ -59,36 +57,36 @@ BPPSendThread::BPPSendThread(uint32_t initMsgsLeft) , sawAllConnections(false) , fcEnabled(false) , currentByteSize(0) - , maxByteSize(25000000) { + maxByteSize = joblist::ResourceManager::instance()->getMaxBPPSendQueue(); runner = boost::thread(Runner_t(this)); } BPPSendThread::~BPPSendThread() { - boost::mutex::scoped_lock sl(msgQueueLock); - boost::mutex::scoped_lock sl2(ackLock); - die = true; - queueNotEmpty.notify_one(); - okToSend.notify_one(); - sl.unlock(); - sl2.unlock(); + abort(); runner.join(); } -bool BPPSendThread::okToProceed() -{ - // keep the queue size below the 100 msg threshold & below the 25MB mark, - // but at least 2 msgs so there is always 1 ready to be sent. - return ((msgQueue.size() < sizeThreshold && currentByteSize < maxByteSize) || msgQueue.size() < 3) && !die; -} - void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection) { + // Wait for the queue to empty out a bit if it's stuffed full + if (sizeTooBig()) + { + std::unique_lock sl1(respondLock); + while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die) + { + respondWait = true; + fProcessorPool->incBlockedThreads(); + okToRespond.wait(sl1); + fProcessorPool->decBlockedThreads(); + respondWait = false; + } + } if (die) return; - boost::mutex::scoped_lock sl(msgQueueLock); + std::unique_lock sl(msgQueueLock); if (gotException) throw runtime_error(exceptionString); @@ -119,10 +117,23 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection) void BPPSendThread::sendResults(const vector& msgs, bool newConnection) { + // Wait for the queue to empty out a bit if it's stuffed full + if (sizeTooBig()) + { + std::unique_lock sl1(respondLock); + while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die) + { + respondWait = true; + fProcessorPool->incBlockedThreads(); + okToRespond.wait(sl1); + fProcessorPool->decBlockedThreads(); + respondWait = false; + } + } if (die) return; - boost::mutex::scoped_lock sl(msgQueueLock); + std::unique_lock sl(msgQueueLock); if (gotException) throw runtime_error(exceptionString); @@ -157,7 +168,7 @@ void BPPSendThread::sendResults(const vector& msgs, bool newConnection) void BPPSendThread::sendMore(int num) { - boost::mutex::scoped_lock sl(ackLock); + std::unique_lock sl(ackLock); // cout << "got an ACK for " << num << " msgsLeft=" << msgsLeft << endl; if (num == -1) @@ -170,6 +181,7 @@ void BPPSendThread::sendMore(int num) else (void)atomicops::atomicAdd(&msgsLeft, num); + sl.unlock(); if (waiting) okToSend.notify_one(); } @@ -192,7 +204,7 @@ void BPPSendThread::mainLoop() while (!die) { - boost::mutex::scoped_lock sl(msgQueueLock); + std::unique_lock sl(msgQueueLock); if (msgQueue.empty() && !die) { @@ -223,8 +235,7 @@ void BPPSendThread::mainLoop() if (msgsLeft <= 0 && fcEnabled && !die) { - boost::mutex::scoped_lock sl2(ackLock); - + std::unique_lock sl2(ackLock); while (msgsLeft <= 0 && fcEnabled && !die) { waiting = true; @@ -267,19 +278,26 @@ void BPPSendThread::mainLoop() (void)atomicops::atomicSub(¤tByteSize, bsSize); msg[msgsSent].msg.reset(); } + + if (respondWait && currentByteSize < maxByteSize) + { + okToRespond.notify_one(); + } } } } void BPPSendThread::abort() { - boost::mutex::scoped_lock sl(msgQueueLock); - boost::mutex::scoped_lock sl2(ackLock); + std::lock_guard sl(msgQueueLock); + std::lock_guard sl2(ackLock); + std::lock_guard sl3(respondLock); + die = true; - queueNotEmpty.notify_one(); - okToSend.notify_one(); - sl.unlock(); - sl2.unlock(); + + queueNotEmpty.notify_all(); + okToSend.notify_all(); + okToRespond.notify_all(); } } // namespace primitiveprocessor diff --git a/primitives/primproc/bppsendthread.h b/primitives/primproc/bppsendthread.h index f99fcf2fd..8c3a3e9a1 100644 --- a/primitives/primproc/bppsendthread.h +++ b/primitives/primproc/bppsendthread.h @@ -27,8 +27,9 @@ #include "umsocketselector.h" #include #include -#include -#include +#include +#include "threadnaming.h" +#include "prioritythreadpool.h" namespace primitiveprocessor { @@ -65,7 +66,14 @@ class BPPSendThread } }; - bool okToProceed(); + bool sizeTooBig() + { + // keep the queue size below the 100 msg threshold & below the 250MB mark, + // but at least 3 msgs so there is always 1 ready to be sent. + return ((msgQueue.size() > sizeThreshold) || (currentByteSize >= maxByteSize && msgQueue.size() > 3)) && + !die; + } + void sendMore(int num); void sendResults(const std::vector& msgs, bool newConnection); void sendResult(const Msg_t& msg, bool newConnection); @@ -76,6 +84,10 @@ class BPPSendThread { return die; } + void setProcessorPool(threadpool::PriorityThreadPool* processorPool) + { + fProcessorPool = processorPool; + } private: BPPSendThread(const BPPSendThread&); @@ -89,21 +101,26 @@ class BPPSendThread } void operator()() { + utils::setThreadName("BPPSendThread"); bppst->mainLoop(); } }; boost::thread runner; std::queue msgQueue; - boost::mutex msgQueueLock; - boost::condition queueNotEmpty; + std::mutex msgQueueLock; + std::condition_variable queueNotEmpty; volatile bool die, gotException, mainThreadWaiting; std::string exceptionString; uint32_t sizeThreshold; volatile int32_t msgsLeft; bool waiting; - boost::mutex ackLock; - boost::condition okToSend; + std::mutex ackLock; + std::condition_variable okToSend; + // Condition to prevent run away queue + bool respondWait; + std::mutex respondLock; + std::condition_variable okToRespond; /* Load balancing structures */ struct Connection_t @@ -130,6 +147,9 @@ class BPPSendThread /* secondary queue size restriction based on byte size */ volatile uint64_t currentByteSize; uint64_t maxByteSize; + // Used to tell the PriorityThreadPool It should consider additional threads because a + // queue full event has happened and a thread has been blocked. + threadpool::PriorityThreadPool* fProcessorPool; }; } // namespace primitiveprocessor diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 5581ceee9..99aa7a315 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -1395,7 +1395,7 @@ struct BPPHandler SBPPV bppv; // make the new BPP object - bppv.reset(new BPPV()); + bppv.reset(new BPPV(fPrimitiveServerPtr)); bpp.reset(new BatchPrimitiveProcessor(bs, fPrimitiveServerPtr->prefetchThreshold(), bppv->getSendThread(), fPrimitiveServerPtr->ProcessorThreads())); @@ -1857,7 +1857,7 @@ struct ReadThread /* Message format: * ISMPacketHeader * Partition count - 32 bits - * Partition set - sizeof(LogicalPartition) * count + * Partition set - sizeof(LogicalPartition) boost::shared_ptr* count * OID count - 32 bits * OID array - 32 bits * count */ @@ -1948,8 +1948,7 @@ struct ReadThread void operator()() { utils::setThreadName("PPReadThread"); - boost::shared_ptr procPoolPtr = - fPrimitiveServerPtr->getProcessorThreadPool(); + threadpool::PriorityThreadPool* procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); SBS bs; UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); @@ -2407,8 +2406,8 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro fServerpool.setQueueSize(fServerQueueSize); fServerpool.setName("PrimitiveServer"); - fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads, - medPriorityThreads, lowPriorityThreads, 0)); + fProcessorPool = new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads, + medPriorityThreads, lowPriorityThreads, 0); // We're not using either the priority or the job-clustering features, just need a threadpool // that can reschedule jobs, and an unlimited non-blocking queue @@ -2460,9 +2459,10 @@ void PrimitiveServer::start(Service* service) cerr << "PrimitiveServer::start() exiting!" << endl; } -BPPV::BPPV() +BPPV::BPPV(PrimitiveServer* ps) { sendThread.reset(new BPPSendThread()); + sendThread->setProcessorPool(ps->getProcessorThreadPool()); v.reserve(BPPCount); pos = 0; joinDataReceived = false; @@ -2503,7 +2503,7 @@ const vector >& BPPV::get() boost::shared_ptr BPPV::next() { uint32_t size = v.size(); - uint32_t i; + uint32_t i = 0; #if 0 diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index 5f35911e4..d3b37fc0b 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -56,10 +56,12 @@ extern uint32_t highPriorityThreads, medPriorityThreads, lowPriorityThreads; class BPPSendThread; +class PrimitiveServer; + class BPPV { public: - BPPV(); + BPPV(PrimitiveServer* ps); ~BPPV(); boost::shared_ptr next(); void add(boost::shared_ptr a); @@ -128,7 +130,7 @@ class PrimitiveServer /** @brief get a pointer the shared processor thread pool */ - inline boost::shared_ptr getProcessorThreadPool() const + inline threadpool::PriorityThreadPool* getProcessorThreadPool() const { return fProcessorPool; } @@ -165,7 +167,7 @@ class PrimitiveServer /** @brief the thread pool used to process * primitive commands */ - boost::shared_ptr fProcessorPool; + threadpool::PriorityThreadPool* fProcessorPool; int fServerThreads; int fServerQueueSize; diff --git a/primitives/primproc/primproc.cpp b/primitives/primproc/primproc.cpp index 277065e91..bc901803d 100644 --- a/primitives/primproc/primproc.cpp +++ b/primitives/primproc/primproc.cpp @@ -75,6 +75,7 @@ using namespace idbdatafile; #include "mariadb_my_sys.h" #include "service.h" +#include "threadnaming.h" class Opt { @@ -246,6 +247,7 @@ class QszMonThd void operator()() { + utils::setThreadName("QszMonThd"); for (;;) { uint32_t qd = fPsp->getProcessorThreadPool()->getWaiting(); @@ -287,6 +289,7 @@ class QszMonThd #ifdef DUMP_CACHE_CONTENTS void* waitForSIGUSR1(void* p) { + utils::setThreadName("waitForSIGUSR1"); #if defined(__LP64__) || defined(_MSC_VER) ptrdiff_t tmp = reinterpret_cast(p); int cacheCount = static_cast(tmp); diff --git a/utils/common/MonitorProcMem.cpp b/utils/common/MonitorProcMem.cpp index 11666df29..1341dabbe 100644 --- a/utils/common/MonitorProcMem.cpp +++ b/utils/common/MonitorProcMem.cpp @@ -42,6 +42,7 @@ using namespace std; using namespace logging; #include "MonitorProcMem.h" +#include "threadnaming.h" namespace utils { @@ -56,6 +57,7 @@ int MonitorProcMem::fMemPctCheck = 0; //------------------------------------------------------------------------------ void MonitorProcMem::operator()() const { + utils::setThreadName("MonitorProcMem"); while (1) { if (fMaxPct > 0) diff --git a/utils/common/atomicops.h b/utils/common/atomicops.h index 09f4bb1be..ee096cd0c 100644 --- a/utils/common/atomicops.h +++ b/utils/common/atomicops.h @@ -154,6 +154,24 @@ inline bool atomicCAS(volatile T* mem, T comp, T swap) #endif } +// implements a zero out of a variable +template +inline void atomicZero(volatile T* mem) +{ +#ifdef _MSC_VER + + switch (sizeof(T)) + { + case 4: + default: InterlockedXor(reinterpret_cast(mem), (static_cast(*mem))); break; + + case 8: InterlockedXor64(reinterpret_cast(mem), (static_cast(*mem))); break; + } +#else + __sync_xor_and_fetch(mem, *mem); +#endif +} + // Implements a scheduler yield inline void atomicYield() { diff --git a/utils/joiner/joinpartition.cpp b/utils/joiner/joinpartition.cpp index c31932046..5fa2ec706 100644 --- a/utils/joiner/joinpartition.cpp +++ b/utils/joiner/joinpartition.cpp @@ -108,9 +108,6 @@ JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vec buckets.reserve(bucketCount); - for (int i = 0; i < (int)bucketCount; i++) - buckets.push_back(boost::shared_ptr(new JoinPartition(*this, false))); - string compressionType; try { @@ -128,6 +125,9 @@ JoinPartition::JoinPartition(const RowGroup& lRG, const RowGroup& sRG, const vec { compressor.reset(new compress::CompressInterfaceSnappy()); } + + for (uint32_t i = 0; i < bucketCount; i++) + buckets.push_back(boost::shared_ptr(new JoinPartition(*this, false))); } /* Ctor used by JoinPartition on expansion, creates JP's in filemode */ diff --git a/utils/loggingcpp/ErrorMessage.txt b/utils/loggingcpp/ErrorMessage.txt index 4a899fc4c..d918e2ff7 100755 --- a/utils/loggingcpp/ErrorMessage.txt +++ b/utils/loggingcpp/ErrorMessage.txt @@ -103,6 +103,7 @@ 2054 ERR_DISKAGG_ERROR Unknown error while aggregation. 2055 ERR_DISKAGG_TOO_BIG Not enough memory to make disk-based aggregation. Raise TotalUmMemory if possible. 2056 ERR_DISKAGG_FILEIO_ERROR There was an IO error during a disk-based aggregation: %1% +2057 ERR_JOIN_RESULT_TOO_BIG Not enough memory to consolidate join results. Estimated %1% MB needed. TotalUmMemory is %2% MB. # Sub-query errors 3001 ERR_NON_SUPPORT_SUB_QUERY_TYPE This subquery type is not supported yet. diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index 7c64358ea..b27cd91a8 100644 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -4705,12 +4705,12 @@ RowAggregationMultiDistinct::RowAggregationMultiDistinct(const RowAggregationMul for (uint32_t i = 0; i < rhs.fSubAggregators.size(); i++) { #if 0 - fTotalMemUsage += fSubRowGroups[i].getDataSize(AGG_ROWGROUP_SIZE); - if (!fRm->getMemory(fSubRowGroups[i].getDataSize(AGG_ROWGROUP_SIZE, fSessionMemLimit))) throw logging::IDBExcept(logging::IDBErrorInfo::instance()-> errorMsg(logging::ERR_AGGREGATION_TOO_BIG), logging::ERR_AGGREGATION_TOO_BIG); + fTotalMemUsage += fSubRowGroups[i].getDataSize(AGG_ROWGROUP_SIZE); + #endif data.reset(new RGData(fSubRowGroups[i], RowAggStorage::getMaxRows(fRm ? fRm->getAllowDiskAggregation() : false))); @@ -4748,12 +4748,11 @@ void RowAggregationMultiDistinct::addSubAggregator(const boost::shared_ptr data; #if 0 - fTotalMemUsage += rg.getDataSize(AGG_ROWGROUP_SIZE); - if (!fRm->getMemory(rg.getDataSize(AGG_ROWGROUP_SIZE), fSessionMemLimit)) throw logging::IDBExcept(logging::IDBErrorInfo::instance()-> errorMsg(logging::ERR_AGGREGATION_TOO_BIG), logging::ERR_AGGREGATION_TOO_BIG); + fTotalMemUsage += rg.getDataSize(AGG_ROWGROUP_SIZE); #endif data.reset(new RGData(rg, RowAggStorage::getMaxRows(fRm ? fRm->getAllowDiskAggregation() : false))); fSubRowData.push_back(data); diff --git a/utils/rowgroup/rowstorage.cpp b/utils/rowgroup/rowstorage.cpp index cd0e56e30..d0b5d699e 100644 --- a/utils/rowgroup/rowstorage.cpp +++ b/utils/rowgroup/rowstorage.cpp @@ -412,20 +412,25 @@ class RMMemManager : public MemManager protected: bool acquireImpl(size_t amount) final { - MemManager::acquireImpl(amount); + if (amount) + { if (!fRm->getMemory(amount, fSessLimit, fWait) && fStrict) { return false; } - + MemManager::acquireImpl(amount); + } return true; } void releaseImpl(size_t amount) override { + if (amount) + { MemManager::releaseImpl(amount); fRm->returnMemory(amount, fSessLimit); } + } private: joblist::ResourceManager* fRm = nullptr; diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index e71c3bbb4..b26144b62 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -28,6 +28,7 @@ using namespace std; #include "messageobj.h" #include "messagelog.h" +#include "threadnaming.h" using namespace logging; #include "prioritythreadpool.h" @@ -39,7 +40,7 @@ namespace threadpool { PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, uint ID) - : _stop(false), weightPerRun(targetWeightPerRun), id(ID) + : _stop(false), weightPerRun(targetWeightPerRun), id(ID), blockedThreads(0), extraThreads(0), stopExtra(true) { boost::thread* newThread; for (uint32_t i = 0; i < highThreads; i++) @@ -98,6 +99,21 @@ void PriorityThreadPool::addJob(const Job& job, bool useLock) threadCounts[LOW]++; } + // If some threads have blocked (because of output queue full) + // Temporarily add some extra worker threads to make up for the blocked threads. + if (blockedThreads > extraThreads) + { + stopExtra = false; + newThread = threads.create_thread(ThreadHelper(this, EXTRA)); + newThread->detach(); + extraThreads++; + } + else if (blockedThreads == 0) + { + // Release the temporary threads -- some threads have become unblocked. + stopExtra = true; + } + if (job.priority > 66) jobQueues[HIGH].push_back(job); else if (job.priority > 33) @@ -125,7 +141,7 @@ void PriorityThreadPool::removeJobs(uint32_t id) PriorityThreadPool::Priority PriorityThreadPool::pickAQueue(Priority preference) { - if (!jobQueues[preference].empty()) + if (preference != EXTRA && !jobQueues[preference].empty()) return preference; else if (!jobQueues[HIGH].empty()) return HIGH; @@ -137,6 +153,10 @@ PriorityThreadPool::Priority PriorityThreadPool::pickAQueue(Priority preference) void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() { + if (preferredQueue == EXTRA) + utils::setThreadName("Extra"); + else + utils::setThreadName("Idle"); Priority queue = LOW; uint32_t weight, i = 0; vector runList; @@ -155,6 +175,14 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() if (jobQueues[queue].empty()) { + // If this is an EXTRA thread due toother threads blocking, and all blockers are unblocked, + // we don't want this one any more. + if (preferredQueue == EXTRA && stopExtra) + { + extraThreads--; + return; + } + newJob.wait(lk); continue; } @@ -190,6 +218,10 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() if (reschedule[i]) rescheduleCount++; } + if (preferredQueue == EXTRA) + utils::setThreadName("Extra (used)"); + else + utils::setThreadName("Idle"); // no real work was done, prevent intensive busy waiting if (rescheduleCount == runList.size()) diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index a8daea343..842d8f5c0 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -34,8 +34,10 @@ #include #include #include +#include #include "../winport/winport.h" #include "primitives/primproc/umsocketselector.h" +#include "atomicops.h" namespace threadpool { @@ -72,7 +74,9 @@ class PriorityThreadPool LOW, MEDIUM, HIGH, - _COUNT + _COUNT, + EXTRA // After _COUNT because _COUNT is for jobQueue size and EXTRA isn't a jobQueue. But we need EXTRA + // in places where Priority is used. }; /********************************************* @@ -95,6 +99,20 @@ class PriorityThreadPool */ void dump(); + // If a job is blocked, we want to temporarily increase the number of threads managed by the pool + // A problem can occur if all threads are running long or blocked for a single query. Other + // queries won't get serviced, even though there are cpu cycles available. + // These calls are currently protected by respondLock in sendThread(). If you call from other + // places, you need to consider atomicity. + void incBlockedThreads() + { + blockedThreads++; + } + void decBlockedThreads() + { + blockedThreads--; + } + protected: private: struct ThreadHelper @@ -127,6 +145,10 @@ class PriorityThreadPool bool _stop; uint32_t weightPerRun; volatile uint id; // prevent it from being optimized out + + std::atomic blockedThreads; + std::atomic extraThreads; + bool stopExtra; }; } // namespace threadpool diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index 0ad24346f..fb9af113f 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -83,6 +83,7 @@ void ThreadPool::setQueueSize(size_t queueSize) void ThreadPool::pruneThread() { + utils::setThreadName("pruneThread"); boost::unique_lock lock2(fPruneMutex); while (true) diff --git a/utils/windowfunction/idborderby.cpp b/utils/windowfunction/idborderby.cpp index 073fc4459..30d442e74 100644 --- a/utils/windowfunction/idborderby.cpp +++ b/utils/windowfunction/idborderby.cpp @@ -754,14 +754,12 @@ void IdbOrderBy::initialize(const RowGroup& rg) IdbCompare::initialize(rg); uint64_t newSize = rg.getSizeWithStrings(fRowsPerRG); - fMemSize += newSize; - - if (!fRm->getMemory(newSize, fSessionMemLimit)) + if (fRm && !fRm->getMemory(newSize, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; throw IDBExcept(fErrorCode); } - + fMemSize += newSize; fData.reinit(fRowGroup, fRowsPerRG); fRowGroup.setData(&fData); fRowGroup.resetRowGroup(0);