From dc03621e9db092e0b7a576b62db093985dd3be70 Mon Sep 17 00:00:00 2001 From: drrtuy Date: Sat, 24 Aug 2024 19:13:58 +0000 Subject: [PATCH] fix(rowgroup): RGData now uses uint64_t counter for the fixed sizes columns data buf. The buffer can utilize > 4GB RAM that is necessary for PM side join. RGData ctor uses uint32_t allocating data buffer. This fact causes implicit heap overflow. --- dbcon/joblist/batchprimitiveprocessor-jl.cpp | 55 ++++------ dbcon/joblist/batchprimitiveprocessor-jl.h | 1 - dbcon/joblist/resourcemanager.cpp | 1 + dbcon/joblist/tuplehashjoin.cpp | 1 + utils/rowgroup/rowgroup.cpp | 103 +++++++------------ utils/rowgroup/rowgroup.h | 33 +++--- utils/rowgroup/rowstorage.cpp | 3 +- utils/windowfunction/idborderby.cpp | 2 +- 8 files changed, 78 insertions(+), 121 deletions(-) diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index d8f2943b6..2f1d79bac 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -72,7 +72,6 @@ BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm) , LBIDTrace(false) , tupleLength(0) , status(0) - , sendRowGroups(false) , valueColumn(0) , sendTupleJoinRowGroupData(false) , bop(BOP_AND) @@ -147,7 +146,7 @@ void BatchPrimitiveProcessorJL::addFilterStep(const pDictionaryStep& step) tableOID = step.tableOid(); - if (filterCount == 0 && !sendRowGroups) + if (filterCount == 0) { sendAbsRids = true; sendValues = true; @@ -244,7 +243,7 @@ void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& step) if (utils::isWide(cc->getWidth())) wideColumnsWidths |= cc->getWidth(); - if (filterCount == 0 && !sendRowGroups) + if (filterCount == 0) sendValues = true; idbassert(sessionID == step.sessionId()); @@ -283,7 +282,7 @@ void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& p, const pDic projectCount++; needStrValues = true; - if (filterCount == 0 && !sendRowGroups) + if (filterCount == 0) { sendValues = true; sendAbsRids = true; @@ -1054,9 +1053,6 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const if (tJoiners.size() > 0) flags |= HAS_JOINER; - if (sendRowGroups) - flags |= HAS_ROWGROUP; - if (sendTupleJoinRowGroupData) flags |= JOIN_ROWGROUP_DATA; @@ -1071,12 +1067,6 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const bs << bop; bs << (uint8_t)(forHJ ? 1 : 0); - if (sendRowGroups) - { - bs << valueColumn; - bs << inputRG; - } - if (ot == ROW_GROUP) { bs << projectionRG; @@ -1248,6 +1238,7 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const * (projection count)x run msgs for projection Commands */ +// The deser counterpart function is BPP::resetBPP void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isExeMgrDEC) { ISMPacketHeader ism; @@ -1289,35 +1280,28 @@ void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isEx bs << sentByEM; if (_hasScan) + { idbassert(ridCount == 0); - else if (!sendRowGroups) + } + else + { idbassert(ridCount > 0 && (ridMap != 0 || sendAbsRids)); - else - idbassert(inputRG.getRowCount() > 0); - - if (sendRowGroups) - { - uint32_t rgSize = inputRG.getDataSize(); - bs << rgSize; - bs.append(inputRG.getData(), rgSize); } + + bs << ridCount; + + if (sendAbsRids) + bs.append((uint8_t*)absRids.get(), ridCount << 3); else { - bs << ridCount; - - if (sendAbsRids) - bs.append((uint8_t*)absRids.get(), ridCount << 3); - else - { - bs << ridMap; - bs << baseRid; - bs.append((uint8_t*)relRids, ridCount << 1); - } - - if (sendValues) - bs.append((uint8_t*)values, ridCount << 3); + bs << ridMap; + bs << baseRid; + bs.append((uint8_t*)relRids, ridCount << 1); } + if (sendValues) + bs.append((uint8_t*)values, ridCount << 3); + for (i = 0; i < filterCount; i++) filterSteps[i]->runCommand(bs); @@ -1667,7 +1651,6 @@ void BatchPrimitiveProcessorJL::setJoinedRowGroup(const rowgroup::RowGroup& rg) void BatchPrimitiveProcessorJL::setInputRowGroup(const rowgroup::RowGroup& rg) { - sendRowGroups = true; sendAbsRids = false; sendValues = false; inputRG = rg; diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.h b/dbcon/joblist/batchprimitiveprocessor-jl.h index 365ea4ad0..2b9e400b0 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.h +++ b/dbcon/joblist/batchprimitiveprocessor-jl.h @@ -343,7 +343,6 @@ class BatchPrimitiveProcessorJL /* for RowGroup return type */ rowgroup::RowGroup inputRG, projectionRG; - bool sendRowGroups; uint32_t valueColumn; /* for PM Aggregation */ diff --git a/dbcon/joblist/resourcemanager.cpp b/dbcon/joblist/resourcemanager.cpp index e162f1a1f..e61da3c4d 100644 --- a/dbcon/joblist/resourcemanager.cpp +++ b/dbcon/joblist/resourcemanager.cpp @@ -368,6 +368,7 @@ bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr& sess return (ret1 && ret2); } // Don't care about session memory +// The amount type is unsafe if amount close to max that is unrealistic in 2024. bool ResourceManager::getMemory(int64_t amount, bool patience) { bool ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index e6c6b9796..3f08f2790 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -417,6 +417,7 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* smallRG.initRow(&r); try { + // Very unfortunate choice for the type b/c of RM::getMemory type. ssize_t rgSize; bool gotMem; goto next; diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index 5d05b9cdc..8fedc76e6 100644 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -31,7 +31,6 @@ #include using namespace std; - #include #include "bytestream.h" @@ -49,8 +48,6 @@ namespace rowgroup { using cscType = execplan::CalpontSystemCatalog::ColDataType; - - StringStore::~StringStore() { #if 0 @@ -302,47 +299,27 @@ void UserDataStore::deserialize(ByteStream& bs) return; } - RGData::RGData(const RowGroup& rg, uint32_t rowCount) { - // cout << "rgdata++ = " << __sync_add_and_fetch(&rgDataCount, 1) << endl; - rowData.reset(new uint8_t[rg.getDataSize(rowCount)]); + RGDataSizeType s = rg.getDataSize(rowCount); + rowData.reset(new uint8_t[s]); if (rg.usesStringTable() && rowCount > 0) strings.reset(new StringStore()); userDataStore.reset(); - - -#ifdef VALGRIND - /* In a PM-join, we can serialize entire tables; not every value has been - * filled in yet. Need to look into that. Valgrind complains that - * those bytes are uninitialized, this suppresses that error. - */ - memset(rowData.get(), 0, rg.getDataSize(rowCount)); // XXXPAT: make valgrind happy temporarily -#endif - memset(rowData.get(), 0, rg.getDataSize(rowCount)); // XXXPAT: make valgrind happy temporarily columnCount = rg.getColumnCount(); rowSize = rg.getRowSize(); } RGData::RGData(const RowGroup& rg) { - // cout << "rgdata++ = " << __sync_add_and_fetch(&rgDataCount, 1) << endl; rowData.reset(new uint8_t[rg.getMaxDataSize()]); if (rg.usesStringTable()) strings.reset(new StringStore()); userDataStore.reset(); - -#ifdef VALGRIND - /* In a PM-join, we can serialize entire tables; not every value has been - * filled in yet. Need to look into that. Valgrind complains that - * those bytes are uninitialized, this suppresses that error. - */ - memset(rowData.get(), 0, rg.getMaxDataSize()); -#endif columnCount = rg.getColumnCount(); rowSize = rg.getRowSize(); } @@ -356,16 +333,6 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount) strings.reset(new StringStore()); else strings.reset(); - -#ifdef VALGRIND - /* In a PM-join, we can serialize entire tables; not every value has been - * filled in yet. Need to look into that. Valgrind complains that - * those bytes are uninitialized, this suppresses that error. - */ - memset(rowData.get(), 0, rg.getDataSize(rowCount)); -#endif - columnCount = rg.getColumnCount(); - rowSize = rg.getRowSize(); } void RGData::reinit(const RowGroup& rg) @@ -373,7 +340,7 @@ void RGData::reinit(const RowGroup& rg) reinit(rg, 8192); } -void RGData::serialize(ByteStream& bs, uint32_t amount) const +void RGData::serialize(ByteStream& bs, RGDataSizeType amount) const { // cout << "serializing!\n"; bs << (uint32_t)RGDATA_SIG; @@ -399,9 +366,10 @@ void RGData::serialize(ByteStream& bs, uint32_t amount) const bs << (uint8_t)0; } -void RGData::deserialize(ByteStream& bs, uint32_t defAmount) +void RGData::deserialize(ByteStream& bs, RGDataSizeType defAmount) { - uint32_t amount, sig; + uint32_t sig; + RGDataSizeType amount; uint8_t* buf; uint8_t tmp8; @@ -642,7 +610,7 @@ string Row::toCSV() const void Row::setToNull(uint32_t colIndex) { - setNullMark(colIndex, true); // mark as null. + setNullMark(colIndex, true); // mark as null. switch (types[colIndex]) { case CalpontSystemCatalog::TINYINT: data[offsets[colIndex]] = joblist::TINYINTNULL; break; @@ -665,11 +633,11 @@ void Row::setToNull(uint32_t colIndex) *((int32_t*)&data[offsets[colIndex]]) = static_cast(joblist::DATENULL); break; - case CalpontSystemCatalog::BIGINT: - if (precision[colIndex] != MagicPrecisionForCountAgg) - *((uint64_t*)&data[offsets[colIndex]]) = joblist::BIGINTNULL; - else // work around for count() in outer join result. - *((uint64_t*)&data[offsets[colIndex]]) = 0; + case CalpontSystemCatalog::BIGINT: + if (precision[colIndex] != MagicPrecisionForCountAgg) + *((uint64_t*)&data[offsets[colIndex]]) = joblist::BIGINTNULL; + else // work around for count() in outer join result. + *((uint64_t*)&data[offsets[colIndex]]) = 0; break; @@ -680,9 +648,13 @@ void Row::setToNull(uint32_t colIndex) *((long double*)&data[offsets[colIndex]]) = joblist::LONGDOUBLENULL; break; - case CalpontSystemCatalog::DATETIME: *((uint64_t*)&data[offsets[colIndex]]) = joblist::DATETIMENULL; break; + case CalpontSystemCatalog::DATETIME: + *((uint64_t*)&data[offsets[colIndex]]) = joblist::DATETIMENULL; + break; - case CalpontSystemCatalog::TIMESTAMP: *((uint64_t*)&data[offsets[colIndex]]) = joblist::TIMESTAMPNULL; break; + case CalpontSystemCatalog::TIMESTAMP: + *((uint64_t*)&data[offsets[colIndex]]) = joblist::TIMESTAMPNULL; + break; case CalpontSystemCatalog::TIME: *((uint64_t*)&data[offsets[colIndex]]) = joblist::TIMENULL; break; @@ -716,9 +688,7 @@ void Row::setToNull(uint32_t colIndex) case 7: case 8: *((uint64_t*)&data[offsets[colIndex]]) = joblist::CHAR8NULL; break; - default: - setNullMark(colIndex, true); - break; + default: setNullMark(colIndex, true); break; } break; @@ -751,7 +721,9 @@ void Row::setToNull(uint32_t colIndex) case CalpontSystemCatalog::UTINYINT: data[offsets[colIndex]] = joblist::UTINYINTNULL; break; - case CalpontSystemCatalog::USMALLINT: *((uint16_t*)&data[offsets[colIndex]]) = joblist::USMALLINTNULL; break; + case CalpontSystemCatalog::USMALLINT: + *((uint16_t*)&data[offsets[colIndex]]) = joblist::USMALLINTNULL; + break; case CalpontSystemCatalog::UMEDINT: case CalpontSystemCatalog::UINT: *((uint32_t*)&data[offsets[colIndex]]) = joblist::UINTNULL; break; @@ -760,8 +732,8 @@ void Row::setToNull(uint32_t colIndex) default: ostringstream os; - os << "Row::initToNull(): got bad column type (" << types[colIndex] << "). Width=" << getColumnWidth(colIndex) - << endl; + os << "Row::initToNull(): got bad column type (" << types[colIndex] + << "). Width=" << getColumnWidth(colIndex) << endl; os << toString(); throw logic_error(os.str()); } @@ -870,8 +842,8 @@ bool Row::isNullValue(uint32_t colIndex) const return strings->isNullValue(offset); } -// if (data[offsets[colIndex]] == 0) // empty string -// return true; + // if (data[offsets[colIndex]] == 0) // empty string + // return true; switch (len) { @@ -1120,7 +1092,6 @@ RowGroup::RowGroup(const RowGroup& r) offsets = &stOffsets[0]; else if (!useStringTable && !oldOffsets.empty()) offsets = &oldOffsets[0]; - } RowGroup& RowGroup::operator=(const RowGroup& r) @@ -1235,27 +1206,28 @@ void RowGroup::serializeRGData(ByteStream& bs) const rgData->serialize(bs, getDataSize()); } -uint32_t RowGroup::getDataSize() const +RGDataSizeType RowGroup::getDataSize() const { return getDataSize(getRowCount()); } -uint32_t RowGroup::getDataSize(uint64_t n) const +RGDataSizeType RowGroup::getDataSize(uint64_t n) const { - return headerSize + (n * getRowSize()); + return headerSize + (n * static_cast(getRowSize())); } -uint32_t RowGroup::getMaxDataSize() const +RGDataSizeType RowGroup::getMaxDataSize() const { - return headerSize + (8192 * getRowSize()); + return headerSize + (static_cast(rgCommonSize) * static_cast(getRowSize())); } -uint32_t RowGroup::getMaxDataSizeWithStrings() const +RGDataSizeType RowGroup::getMaxDataSizeWithStrings() const { - return headerSize + (8192 * (oldOffsets[columnCount] + columnCount)); + return headerSize + + (static_cast(rgCommonSize) * static_cast(getRowSizeWithStrings())); } -uint32_t RowGroup::getEmptySize() const +RGDataSizeType RowGroup::getEmptySize() const { return headerSize; } @@ -1325,9 +1297,8 @@ string RowGroup::toString(const std::vector& used) const os << "rowcount = " << getRowCount() << endl; if (!used.empty()) { - uint64_t cnt = - std::accumulate(used.begin(), used.end(), 0ULL, - [](uint64_t a, uint64_t bits) { return a + __builtin_popcountll(bits); }); + uint64_t cnt = std::accumulate(used.begin(), used.end(), 0ULL, [](uint64_t a, uint64_t bits) + { return a + __builtin_popcountll(bits); }); os << "sparse row count = " << cnt << endl; } os << "base rid = " << getBaseRid() << endl; diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index 41af8a0a8..a531be2c2 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -62,6 +62,7 @@ namespace rowgroup { const int16_t rgCommonSize = 8192; +using RGDataSizeType = uint64_t; /* The RowGroup family of classes encapsulate the data moved through the @@ -270,14 +271,14 @@ class RGData // amount should be the # returned by RowGroup::getDataSize() - void serialize(messageqcpp::ByteStream&, uint32_t amount) const; + void serialize(messageqcpp::ByteStream&, RGDataSizeType amount) const; // the 'hasLengthField' is there b/c PM aggregation (and possibly others) currently sends // inline data with a length field. Once that's converted to string table format, that // option can go away. - void deserialize(messageqcpp::ByteStream&, uint32_t amount = 0); // returns the # of bytes read + void deserialize(messageqcpp::ByteStream&, RGDataSizeType amount = 0); // returns the # of bytes read - inline uint64_t getStringTableMemUsage(); + inline RGDataSizeType getStringTableMemUsage(); void clear(); void reinit(const RowGroup& rg); void reinit(const RowGroup& rg, uint32_t rowCount); @@ -1496,15 +1497,15 @@ class RowGroup : public messageqcpp::Serializeable uint32_t getDBRoot() const; void setDBRoot(uint32_t); - uint32_t getDataSize() const; - uint32_t getDataSize(uint64_t n) const; - uint32_t getMaxDataSize() const; - uint32_t getMaxDataSizeWithStrings() const; - uint32_t getEmptySize() const; + RGDataSizeType getDataSize() const; + RGDataSizeType getDataSize(uint64_t n) const; + RGDataSizeType getMaxDataSize() const; + RGDataSizeType getMaxDataSizeWithStrings() const; + RGDataSizeType getEmptySize() const; // this returns the size of the row data with the string table - inline uint64_t getSizeWithStrings() const; - inline uint64_t getSizeWithStrings(uint64_t n) const; + inline RGDataSizeType getSizeWithStrings() const; + inline RGDataSizeType getSizeWithStrings(uint64_t n) const; // sets the row count to 0 and the baseRid to something // effectively initializing whatever chunk of memory @@ -1625,11 +1626,11 @@ class RowGroup : public messageqcpp::Serializeable uint32_t sTableThreshold = 20; std::shared_ptr forceInline; - static const uint32_t headerSize = 18; - static const uint32_t rowCountOffset = 0; - static const uint32_t baseRidOffset = 4; - static const uint32_t statusOffset = 12; - static const uint32_t dbRootOffset = 14; + static const uint64_t headerSize = 18; + static const uint64_t rowCountOffset = 0; + static const uint64_t baseRidOffset = 4; + static const uint64_t statusOffset = 12; + static const uint64_t dbRootOffset = 14; }; inline uint64_t convertToRid(const uint32_t& partNum, const uint16_t& segNum, const uint8_t& extentNum, @@ -1775,7 +1776,7 @@ inline uint32_t RowGroup::getRowSizeWithStrings() const return oldOffsets[columnCount] + columnCount; } -inline uint64_t RowGroup::getSizeWithStrings(uint64_t n) const +inline RGDataSizeType RowGroup::getSizeWithStrings(uint64_t n) const { if (strings == nullptr) return getDataSize(n); diff --git a/utils/rowgroup/rowstorage.cpp b/utils/rowgroup/rowstorage.cpp index 585dc8ed0..bcdd7f240 100644 --- a/utils/rowgroup/rowstorage.cpp +++ b/utils/rowgroup/rowstorage.cpp @@ -635,6 +635,7 @@ class RowGroupStorage if (fRGDatas[rgid]) { fRowGroupOut->setData(fRGDatas[rgid].get()); + // An implicit s2u type cast. int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows); if (!fMM->acquire(memSz)) { @@ -792,7 +793,7 @@ class RowGroupStorage while (rgid >= fRGDatas.size()) { - int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows); + auto memSz = fRowGroupOut->getSizeWithStrings(fMaxRows); if (!fMM->acquire(memSz)) { throw logging::IDBExcept( diff --git a/utils/windowfunction/idborderby.cpp b/utils/windowfunction/idborderby.cpp index ec4008062..1c6fd9528 100644 --- a/utils/windowfunction/idborderby.cpp +++ b/utils/windowfunction/idborderby.cpp @@ -753,7 +753,7 @@ void IdbOrderBy::initialize(const RowGroup& rg) // initialize rows IdbCompare::initialize(rg); - uint64_t newSize = rg.getSizeWithStrings(fRowsPerRG); + auto newSize = rg.getSizeWithStrings(fRowsPerRG); if (fRm && !fRm->getMemory(newSize, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;