You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
MCOL-4031 More accurate memory usage counting while sorting
This commit is contained in:
@@ -42,9 +42,10 @@ using namespace ordering;
|
|||||||
namespace joblist
|
namespace joblist
|
||||||
{
|
{
|
||||||
|
|
||||||
|
const uint64_t LimitedOrderBy::fMaxUncommited = 102400; // 100KiB - make it configurable?
|
||||||
|
|
||||||
// LimitedOrderBy class implementation
|
// LimitedOrderBy class implementation
|
||||||
LimitedOrderBy::LimitedOrderBy() : fStart(0), fCount(-1)
|
LimitedOrderBy::LimitedOrderBy() : fStart(0), fCount(-1), fUncommitedMemory(0)
|
||||||
{
|
{
|
||||||
fRule.fIdbCompare = this;
|
fRule.fIdbCompare = this;
|
||||||
}
|
}
|
||||||
@@ -122,6 +123,20 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
|
|||||||
OrderByRow newRow(fRow0, fRule);
|
OrderByRow newRow(fRow0, fRule);
|
||||||
fOrderByQueue.push(newRow);
|
fOrderByQueue.push(newRow);
|
||||||
|
|
||||||
|
uint64_t memSizeInc = sizeof(newRow);
|
||||||
|
fUncommitedMemory += memSizeInc;
|
||||||
|
if (fUncommitedMemory >= fMaxUncommited)
|
||||||
|
{
|
||||||
|
fMemSize += fUncommitedMemory;
|
||||||
|
if (!fRm->getMemory(fUncommitedMemory, fSessionMemLimit))
|
||||||
|
{
|
||||||
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @"
|
||||||
|
<< __FILE__ << ":" << __LINE__;
|
||||||
|
throw IDBExcept(fErrorCode);
|
||||||
|
}
|
||||||
|
fUncommitedMemory = 0;
|
||||||
|
}
|
||||||
|
|
||||||
// add to the distinct map
|
// add to the distinct map
|
||||||
if (fDistinct)
|
if (fDistinct)
|
||||||
fDistinctMap->insert(fRow0.getPointer());
|
fDistinctMap->insert(fRow0.getPointer());
|
||||||
@@ -132,7 +147,7 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
|
|||||||
if (fRowGroup.getRowCount() >= fRowsPerRG)
|
if (fRowGroup.getRowCount() >= fRowsPerRG)
|
||||||
{
|
{
|
||||||
fDataQueue.push(fData);
|
fDataQueue.push(fData);
|
||||||
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
uint64_t newSize = fRowGroup.getSizeWithStrings() - fRowGroup.getHeaderSize();
|
||||||
fMemSize += newSize;
|
fMemSize += newSize;
|
||||||
|
|
||||||
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
||||||
@@ -157,7 +172,7 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
|
|||||||
|
|
||||||
if (fDistinct)
|
if (fDistinct)
|
||||||
{
|
{
|
||||||
fDistinctMap->erase(fOrderByQueue.top().fData);
|
fDistinctMap->erase(fOrderByQueue.top().fData);
|
||||||
fDistinctMap->insert(row1.getPointer());
|
fDistinctMap->insert(row1.getPointer());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -173,6 +188,18 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
|
|||||||
*/
|
*/
|
||||||
void LimitedOrderBy::finalize()
|
void LimitedOrderBy::finalize()
|
||||||
{
|
{
|
||||||
|
if (fUncommitedMemory > 0)
|
||||||
|
{
|
||||||
|
fMemSize += fUncommitedMemory;
|
||||||
|
if (!fRm->getMemory(fUncommitedMemory, fSessionMemLimit))
|
||||||
|
{
|
||||||
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @"
|
||||||
|
<< __FILE__ << ":" << __LINE__;
|
||||||
|
throw IDBExcept(fErrorCode);
|
||||||
|
}
|
||||||
|
fUncommitedMemory = 0;
|
||||||
|
}
|
||||||
|
|
||||||
queue<RGData> tempQueue;
|
queue<RGData> tempQueue;
|
||||||
if (fRowGroup.getRowCount() > 0)
|
if (fRowGroup.getRowCount() > 0)
|
||||||
fDataQueue.push(fData);
|
fDataQueue.push(fData);
|
||||||
@@ -181,7 +208,7 @@ void LimitedOrderBy::finalize()
|
|||||||
{
|
{
|
||||||
// *DRRTUY Very memory intensive. CS needs to account active
|
// *DRRTUY Very memory intensive. CS needs to account active
|
||||||
// memory only and release memory if needed.
|
// memory only and release memory if needed.
|
||||||
uint64_t memSizeInc = fRowsPerRG * fRowGroup.getRowSize();
|
uint64_t memSizeInc = fRowGroup.getSizeWithStrings() - fRowGroup.getHeaderSize();
|
||||||
fMemSize += memSizeInc;
|
fMemSize += memSizeInc;
|
||||||
|
|
||||||
if (!fRm->getMemory(memSizeInc, fSessionMemLimit))
|
if (!fRm->getMemory(memSizeInc, fSessionMemLimit))
|
||||||
|
@@ -61,8 +61,10 @@ public:
|
|||||||
void finalize();
|
void finalize();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
uint64_t fStart;
|
uint64_t fStart;
|
||||||
uint64_t fCount;
|
uint64_t fCount;
|
||||||
|
uint64_t fUncommitedMemory;
|
||||||
|
static const uint64_t fMaxUncommited;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@@ -1435,6 +1435,7 @@ public:
|
|||||||
|
|
||||||
// this returns the size of the row data with the string table
|
// this returns the size of the row data with the string table
|
||||||
inline uint64_t getSizeWithStrings() const;
|
inline uint64_t getSizeWithStrings() const;
|
||||||
|
inline uint64_t getSizeWithStrings(uint64_t n) const;
|
||||||
|
|
||||||
// sets the row count to 0 and the baseRid to something
|
// sets the row count to 0 and the baseRid to something
|
||||||
// effectively initializing whatever chunk of memory
|
// effectively initializing whatever chunk of memory
|
||||||
@@ -1708,12 +1709,17 @@ inline uint32_t RowGroup::getRowSizeWithStrings() const
|
|||||||
return oldOffsets[columnCount];
|
return oldOffsets[columnCount];
|
||||||
}
|
}
|
||||||
|
|
||||||
inline uint64_t RowGroup::getSizeWithStrings() const
|
inline uint64_t RowGroup::getSizeWithStrings(uint64_t n) const
|
||||||
{
|
{
|
||||||
if (strings == NULL)
|
if (strings == NULL)
|
||||||
return getDataSize();
|
return getDataSize(n);
|
||||||
else
|
else
|
||||||
return getDataSize() + strings->getSize();
|
return getDataSize(n) + strings->getSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
inline uint64_t RowGroup::getSizeWithStrings() const
|
||||||
|
{
|
||||||
|
return getSizeWithStrings(getRowCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool RowGroup::isCharType(uint32_t colIndex) const
|
inline bool RowGroup::isCharType(uint32_t colIndex) const
|
||||||
|
@@ -772,7 +772,7 @@ void IdbOrderBy::initialize(const RowGroup& rg)
|
|||||||
// initialize rows
|
// initialize rows
|
||||||
IdbCompare::initialize(rg);
|
IdbCompare::initialize(rg);
|
||||||
|
|
||||||
uint64_t newSize = fRowsPerRG * rg.getRowSize();
|
uint64_t newSize = rg.getSizeWithStrings(fRowsPerRG);
|
||||||
fMemSize += newSize;
|
fMemSize += newSize;
|
||||||
|
|
||||||
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
||||||
|
Reference in New Issue
Block a user