You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
MCOL-707 Fix ExeMgr's memory accounting
ExeMgr uses ResourceManager to count memory usage. If a usage exceeded error occurs the counting wasn't reset and subsequent usage attempts in the same ExeMgr thread would error. This patch moves the in-class accounting for GroupConcat and others so that it happens before the error is detected. The memory usage counter is then decremented correctly on the class destructor.
This commit is contained in:
@@ -715,13 +715,13 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
|
|||||||
fDataQueue.push(fData);
|
fDataQueue.push(fData);
|
||||||
|
|
||||||
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
||||||
|
fMemSize += newSize;
|
||||||
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
||||||
{
|
{
|
||||||
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
|
||||||
<< " @" << __FILE__ << ":" << __LINE__;
|
<< " @" << __FILE__ << ":" << __LINE__;
|
||||||
throw IDBExcept(fErrorCode);
|
throw IDBExcept(fErrorCode);
|
||||||
}
|
}
|
||||||
fMemSize += newSize;
|
|
||||||
|
|
||||||
fData.reinit(fRowGroup, fRowsPerRG);
|
fData.reinit(fRowGroup, fRowsPerRG);
|
||||||
fRowGroup.setData(&fData);
|
fRowGroup.setData(&fData);
|
||||||
@@ -914,6 +914,7 @@ void GroupConcatNoOrder::initialize(const rowgroup::SP_GroupConcat& gcc)
|
|||||||
fConcatColumns.push_back((*(i++)).second);
|
fConcatColumns.push_back((*(i++)).second);
|
||||||
|
|
||||||
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
||||||
|
fMemSize += newSize;
|
||||||
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
||||||
{
|
{
|
||||||
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
|
||||||
@@ -921,7 +922,6 @@ void GroupConcatNoOrder::initialize(const rowgroup::SP_GroupConcat& gcc)
|
|||||||
throw IDBExcept(fErrorCode);
|
throw IDBExcept(fErrorCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
fMemSize += newSize;
|
|
||||||
//fData.reset(new uint8_t[fRowGroup.getDataSize(fRowsPerRG)]);
|
//fData.reset(new uint8_t[fRowGroup.getDataSize(fRowsPerRG)]);
|
||||||
fData.reinit(fRowGroup, fRowsPerRG);
|
fData.reinit(fRowGroup, fRowsPerRG);
|
||||||
fRowGroup.setData(&fData);
|
fRowGroup.setData(&fData);
|
||||||
@@ -950,13 +950,13 @@ void GroupConcatNoOrder::processRow(const rowgroup::Row& row)
|
|||||||
{
|
{
|
||||||
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
||||||
|
|
||||||
|
fMemSize += newSize;
|
||||||
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
||||||
{
|
{
|
||||||
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
|
||||||
<< " @" << __FILE__ << ":" << __LINE__;
|
<< " @" << __FILE__ << ":" << __LINE__;
|
||||||
throw IDBExcept(fErrorCode);
|
throw IDBExcept(fErrorCode);
|
||||||
}
|
}
|
||||||
fMemSize += newSize;
|
|
||||||
|
|
||||||
fDataQueue.push(fData);
|
fDataQueue.push(fData);
|
||||||
fData.reinit(fRowGroup, fRowsPerRG);
|
fData.reinit(fRowGroup, fRowsPerRG);
|
||||||
|
@@ -124,13 +124,13 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
|
|||||||
{
|
{
|
||||||
fDataQueue.push(fData);
|
fDataQueue.push(fData);
|
||||||
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
||||||
|
fMemSize += newSize;
|
||||||
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
||||||
{
|
{
|
||||||
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
|
||||||
<< " @" << __FILE__ << ":" << __LINE__;
|
<< " @" << __FILE__ << ":" << __LINE__;
|
||||||
throw IDBExcept(fErrorCode);
|
throw IDBExcept(fErrorCode);
|
||||||
}
|
}
|
||||||
fMemSize += newSize;
|
|
||||||
fData.reinit(fRowGroup, fRowsPerRG);
|
fData.reinit(fRowGroup, fRowsPerRG);
|
||||||
fRowGroup.setData(&fData);
|
fRowGroup.setData(&fData);
|
||||||
fRowGroup.resetRowGroup(0);
|
fRowGroup.resetRowGroup(0);
|
||||||
@@ -170,13 +170,13 @@ void LimitedOrderBy::finalize()
|
|||||||
if (fStart != 0)
|
if (fStart != 0)
|
||||||
{
|
{
|
||||||
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
||||||
|
fMemSize += newSize;
|
||||||
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
||||||
{
|
{
|
||||||
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
|
||||||
<< " @" << __FILE__ << ":" << __LINE__;
|
<< " @" << __FILE__ << ":" << __LINE__;
|
||||||
throw IDBExcept(fErrorCode);
|
throw IDBExcept(fErrorCode);
|
||||||
}
|
}
|
||||||
fMemSize += newSize;
|
|
||||||
fData.reinit(fRowGroup, fRowsPerRG);
|
fData.reinit(fRowGroup, fRowsPerRG);
|
||||||
fRowGroup.setData(&fData);
|
fRowGroup.setData(&fData);
|
||||||
fRowGroup.resetRowGroup(0);
|
fRowGroup.resetRowGroup(0);
|
||||||
@@ -196,13 +196,13 @@ void LimitedOrderBy::finalize()
|
|||||||
if (fRowGroup.getRowCount() >= fRowsPerRG)
|
if (fRowGroup.getRowCount() >= fRowsPerRG)
|
||||||
{
|
{
|
||||||
tempQueue.push(fData);
|
tempQueue.push(fData);
|
||||||
|
fMemSize += newSize;
|
||||||
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
||||||
{
|
{
|
||||||
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
|
||||||
<< " @" << __FILE__ << ":" << __LINE__;
|
<< " @" << __FILE__ << ":" << __LINE__;
|
||||||
throw IDBExcept(fErrorCode);
|
throw IDBExcept(fErrorCode);
|
||||||
}
|
}
|
||||||
fMemSize += newSize;
|
|
||||||
fData.reinit(fRowGroup, fRowsPerRG);
|
fData.reinit(fRowGroup, fRowsPerRG);
|
||||||
//fData.reset(new uint8_t[fRowGroup.getDataSize(fRowsPerRG)]);
|
//fData.reset(new uint8_t[fRowGroup.getDataSize(fRowsPerRG)]);
|
||||||
fRowGroup.setData(&fData);
|
fRowGroup.setData(&fData);
|
||||||
|
@@ -784,9 +784,9 @@ void WindowFunctionStep::execute()
|
|||||||
{
|
{
|
||||||
fInRowGroupData.push_back(rgData);
|
fInRowGroupData.push_back(rgData);
|
||||||
uint64_t memAdd = fRowGroupIn.getSizeWithStrings() + rowCnt * sizeof(RowPosition);
|
uint64_t memAdd = fRowGroupIn.getSizeWithStrings() + rowCnt * sizeof(RowPosition);
|
||||||
|
fMemUsage += memAdd;
|
||||||
if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
|
if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
|
||||||
throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
|
throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
|
||||||
fMemUsage += memAdd;
|
|
||||||
|
|
||||||
for (uint64_t j = 0; j < rowCnt; ++j)
|
for (uint64_t j = 0; j < rowCnt; ++j)
|
||||||
{
|
{
|
||||||
@@ -919,9 +919,9 @@ void WindowFunctionStep::doFunction()
|
|||||||
while (((i = nextFunctionIndex()) < fFunctionCount) && !cancelled())
|
while (((i = nextFunctionIndex()) < fFunctionCount) && !cancelled())
|
||||||
{
|
{
|
||||||
uint64_t memAdd = fRows.size() * sizeof(RowPosition);
|
uint64_t memAdd = fRows.size() * sizeof(RowPosition);
|
||||||
|
fMemUsage += memAdd;
|
||||||
if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
|
if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
|
||||||
throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
|
throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
|
||||||
fMemUsage += memAdd;
|
|
||||||
fFunctions[i]->setCallback(this, i);
|
fFunctions[i]->setCallback(this, i);
|
||||||
(*fFunctions[i].get())();
|
(*fFunctions[i].get())();
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user