You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-5429 Fix high memory consumption in GROUP_CONCAT() processing.
1. Input and output RowGroup's used in GROUP_CONCAT classes are currently allocating a raw memory buffer of size equal to the actual width of the string datatype. As an example, for the following query: SELECT col1, GROUP_CONCAT(col2) FROM t GROUP BY col1; If col2 is a TEXT field with default width, the input RowGroup containing the target rows to be concatenated will assign 64kb of memory for every input row in the RowGroup. This is wasteful as actual field values in real workloads would be much smaller. We fix this by enabling the RowGroup to use the StringStore when the RowGroup contains long strings. 2. RowAggregation::initialize() allocates a memory buffer for a NULL row. The size of this buffer is equal to the row size for the output RowGroup. For the above scenario, using the default group_concat_max_len (which is a server variable that sets the maximum length of the GROUP_CONCAT string) value of 1mb, the buffer size would be (1mb + 64kb + some additional metadata). If the user sets group_concat_max_len to a higher value, say 3gb, this buffer size would be ~3gb. Now if the runtime initiates several instances of RowAggregation, total memory consumption by PrimProc could exceed the hardware memory limits causing the OS OOM to kill the process. We fix this problem by again enabling the StringStore for the NULL row allocation. 3. In the plugin code in buildAggregateColumn(), there is an integer overflow when the server group_concat_max_len variable (which is an uint32_t) is set to a value > INT32_MAX (such as 3gb) and is assigned to CalpontSystemCatalog::ColType::colWidth (which is an int32_t). As a short term fix, we saturate the assigned value to colWidth to INT32_MAX. Proper fix would be to upgrade CalpontSystemCatalog::ColType::colWidth to an uint32_t.
This commit is contained in:
@ -267,6 +267,14 @@ void GroupConcatInfo::mapColumns(const RowGroup& projRG)
|
|||||||
|
|
||||||
(*k)->fRowGroup = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision,
|
(*k)->fRowGroup = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision,
|
||||||
projRG.getStringTableThreshold(), false);
|
projRG.getStringTableThreshold(), false);
|
||||||
|
|
||||||
|
// MCOL-5429 Use stringstore if the datatype of the groupconcat
|
||||||
|
// field is a long string.
|
||||||
|
if ((*k)->fRowGroup.hasLongString())
|
||||||
|
{
|
||||||
|
(*k)->fRowGroup.setUseStringTable(true);
|
||||||
|
}
|
||||||
|
|
||||||
(*k)->fMapping = makeMapping(projRG, (*k)->fRowGroup);
|
(*k)->fMapping = makeMapping(projRG, (*k)->fRowGroup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -318,10 +326,24 @@ void GroupConcatAgUM::initialize()
|
|||||||
|
|
||||||
fConcator->initialize(fGroupConcat);
|
fConcator->initialize(fGroupConcat);
|
||||||
|
|
||||||
fGroupConcat->fRowGroup.initRow(&fRow, true);
|
// MCOL-5429 Use stringstore if the datatype of the groupconcat
|
||||||
fData.reset(new uint8_t[fRow.getSize()]);
|
// field is a long string.
|
||||||
|
if (fGroupConcat->fRowGroup.hasLongString())
|
||||||
fRow.setData(rowgroup::Row::Pointer(fData.get()));
|
{
|
||||||
|
fRowGroup = fGroupConcat->fRowGroup;
|
||||||
|
fRowGroup.setUseStringTable(true);
|
||||||
|
fRowRGData.reinit(fRowGroup, 1);
|
||||||
|
fRowGroup.setData(&fRowRGData);
|
||||||
|
fRowGroup.resetRowGroup(0);
|
||||||
|
fRowGroup.initRow(&fRow);
|
||||||
|
fRowGroup.getRow(0, &fRow);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
fGroupConcat->fRowGroup.initRow(&fRow, true);
|
||||||
|
fData.reset(new uint8_t[fRow.getSize()]);
|
||||||
|
fRow.setData(rowgroup::Row::Pointer(fData.get()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void GroupConcatAgUM::processRow(const rowgroup::Row& inRow)
|
void GroupConcatAgUM::processRow(const rowgroup::Row& inRow)
|
||||||
@ -392,7 +414,7 @@ GroupConcator::~GroupConcator()
|
|||||||
void GroupConcator::initialize(const rowgroup::SP_GroupConcat& gcc)
|
void GroupConcator::initialize(const rowgroup::SP_GroupConcat& gcc)
|
||||||
{
|
{
|
||||||
// MCOL-901 This value comes from the Server and it is
|
// MCOL-901 This value comes from the Server and it is
|
||||||
// too high(3MB) to allocate it for every instance.
|
// too high(1MB or 3MB by default) to allocate it for every instance.
|
||||||
fGroupConcatLen = gcc->fSize;
|
fGroupConcatLen = gcc->fSize;
|
||||||
size_t sepSize = gcc->fSeparator.size();
|
size_t sepSize = gcc->fSeparator.size();
|
||||||
fCurrentLength -= sepSize; // XXX Yet I have to find out why spearator has c_str() as nullptr here.
|
fCurrentLength -= sepSize; // XXX Yet I have to find out why spearator has c_str() as nullptr here.
|
||||||
|
@ -92,6 +92,8 @@ class GroupConcatAgUM : public rowgroup::GroupConcatAg
|
|||||||
boost::scoped_ptr<GroupConcator> fConcator;
|
boost::scoped_ptr<GroupConcator> fConcator;
|
||||||
boost::scoped_array<uint8_t> fData;
|
boost::scoped_array<uint8_t> fData;
|
||||||
rowgroup::Row fRow;
|
rowgroup::Row fRow;
|
||||||
|
rowgroup::RGData fRowRGData;
|
||||||
|
rowgroup::RowGroup fRowGroup;
|
||||||
bool fNoOrder;
|
bool fNoOrder;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -5348,7 +5348,24 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
|
|||||||
// Item_func_group_concat* gc = (Item_func_group_concat*)isp;
|
// Item_func_group_concat* gc = (Item_func_group_concat*)isp;
|
||||||
CalpontSystemCatalog::ColType ct;
|
CalpontSystemCatalog::ColType ct;
|
||||||
ct.colDataType = CalpontSystemCatalog::VARCHAR;
|
ct.colDataType = CalpontSystemCatalog::VARCHAR;
|
||||||
ct.colWidth = isp->max_length;
|
|
||||||
|
// MCOL-5429 CalpontSystemCatalog::ColType::colWidth is currently
|
||||||
|
// stored as an int32_t (see calpontsystemcatalog.h). However,
|
||||||
|
// Item_sum::max_length is an uint32_t. This means there will be an
|
||||||
|
// integer overflow when Item_sum::max_length > colWidth. This ultimately
|
||||||
|
// causes an array index out of bound in GroupConcator::swapStreamWithStringAndReturnBuf()
|
||||||
|
// in groupconcat.cpp when ExeMgr processes groupconcat. As a temporary
|
||||||
|
// fix, we cap off the max groupconcat length to std::numeric_limits<int32_t>::max().
|
||||||
|
// The proper fix would be to change colWidth type to uint32_t.
|
||||||
|
if (isp->max_length <= std::numeric_limits<int32_t>::max())
|
||||||
|
{
|
||||||
|
ct.colWidth = isp->max_length;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ct.colWidth = std::numeric_limits<int32_t>::max();
|
||||||
|
}
|
||||||
|
|
||||||
ct.precision = 0;
|
ct.precision = 0;
|
||||||
ac->resultType(ct);
|
ac->resultType(ct);
|
||||||
}
|
}
|
||||||
|
@ -656,7 +656,7 @@ void RowAggregation::resetUDAF(RowUDAFFunctionCol* rowUDAF, uint64_t funcColsIdx
|
|||||||
// Initilalize the data members to meaningful values, setup the hashmap.
|
// Initilalize the data members to meaningful values, setup the hashmap.
|
||||||
// The fRowGroupOut must have a valid data pointer before this.
|
// The fRowGroupOut must have a valid data pointer before this.
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
void RowAggregation::initialize()
|
void RowAggregation::initialize(bool hasGroupConcat)
|
||||||
{
|
{
|
||||||
// Calculate the length of the hashmap key.
|
// Calculate the length of the hashmap key.
|
||||||
fAggMapKeyCount = fGroupByCols.size();
|
fAggMapKeyCount = fGroupByCols.size();
|
||||||
@ -694,9 +694,25 @@ void RowAggregation::initialize()
|
|||||||
makeAggFieldsNull(fRow);
|
makeAggFieldsNull(fRow);
|
||||||
|
|
||||||
// Keep a copy of the null row to initialize new map entries.
|
// Keep a copy of the null row to initialize new map entries.
|
||||||
fRowGroupOut->initRow(&fNullRow, true);
|
// MCOL-5429 Use stringstore if the datatype of the groupconcat
|
||||||
fNullRowData.reset(new uint8_t[fNullRow.getSize()]);
|
// field is a long string.
|
||||||
fNullRow.setData(rowgroup::Row::Pointer(fNullRowData.get()));
|
if (hasGroupConcat && fRowGroupOut->hasLongString())
|
||||||
|
{
|
||||||
|
fNullRowGroup = *fRowGroupOut;
|
||||||
|
fNullRowGroup.setUseStringTable(true);
|
||||||
|
fNullRowRGData.reinit(fNullRowGroup, 1);
|
||||||
|
fNullRowGroup.setData(&fNullRowRGData);
|
||||||
|
fNullRowGroup.resetRowGroup(0);
|
||||||
|
fNullRowGroup.initRow(&fNullRow);
|
||||||
|
fNullRowGroup.getRow(0, &fNullRow);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
fRowGroupOut->initRow(&fNullRow, true);
|
||||||
|
fNullRowData.reset(new uint8_t[fNullRow.getSize()]);
|
||||||
|
fNullRow.setData(rowgroup::Row::Pointer(fNullRowData.get()));
|
||||||
|
}
|
||||||
|
|
||||||
copyRow(fRow, &fNullRow);
|
copyRow(fRow, &fNullRow);
|
||||||
|
|
||||||
// Lazy approach w/o a mapping b/w fFunctionCols idx and fRGContextColl idx
|
// Lazy approach w/o a mapping b/w fFunctionCols idx and fRGContextColl idx
|
||||||
@ -2413,7 +2429,7 @@ void RowAggregationUM::endOfInput()
|
|||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
// Initilalize the Group Concat data
|
// Initilalize the Group Concat data
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
void RowAggregationUM::initialize()
|
void RowAggregationUM::initialize(bool hasGroupConcat)
|
||||||
{
|
{
|
||||||
if (fGroupConcat.size() > 0)
|
if (fGroupConcat.size() > 0)
|
||||||
fFunctionColGc = fFunctionCols;
|
fFunctionColGc = fFunctionCols;
|
||||||
@ -2423,7 +2439,7 @@ void RowAggregationUM::initialize()
|
|||||||
fKeyRG = fRowGroupIn.truncate(fGroupByCols.size());
|
fKeyRG = fRowGroupIn.truncate(fGroupByCols.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
RowAggregation::initialize();
|
RowAggregation::initialize(fGroupConcat.size() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -529,7 +529,7 @@ class RowAggregation : public messageqcpp::Serializeable
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual void initialize();
|
virtual void initialize(bool hasGroupConcat = false);
|
||||||
virtual void initMapData(const Row& row);
|
virtual void initMapData(const Row& row);
|
||||||
virtual void attachGroupConcatAg();
|
virtual void attachGroupConcatAg();
|
||||||
|
|
||||||
@ -580,6 +580,8 @@ class RowAggregation : public messageqcpp::Serializeable
|
|||||||
Row fNullRow;
|
Row fNullRow;
|
||||||
Row* tmpRow; // used by the hashers & eq functors
|
Row* tmpRow; // used by the hashers & eq functors
|
||||||
boost::scoped_array<uint8_t> fNullRowData;
|
boost::scoped_array<uint8_t> fNullRowData;
|
||||||
|
rowgroup::RGData fNullRowRGData;
|
||||||
|
rowgroup::RowGroup fNullRowGroup;
|
||||||
|
|
||||||
std::unique_ptr<RowAggStorage> fRowAggStorage;
|
std::unique_ptr<RowAggStorage> fRowAggStorage;
|
||||||
|
|
||||||
@ -724,7 +726,7 @@ class RowAggregationUM : public RowAggregation
|
|||||||
|
|
||||||
protected:
|
protected:
|
||||||
// virtual methods from base
|
// virtual methods from base
|
||||||
void initialize() override;
|
void initialize(bool hasGroupConcat = false) override;
|
||||||
|
|
||||||
void attachGroupConcatAg() override;
|
void attachGroupConcatAg() override;
|
||||||
void updateEntry(const Row& row, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
|
void updateEntry(const Row& row, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
|
||||||
|
@ -1544,6 +1544,11 @@ class RowGroup : public messageqcpp::Serializeable
|
|||||||
inline bool usesStringTable() const;
|
inline bool usesStringTable() const;
|
||||||
inline void setUseStringTable(bool);
|
inline void setUseStringTable(bool);
|
||||||
|
|
||||||
|
bool hasLongString() const
|
||||||
|
{
|
||||||
|
return hasLongStringField;
|
||||||
|
}
|
||||||
|
|
||||||
void serializeRGData(messageqcpp::ByteStream&) const;
|
void serializeRGData(messageqcpp::ByteStream&) const;
|
||||||
inline uint32_t getStringTableThreshold() const;
|
inline uint32_t getStringTableThreshold() const;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user