diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 10f791918..69b5e4620 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -427,7 +427,7 @@ Error: // eventually let jobstep error out. std::unique_lock lk(fMlock); MessageQueueMap::iterator map_tok; - sbs.reset(new ByteStream(0)); + sbs.reset(new ByteStream(0U)); for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) { @@ -1103,7 +1103,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ std::unique_lock lk(fMlock); // std::cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << std::endl; MessageQueueMap::iterator map_tok; - sbs.reset(new ByteStream(0)); + sbs.reset(new ByteStream(0U)); for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) { diff --git a/dbcon/joblist/resourcemanager.cpp b/dbcon/joblist/resourcemanager.cpp index e61da3c4d..db5cd15b3 100644 --- a/dbcon/joblist/resourcemanager.cpp +++ b/dbcon/joblist/resourcemanager.cpp @@ -22,6 +22,7 @@ ******************************************************************************************/ #include +#include #include #include #include @@ -346,23 +347,23 @@ bool ResourceManager::userPriorityEnabled() const // If both have space, return true. bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr& sessionLimit, bool patience) { - bool ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); + bool ret1 = (totalUmMemLimit.fetch_sub(amount, std::memory_order_relaxed) >= 0); bool ret2 = sessionLimit ? (atomicops::atomicSub(sessionLimit.get(), amount) >= 0) : ret1; uint32_t retryCounter = 0, maxRetries = 20; // 10s delay while (patience && !(ret1 && ret2) && retryCounter++ < maxRetries) { - atomicops::atomicAdd(&totalUmMemLimit, amount); + totalUmMemLimit.fetch_add(amount, std::memory_order_relaxed); sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0; usleep(500000); - ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); + ret1 = (totalUmMemLimit.fetch_sub(amount, std::memory_order_relaxed) >= 0); ret2 = sessionLimit ? (atomicops::atomicSub(sessionLimit.get(), amount) >= 0) : ret1; } if (!(ret1 && ret2)) { // If we didn't get any memory, restore the counters. - atomicops::atomicAdd(&totalUmMemLimit, amount); + totalUmMemLimit.fetch_add(amount, std::memory_order_relaxed); sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0; } return (ret1 && ret2); @@ -371,20 +372,20 @@ bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr& sess // The amount type is unsafe if amount close to max that is unrealistic in 2024. bool ResourceManager::getMemory(int64_t amount, bool patience) { - bool ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); + bool ret1 = (totalUmMemLimit.fetch_sub(amount, std::memory_order_relaxed) >= 0); uint32_t retryCounter = 0, maxRetries = 20; // 10s delay while (patience && !ret1 && retryCounter++ < maxRetries) { - atomicops::atomicAdd(&totalUmMemLimit, amount); + totalUmMemLimit.fetch_add(amount, std::memory_order_relaxed); usleep(500000); - ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); + ret1 = (totalUmMemLimit.fetch_sub(amount, std::memory_order_relaxed) >= 0); } if (!ret1) { // If we didn't get any memory, restore the counters. - atomicops::atomicAdd(&totalUmMemLimit, amount); + totalUmMemLimit.fetch_add(amount, std::memory_order_relaxed); } return ret1; } diff --git a/dbcon/joblist/resourcemanager.h b/dbcon/joblist/resourcemanager.h index 0bf5a9c7f..809d44d3a 100644 --- a/dbcon/joblist/resourcemanager.h +++ b/dbcon/joblist/resourcemanager.h @@ -25,6 +25,7 @@ */ #pragma once +#include #include #include #include @@ -33,6 +34,7 @@ #include "configcpp.h" #include "calpontselectexecutionplan.h" +#include "countingallocator.h" #include "resourcedistributor.h" #include "installdir.h" #include "branchpred.h" @@ -325,16 +327,16 @@ class ResourceManager bool getMemory(int64_t amount, bool patience = true); inline void returnMemory(int64_t amount) { - atomicops::atomicAdd(&totalUmMemLimit, amount); + totalUmMemLimit.fetch_add(amount, std::memory_order_relaxed); } inline void returnMemory(int64_t amount, boost::shared_ptr& sessionLimit) { - atomicops::atomicAdd(&totalUmMemLimit, amount); + totalUmMemLimit.fetch_add(amount, std::memory_order_relaxed); sessionLimit ? atomicops::atomicAdd(sessionLimit.get(), amount) : 0; } inline int64_t availableMemory() const { - return totalUmMemLimit; + return totalUmMemLimit.load(std::memory_order_relaxed); } /* old HJ mem interface, used by HashJoin */ @@ -454,6 +456,12 @@ class ResourceManager return configuredUmMemLimit; } + template + allocators::CountingAllocator getAllocator() + { + return allocators::CountingAllocator(totalUmMemLimit); + } + private: void logResourceChangeMessage(logging::LOG_TYPE logType, uint32_t sessionID, uint64_t newvalue, uint64_t value, const std::string& source, logging::Message::MessageID mid); @@ -504,7 +512,7 @@ class ResourceManager LockedSessionMap fHJPmMaxMemorySmallSideSessionMap; /* new HJ/Union/Aggregation support */ - volatile int64_t totalUmMemLimit; // mem limit for join, union, and aggregation on the UM + std::atomic totalUmMemLimit{0}; // mem limit for join, union, and aggregation on the UM int64_t configuredUmMemLimit; uint64_t pmJoinMemLimit; // mem limit on individual PM joins diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index dfe1527a7..d137cae91 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -213,13 +213,6 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch, initBPP(b); } -#if 0 -BatchPrimitiveProcessor::BatchPrimitiveProcessor(const BatchPrimitiveProcessor& bpp) -{ - throw logic_error("copy BPP deprecated"); -} -#endif - BatchPrimitiveProcessor::~BatchPrimitiveProcessor() { // FIXME: just do a sync fetch @@ -247,6 +240,8 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) uint8_t tmp8; uint16_t tmp16; Command::CommandType type; + auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory(); + std::cout << "initBPP availableMemory: " << cnt << std::endl; bs.advance(sizeof(ISMPacketHeader)); // skip the header bs >> tmp8; @@ -365,13 +360,17 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) if (!typelessJoin[i]) { + auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator(); + bs >> joinNullValues[i]; bs >> largeSideKeyColumns[i]; for (uint j = 0; j < processorThreads; ++j) - tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher())); + tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), alloc)); } else { + auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator(); + deserializeVector(bs, tlLargeSideKeyColumns[i]); bs >> tlSmallSideKeyLengths[i]; bs >> tmp8; @@ -393,7 +392,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) mSmallSideKeyColumnsPtr, mSmallSideRGPtr); auto tlComparator = TupleJoiner::TypelessDataComparator(&outputRG, &tlLargeSideKeyColumns[i], mSmallSideKeyColumnsPtr, mSmallSideRGPtr); - tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator)); + tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator, alloc)); } } } @@ -497,7 +496,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) bs >> *(fAggregator.get()); // If there's UDAF involved, set up for PM processing - for (const auto & pcol : fAggregator->getAggFunctions()) + for (const auto& pcol : fAggregator->getAggFunctions()) { auto* rowUDAF = dynamic_cast(pcol.get()); @@ -843,6 +842,8 @@ int BatchPrimitiveProcessor::endOfJoiner() { endOfJoinerRan = true; pthread_mutex_unlock(&objLock); + auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory(); + std::cout << "endOfJoiner availableMemory: " << cnt << std::endl; return 0; } @@ -885,6 +886,8 @@ int BatchPrimitiveProcessor::endOfJoiner() endOfJoinerRan = true; pthread_mutex_unlock(&objLock); + auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory(); + std::cout << "endOfJoiner availableMemory: " << cnt << std::endl; return 0; } @@ -1218,7 +1221,7 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid, RowGroup& { bool hasNull = false; - for (unsigned int column: tlLargeSideKeyColumns[j]) + for (unsigned int column : tlLargeSideKeyColumns[j]) if (oldRow.isNullValue(column)) { hasNull = true; @@ -1374,7 +1377,7 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid, RowGroup& #ifdef PRIMPROC_STOPWATCH void BatchPrimitiveProcessor::execute(StopWatch* stopwatch) #else -void BatchPrimitiveProcessor::execute() +void BatchPrimitiveProcessor::execute(messageqcpp::SBS& bs) #endif { uint8_t sendCount = 0; @@ -1509,7 +1512,7 @@ void BatchPrimitiveProcessor::execute() writeProjectionPreamble(); stopwatch->stop("- writeProjectionPreamble"); #else - writeProjectionPreamble(); + writeProjectionPreamble(bs); #endif } @@ -1536,7 +1539,7 @@ void BatchPrimitiveProcessor::execute() { for (j = 0; j < projectCount; ++j) { - projectSteps[j]->project(); + projectSteps[j]->project(bs); } } else @@ -1641,9 +1644,9 @@ void BatchPrimitiveProcessor::execute() if (!fAggregator) { - *serialized << (uint8_t)1; // the "count this msg" var + *bs << (uint8_t)1; // the "count this msg" var fe2Output.setDBRoot(dbRoot); - fe2Output.serializeRGData(*serialized); + fe2Output.serializeRGData(*bs); //*serialized << fe2Output.getDataSize(); // serialized->append(fe2Output.getData(), fe2Output.getDataSize()); } @@ -1653,7 +1656,7 @@ void BatchPrimitiveProcessor::execute() { utils::setThreadName("BPPAgg_1"); - *serialized << (uint8_t)1; // the "count this msg" var + *bs << (uint8_t)1; // the "count this msg" var // see TupleBPS::setFcnExpGroup2() and where it gets called. // it sets fe2 there, on the other side of communication. @@ -1669,25 +1672,25 @@ void BatchPrimitiveProcessor::execute() if ((currentBlockOffset + 1) == count) // @bug4507, 8k { - fAggregator->loadResult(*serialized); // @bug4507, 8k + fAggregator->loadResult(*bs); // @bug4507, 8k } // @bug4507, 8k else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k { - fAggregator->loadEmptySet(*serialized); // @bug4507, 8k + fAggregator->loadEmptySet(*bs); // @bug4507, 8k } // @bug4507, 8k else // @bug4507, 8k { - fAggregator->loadResult(*serialized); // @bug4507, 8k - fAggregator->aggReset(); // @bug4507, 8k + fAggregator->loadResult(*bs); // @bug4507, 8k + fAggregator->aggReset(); // @bug4507, 8k } // @bug4507, 8k } if (!fAggregator && !fe2) { - *serialized << (uint8_t)1; // the "count this msg" var + *bs << (uint8_t)1; // the "count this msg" var outputRG.setDBRoot(dbRoot); // cerr << "serializing " << outputRG.toString() << endl; - outputRG.serializeRGData(*serialized); + outputRG.serializeRGData(*bs); //*serialized << outputRG.getDataSize(); // serialized->append(outputRG.getData(), outputRG.getDataSize()); @@ -1700,7 +1703,7 @@ void BatchPrimitiveProcessor::execute() else // Is doJoin { uint32_t startRid = 0; - ByteStream preamble = *serialized; + ByteStream preamble = *bs; origRidCount = ridCount; // ridCount can get modified by executeTupleJoin(). We need to keep track of // the original val. /* project the key columns. If there's the filter IN the join, project everything. @@ -1783,7 +1786,7 @@ void BatchPrimitiveProcessor::execute() sendCount = (uint8_t)(!moreRGs && !startRid); // *serialized << (uint8_t)(!moreRGs && !startRid); // the "count // this msg" var - *serialized << sendCount; + *bs << sendCount; if (fe2) { utils::setThreadName("BPPFE2_2"); @@ -1817,30 +1820,30 @@ void BatchPrimitiveProcessor::execute() if ((currentBlockOffset + 1) == count && moreRGs == false && startRid == 0) // @bug4507, 8k { - fAggregator->loadResult(*serialized); // @bug4507, 8k + fAggregator->loadResult(*bs); // @bug4507, 8k } // @bug4507, 8k else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k { - fAggregator->loadEmptySet(*serialized); // @bug4507, 8k + fAggregator->loadEmptySet(*bs); // @bug4507, 8k } // @bug4507, 8k else // @bug4507, 8k { - fAggregator->loadResult(*serialized); // @bug4507, 8k - fAggregator->aggReset(); // @bug4507, 8k + fAggregator->loadResult(*bs); // @bug4507, 8k + fAggregator->aggReset(); // @bug4507, 8k } // @bug4507, 8k } else { // cerr <<" * serialzing " << nextRG.toString() << endl; - nextRG.serializeRGData(*serialized); + nextRG.serializeRGData(*bs); } /* send the msg & reinit the BS */ if (moreRGs) { - sendResponse(); - serialized.reset(new ByteStream()); - *serialized = preamble; + sendResponse(bs); + bs.reset(new ByteStream()); + *bs = preamble; } } @@ -1848,16 +1851,16 @@ void BatchPrimitiveProcessor::execute() { // Should we happen to finish sending data rows right on the boundary of when moreRGs flips off, // then we need to start a new buffer. I.e., it needs the count this message byte pushed. - if (serialized->length() == preamble.length()) - *serialized << (uint8_t)(startRid > 0 ? 0 : 1); // the "count this msg" var + if (bs->length() == preamble.length()) + *bs << (uint8_t)(startRid > 0 ? 0 : 1); // the "count this msg" var - *serialized << ridCount; + *bs << ridCount; for (i = 0; i < joinerCount; i++) { for (j = 0; j < ridCount; ++j) { - serializeInlineVector(*serialized, tSmallSideMatches[i][j]); + serializeInlineVector(*bs, tSmallSideMatches[i][j]); tSmallSideMatches[i][j].clear(); } } @@ -1872,10 +1875,10 @@ void BatchPrimitiveProcessor::execute() } else { - *serialized << (uint8_t)(startRid > 0 ? 0 : 1); // the "count this msg" var + *bs << (uint8_t)(startRid > 0 ? 0 : 1); // the "count this msg" var outputRG.setDBRoot(dbRoot); // cerr << "serializing " << outputRG.toString() << endl; - outputRG.serializeRGData(*serialized); + outputRG.serializeRGData(*bs); //*serialized << outputRG.getDataSize(); // serialized->append(outputRG.getData(), outputRG.getDataSize()); @@ -1883,16 +1886,16 @@ void BatchPrimitiveProcessor::execute() { for (j = 0; j < ridCount; ++j) { - serializeInlineVector(*serialized, tSmallSideMatches[i][j]); + serializeInlineVector(*bs, tSmallSideMatches[i][j]); tSmallSideMatches[i][j].clear(); } } } if (startRid > 0) { - sendResponse(); - serialized.reset(new ByteStream()); - *serialized = preamble; + sendResponse(bs); + bs.reset(new ByteStream()); + *bs = preamble; } } while (startRid > 0); } @@ -1905,11 +1908,11 @@ void BatchPrimitiveProcessor::execute() // sendCount << std::endl; if (projectCount > 0 || ot == ROW_GROUP) { - *serialized << cachedIO; + *bs << cachedIO; cachedIO = 0; - *serialized << physIO; + *bs << physIO; physIO = 0; - *serialized << touchedBlocks; + *bs << touchedBlocks; touchedBlocks = 0; // cout << "sent physIO=" << physIO << " cachedIO=" << cachedIO << // " touchedBlocks=" << touchedBlocks << endl; @@ -1922,15 +1925,15 @@ void BatchPrimitiveProcessor::execute() } catch (logging::QueryDataExcept& qex) { - writeErrorMsg(qex.what(), qex.errorCode()); + writeErrorMsg(bs, qex.what(), qex.errorCode()); } catch (logging::DictionaryBufferOverflow& db) { - writeErrorMsg(db.what(), db.errorCode()); + writeErrorMsg(bs, db.what(), db.errorCode()); } catch (scalar_exception& se) { - writeErrorMsg(IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW), ERR_MORE_THAN_1_ROW, false); + writeErrorMsg(bs, IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW), ERR_MORE_THAN_1_ROW, false); } catch (NeedToRestartJob& n) { @@ -1941,20 +1944,21 @@ void BatchPrimitiveProcessor::execute() } catch (IDBExcept& iex) { - writeErrorMsg(iex.what(), iex.errorCode(), true, false); + writeErrorMsg(bs, iex.what(), iex.errorCode(), true, false); } catch (const std::exception& ex) { - writeErrorMsg(ex.what(), logging::batchPrimitiveProcessorErr); + writeErrorMsg(bs, ex.what(), logging::batchPrimitiveProcessorErr); } catch (...) { string msg("BatchPrimitiveProcessor caught an unknown exception"); - writeErrorMsg(msg, logging::batchPrimitiveProcessorErr); + writeErrorMsg(bs, msg, logging::batchPrimitiveProcessorErr); } } -void BatchPrimitiveProcessor::writeErrorMsg(const string& error, uint16_t errCode, bool logIt, bool critical) +void BatchPrimitiveProcessor::writeErrorMsg(messageqcpp::SBS& bs, const string& error, uint16_t errCode, + bool logIt, bool critical) { ISMPacketHeader ism; PrimitiveHeader ph; @@ -1970,10 +1974,10 @@ void BatchPrimitiveProcessor::writeErrorMsg(const string& error, uint16_t errCod ph.UniqueID = uniqueID; ism.Status = errCode; - serialized.reset(new ByteStream()); - serialized->append((uint8_t*)&ism, sizeof(ism)); - serialized->append((uint8_t*)&ph, sizeof(ph)); - *serialized << error; + bs.reset(new ByteStream()); + bs->append((uint8_t*)&ism, sizeof(ism)); + bs->append((uint8_t*)&ph, sizeof(ph)); + *bs << error; if (logIt) { @@ -1982,7 +1986,7 @@ void BatchPrimitiveProcessor::writeErrorMsg(const string& error, uint16_t errCod } } -void BatchPrimitiveProcessor::writeProjectionPreamble() +void BatchPrimitiveProcessor::writeProjectionPreamble(SBS& bs) { ISMPacketHeader ism; PrimitiveHeader ph; @@ -1997,36 +2001,36 @@ void BatchPrimitiveProcessor::writeProjectionPreamble() ph.StepID = stepID; ph.UniqueID = uniqueID; - serialized.reset(new ByteStream()); - serialized->append((uint8_t*)&ism, sizeof(ism)); - serialized->append((uint8_t*)&ph, sizeof(ph)); + bs.reset(new ByteStream()); + bs->append((uint8_t*)&ism, sizeof(ism)); + bs->append((uint8_t*)&ph, sizeof(ph)); /* add-ons */ if (hasScan) { if (validCPData) { - *serialized << (uint8_t)1; - *serialized << lbidForCP; - *serialized << ((uint8_t)cpDataFromDictScan); + *bs << (uint8_t)1; + *bs << lbidForCP; + *bs << ((uint8_t)cpDataFromDictScan); if (UNLIKELY(hasWideColumnOut)) { // PSA width - *serialized << (uint8_t)wideColumnWidthOut; - *serialized << min128Val; - *serialized << max128Val; + *bs << (uint8_t)wideColumnWidthOut; + *bs << min128Val; + *bs << max128Val; } else { - *serialized << (uint8_t)utils::MAXLEGACYWIDTH; // width of min/max value - *serialized << (uint64_t)minVal; - *serialized << (uint64_t)maxVal; + *bs << (uint8_t)utils::MAXLEGACYWIDTH; // width of min/max value + *bs << (uint64_t)minVal; + *bs << (uint64_t)maxVal; } } else { - *serialized << (uint8_t)0; - *serialized << lbidForCP; + *bs << (uint8_t)0; + *bs << lbidForCP; } } @@ -2035,34 +2039,34 @@ void BatchPrimitiveProcessor::writeProjectionPreamble() if (ot != ROW_GROUP) { - *serialized << ridCount; + *bs << ridCount; if (sendRidsAtDelivery) { - *serialized << baseRid; - serialized->append((uint8_t*)relRids, ridCount << 1); + *bs << baseRid; + bs->append((uint8_t*)relRids, ridCount << 1); } } } -void BatchPrimitiveProcessor::serializeElementTypes() +void BatchPrimitiveProcessor::serializeElementTypes(messageqcpp::SBS& bs) { - *serialized << baseRid; - *serialized << ridCount; - serialized->append((uint8_t*)relRids, ridCount << 1); - serialized->append((uint8_t*)values, ridCount << 3); + *bs << baseRid; + *bs << ridCount; + bs->append((uint8_t*)relRids, ridCount << 1); + bs->append((uint8_t*)values, ridCount << 3); } -void BatchPrimitiveProcessor::serializeStrings() +void BatchPrimitiveProcessor::serializeStrings(messageqcpp::SBS& bs) { - *serialized << ridCount; - serialized->append((uint8_t*)absRids.get(), ridCount << 3); + *bs << ridCount; + bs->append((uint8_t*)absRids.get(), ridCount << 3); for (uint32_t i = 0; i < ridCount; ++i) - *serialized << strValues[i]; + *bs << strValues[i]; } -void BatchPrimitiveProcessor::sendResponse() +void BatchPrimitiveProcessor::sendResponse(messageqcpp::SBS& bs) { // Here is the fast path for local EM to PM interaction. PM puts into the // input EM DEC queue directly. @@ -2073,12 +2077,12 @@ void BatchPrimitiveProcessor::sendResponse() // is limited. if (sendThread->flowControlEnabled()) { - sendThread->sendResult({serialized, sock, writelock, 0}, false); + sendThread->sendResult({bs, sock, writelock, 0}, false); } else { - sock->write(serialized); - serialized.reset(); + sock->write(bs); + bs.reset(); } return; @@ -2088,20 +2092,20 @@ void BatchPrimitiveProcessor::sendResponse() { // newConnection should be set only for the first result of a batch job // it tells sendthread it should consider it for the connection array - sendThread->sendResult(BPPSendThread::Msg_t(serialized, sock, writelock, sockIndex), newConnection); + sendThread->sendResult(BPPSendThread::Msg_t(bs, sock, writelock, sockIndex), newConnection); newConnection = false; } else { boost::mutex::scoped_lock lk(*writelock); - sock->write(*serialized); + sock->write(*bs); } - serialized.reset(); + bs.reset(); } /* The output of a filter chain is either ELEMENT_TYPE or STRING_ELEMENT_TYPE */ -void BatchPrimitiveProcessor::makeResponse() +void BatchPrimitiveProcessor::makeResponse(messageqcpp::SBS& bs) { ISMPacketHeader ism; PrimitiveHeader ph; @@ -2116,39 +2120,39 @@ void BatchPrimitiveProcessor::makeResponse() ph.StepID = stepID; ph.UniqueID = uniqueID; - serialized.reset(new ByteStream()); - serialized->append((uint8_t*)&ism, sizeof(ism)); - serialized->append((uint8_t*)&ph, sizeof(ph)); + bs.reset(new ByteStream()); + bs->append((uint8_t*)&ism, sizeof(ism)); + bs->append((uint8_t*)&ph, sizeof(ph)); /* add-ons */ if (hasScan) { if (validCPData) { - *serialized << (uint8_t)1; - *serialized << lbidForCP; - *serialized << ((uint8_t)cpDataFromDictScan); + *bs << (uint8_t)1; + *bs << lbidForCP; + *bs << ((uint8_t)cpDataFromDictScan); if (UNLIKELY(hasWideColumnOut)) { // PSA width // Remove the assert for >16 bytes DTs. assert(wideColumnWidthOut == datatypes::MAXDECIMALWIDTH); - *serialized << (uint8_t)wideColumnWidthOut; - *serialized << min128Val; - *serialized << max128Val; + *bs << (uint8_t)wideColumnWidthOut; + *bs << min128Val; + *bs << max128Val; } else { - *serialized << (uint8_t)utils::MAXLEGACYWIDTH; // width of min/max value - *serialized << (uint64_t)minVal; - *serialized << (uint64_t)maxVal; + *bs << (uint8_t)utils::MAXLEGACYWIDTH; // width of min/max value + *bs << (uint64_t)minVal; + *bs << (uint64_t)maxVal; } } else { - *serialized << (uint8_t)0; - *serialized << lbidForCP; + *bs << (uint8_t)0; + *bs << lbidForCP; } } @@ -2156,9 +2160,9 @@ void BatchPrimitiveProcessor::makeResponse() /* Take the rid and value arrays, munge into OutputType ot */ switch (ot) { - case BPS_ELEMENT_TYPE: serializeElementTypes(); break; + case BPS_ELEMENT_TYPE: serializeElementTypes(bs); break; - case STRING_ELEMENT_TYPE: serializeStrings(); break; + case STRING_ELEMENT_TYPE: serializeStrings(bs); break; default: { @@ -2166,15 +2170,13 @@ void BatchPrimitiveProcessor::makeResponse() oss << "BPP: makeResponse(): Bad output type: " << ot; throw logic_error(oss.str()); } - - // throw logic_error("BPP: makeResponse(): Bad output type"); } - *serialized << cachedIO; + *bs << cachedIO; cachedIO = 0; - *serialized << physIO; + *bs << physIO; physIO = 0; - *serialized << touchedBlocks; + *bs << touchedBlocks; touchedBlocks = 0; // cout << "sent physIO=" << physIO << " cachedIO=" << cachedIO << @@ -2230,20 +2232,24 @@ int BatchPrimitiveProcessor::operator()() validCPData = false; cpDataFromDictScan = false; + + auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator(); + messageqcpp::SBS bs(new ByteStream(&alloc)); + #ifdef PRIMPROC_STOPWATCH stopwatch->start("BPP() execute"); execute(stopwatch); stopwatch->stop("BPP() execute"); #else - execute(); + execute(bs); #endif if (projectCount == 0 && ot != ROW_GROUP) - makeResponse(); + makeResponse(bs); try { - sendResponse(); + sendResponse(bs); } catch (std::exception& e) { @@ -2717,7 +2723,7 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde { bool hasNullValue = false; - for (unsigned int column: tlLargeSideKeyColumns[jIndex]) + for (unsigned int column : tlLargeSideKeyColumns[jIndex]) { if (r.isNullValue(column)) { diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index f686ecc15..55320fd46 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -33,9 +33,10 @@ #include #include #include -#include +#include #include +#include "countingallocator.h" #include "errorcodes.h" #include "serializeable.h" #include "messagequeue.h" @@ -189,20 +190,20 @@ class BatchPrimitiveProcessor #ifdef PRIMPROC_STOPWATCH void execute(logging::StopWatch* stopwatch); #else - void execute(); + void execute(messageqcpp::SBS& bs); #endif - void writeProjectionPreamble(); - void makeResponse(); - void sendResponse(); + void writeProjectionPreamble(messageqcpp::SBS& bs); + void makeResponse(messageqcpp::SBS& bs); + void sendResponse(messageqcpp::SBS& bs); /* Used by scan operations to increment the LBIDs in successive steps */ void nextLBID(); /* these send relative rids, should this be abs rids? */ - void serializeElementTypes(); - void serializeStrings(); + void serializeElementTypes(messageqcpp::SBS& bs); + void serializeStrings(messageqcpp::SBS& bs); void asyncLoadProjectColumns(); - void writeErrorMsg(const std::string& error, uint16_t errCode, bool logIt = true, bool critical = true); + void writeErrorMsg(messageqcpp::SBS& bs, const std::string& error, uint16_t errCode, bool logIt = true, bool critical = true); BPSOutputType ot; @@ -269,7 +270,7 @@ class BatchPrimitiveProcessor uint32_t physIO, cachedIO, touchedBlocks; SP_UM_IOSOCK sock; - messageqcpp::SBS serialized; + // messageqcpp::SBS serialized; SP_UM_MUTEX writelock; // MCOL-744 using pthread mutex instead of Boost mutex because @@ -308,16 +309,23 @@ class BatchPrimitiveProcessor bool hasRowGroup; /* Rowgroups + join */ - typedef std::tr1::unordered_multimap, - utils::STLPoolAllocator>> - TJoiner; + // typedef std::unordered_multimap, + // utils::STLPoolAllocator>> + // TJoiner; + using TJoiner = + std::unordered_multimap, + allocators::CountingAllocator>>; - typedef std::tr1::unordered_multimap< - joiner::TypelessData, uint32_t, joiner::TupleJoiner::TypelessDataHasher, - joiner::TupleJoiner::TypelessDataComparator, - utils::STLPoolAllocator>> - TLJoiner; + // typedef std::unordered_multimap< + // joiner::TypelessData, uint32_t, joiner::TupleJoiner::TypelessDataHasher, + // joiner::TupleJoiner::TypelessDataComparator, + // utils::STLPoolAllocator>> + // TLJoiner; + using TLJoiner = + std::unordered_multimap>>; bool generateJoinedRowGroup(rowgroup::Row& baseRow, const uint32_t depth = 0); /* generateJoinedRowGroup helper fcns & vars */ diff --git a/primitives/primproc/columncommand.cpp b/primitives/primproc/columncommand.cpp index 6f7480074..a3bd8ba5b 100644 --- a/primitives/primproc/columncommand.cpp +++ b/primitives/primproc/columncommand.cpp @@ -654,7 +654,7 @@ void ColumnCommand::fillInPrimitiveMessageHeader(const int8_t outputType, const } /* Assumes OT_DATAVALUE */ -void ColumnCommand::projectResult() +void ColumnCommand::projectResult(messageqcpp::SBS& bs) { auto nvals = outMsg->NVALS; if (primMsg->NVALS != nvals || nvals != bpp->ridCount) @@ -687,8 +687,8 @@ void ColumnCommand::projectResult() idbassert(primMsg->NVALS == nvals); idbassert(bpp->ridCount == nvals); uint32_t valuesByteSize = nvals * colType.colWidth; - *bpp->serialized << valuesByteSize; - bpp->serialized->append(primitives::getFirstValueArrayPosition(outMsg), valuesByteSize); + *bs << valuesByteSize; + bs->append(primitives::getFirstValueArrayPosition(outMsg), valuesByteSize); } void ColumnCommand::removeRowsFromRowGroup(RowGroup& rg) @@ -815,19 +815,19 @@ void ColumnCommand::projectResultRG(RowGroup& rg, uint32_t pos) } } -void ColumnCommand::project() +void ColumnCommand::project(messageqcpp::SBS& bs) { /* bpp->ridCount == 0 would signify a scan operation */ if (bpp->ridCount == 0) { - *bpp->serialized << (uint32_t)0; + *bs << (uint32_t)0; blockCount += colType.colWidth; return; } makeStepMsg(); issuePrimitive(); - projectResult(); + projectResult(bs); } void ColumnCommand::projectIntoRowGroup(RowGroup& rg, uint32_t pos) diff --git a/primitives/primproc/columncommand.h b/primitives/primproc/columncommand.h index 3914debe9..fd5ae30df 100644 --- a/primitives/primproc/columncommand.h +++ b/primitives/primproc/columncommand.h @@ -31,6 +31,7 @@ #pragma once #include +#include "bytestream.h" #include "columnwidth.h" #include "command.h" #include "calpontsystemcatalog.h" @@ -71,7 +72,7 @@ class ColumnCommand : public Command void execute() override; void execute(int64_t* vals); // used by RTSCommand to redirect values void prep(int8_t outputType, bool absRids) override; - void project() override; + void project(messageqcpp::SBS& bs) override; void projectIntoRowGroup(rowgroup::RowGroup& rg, uint32_t pos) override; void nextLBID() override; bool isScan() @@ -151,7 +152,7 @@ class ColumnCommand : public Command template void _process_OT_DATAVALUE(); void process_OT_ROWGROUP(); - void projectResult(); + void projectResult(messageqcpp::SBS& bs); template void _projectResultRGLoop(rowgroup::Row& r, const T* valuesArray, const uint32_t offset); template diff --git a/primitives/primproc/command.h b/primitives/primproc/command.h index bd0afc9b8..81f228ac7 100644 --- a/primitives/primproc/command.h +++ b/primitives/primproc/command.h @@ -54,7 +54,7 @@ class Command virtual ~Command(); virtual void execute() = 0; - virtual void project() = 0; + virtual void project(messageqcpp::SBS& bs) = 0; virtual void projectIntoRowGroup(rowgroup::RowGroup& rg, uint32_t columnPosition) = 0; virtual uint64_t getLBID() = 0; virtual void getLBIDList(uint32_t loopCount, std::vector* out) diff --git a/primitives/primproc/dictstep.cpp b/primitives/primproc/dictstep.cpp index db6d68be7..1a4753201 100644 --- a/primitives/primproc/dictstep.cpp +++ b/primitives/primproc/dictstep.cpp @@ -414,7 +414,7 @@ void DictStep::_execute() } /* This will do the same thing as execute() but put the result in bpp->serialized */ -void DictStep::_project() +void DictStep::_project(messageqcpp::SBS& bs) { /* Need to loop over bpp->values, issuing a primitive for each LBID */ uint32_t i; @@ -466,13 +466,13 @@ void DictStep::_project() } idbassert(tmpResultCounter == bpp->ridCount); - *bpp->serialized << totalResultLength; + *bs << totalResultLength; // cout << "_project() total length = " << totalResultLength << endl; for (i = 0; i < tmpResultCounter; i++) { // cout << "serializing " << tmpStrings[i] << endl; - *bpp->serialized << tmpStrings[i]; + *bs << tmpStrings[i]; } // cout << "DS: /_project() l: " << l_lbid << endl; @@ -645,16 +645,16 @@ void DictStep::_projectToRG(RowGroup& rg, uint32_t col) // << endl; } -void DictStep::project() +void DictStep::project(messageqcpp::SBS& bs) { values = bpp->values; - _project(); + _project(bs); } -void DictStep::project(int64_t* vals) +void DictStep::project(messageqcpp::SBS& bs, int64_t* vals) { values = vals; - _project(); + _project(bs); } void DictStep::projectIntoRowGroup(RowGroup& rg, uint32_t col) diff --git a/primitives/primproc/dictstep.h b/primitives/primproc/dictstep.h index cb07717bb..573634731 100644 --- a/primitives/primproc/dictstep.h +++ b/primitives/primproc/dictstep.h @@ -42,8 +42,8 @@ class DictStep : public Command ~DictStep() override; void execute() override; - void project() override; - void project(int64_t* vals); // used by RTSCommand to redirect input + void project(messageqcpp::SBS& bs) override; + void project(messageqcpp::SBS& bs, int64_t* vals); // used by RTSCommand to redirect input void projectIntoRowGroup(rowgroup::RowGroup& rg, uint32_t row) override; void projectIntoRowGroup(rowgroup::RowGroup& rg, int64_t* vals, uint32_t col); uint64_t getLBID() override; @@ -95,7 +95,7 @@ class DictStep : public Command void processResult(); void projectResult(std::string* tmpStrings); void projectResult(StringPtr* tmpStrings); - void _project(); + void _project(messageqcpp::SBS& bs); void _projectToRG(rowgroup::RowGroup& rg, uint32_t col); // struct used for scratch space diff --git a/primitives/primproc/filtercommand.cpp b/primitives/primproc/filtercommand.cpp index f5c0e6646..5587a2c7d 100644 --- a/primitives/primproc/filtercommand.cpp +++ b/primitives/primproc/filtercommand.cpp @@ -201,7 +201,7 @@ void FilterCommand::prep(int8_t outputType, bool absRids) { } -void FilterCommand::project() +void FilterCommand::project(messageqcpp::SBS& bs) { } diff --git a/primitives/primproc/filtercommand.h b/primitives/primproc/filtercommand.h index 3515c7c02..08ce44577 100644 --- a/primitives/primproc/filtercommand.h +++ b/primitives/primproc/filtercommand.h @@ -45,7 +45,7 @@ class FilterCommand : public Command // virtuals from base class -- Command void execute() override; - void project() override; + void project(messageqcpp::SBS& bs) override; void projectIntoRowGroup(rowgroup::RowGroup& rg, uint32_t col) override; uint64_t getLBID() override; void nextLBID() override; diff --git a/primitives/primproc/passthrucommand.cpp b/primitives/primproc/passthrucommand.cpp index 936d3466a..fd9d36f1a 100644 --- a/primitives/primproc/passthrucommand.cpp +++ b/primitives/primproc/passthrucommand.cpp @@ -60,11 +60,11 @@ void PassThruCommand::execute() // throw logic_error("PassThruCommand isn't a filter step"); } -void PassThruCommand::project() +void PassThruCommand::project(messageqcpp::SBS& bs) { uint32_t i; - *bpp->serialized << (uint32_t)(bpp->ridCount * colWidth); + *bs << (uint32_t)(bpp->ridCount * colWidth); #if 0 cout << "pass thru serializing " << (uint32_t) (bpp->ridCount * colWidth) << " bytes:\n"; cout << "at relative position " << bpp->serialized->length() - sizeof(ISMPacketHeader) - sizeof(PrimitiveHeader) - 4 << endl; @@ -76,25 +76,25 @@ void PassThruCommand::project() switch (colWidth) { - case 16: bpp->serialized->append((uint8_t*)bpp->wide128Values, bpp->ridCount << 4); break; + case 16: bs->append((uint8_t*)bpp->wide128Values, bpp->ridCount << 4); break; - case 8: bpp->serialized->append((uint8_t*)bpp->values, bpp->ridCount << 3); break; + case 8: bs->append((uint8_t*)bpp->values, bpp->ridCount << 3); break; case 4: for (i = 0; i < bpp->ridCount; i++) - *bpp->serialized << (uint32_t)bpp->values[i]; + *bs << (uint32_t)bpp->values[i]; break; case 2: for (i = 0; i < bpp->ridCount; i++) - *bpp->serialized << (uint16_t)bpp->values[i]; + *bs << (uint16_t)bpp->values[i]; break; case 1: for (i = 0; i < bpp->ridCount; i++) - *bpp->serialized << (uint8_t)bpp->values[i]; + *bs << (uint8_t)bpp->values[i]; break; diff --git a/primitives/primproc/passthrucommand.h b/primitives/primproc/passthrucommand.h index 87e577872..e9944825e 100644 --- a/primitives/primproc/passthrucommand.h +++ b/primitives/primproc/passthrucommand.h @@ -30,6 +30,7 @@ #pragma once +#include "bytestream.h" #include "command.h" namespace primitiveprocessor @@ -42,7 +43,7 @@ class PassThruCommand : public Command void prep(int8_t outputType, bool makeAbsRids) override; void execute() override; - void project() override; + void project(messageqcpp::SBS& bs) override; void projectIntoRowGroup(rowgroup::RowGroup& rg, uint32_t col) override; uint64_t getLBID() override; void nextLBID() override; diff --git a/primitives/primproc/primproc.cpp b/primitives/primproc/primproc.cpp index 87ef46af2..672659d1d 100644 --- a/primitives/primproc/primproc.cpp +++ b/primitives/primproc/primproc.cpp @@ -121,7 +121,6 @@ void setupSignalHandlers() sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGPIPE); - sigaddset(&sigset, SIGUSR1); sigaddset(&sigset, SIGUSR2); sigprocmask(SIG_BLOCK, &sigset, 0); @@ -337,12 +336,15 @@ int ServicePrimProc::Child() return 2; } + bool runningWithExeMgr = true; + auto* rm = joblist::ResourceManager::instance(runningWithExeMgr, cf); + utils::USpaceSpinLock startupRaceLock(getStartupRaceFlag()); std::thread exeMgrThread( - [this, cf]() + [this, rm]() { exemgr::Opt opt; - exemgr::globServiceExeMgr = new exemgr::ServiceExeMgr(opt, cf); + exemgr::globServiceExeMgr = new exemgr::ServiceExeMgr(opt, rm); // primitive delay to avoid 'not connected to PM' log error messages // from EM. PrimitiveServer::start() releases SpinLock after sockets // are available. diff --git a/primitives/primproc/rtscommand.cpp b/primitives/primproc/rtscommand.cpp index 2cbdb8b7b..9e783418b 100644 --- a/primitives/primproc/rtscommand.cpp +++ b/primitives/primproc/rtscommand.cpp @@ -31,6 +31,7 @@ #include #include "bpp.h" +#include "bytestream.h" #include "exceptclasses.h" using namespace std; @@ -53,7 +54,7 @@ void RTSCommand::execute() throw logic_error("RTSCommand shouldn't be used for filter steps"); } -void RTSCommand::project() +void RTSCommand::project(messageqcpp::SBS& bs) { uint32_t i; @@ -70,7 +71,7 @@ void RTSCommand::project() // need something in values - dict.project(); + dict.project(bs); } else { @@ -99,7 +100,7 @@ void RTSCommand::project() } } - dict.project(tmpValues); + dict.project(bs, tmpValues); } } diff --git a/primitives/primproc/rtscommand.h b/primitives/primproc/rtscommand.h index e5c594a27..637367fc1 100644 --- a/primitives/primproc/rtscommand.h +++ b/primitives/primproc/rtscommand.h @@ -30,6 +30,7 @@ #pragma once +#include "bytestream.h" #include "command.h" #include #include @@ -43,7 +44,7 @@ class RTSCommand : public Command ~RTSCommand() override; void execute() override; - void project() override; + void project(messageqcpp::SBS& bs) override; void projectIntoRowGroup(rowgroup::RowGroup& rg, uint32_t col) override; uint64_t getLBID() override; void nextLBID() override; diff --git a/primitives/primproc/serviceexemgr.cpp b/primitives/primproc/serviceexemgr.cpp index 8746b68f7..692911244 100644 --- a/primitives/primproc/serviceexemgr.cpp +++ b/primitives/primproc/serviceexemgr.cpp @@ -89,22 +89,8 @@ void startRssMon(size_t maxPct, int pauseSeconds); void added_a_pm(int) { - logging::LoggingID logid(21, 0, 0); - logging::Message::Args args1; - logging::Message msg(1); - args1.add("exeMgr caught SIGHUP. Resetting connections"); - msg.format(args1); - std::cout << msg.msg().c_str() << std::endl; - logging::Logger logger(logid.fSubsysID); - logger.logMessage(logging::LOG_TYPE_DEBUG, msg, logid); - - auto* dec = exemgr::globServiceExeMgr->getDec(); - if (dec) - { - oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - oamCache->forceReload(); - dec->Setup(); - } + int64_t num = globServiceExeMgr->getRm().availableMemory(); + std::cout << "Total UM memory available: " << num << std::endl; } void printTotalUmMemory(int sig) diff --git a/primitives/primproc/serviceexemgr.h b/primitives/primproc/serviceexemgr.h index 5be609593..9f4ff73a6 100644 --- a/primitives/primproc/serviceexemgr.h +++ b/primitives/primproc/serviceexemgr.h @@ -94,6 +94,8 @@ class Opt } }; + void printTotalUmMemory(int sig); + class ServiceExeMgr : public Service, public Opt { using SessionMemMap_t = std::map; @@ -111,50 +113,40 @@ class ServiceExeMgr : public Service, public Opt logger.logMessage(type, message, logid); } - public: - ServiceExeMgr(const Opt& opt) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16)) - { - bool runningWithExeMgr = true; - rm_ = joblist::ResourceManager::instance(runningWithExeMgr); - } - ServiceExeMgr(const Opt& opt, config::Config* aConfig) - : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16)) - { - bool runningWithExeMgr = true; - rm_ = joblist::ResourceManager::instance(runningWithExeMgr, aConfig); - } - void LogErrno() override - { - log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno))); - } - void ParentLogChildMessage(const std::string& str) override - { - log(logging::LOG_TYPE_INFO, str); - } - int Child() override; - int Run() - { - return m_fg ? Child() : RunForking(); - } - static const constexpr unsigned logDefaultMsg = logging::M0000; - static const constexpr unsigned logDbProfStartStatement = logging::M0028; - static const constexpr unsigned logDbProfEndStatement = logging::M0029; - static const constexpr unsigned logStartSql = logging::M0041; - static const constexpr unsigned logEndSql = logging::M0042; - static const constexpr unsigned logRssTooBig = logging::M0044; - static const constexpr unsigned logDbProfQueryStats = logging::M0047; - static const constexpr unsigned logExeMgrExcpt = logging::M0055; - // If any flags other than the table mode flags are set, produce output to screeen - static const constexpr uint32_t flagsWantOutput = - (0xffffffff & ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH & - ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF); - logging::Logger& getLogger() - { - return msgLog_; - } - void updateSessionMap(const size_t pct) - { - std::lock_guard lk(sessionMemMapMutex_); + public: + ServiceExeMgr(const Opt& opt, joblist::ResourceManager* rm) : Service("ExeMgr"), Opt(opt), msgLog_(logging::Logger(16)), rm_(rm) + { } + void LogErrno() override + { + log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno))); + } + void ParentLogChildMessage(const std::string& str) override + { + log(logging::LOG_TYPE_INFO, str); + } + int Child() override; + int Run() + { + return m_fg ? Child() : RunForking(); + } + static const constexpr unsigned logDefaultMsg = logging::M0000; + static const constexpr unsigned logDbProfStartStatement = logging::M0028; + static const constexpr unsigned logDbProfEndStatement = logging::M0029; + static const constexpr unsigned logStartSql = logging::M0041; + static const constexpr unsigned logEndSql = logging::M0042; + static const constexpr unsigned logRssTooBig = logging::M0044; + static const constexpr unsigned logDbProfQueryStats = logging::M0047; + static const constexpr unsigned logExeMgrExcpt = logging::M0055; + // If any flags other than the table mode flags are set, produce output to screeen + static const constexpr uint32_t flagsWantOutput = (0xffffffff & ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH & + ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF); + logging::Logger& getLogger() + { + return msgLog_; + } + void updateSessionMap(const size_t pct) + { + std::lock_guard lk(sessionMemMapMutex_); for (auto mapIter = sessionMemMap_.begin(); mapIter != sessionMemMap_.end(); ++mapIter) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 549f92a7c..c90ba775e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -75,6 +75,11 @@ if (WITH_UNITTESTS) target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) gtest_add_tests(TARGET fair_threadpool_test TEST_PREFIX columnstore:) + add_executable(counting_allocator counting_allocator.cpp) + add_dependencies(counting_allocator googletest) + target_link_libraries(counting_allocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES}) + gtest_add_tests(TARGET counting_allocator TEST_PREFIX columnstore:) + add_executable(comparators_tests comparators-tests.cpp) target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) add_test(NAME columnstore:comparators_tests COMMAND comparators_tests) diff --git a/tests/counting_allocator.cpp b/tests/counting_allocator.cpp new file mode 100644 index 000000000..1ca5ccda6 --- /dev/null +++ b/tests/counting_allocator.cpp @@ -0,0 +1,139 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ +#include +#include +#include +#include +#include "countingallocator.h" + +using namespace allocators; + +// Example class to be managed by the allocator +struct TestClass { + int value; + + TestClass(int val) : value(val) {} +}; + +static const constexpr int64_t MemoryAllowance = 10 * 1024 * 1024; + +// Test Fixture for AtomicCounterAllocator +class CountingAllocatorTest : public ::testing::Test { +protected: + // Atomic counter to track allocated memory + std::atomic allocatedMemory{MemoryAllowance}; + + // Custom allocator instance + CountingAllocator allocator; + + // Constructor + CountingAllocatorTest() + : allocatedMemory(MemoryAllowance), allocator(allocatedMemory, MemoryAllowance / 100) {} + + // Destructor + ~CountingAllocatorTest() override = default; +}; + +// Test 1: Allocation increases the counter correctly +TEST_F(CountingAllocatorTest, Allocation) { + const std::size_t numObjects = 5; + TestClass* ptr = allocator.allocate(numObjects); + EXPECT_NE(ptr, nullptr); + EXPECT_EQ(allocatedMemory.load(), MemoryAllowance - numObjects * static_cast(sizeof(TestClass))); + allocator.deallocate(ptr, numObjects); +} + +// Test 2: Deallocation decreases the counter correctly +TEST_F(CountingAllocatorTest, Deallocation) { + const std::size_t numObjects = 3; + TestClass* ptr = allocator.allocate(numObjects); + EXPECT_EQ(allocatedMemory.load(), MemoryAllowance - numObjects * static_cast(sizeof(TestClass))); + + allocator.deallocate(ptr, numObjects); + EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); +} + +// Test 3: Allocator equality based on shared counter +TEST_F(CountingAllocatorTest, AllocatorEquality) { + CountingAllocator allocator1(allocatedMemory); + CountingAllocator allocator2(allocatedMemory); + EXPECT_TRUE(allocator1 == allocator2); + + std::atomic anotherCounter(0); + CountingAllocator allocator3(anotherCounter); + EXPECT_FALSE(allocator1 == allocator3); +} + +// Test 4: Using allocator with std::allocate_shared +TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator) { + // Create a shared_ptr using allocate_shared with the custom allocator + std::shared_ptr ptr = std::allocate_shared(allocator, 100); + + // Check that the counter has increased by the size of TestClass plus control block + // Exact size depends on the implementation, so we verify it's at least sizeof(TestClass) + EXPECT_LE(allocatedMemory.load(), MemoryAllowance - static_cast(sizeof(TestClass))); + + // Reset the shared_ptr and check that the counter decreases appropriately + ptr.reset(); + // After deallocation, the counter should return to zero + EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); + + auto deleter = [this](TestClass* ptr) { + this->allocator.deallocate(ptr, 1); + }; + ptr.reset(allocator.allocate(1), deleter); + EXPECT_LE(allocatedMemory.load(), MemoryAllowance - static_cast(sizeof(TestClass))); + + ptr.reset(); + EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); +} + +// Test 5: Thread Safety - Concurrent Allocations and Deallocations +TEST_F(CountingAllocatorTest, ThreadSafety) { + const std::size_t numThreads = 100; + const std::size_t allocationsPerThread = 3; + + auto worker = [this]() { + for (std::size_t i = 0; i < allocationsPerThread; ++i) { + TestClass* ptr = allocator.allocate(1); + allocator.deallocate(ptr, 1); + } + }; + + std::vector threads; + // Launch multiple threads performing allocations and deallocations + for (std::size_t i = 0; i < numThreads; ++i) { + threads.emplace_back(worker); + } + + // Wait for all threads to finish + for (auto& th : threads) { + th.join(); + } + + // After all allocations and deallocations, the counter should be zero + EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); +} + +// Test 6: Allocating zero objects should not change the counter +TEST_F(CountingAllocatorTest, AllocateZeroObjects) { + TestClass* ptr = allocator.allocate(0); + EXPECT_NE(ptr, nullptr); + EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); + allocator.deallocate(ptr, 0); + EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); +} \ No newline at end of file diff --git a/utils/common/countingallocator.h b/utils/common/countingallocator.h new file mode 100644 index 000000000..76f7c9a58 --- /dev/null +++ b/utils/common/countingallocator.h @@ -0,0 +1,91 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace allocators +{ + +// const constexpr std::uint64_t CounterUpdateUnitSize = 4 * 1024 * 1024; +const constexpr std::int64_t MemoryLimitLowerBound = 100 * 1024 * 1024; // WIP + +// Custom Allocator that tracks allocated memory using an atomic counter +template +class CountingAllocator { +public: + using value_type = T; + + // Constructor accepting a reference to an atomic counter + explicit CountingAllocator(std::atomic& memoryLimit, const uint64_t lowerBound = MemoryLimitLowerBound) noexcept + : memoryLimitRef_(memoryLimit), memoryLimitLowerBound(lowerBound) {} + + // Copy constructor (template to allow conversion between different types) + template + CountingAllocator(const CountingAllocator& other) noexcept + : memoryLimitRef_(other.memoryLimitRef_) {} + + // Allocate memory for n objects of type T + T* allocate(std::size_t n) { + auto memCounted = memoryLimitRef_.fetch_sub(n * sizeof(T), std::memory_order_relaxed); + if (memCounted < memoryLimitLowerBound) { + memoryLimitRef_.fetch_add(n * sizeof(T), std::memory_order_relaxed); + throw std::bad_alloc(); + } + + T* ptr = static_cast(::operator new(n * sizeof(T))); + // std::cout << "[Allocate] " << n * sizeof(T) << " bytes at " << static_cast(ptr) + // << ". current timit: " << std::dec << memoryLimitRef_.load() << std::hex << " bytes.\n"; + return ptr; + } + + // Deallocate memory for n objects of type T + void deallocate(T* ptr, std::size_t n) noexcept { + ::operator delete(ptr); + memoryLimitRef_.fetch_add(n * sizeof(T), std::memory_order_relaxed); + // std::cout << "[Deallocate] " << n * sizeof(T) << " bytes from " << static_cast(ptr) + // << ". current timit: " << std::dec << memoryLimitRef_.load() << std::hex << " bytes.\n"; + } + + // Equality operators (allocators are equal if they share the same counter) + template + bool operator==(const CountingAllocator& other) const noexcept { + return &memoryLimitRef_ == &other.memoryLimitRef_; + } + + template + bool operator!=(const CountingAllocator& other) const noexcept { + return !(*this == other); + } + +private: + std::atomic& memoryLimitRef_; + int64_t memoryLimitLowerBound = 0; + + // Grant access to other instances of CountingAllocator with different types + template + friend class CountingAllocator; +}; + +} // namespace allocators \ No newline at end of file diff --git a/utils/messageqcpp/bytestream.cpp b/utils/messageqcpp/bytestream.cpp index 565b6eb1c..527f09709 100644 --- a/utils/messageqcpp/bytestream.cpp +++ b/utils/messageqcpp/bytestream.cpp @@ -50,8 +50,8 @@ void ByteStream::doCopy(const ByteStream& rhs) if (fMaxLen < rlen) { - delete[] fBuf; - fBuf = new uint8_t[rlen + ISSOverhead]; + deallocate(fBuf); + fBuf = allocate(rlen + ISSOverhead); fMaxLen = rlen; } @@ -83,7 +83,7 @@ ByteStream& ByteStream::operator=(const ByteStream& rhs) doCopy(rhs); else { - delete[] fBuf; + deallocate(fBuf); fBuf = fCurInPtr = fCurOutPtr = 0; fMaxLen = 0; // Clear `longStrings`. @@ -100,6 +100,13 @@ ByteStream::ByteStream(BSSizeType initSize) : fBuf(0), fCurInPtr(0), fCurOutPtr( growBuf(initSize); } +ByteStream::ByteStream(allocators::CountingAllocator* allocator, uint32_t initSize) + : fBuf(0), fCurInPtr(0), fCurOutPtr(0), fMaxLen(0), allocator(allocator) +{ + if (initSize > 0) + growBuf(initSize); +} + void ByteStream::add(const uint8_t b) { if (fBuf == 0 || (static_cast(fCurInPtr - fBuf) == fMaxLen + ISSOverhead)) @@ -108,6 +115,26 @@ void ByteStream::add(const uint8_t b) *fCurInPtr++ = b; } +BSBufType* ByteStream::allocate(const size_t size) +{ + if (allocator) + { + auto* mem = allocator->allocate(size); + return new (mem) BSBufType[size]; + } + return new BSBufType[size]; +} + +void ByteStream::deallocate(BSBufType* ptr) +{ + if (allocator) + { + size_t count = (fMaxLen) ? fMaxLen + ISSOverhead : 0; + return allocator->deallocate(ptr, count); + } + return delete[] fBuf; +} + void ByteStream::growBuf(BSSizeType toSize) { if (fBuf == 0) @@ -117,7 +144,7 @@ void ByteStream::growBuf(BSSizeType toSize) else toSize = ((toSize + BlockSize - 1) / BlockSize) * BlockSize; - fBuf = new uint8_t[toSize + ISSOverhead]; + fBuf = allocate(toSize + ISSOverhead); #ifdef ZERO_ON_NEW memset(fBuf, 0, (toSize + ISSOverhead)); #endif @@ -137,14 +164,14 @@ void ByteStream::growBuf(BSSizeType toSize) // Make sure we at least double the allocation toSize = std::max(toSize, fMaxLen * 2); - uint8_t* t = new uint8_t[toSize + ISSOverhead]; + BSBufType* t = allocate(toSize + ISSOverhead); BSSizeType curOutOff = fCurOutPtr - fBuf; BSSizeType curInOff = fCurInPtr - fBuf; memcpy(t, fBuf, fCurInPtr - fBuf); #ifdef ZERO_ON_NEW memset(t + (fCurInPtr - fBuf), 0, (toSize + ISSOverhead) - (fCurInPtr - fBuf)); #endif - delete[] fBuf; + deallocate(fBuf); fBuf = t; fMaxLen = toSize; fCurInPtr = fBuf + curInOff; @@ -541,8 +568,8 @@ void ByteStream::load(const uint8_t* bp, BSSizeType len) if (len > fMaxLen) { - delete[] fBuf; - fBuf = new uint8_t[newMaxLen + ISSOverhead]; + deallocate(fBuf); + fBuf = allocate(newMaxLen + ISSOverhead); fMaxLen = newMaxLen; } @@ -575,8 +602,10 @@ void ByteStream::swap(ByteStream& rhs) std::swap(fCurOutPtr, rhs.fCurOutPtr); std::swap(fMaxLen, rhs.fMaxLen); std::swap(longStrings, rhs.longStrings); + std::swap(allocator, rhs.allocator); } +// WIP use allocator ifstream& operator>>(ifstream& ifs, ByteStream& bs) { int ifs_len; @@ -653,7 +682,6 @@ void ByteStream::needAtLeast(BSSizeType amount) growBuf(fMaxLen + amount); } - ByteStream& ByteStream::operator<<(const ByteStream& bs) { BSSizeType len = bs.length(); diff --git a/utils/messageqcpp/bytestream.h b/utils/messageqcpp/bytestream.h index 84256cf0f..09da4b44a 100644 --- a/utils/messageqcpp/bytestream.h +++ b/utils/messageqcpp/bytestream.h @@ -37,6 +37,7 @@ #include "serializeable.h" #include "any.hpp" #include "nullstring.h" +#include "countingallocator.h" class ByteStreamTestSuite; @@ -46,7 +47,7 @@ namespace messageqcpp { typedef boost::shared_ptr SBS; using BSSizeType = uint64_t; - +using BSBufType = uint8_t; /** * @brief A class to marshall bytes as a stream * @@ -78,6 +79,7 @@ class ByteStream : public Serializeable * default ctor */ EXPORT explicit ByteStream(BSSizeType initSize = 8192); // multiples of pagesize are best + explicit ByteStream(allocators::CountingAllocator* alloc, uint32_t initSize = 8192); /** * ctor with a uint8_t array and len initializer */ @@ -466,6 +468,9 @@ class ByteStream : public Serializeable void doCopy(const ByteStream& rhs); private: + BSBufType* allocate(const size_t size); + void deallocate(BSBufType* ptr); + // Put struct `MemChunk` declaration here, to avoid circular dependency. struct MemChunk { @@ -474,11 +479,13 @@ class ByteStream : public Serializeable uint8_t data[]; }; - uint8_t* fBuf; /// the start of the allocated buffer - uint8_t* fCurInPtr; // the point in fBuf where data is inserted next - uint8_t* fCurOutPtr; // the point in fBuf where data is extracted from next - BSSizeType fMaxLen; // how big fBuf is currently - std::vector> longStrings; // Stores `long strings`. + BSBufType* fBuf; /// the start of the allocated buffer + BSBufType* fCurInPtr; // the point in fBuf where data is inserted next + BSBufType* fCurOutPtr; // the point in fBuf where data is extracted from next + BSSizeType fMaxLen; // how big fBuf is currently + // Stores `long strings`. + std::vector> longStrings; + allocators::CountingAllocator* allocator = nullptr; }; template @@ -533,7 +540,7 @@ inline ByteStream::ByteStream(const uint8_t* bp, BSSizeType len) : fBuf(nullptr) } inline ByteStream::~ByteStream() { - delete[] fBuf; + deallocate(fBuf); } inline const uint8_t* ByteStream::buf() const @@ -558,7 +565,7 @@ inline BSSizeType ByteStream::lengthWithHdrOverhead() const } inline void ByteStream::reset() { - delete[] fBuf; + deallocate(fBuf); fMaxLen = 0; fCurInPtr = fCurOutPtr = fBuf = nullptr; } diff --git a/utils/messageqcpp/compressed_iss.cpp b/utils/messageqcpp/compressed_iss.cpp index c194a41ad..4cee215de 100644 --- a/utils/messageqcpp/compressed_iss.cpp +++ b/utils/messageqcpp/compressed_iss.cpp @@ -103,7 +103,7 @@ const SBS CompressedInetStreamSocket::read(const struct timespec* timeout, bool* uint32_t storedLen = *(uint32_t*)readBS->buf(); if (!storedLen) - return SBS(new ByteStream(0)); + return SBS(new ByteStream(0U)); uncompressedSize = storedLen; ret.reset(new ByteStream(uncompressedSize)); diff --git a/utils/messageqcpp/inetstreamsocket.cpp b/utils/messageqcpp/inetstreamsocket.cpp index d42958c9a..07b43e8c3 100644 --- a/utils/messageqcpp/inetstreamsocket.cpp +++ b/utils/messageqcpp/inetstreamsocket.cpp @@ -487,25 +487,25 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO // { // logIoError("InetStreamSocket::read: timeout during readToMagic", 0); // } - return SBS(new ByteStream(0)); + return SBS(new ByteStream(0U)); } // we need to read the 4-byte message length first. uint32_t msglen; if (!readFixedSizeData(pfd, reinterpret_cast(&msglen), sizeof(msglen), timeout, isTimeOut, stats, msecs)) - return SBS(new ByteStream(0)); + return SBS(new ByteStream(0U)); // Read the number of the `long strings`. uint32_t longStringSize; if (!readFixedSizeData(pfd, reinterpret_cast(&longStringSize), sizeof(longStringSize), timeout, isTimeOut, stats, msecs)) - return SBS(new ByteStream(0)); + return SBS(new ByteStream(0U)); // Read the actual data of the `ByteStream`. SBS res(new ByteStream(msglen)); if (!readFixedSizeData(pfd, res->getInputPtr(), msglen, timeout, isTimeOut, stats, msecs)) - return SBS(new ByteStream(0)); + return SBS(new ByteStream(0U)); res->advanceInputPtr(msglen); std::vector> longStrings; @@ -517,7 +517,7 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO rowgroup::StringStore::MemChunk memChunk; if (!readFixedSizeData(pfd, reinterpret_cast(&memChunk), sizeof(rowgroup::StringStore::MemChunk), timeout, isTimeOut, stats, msecs)) - return SBS(new ByteStream(0)); + return SBS(new ByteStream(0U)); // Allocate new memory for the `long string`. std::shared_ptr longString( @@ -532,7 +532,7 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO // Read the `long string`. if (!readFixedSizeData(pfd, memChunkPointer->data, memChunkPointer->currentSize, timeout, isTimeOut, stats, msecs)) - return SBS(new ByteStream(0)); + return SBS(new ByteStream(0U)); longStrings.push_back(longString); } @@ -540,7 +540,7 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO catch (std::bad_alloc& exception) { logIoError("InetStreamSocket::read: error during read for 'long strings' - 'bad_alloc'", 0); - return SBS(new ByteStream(0)); + return SBS(new ByteStream(0U)); } catch (std::exception& exception) { diff --git a/writeengine/client/we_clients.cpp b/writeengine/client/we_clients.cpp index 80370dcb5..1badf9ab4 100644 --- a/writeengine/client/we_clients.cpp +++ b/writeengine/client/we_clients.cpp @@ -358,7 +358,7 @@ Error: boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok; - sbs.reset(new ByteStream(0)); + sbs.reset(new ByteStream(0U)); for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) {