You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
MCOL-5385 This patch reduces RAM consumption and adds GROUP_CONCAT RAM accounting feature
This commit is contained in:
@ -1,5 +1,5 @@
|
||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
||||
Copyright (C) 2019 MariaDB Corporation
|
||||
Copyright (C) 2019-2023 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
@ -19,7 +19,7 @@
|
||||
// $Id: groupconcat.cpp 9705 2013-07-17 20:06:07Z pleblanc $
|
||||
|
||||
#include <iostream>
|
||||
//#define NDEBUG
|
||||
// #define NDEBUG
|
||||
#include <cassert>
|
||||
#include <string>
|
||||
using namespace std;
|
||||
@ -341,7 +341,7 @@ void GroupConcatAgUM::merge(const rowgroup::Row& inRow, int64_t i)
|
||||
|
||||
void GroupConcatAgUM::getResult(uint8_t* buff)
|
||||
{
|
||||
fConcator->getResult(buff, fGroupConcat->fSeparator);
|
||||
fConcator->getResultImpl(fGroupConcat->fSeparator);
|
||||
}
|
||||
|
||||
uint8_t* GroupConcatAgUM::getResult()
|
||||
@ -769,8 +769,8 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
|
||||
if (fRowGroup.getRowCount() >= fRowsPerRG)
|
||||
{
|
||||
fDataQueue.push(fData);
|
||||
|
||||
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
||||
// A "postfix" but accurate RAM accounting that sums up sizes of RGDatas.
|
||||
uint64_t newSize = fRowGroup.getSizeWithStrings();
|
||||
|
||||
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
||||
{
|
||||
@ -863,20 +863,21 @@ void GroupConcatOrderBy::merge(GroupConcator* gc)
|
||||
}
|
||||
}
|
||||
|
||||
void GroupConcatOrderBy::getResult(uint8_t* buff, const string& sep)
|
||||
uint8_t* GroupConcatOrderBy::getResultImpl(const string& sep)
|
||||
{
|
||||
ostringstream oss;
|
||||
bool addSep = false;
|
||||
|
||||
// need to reverse the order
|
||||
stack<OrderByRow> rowStack;
|
||||
|
||||
while (fOrderByQueue.size() > 0)
|
||||
{
|
||||
rowStack.push(fOrderByQueue.top());
|
||||
fOrderByQueue.pop();
|
||||
}
|
||||
|
||||
size_t prevResultSize = 0;
|
||||
size_t rowsProcessed = 0;
|
||||
while (rowStack.size() > 0)
|
||||
{
|
||||
if (addSep)
|
||||
@ -888,21 +889,44 @@ void GroupConcatOrderBy::getResult(uint8_t* buff, const string& sep)
|
||||
fRow0.setData(topRow.fData);
|
||||
outputRow(oss, fRow0);
|
||||
rowStack.pop();
|
||||
if (rowsProcessed >= fRowsPerRG)
|
||||
{
|
||||
size_t sizeDiff = oss.str().size() - prevResultSize;
|
||||
prevResultSize = oss.str().size();
|
||||
if (!fRm->getMemory(sizeDiff, fSessionMemLimit))
|
||||
{
|
||||
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
|
||||
throw IDBExcept(fErrorCode);
|
||||
}
|
||||
fMemSize += sizeDiff;
|
||||
rowsProcessed = 0;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t resultSize = oss.str().size();
|
||||
resultSize = (resultSize > fGroupConcatLen) ? fGroupConcatLen : resultSize;
|
||||
fOutputString.reset(new uint8_t[resultSize + 2]);
|
||||
fOutputString[resultSize] = '\0';
|
||||
fOutputString[resultSize + 1] = '\0';
|
||||
return swapStreamWithStringAndReturnBuf(oss);
|
||||
}
|
||||
|
||||
strncpy((char*)fOutputString.get(), oss.str().c_str(), resultSize);
|
||||
uint8_t* GroupConcator::swapStreamWithStringAndReturnBuf(ostringstream& oss)
|
||||
{
|
||||
int64_t resultSize = oss.str().size();
|
||||
oss << '\0' << '\0';
|
||||
outputBuf_.reset(new std::string(std::move(*oss.rdbuf()).str()));
|
||||
|
||||
if (resultSize >= fGroupConcatLen + 1)
|
||||
{
|
||||
(*outputBuf_)[fGroupConcatLen] = '\0';
|
||||
}
|
||||
if (resultSize >= fGroupConcatLen + 2)
|
||||
{
|
||||
(*outputBuf_)[fGroupConcatLen + 1] = '\0';
|
||||
}
|
||||
|
||||
return reinterpret_cast<uint8_t*>(outputBuf_->data());
|
||||
}
|
||||
|
||||
uint8_t* GroupConcator::getResult(const string& sep)
|
||||
{
|
||||
getResult(fOutputString.get(), sep);
|
||||
return fOutputString.get();
|
||||
return getResultImpl(sep);
|
||||
}
|
||||
|
||||
const string GroupConcatOrderBy::toString() const
|
||||
@ -984,7 +1008,8 @@ void GroupConcatNoOrder::processRow(const rowgroup::Row& row)
|
||||
|
||||
if (fRowGroup.getRowCount() >= fRowsPerRG)
|
||||
{
|
||||
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
||||
// A "postfix" but accurate RAM accounting that sums up sizes of RGDatas.
|
||||
uint64_t newSize = fRowGroup.getSizeWithStrings();
|
||||
|
||||
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
||||
{
|
||||
@ -1017,11 +1042,12 @@ void GroupConcatNoOrder::merge(GroupConcator* gc)
|
||||
in->fMemSize = 0;
|
||||
}
|
||||
|
||||
void GroupConcatNoOrder::getResult(uint8_t* buff, const string& sep)
|
||||
uint8_t* GroupConcatNoOrder::getResultImpl(const string& sep)
|
||||
{
|
||||
ostringstream oss;
|
||||
bool addSep = false;
|
||||
fDataQueue.push(fData);
|
||||
size_t prevResultSize = 0;
|
||||
|
||||
while (fDataQueue.size() > 0)
|
||||
{
|
||||
@ -1038,17 +1064,18 @@ void GroupConcatNoOrder::getResult(uint8_t* buff, const string& sep)
|
||||
outputRow(oss, fRow);
|
||||
fRow.nextRow();
|
||||
}
|
||||
|
||||
size_t sizeDiff = oss.str().size() - prevResultSize;
|
||||
prevResultSize = oss.str().size();
|
||||
if (!fRm->getMemory(sizeDiff, fSessionMemLimit))
|
||||
{
|
||||
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
|
||||
throw IDBExcept(fErrorCode);
|
||||
}
|
||||
fMemSize += sizeDiff;
|
||||
fDataQueue.pop();
|
||||
}
|
||||
|
||||
int64_t resultSize = oss.str().size();
|
||||
resultSize = (resultSize > fGroupConcatLen) ? fGroupConcatLen : resultSize;
|
||||
fOutputString.reset(new uint8_t[resultSize + 2]);
|
||||
fOutputString[resultSize] = '\0';
|
||||
fOutputString[resultSize + 1] = '\0';
|
||||
|
||||
strncpy((char*)fOutputString.get(), oss.str().c_str(), resultSize);
|
||||
return swapStreamWithStringAndReturnBuf(oss);
|
||||
}
|
||||
|
||||
const string GroupConcatNoOrder::toString() const
|
||||
|
Reference in New Issue
Block a user