diff --git a/dbcon/joblist/groupconcat.cpp b/dbcon/joblist/groupconcat.cpp index 7e463a5ea..f56a22672 100644 --- a/dbcon/joblist/groupconcat.cpp +++ b/dbcon/joblist/groupconcat.cpp @@ -1,5 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. - Copyright (C) 2019 MariaDB Corporation + Copyright (C) 2019-2023 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -19,7 +19,7 @@ // $Id: groupconcat.cpp 9705 2013-07-17 20:06:07Z pleblanc $ #include -//#define NDEBUG +// #define NDEBUG #include #include using namespace std; @@ -341,7 +341,7 @@ void GroupConcatAgUM::merge(const rowgroup::Row& inRow, int64_t i) void GroupConcatAgUM::getResult(uint8_t* buff) { - fConcator->getResult(buff, fGroupConcat->fSeparator); + fConcator->getResultImpl(fGroupConcat->fSeparator); } uint8_t* GroupConcatAgUM::getResult() @@ -769,8 +769,8 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row) if (fRowGroup.getRowCount() >= fRowsPerRG) { fDataQueue.push(fData); - - uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize(); + // A "postfix" but accurate RAM accounting that sums up sizes of RGDatas. + uint64_t newSize = fRowGroup.getSizeWithStrings(); if (!fRm->getMemory(newSize, fSessionMemLimit)) { @@ -863,20 +863,21 @@ void GroupConcatOrderBy::merge(GroupConcator* gc) } } -void GroupConcatOrderBy::getResult(uint8_t* buff, const string& sep) +uint8_t* GroupConcatOrderBy::getResultImpl(const string& sep) { ostringstream oss; bool addSep = false; // need to reverse the order stack rowStack; - while (fOrderByQueue.size() > 0) { rowStack.push(fOrderByQueue.top()); fOrderByQueue.pop(); } + size_t prevResultSize = 0; + size_t rowsProcessed = 0; while (rowStack.size() > 0) { if (addSep) @@ -888,21 +889,44 @@ void GroupConcatOrderBy::getResult(uint8_t* buff, const string& sep) fRow0.setData(topRow.fData); outputRow(oss, fRow0); rowStack.pop(); + if (rowsProcessed >= fRowsPerRG) + { + size_t sizeDiff = oss.str().size() - prevResultSize; + prevResultSize = oss.str().size(); + if (!fRm->getMemory(sizeDiff, fSessionMemLimit)) + { + cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; + throw IDBExcept(fErrorCode); + } + fMemSize += sizeDiff; + rowsProcessed = 0; + } } - int64_t resultSize = oss.str().size(); - resultSize = (resultSize > fGroupConcatLen) ? fGroupConcatLen : resultSize; - fOutputString.reset(new uint8_t[resultSize + 2]); - fOutputString[resultSize] = '\0'; - fOutputString[resultSize + 1] = '\0'; + return swapStreamWithStringAndReturnBuf(oss); +} - strncpy((char*)fOutputString.get(), oss.str().c_str(), resultSize); +uint8_t* GroupConcator::swapStreamWithStringAndReturnBuf(ostringstream& oss) +{ + int64_t resultSize = oss.str().size(); + oss << '\0' << '\0'; + outputBuf_.reset(new std::string(std::move(*oss.rdbuf()).str())); + + if (resultSize >= fGroupConcatLen + 1) + { + (*outputBuf_)[fGroupConcatLen] = '\0'; + } + if (resultSize >= fGroupConcatLen + 2) + { + (*outputBuf_)[fGroupConcatLen + 1] = '\0'; + } + + return reinterpret_cast(outputBuf_->data()); } uint8_t* GroupConcator::getResult(const string& sep) { - getResult(fOutputString.get(), sep); - return fOutputString.get(); + return getResultImpl(sep); } const string GroupConcatOrderBy::toString() const @@ -984,7 +1008,8 @@ void GroupConcatNoOrder::processRow(const rowgroup::Row& row) if (fRowGroup.getRowCount() >= fRowsPerRG) { - uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize(); + // A "postfix" but accurate RAM accounting that sums up sizes of RGDatas. + uint64_t newSize = fRowGroup.getSizeWithStrings(); if (!fRm->getMemory(newSize, fSessionMemLimit)) { @@ -1017,11 +1042,12 @@ void GroupConcatNoOrder::merge(GroupConcator* gc) in->fMemSize = 0; } -void GroupConcatNoOrder::getResult(uint8_t* buff, const string& sep) +uint8_t* GroupConcatNoOrder::getResultImpl(const string& sep) { ostringstream oss; bool addSep = false; fDataQueue.push(fData); + size_t prevResultSize = 0; while (fDataQueue.size() > 0) { @@ -1038,17 +1064,18 @@ void GroupConcatNoOrder::getResult(uint8_t* buff, const string& sep) outputRow(oss, fRow); fRow.nextRow(); } - + size_t sizeDiff = oss.str().size() - prevResultSize; + prevResultSize = oss.str().size(); + if (!fRm->getMemory(sizeDiff, fSessionMemLimit)) + { + cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; + throw IDBExcept(fErrorCode); + } + fMemSize += sizeDiff; fDataQueue.pop(); } - int64_t resultSize = oss.str().size(); - resultSize = (resultSize > fGroupConcatLen) ? fGroupConcatLen : resultSize; - fOutputString.reset(new uint8_t[resultSize + 2]); - fOutputString[resultSize] = '\0'; - fOutputString[resultSize + 1] = '\0'; - - strncpy((char*)fOutputString.get(), oss.str().c_str(), resultSize); + return swapStreamWithStringAndReturnBuf(oss); } const string GroupConcatNoOrder::toString() const diff --git a/dbcon/joblist/groupconcat.h b/dbcon/joblist/groupconcat.h index 6db944b57..6c1341a75 100644 --- a/dbcon/joblist/groupconcat.h +++ b/dbcon/joblist/groupconcat.h @@ -111,8 +111,9 @@ class GroupConcator virtual void processRow(const rowgroup::Row&) = 0; virtual void merge(GroupConcator*) = 0; - virtual void getResult(uint8_t* buff, const std::string& sep) = 0; + virtual uint8_t* getResultImpl(const std::string& sep) = 0; virtual uint8_t* getResult(const std::string& sep); + virtual uint8_t* swapStreamWithStringAndReturnBuf(ostringstream& oss); virtual const std::string toString() const; @@ -126,7 +127,7 @@ class GroupConcator int64_t fCurrentLength; int64_t fGroupConcatLen; int64_t fConstantLen; - boost::scoped_array fOutputString; + std::unique_ptr outputBuf_; long fTimeZone; }; @@ -142,7 +143,7 @@ class GroupConcatNoOrder : public GroupConcator void merge(GroupConcator*); using GroupConcator::getResult; - void getResult(uint8_t* buff, const std::string& sep); + uint8_t* getResultImpl(const std::string& sep); const std::string toString() const; @@ -173,7 +174,7 @@ class GroupConcatOrderBy : public GroupConcator, public ordering::IdbOrderBy void merge(GroupConcator*); using GroupConcator::getResult; - void getResult(uint8_t* buff, const std::string& sep); + uint8_t* getResultImpl(const std::string& sep); const std::string toString() const; diff --git a/dbcon/joblist/jsonarrayagg.cpp b/dbcon/joblist/jsonarrayagg.cpp index 30da5b73d..dd59af021 100644 --- a/dbcon/joblist/jsonarrayagg.cpp +++ b/dbcon/joblist/jsonarrayagg.cpp @@ -15,9 +15,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ - #include -//#define NDEBUG +// #define NDEBUG #include #include using namespace std; @@ -59,7 +58,6 @@ using namespace ordering; #include "utils/json/json.hpp" using namespace nlohmann; - namespace joblist { @@ -73,7 +71,7 @@ void JsonArrayInfo::prepJsonArray(JobInfo& jobInfo) const RowColumn* rcp = dynamic_cast(gcc->aggParms()[0].get()); SP_GroupConcat groupConcat(new GroupConcat); - groupConcat->fSeparator = gcc->separator(); // or ,? + groupConcat->fSeparator = gcc->separator(); // or ,? groupConcat->fDistinct = gcc->distinct(); groupConcat->fSize = gcc->resultType().colWidth; groupConcat->fRm = jobInfo.rm; @@ -297,13 +295,11 @@ const string JsonArrayInfo::toString() const return oss.str(); } - JsonArrayAggregatAgUM::JsonArrayAggregatAgUM(rowgroup::SP_GroupConcat& gcc) : GroupConcatAgUM(gcc) { initialize(); } - JsonArrayAggregatAgUM::~JsonArrayAggregatAgUM() { } @@ -338,7 +334,7 @@ void JsonArrayAggregatAgUM::merge(const rowgroup::Row& inRow, int64_t i) void JsonArrayAggregatAgUM::getResult(uint8_t* buff) { - fConcator->getResult(buff, fGroupConcat->fSeparator); + fConcator->getResultImpl(fGroupConcat->fSeparator); } uint8_t* JsonArrayAggregatAgUM::getResult() @@ -383,7 +379,6 @@ void JsonArrayAggregatAgUM::applyMapping(const boost::shared_array& mapping } } - JsonArrayAggregator::JsonArrayAggregator() : GroupConcator() { } @@ -405,7 +400,6 @@ void JsonArrayAggregator::initialize(const rowgroup::SP_GroupConcat& gcc) fConstantLen += strlen(fConstCols[i].first.c_str()); } - void JsonArrayAggregator::outputRow(std::ostringstream& oss, const rowgroup::Row& row) { const CalpontSystemCatalog::ColDataType* types = row.getColTypes(); @@ -704,8 +698,6 @@ const string JsonArrayAggregator::toString() const return oss.str(); } - - JsonArrayAggOrderBy::JsonArrayAggOrderBy() { fRule.fIdbCompare = this; @@ -869,7 +861,7 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc) } } -void JsonArrayAggOrderBy::getResult(uint8_t* buff, const string&) +uint8_t* JsonArrayAggOrderBy::getResultImpl(const string&) { ostringstream oss; bool addSep = false; @@ -899,16 +891,10 @@ void JsonArrayAggOrderBy::getResult(uint8_t* buff, const string&) } oss << ']'; } - int64_t resultSize = oss.str().size(); - resultSize = (resultSize > fGroupConcatLen) ? fGroupConcatLen : resultSize; - fOutputString.reset(new uint8_t[resultSize + 2]); - fOutputString[resultSize] = '\0'; - fOutputString[resultSize + 1] = '\0'; - strncpy((char*)fOutputString.get(), oss.str().c_str(), resultSize); + return swapStreamWithStringAndReturnBuf(oss); } - const string JsonArrayAggOrderBy::toString() const { string baseStr = JsonArrayAggregator::toString(); @@ -929,7 +915,6 @@ const string JsonArrayAggOrderBy::toString() const return (baseStr + oss.str()); } - JsonArrayAggNoOrder::JsonArrayAggNoOrder() : fRowsPerRG(128), fErrorCode(ERR_AGGREGATION_TOO_BIG), fMemSize(0), fRm(NULL) { @@ -1021,7 +1006,7 @@ void JsonArrayAggNoOrder::merge(GroupConcator* gc) in->fMemSize = 0; } -void JsonArrayAggNoOrder::getResult(uint8_t* buff, const string&) +uint8_t* JsonArrayAggNoOrder::getResultImpl(const string&) { ostringstream oss; bool addSep = false; @@ -1050,13 +1035,7 @@ void JsonArrayAggNoOrder::getResult(uint8_t* buff, const string&) } oss << ']'; } - int64_t resultSize = oss.str().size(); - resultSize = (resultSize > fGroupConcatLen) ? fGroupConcatLen : resultSize; - fOutputString.reset(new uint8_t[resultSize + 2]); - fOutputString[resultSize] = '\0'; - fOutputString[resultSize + 1] = '\0'; - - strncpy((char*)fOutputString.get(), oss.str().c_str(), resultSize); + return swapStreamWithStringAndReturnBuf(oss); } const string JsonArrayAggNoOrder::toString() const diff --git a/dbcon/joblist/jsonarrayagg.h b/dbcon/joblist/jsonarrayagg.h index 7a4dc2edd..83c02f5c3 100644 --- a/dbcon/joblist/jsonarrayagg.h +++ b/dbcon/joblist/jsonarrayagg.h @@ -15,7 +15,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ - /** @file */ #pragma once @@ -40,7 +39,6 @@ namespace joblist class JsonArrayAggregator; class ResourceManager; - class JsonArrayInfo : public GroupConcatInfo { public: @@ -52,10 +50,8 @@ class JsonArrayInfo : public GroupConcatInfo protected: uint32_t getColumnKey(const execplan::SRCP& srcp, JobInfo& jobInfo); boost::shared_array makeMapping(const rowgroup::RowGroup&, const rowgroup::RowGroup&); - }; - class JsonArrayAggregatAgUM : public GroupConcatAgUM { public: @@ -105,7 +101,7 @@ class JsonArrayAggNoOrder : public JsonArrayAggregator using GroupConcator::merge; void merge(GroupConcator*); using GroupConcator::getResult; - void getResult(uint8_t* buff, const std::string& sep); + uint8_t* getResultImpl(const std::string& sep); const std::string toString() const; @@ -136,7 +132,7 @@ class JsonArrayAggOrderBy : public JsonArrayAggregator, public ordering::IdbOrde using GroupConcator::merge; void merge(GroupConcator*); using GroupConcator::getResult; - void getResult(uint8_t* buff, const std::string& sep); + uint8_t* getResultImpl(const std::string& sep); const std::string toString() const;