You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
1. Input and output RowGroup's used in GROUP_CONCAT classes are currently allocating a raw memory buffer of size equal to the actual width of the string datatype. As an example, for the following query: SELECT col1, GROUP_CONCAT(col2) FROM t GROUP BY col1; If col2 is a TEXT field with default width, the input RowGroup containing the target rows to be concatenated will assign 64kb of memory for every input row in the RowGroup. This is wasteful as actual field values in real workloads would be much smaller. We fix this by enabling the RowGroup to use the StringStore when the RowGroup contains long strings. 2. RowAggregation::initialize() allocates a memory buffer for a NULL row. The size of this buffer is equal to the row size for the output RowGroup. For the above scenario, using the default group_concat_max_len (which is a server variable that sets the maximum length of the GROUP_CONCAT string) value of 1mb, the buffer size would be (1mb + 64kb + some additional metadata). If the user sets group_concat_max_len to a higher value, say 3gb, this buffer size would be ~3gb. Now if the runtime initiates several instances of RowAggregation, total memory consumption by PrimProc could exceed the hardware memory limits causing the OS OOM to kill the process. We fix this problem by again enabling the StringStore for the NULL row allocation. 3. In the plugin code in buildAggregateColumn(), there is an integer overflow when the server group_concat_max_len variable (which is an uint32_t) is set to a value > INT32_MAX (such as 3gb) and is assigned to CalpontSystemCatalog::ColType::colWidth (which is an int32_t). As a short term fix, we saturate the assigned value to colWidth to INT32_MAX. Proper fix would be to upgrade CalpontSystemCatalog::ColType::colWidth to an uint32_t.
1117 lines
28 KiB
C++
1117 lines
28 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2019-2023 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: groupconcat.cpp 9705 2013-07-17 20:06:07Z pleblanc $
|
|
|
|
#include <iostream>
|
|
// #define NDEBUG
|
|
#include <cassert>
|
|
#include <string>
|
|
using namespace std;
|
|
|
|
|
|
#include "errorids.h"
|
|
#include "exceptclasses.h"
|
|
using namespace logging;
|
|
|
|
#include "returnedcolumn.h"
|
|
#include "aggregatecolumn.h"
|
|
#include "arithmeticcolumn.h"
|
|
#include "functioncolumn.h"
|
|
#include "constantcolumn.h"
|
|
#include "rowcolumn.h"
|
|
#include "groupconcatcolumn.h"
|
|
#include "calpontsystemcatalog.h"
|
|
using namespace execplan;
|
|
|
|
#include "rowgroup.h"
|
|
#include "rowaggregation.h"
|
|
using namespace rowgroup;
|
|
|
|
#include "dataconvert.h"
|
|
using namespace dataconvert;
|
|
|
|
#include "groupconcat.h"
|
|
|
|
using namespace ordering;
|
|
|
|
#include "jobstep.h"
|
|
#include "jlf_common.h"
|
|
#include "limitedorderby.h"
|
|
#include "mcs_decimal.h"
|
|
|
|
namespace joblist
|
|
{
|
|
// GroupConcatInfo class implementation
|
|
GroupConcatInfo::GroupConcatInfo()
|
|
{
|
|
}
|
|
|
|
GroupConcatInfo::~GroupConcatInfo()
|
|
{
|
|
}
|
|
|
|
void GroupConcatInfo::prepGroupConcat(JobInfo& jobInfo)
|
|
{
|
|
RetColsVector::iterator i = jobInfo.groupConcatCols.begin();
|
|
|
|
while (i != jobInfo.groupConcatCols.end())
|
|
{
|
|
GroupConcatColumn* gcc = dynamic_cast<GroupConcatColumn*>(i->get());
|
|
const RowColumn* rcp = dynamic_cast<const RowColumn*>(gcc->aggParms()[0].get());
|
|
|
|
SP_GroupConcat groupConcat(new GroupConcat);
|
|
groupConcat->fSeparator = gcc->separator();
|
|
groupConcat->fDistinct = gcc->distinct();
|
|
groupConcat->fSize = gcc->resultType().colWidth;
|
|
groupConcat->fRm = jobInfo.rm;
|
|
groupConcat->fSessionMemLimit = jobInfo.umMemLimit;
|
|
groupConcat->fTimeZone = jobInfo.timeZone;
|
|
|
|
int key = -1;
|
|
const vector<SRCP>& cols = rcp->columnVec();
|
|
|
|
for (uint64_t j = 0, k = 0; j < cols.size(); j++)
|
|
{
|
|
const ConstantColumn* cc = dynamic_cast<const ConstantColumn*>(cols[j].get());
|
|
|
|
if (cc == NULL)
|
|
{
|
|
key = getColumnKey(cols[j], jobInfo);
|
|
fColumns.insert(key);
|
|
groupConcat->fGroupCols.push_back(make_pair(key, k++));
|
|
}
|
|
else
|
|
{
|
|
groupConcat->fConstCols.push_back(make_pair(cc->constval(), j));
|
|
}
|
|
}
|
|
|
|
vector<SRCP>& orderCols = gcc->orderCols();
|
|
|
|
for (vector<SRCP>::iterator k = orderCols.begin(); k != orderCols.end(); k++)
|
|
{
|
|
if (dynamic_cast<const ConstantColumn*>(k->get()) != NULL)
|
|
continue;
|
|
|
|
key = getColumnKey(*k, jobInfo);
|
|
fColumns.insert(key);
|
|
groupConcat->fOrderCols.push_back(make_pair(key, k->get()->asc()));
|
|
}
|
|
|
|
fGroupConcat.push_back(groupConcat);
|
|
|
|
i++;
|
|
}
|
|
|
|
// Rare case: all columns in group_concat are constant columns, use a column in column map.
|
|
if (jobInfo.groupConcatCols.size() > 0 && fColumns.size() == 0)
|
|
{
|
|
int key = -1;
|
|
|
|
for (vector<uint32_t>::iterator i = jobInfo.tableList.begin(); i != jobInfo.tableList.end() && key == -1;
|
|
i++)
|
|
{
|
|
if (jobInfo.columnMap[*i].size() > 0)
|
|
{
|
|
key = *(jobInfo.columnMap[*i].begin());
|
|
}
|
|
}
|
|
|
|
if (key != -1)
|
|
{
|
|
fColumns.insert(key);
|
|
}
|
|
else
|
|
{
|
|
throw runtime_error("Empty column map.");
|
|
}
|
|
}
|
|
}
|
|
|
|
uint32_t GroupConcatInfo::getColumnKey(const SRCP& srcp, JobInfo& jobInfo)
|
|
{
|
|
int colKey = -1;
|
|
const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(srcp.get());
|
|
|
|
if (sc != NULL)
|
|
{
|
|
if (sc->schemaName().empty())
|
|
{
|
|
// bug3839, handle columns from subquery.
|
|
SimpleColumn tmp(*sc, jobInfo.sessionId);
|
|
tmp.oid(tableOid(sc, jobInfo.csc) + 1 + sc->colPosition());
|
|
colKey = getTupleKey(jobInfo, &tmp);
|
|
}
|
|
else
|
|
{
|
|
colKey = getTupleKey(jobInfo, sc);
|
|
}
|
|
|
|
// check if this is a dictionary column
|
|
if (jobInfo.keyInfo->dictKeyMap.find(colKey) != jobInfo.keyInfo->dictKeyMap.end())
|
|
colKey = jobInfo.keyInfo->dictKeyMap[colKey];
|
|
}
|
|
else
|
|
{
|
|
const ArithmeticColumn* ac = dynamic_cast<const ArithmeticColumn*>(srcp.get());
|
|
const FunctionColumn* fc = dynamic_cast<const FunctionColumn*>(srcp.get());
|
|
|
|
if (ac != NULL || fc != NULL)
|
|
{
|
|
colKey = getExpTupleKey(jobInfo, srcp->expressionId());
|
|
}
|
|
else
|
|
{
|
|
cerr << "Unsupported GROUP_CONCAT column. " << srcp->toString() << endl;
|
|
throw runtime_error("Unsupported GROUP_CONCAT column.");
|
|
}
|
|
}
|
|
|
|
return colKey;
|
|
}
|
|
|
|
void GroupConcatInfo::mapColumns(const RowGroup& projRG)
|
|
{
|
|
map<uint32_t, uint32_t> projColumnMap;
|
|
const vector<uint32_t>& keysProj = projRG.getKeys();
|
|
|
|
for (uint64_t i = 0; i < projRG.getColumnCount(); i++)
|
|
projColumnMap[keysProj[i]] = i;
|
|
|
|
for (vector<SP_GroupConcat>::iterator k = fGroupConcat.begin(); k != fGroupConcat.end(); k++)
|
|
{
|
|
vector<uint32_t> pos;
|
|
vector<uint32_t> oids;
|
|
vector<uint32_t> keys;
|
|
vector<uint32_t> scale;
|
|
vector<uint32_t> precision;
|
|
vector<CalpontSystemCatalog::ColDataType> types;
|
|
vector<uint32_t> csNums;
|
|
pos.push_back(2);
|
|
|
|
vector<pair<uint32_t, uint32_t> >::iterator i1 = (*k)->fGroupCols.begin();
|
|
|
|
while (i1 != (*k)->fGroupCols.end())
|
|
{
|
|
map<uint32_t, uint32_t>::iterator j = projColumnMap.find(i1->first);
|
|
|
|
if (j == projColumnMap.end())
|
|
{
|
|
cerr << "Concat Key:" << i1->first << " is not projected." << endl;
|
|
throw runtime_error("Project error.");
|
|
}
|
|
|
|
pos.push_back(pos.back() + projRG.getColumnWidth(j->second));
|
|
oids.push_back(projRG.getOIDs()[j->second]);
|
|
keys.push_back(projRG.getKeys()[j->second]);
|
|
types.push_back(projRG.getColTypes()[j->second]);
|
|
csNums.push_back(projRG.getCharsetNumber(j->second));
|
|
scale.push_back(projRG.getScale()[j->second]);
|
|
precision.push_back(projRG.getPrecision()[j->second]);
|
|
|
|
i1++;
|
|
}
|
|
|
|
vector<pair<uint32_t, bool> >::iterator i2 = (*k)->fOrderCols.begin();
|
|
|
|
while (i2 != (*k)->fOrderCols.end())
|
|
{
|
|
map<uint32_t, uint32_t>::iterator j = projColumnMap.find(i2->first);
|
|
|
|
if (j == projColumnMap.end())
|
|
{
|
|
cerr << "Order Key:" << i2->first << " is not projected." << endl;
|
|
throw runtime_error("Project error.");
|
|
}
|
|
|
|
vector<uint32_t>::iterator i3 = find(keys.begin(), keys.end(), j->first);
|
|
int idx = 0;
|
|
|
|
if (i3 == keys.end())
|
|
{
|
|
idx = keys.size();
|
|
|
|
pos.push_back(pos.back() + projRG.getColumnWidth(j->second));
|
|
oids.push_back(projRG.getOIDs()[j->second]);
|
|
keys.push_back(projRG.getKeys()[j->second]);
|
|
types.push_back(projRG.getColTypes()[j->second]);
|
|
csNums.push_back(projRG.getCharsetNumber(j->second));
|
|
scale.push_back(projRG.getScale()[j->second]);
|
|
precision.push_back(projRG.getPrecision()[j->second]);
|
|
}
|
|
else
|
|
{
|
|
idx = std::distance(keys.begin(), i3);
|
|
}
|
|
|
|
(*k)->fOrderCond.push_back(make_pair(idx, i2->second));
|
|
|
|
i2++;
|
|
}
|
|
|
|
(*k)->fRowGroup = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision,
|
|
projRG.getStringTableThreshold(), false);
|
|
|
|
// MCOL-5429 Use stringstore if the datatype of the groupconcat
|
|
// field is a long string.
|
|
if ((*k)->fRowGroup.hasLongString())
|
|
{
|
|
(*k)->fRowGroup.setUseStringTable(true);
|
|
}
|
|
|
|
(*k)->fMapping = makeMapping(projRG, (*k)->fRowGroup);
|
|
}
|
|
}
|
|
|
|
std::shared_ptr<int[]> GroupConcatInfo::makeMapping(const RowGroup& in, const RowGroup& out)
|
|
{
|
|
// For some reason using the rowgroup mapping fcns don't work completely right in this class
|
|
std::shared_ptr<int[]> mapping(new int[out.getColumnCount()]);
|
|
|
|
for (uint64_t i = 0; i < out.getColumnCount(); i++)
|
|
{
|
|
for (uint64_t j = 0; j < in.getColumnCount(); j++)
|
|
{
|
|
if ((out.getKeys()[i] == in.getKeys()[j]))
|
|
{
|
|
mapping[i] = j;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
return mapping;
|
|
}
|
|
|
|
const string GroupConcatInfo::toString() const
|
|
{
|
|
ostringstream oss;
|
|
oss << "GroupConcatInfo: toString() to be implemented.";
|
|
oss << endl;
|
|
|
|
return oss.str();
|
|
}
|
|
|
|
GroupConcatAgUM::GroupConcatAgUM(rowgroup::SP_GroupConcat& gcc) : GroupConcatAg(gcc)
|
|
{
|
|
initialize();
|
|
}
|
|
|
|
GroupConcatAgUM::~GroupConcatAgUM()
|
|
{
|
|
}
|
|
|
|
void GroupConcatAgUM::initialize()
|
|
{
|
|
if (fGroupConcat->fDistinct || fGroupConcat->fOrderCols.size() > 0)
|
|
fConcator.reset(new GroupConcatOrderBy());
|
|
else
|
|
fConcator.reset(new GroupConcatNoOrder());
|
|
|
|
fConcator->initialize(fGroupConcat);
|
|
|
|
// MCOL-5429 Use stringstore if the datatype of the groupconcat
|
|
// field is a long string.
|
|
if (fGroupConcat->fRowGroup.hasLongString())
|
|
{
|
|
fRowGroup = fGroupConcat->fRowGroup;
|
|
fRowGroup.setUseStringTable(true);
|
|
fRowRGData.reinit(fRowGroup, 1);
|
|
fRowGroup.setData(&fRowRGData);
|
|
fRowGroup.resetRowGroup(0);
|
|
fRowGroup.initRow(&fRow);
|
|
fRowGroup.getRow(0, &fRow);
|
|
}
|
|
else
|
|
{
|
|
fGroupConcat->fRowGroup.initRow(&fRow, true);
|
|
fData.reset(new uint8_t[fRow.getSize()]);
|
|
fRow.setData(rowgroup::Row::Pointer(fData.get()));
|
|
}
|
|
}
|
|
|
|
void GroupConcatAgUM::processRow(const rowgroup::Row& inRow)
|
|
{
|
|
applyMapping(fGroupConcat->fMapping, inRow);
|
|
fConcator->processRow(fRow);
|
|
}
|
|
|
|
void GroupConcatAgUM::merge(const rowgroup::Row& inRow, int64_t i)
|
|
{
|
|
uint8_t* data = inRow.getData();
|
|
joblist::GroupConcatAgUM* gccAg = *((joblist::GroupConcatAgUM**)(data + inRow.getOffset(i)));
|
|
|
|
fConcator->merge(gccAg->concator().get());
|
|
}
|
|
|
|
uint8_t* GroupConcatAgUM::getResult()
|
|
{
|
|
return fConcator->getResult(fGroupConcat->fSeparator);
|
|
}
|
|
|
|
void GroupConcatAgUM::applyMapping(const std::shared_ptr<int[]>& mapping, const Row& row)
|
|
{
|
|
// For some reason the rowgroup mapping fcns don't work right in this class.
|
|
for (uint64_t i = 0; i < fRow.getColumnCount(); i++)
|
|
{
|
|
if (fRow.getColumnWidth(i) > datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
if (fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::CHAR ||
|
|
fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::VARCHAR ||
|
|
fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::TEXT)
|
|
{
|
|
fRow.setStringField(row.getConstString(mapping[i]), i);
|
|
}
|
|
else if (fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::LONGDOUBLE)
|
|
{
|
|
fRow.setLongDoubleField(row.getLongDoubleField(mapping[i]), i);
|
|
}
|
|
else if (datatypes::isWideDecimalType(fRow.getColType(i), fRow.getColumnWidth(i)))
|
|
{
|
|
row.copyBinaryField(fRow, i, mapping[i]);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::CHAR ||
|
|
fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::VARCHAR)
|
|
{
|
|
fRow.setIntField(row.getUintField(mapping[i]), i);
|
|
}
|
|
else
|
|
{
|
|
fRow.setIntField(row.getIntField(mapping[i]), i);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// GroupConcator class implementation
|
|
GroupConcator::GroupConcator() : fCurrentLength(0), fGroupConcatLen(0), fConstantLen(0)
|
|
{
|
|
}
|
|
|
|
GroupConcator::~GroupConcator()
|
|
{
|
|
}
|
|
|
|
void GroupConcator::initialize(const rowgroup::SP_GroupConcat& gcc)
|
|
{
|
|
// MCOL-901 This value comes from the Server and it is
|
|
// too high(1MB or 3MB by default) to allocate it for every instance.
|
|
fGroupConcatLen = gcc->fSize;
|
|
size_t sepSize = gcc->fSeparator.size();
|
|
fCurrentLength -= sepSize; // XXX Yet I have to find out why spearator has c_str() as nullptr here.
|
|
fTimeZone = gcc->fTimeZone;
|
|
|
|
fConstCols = gcc->fConstCols;
|
|
fConstantLen = sepSize;
|
|
|
|
for (uint64_t i = 0; i < fConstCols.size(); i++)
|
|
fConstantLen += strlen(fConstCols[i].first.str());
|
|
}
|
|
|
|
void GroupConcator::outputRow(std::ostringstream& oss, const rowgroup::Row& row)
|
|
{
|
|
const CalpontSystemCatalog::ColDataType* types = row.getColTypes();
|
|
vector<uint32_t>::iterator i = fConcatColumns.begin();
|
|
auto j = fConstCols.begin();
|
|
|
|
uint64_t groupColCount = fConcatColumns.size() + fConstCols.size();
|
|
|
|
for (uint64_t k = 0; k < groupColCount; k++)
|
|
{
|
|
if (j != fConstCols.end() && k == j->second)
|
|
{
|
|
oss << j->first.safeString();
|
|
j++;
|
|
continue;
|
|
}
|
|
|
|
switch (types[*i])
|
|
{
|
|
case CalpontSystemCatalog::TINYINT:
|
|
case CalpontSystemCatalog::SMALLINT:
|
|
case CalpontSystemCatalog::MEDINT:
|
|
case CalpontSystemCatalog::INT:
|
|
case CalpontSystemCatalog::BIGINT:
|
|
{
|
|
int64_t intVal = row.getIntField(*i);
|
|
|
|
oss << intVal;
|
|
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::DECIMAL:
|
|
case CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
oss << fixed << row.getDecimalField(*i);
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::UTINYINT:
|
|
case CalpontSystemCatalog::USMALLINT:
|
|
case CalpontSystemCatalog::UMEDINT:
|
|
case CalpontSystemCatalog::UINT:
|
|
case CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
uint64_t uintVal = row.getUintField(*i);
|
|
int scale = (int)row.getScale(*i);
|
|
|
|
if (scale == 0)
|
|
{
|
|
oss << uintVal;
|
|
}
|
|
else
|
|
{
|
|
oss << fixed
|
|
<< datatypes::Decimal(datatypes::TSInt128((int128_t)uintVal), scale,
|
|
datatypes::INT128MAXPRECISION);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::CHAR:
|
|
case CalpontSystemCatalog::VARCHAR:
|
|
case CalpontSystemCatalog::TEXT:
|
|
{
|
|
oss << row.getStringField(*i).str();
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::DOUBLE:
|
|
case CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
oss << setprecision(15) << row.getDoubleField(*i);
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
oss << setprecision(15) << row.getLongDoubleField(*i);
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::FLOAT:
|
|
case CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
oss << row.getFloatField(*i);
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::DATE:
|
|
{
|
|
oss << DataConvert::dateToString(row.getUintField(*i));
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::DATETIME:
|
|
{
|
|
oss << DataConvert::datetimeToString(row.getUintField(*i));
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::TIMESTAMP:
|
|
{
|
|
oss << DataConvert::timestampToString(row.getUintField(*i), fTimeZone);
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::TIME:
|
|
{
|
|
oss << DataConvert::timeToString(row.getUintField(*i));
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
i++;
|
|
}
|
|
}
|
|
|
|
bool GroupConcator::concatColIsNull(const rowgroup::Row& row)
|
|
{
|
|
bool ret = false;
|
|
|
|
for (vector<uint32_t>::iterator i = fConcatColumns.begin(); i != fConcatColumns.end(); i++)
|
|
{
|
|
if (row.isNullValue(*i))
|
|
{
|
|
ret = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
int64_t GroupConcator::lengthEstimate(const rowgroup::Row& row)
|
|
{
|
|
int64_t rowLen = fConstantLen; // fixed constant and separator length
|
|
const CalpontSystemCatalog::ColDataType* types = row.getColTypes();
|
|
|
|
// null values are already skipped.
|
|
for (vector<uint32_t>::iterator i = fConcatColumns.begin(); i != fConcatColumns.end(); i++)
|
|
{
|
|
if (row.isNullValue(*i))
|
|
continue;
|
|
|
|
int64_t fieldLen = 0;
|
|
|
|
switch (types[*i])
|
|
{
|
|
case CalpontSystemCatalog::TINYINT:
|
|
case CalpontSystemCatalog::SMALLINT:
|
|
case CalpontSystemCatalog::MEDINT:
|
|
case CalpontSystemCatalog::INT:
|
|
case CalpontSystemCatalog::BIGINT:
|
|
{
|
|
int64_t v = row.getIntField(*i);
|
|
|
|
if (v < 0)
|
|
fieldLen++;
|
|
|
|
while ((v /= 10) != 0)
|
|
fieldLen++;
|
|
|
|
fieldLen += 1;
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::UTINYINT:
|
|
case CalpontSystemCatalog::USMALLINT:
|
|
case CalpontSystemCatalog::UMEDINT:
|
|
case CalpontSystemCatalog::UINT:
|
|
case CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
uint64_t v = row.getUintField(*i);
|
|
|
|
while ((v /= 10) != 0)
|
|
fieldLen++;
|
|
|
|
fieldLen += 1;
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::DECIMAL:
|
|
case CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
fieldLen += 1;
|
|
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::CHAR:
|
|
case CalpontSystemCatalog::VARCHAR:
|
|
case CalpontSystemCatalog::TEXT:
|
|
{
|
|
fieldLen += row.getConstString(*i).length();
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::DOUBLE:
|
|
case CalpontSystemCatalog::UDOUBLE:
|
|
case CalpontSystemCatalog::FLOAT:
|
|
case CalpontSystemCatalog::UFLOAT:
|
|
case CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
fieldLen = 1; // minimum length
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::DATE:
|
|
{
|
|
fieldLen = 10; // YYYY-MM-DD
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::DATETIME:
|
|
case CalpontSystemCatalog::TIMESTAMP:
|
|
{
|
|
fieldLen = 19; // YYYY-MM-DD HH24:MI:SS
|
|
// Decimal point and milliseconds
|
|
uint64_t colPrecision = row.getPrecision(*i);
|
|
|
|
if (colPrecision > 0 && colPrecision < 7)
|
|
{
|
|
fieldLen += colPrecision + 1;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case CalpontSystemCatalog::TIME:
|
|
{
|
|
fieldLen = 10; // -HHH:MI:SS
|
|
// Decimal point and milliseconds
|
|
uint64_t colPrecision = row.getPrecision(*i);
|
|
|
|
if (colPrecision > 0 && colPrecision < 7)
|
|
{
|
|
fieldLen += colPrecision + 1;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
rowLen += fieldLen;
|
|
}
|
|
|
|
return rowLen;
|
|
}
|
|
|
|
const string GroupConcator::toString() const
|
|
{
|
|
ostringstream oss;
|
|
oss << "GroupConcat size-" << fGroupConcatLen;
|
|
oss << "Concat cols: ";
|
|
vector<uint32_t>::const_iterator i = fConcatColumns.begin();
|
|
auto j = fConstCols.begin();
|
|
uint64_t groupColCount = fConcatColumns.size() + fConstCols.size();
|
|
|
|
for (uint64_t k = 0; k < groupColCount; k++)
|
|
{
|
|
if (j != fConstCols.end() && k == j->second)
|
|
{
|
|
oss << 'c' << " ";
|
|
j++;
|
|
}
|
|
else
|
|
{
|
|
oss << (*i) << " ";
|
|
i++;
|
|
}
|
|
}
|
|
|
|
oss << endl;
|
|
|
|
return oss.str();
|
|
}
|
|
|
|
// GroupConcatOrderBy class implementation
|
|
GroupConcatOrderBy::GroupConcatOrderBy()
|
|
{
|
|
fRule.fIdbCompare = this;
|
|
}
|
|
|
|
GroupConcatOrderBy::~GroupConcatOrderBy()
|
|
{
|
|
}
|
|
|
|
void GroupConcatOrderBy::initialize(const rowgroup::SP_GroupConcat& gcc)
|
|
{
|
|
GroupConcator::initialize(gcc);
|
|
|
|
fOrderByCond.resize(0);
|
|
|
|
for (uint64_t i = 0; i < gcc->fOrderCond.size(); i++)
|
|
fOrderByCond.push_back(IdbSortSpec(gcc->fOrderCond[i].first, gcc->fOrderCond[i].second));
|
|
|
|
fDistinct = gcc->fDistinct;
|
|
fRowsPerRG = 128;
|
|
fErrorCode = ERR_AGGREGATION_TOO_BIG;
|
|
fRm = gcc->fRm;
|
|
fSessionMemLimit = gcc->fSessionMemLimit;
|
|
|
|
vector<std::pair<uint32_t, uint32_t> >::iterator i = gcc->fGroupCols.begin();
|
|
while (i != gcc->fGroupCols.end())
|
|
{
|
|
auto x = (*i).second;
|
|
fConcatColumns.push_back(x);
|
|
i++;
|
|
}
|
|
|
|
IdbOrderBy::initialize(gcc->fRowGroup);
|
|
}
|
|
|
|
uint64_t GroupConcatOrderBy::getKeyLength() const
|
|
{
|
|
// only distinct the concatenated columns
|
|
return fConcatColumns.size(); // cols 0 to fConcatColumns.size() - 1 will be compared
|
|
}
|
|
|
|
void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
|
|
{
|
|
// check if this is a distinct row
|
|
if (fDistinct && fDistinctMap->find(row.getPointer()) != fDistinctMap->end())
|
|
return;
|
|
|
|
// this row is skipped if any concatenated column is null.
|
|
if (concatColIsNull(row))
|
|
return;
|
|
|
|
// if the row count is less than the limit
|
|
if (fCurrentLength < fGroupConcatLen)
|
|
{
|
|
copyRow(row, &fRow0);
|
|
// the RID is no meaning here, use it to store the estimated length.
|
|
int16_t estLen = lengthEstimate(fRow0);
|
|
fRow0.setRid(estLen);
|
|
OrderByRow newRow(fRow0, fRule);
|
|
fOrderByQueue.push(newRow);
|
|
fCurrentLength += estLen;
|
|
|
|
// add to the distinct map
|
|
if (fDistinct)
|
|
fDistinctMap->insert(fRow0.getPointer());
|
|
|
|
fRowGroup.incRowCount();
|
|
fRow0.nextRow();
|
|
|
|
if (fRowGroup.getRowCount() >= fRowsPerRG)
|
|
{
|
|
fDataQueue.push(fData);
|
|
// A "postfix" but accurate RAM accounting that sums up sizes of RGDatas.
|
|
uint64_t newSize = fRowGroup.getSizeWithStrings();
|
|
|
|
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
|
{
|
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
|
|
throw IDBExcept(fErrorCode);
|
|
}
|
|
fMemSize += newSize;
|
|
|
|
fData.reinit(fRowGroup, fRowsPerRG);
|
|
fRowGroup.setData(&fData);
|
|
fRowGroup.resetRowGroup(0);
|
|
fRowGroup.getRow(0, &fRow0);
|
|
}
|
|
}
|
|
|
|
else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue.top().fData))
|
|
{
|
|
OrderByRow swapRow = fOrderByQueue.top();
|
|
fRow1.setData(swapRow.fData);
|
|
fOrderByQueue.pop();
|
|
fCurrentLength -= fRow1.getRelRid();
|
|
fRow2.setData(swapRow.fData);
|
|
|
|
if (!fDistinct)
|
|
{
|
|
copyRow(row, &fRow1);
|
|
}
|
|
else
|
|
{
|
|
// only the copyRow does useful work here
|
|
fDistinctMap->erase(swapRow.fData);
|
|
copyRow(row, &fRow2);
|
|
fDistinctMap->insert(swapRow.fData);
|
|
}
|
|
|
|
int16_t estLen = lengthEstimate(fRow2);
|
|
fRow2.setRid(estLen);
|
|
fCurrentLength += estLen;
|
|
|
|
fOrderByQueue.push(swapRow);
|
|
}
|
|
}
|
|
|
|
void GroupConcatOrderBy::merge(GroupConcator* gc)
|
|
{
|
|
GroupConcatOrderBy* go = dynamic_cast<GroupConcatOrderBy*>(gc);
|
|
|
|
while (go->fOrderByQueue.empty() == false)
|
|
{
|
|
const OrderByRow& row = go->fOrderByQueue.top();
|
|
|
|
// check if the distinct row already exists
|
|
if (fDistinct && fDistinctMap->find(row.fData) != fDistinctMap->end())
|
|
{
|
|
; // no op;
|
|
}
|
|
|
|
// if the row count is less than the limit
|
|
else if (fCurrentLength < fGroupConcatLen)
|
|
{
|
|
fOrderByQueue.push(row);
|
|
row1.setData(row.fData);
|
|
fCurrentLength += row1.getRelRid();
|
|
|
|
// add to the distinct map
|
|
if (fDistinct)
|
|
fDistinctMap->insert(row.fData);
|
|
}
|
|
|
|
else if (fOrderByCond.size() > 0 && fRule.less(row.fData, fOrderByQueue.top().fData))
|
|
{
|
|
OrderByRow swapRow = fOrderByQueue.top();
|
|
row1.setData(swapRow.fData);
|
|
fOrderByQueue.pop();
|
|
fCurrentLength -= row1.getRelRid();
|
|
|
|
if (fDistinct)
|
|
{
|
|
fDistinctMap->erase(swapRow.fData);
|
|
fDistinctMap->insert(row.fData);
|
|
}
|
|
|
|
row1.setData(row.fData);
|
|
fCurrentLength += row1.getRelRid();
|
|
|
|
fOrderByQueue.push(row);
|
|
}
|
|
|
|
go->fOrderByQueue.pop();
|
|
}
|
|
}
|
|
|
|
uint8_t* GroupConcatOrderBy::getResultImpl(const string& sep)
|
|
{
|
|
ostringstream oss;
|
|
bool addSep = false;
|
|
|
|
// need to reverse the order
|
|
stack<OrderByRow> rowStack;
|
|
while (fOrderByQueue.size() > 0)
|
|
{
|
|
rowStack.push(fOrderByQueue.top());
|
|
fOrderByQueue.pop();
|
|
}
|
|
|
|
size_t prevResultSize = 0;
|
|
size_t rowsProcessed = 0;
|
|
bool isNull = true;
|
|
while (rowStack.size() > 0)
|
|
{
|
|
if (addSep)
|
|
oss << sep;
|
|
else
|
|
addSep = true;
|
|
|
|
const OrderByRow& topRow = rowStack.top();
|
|
fRow0.setData(topRow.fData);
|
|
outputRow(oss, fRow0);
|
|
isNull = false;
|
|
rowStack.pop();
|
|
if (rowsProcessed >= fRowsPerRG)
|
|
{
|
|
size_t sizeDiff = oss.str().size() - prevResultSize;
|
|
prevResultSize = oss.str().size();
|
|
if (!fRm->getMemory(sizeDiff, fSessionMemLimit))
|
|
{
|
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
|
|
throw IDBExcept(fErrorCode);
|
|
}
|
|
fMemSize += sizeDiff;
|
|
rowsProcessed = 0;
|
|
}
|
|
}
|
|
|
|
return swapStreamWithStringAndReturnBuf(oss, isNull);
|
|
}
|
|
|
|
uint8_t* GroupConcator::swapStreamWithStringAndReturnBuf(ostringstream& oss, bool isNull)
|
|
{
|
|
if (isNull) {
|
|
outputBuf_.reset();
|
|
return nullptr;
|
|
}
|
|
int64_t resultSize = oss.str().size();
|
|
oss << '\0' << '\0';
|
|
outputBuf_.reset(new std::string(std::move(*oss.rdbuf()).str()));
|
|
|
|
if (resultSize >= fGroupConcatLen + 1)
|
|
{
|
|
(*outputBuf_)[fGroupConcatLen] = '\0';
|
|
}
|
|
if (resultSize >= fGroupConcatLen + 2)
|
|
{
|
|
(*outputBuf_)[fGroupConcatLen + 1] = '\0';
|
|
}
|
|
|
|
return reinterpret_cast<uint8_t*>(outputBuf_->data());
|
|
}
|
|
|
|
uint8_t* GroupConcator::getResult(const string& sep)
|
|
{
|
|
return getResultImpl(sep);
|
|
}
|
|
|
|
const string GroupConcatOrderBy::toString() const
|
|
{
|
|
string baseStr = GroupConcator::toString();
|
|
|
|
ostringstream oss;
|
|
oss << "OrderBy cols: ";
|
|
vector<IdbSortSpec>::const_iterator i = fOrderByCond.begin();
|
|
|
|
for (; i != fOrderByCond.end(); i++)
|
|
oss << "(" << i->fIndex << "," << ((i->fAsc) ? "Asc" : "Desc") << ","
|
|
<< ((i->fNf) ? "null first" : "null last") << ") ";
|
|
|
|
if (fDistinct)
|
|
oss << endl << " distinct";
|
|
|
|
oss << endl;
|
|
|
|
return (baseStr + oss.str());
|
|
}
|
|
|
|
// GroupConcatNoOrder class implementation
|
|
GroupConcatNoOrder::GroupConcatNoOrder()
|
|
: fRowsPerRG(128), fErrorCode(ERR_AGGREGATION_TOO_BIG), fMemSize(0), fRm(NULL)
|
|
{
|
|
}
|
|
|
|
GroupConcatNoOrder::~GroupConcatNoOrder()
|
|
{
|
|
if (fRm)
|
|
fRm->returnMemory(fMemSize, fSessionMemLimit);
|
|
}
|
|
|
|
void GroupConcatNoOrder::initialize(const rowgroup::SP_GroupConcat& gcc)
|
|
{
|
|
GroupConcator::initialize(gcc);
|
|
|
|
fRowGroup = gcc->fRowGroup;
|
|
fRowsPerRG = 128;
|
|
fErrorCode = ERR_AGGREGATION_TOO_BIG;
|
|
fRm = gcc->fRm;
|
|
fSessionMemLimit = gcc->fSessionMemLimit;
|
|
|
|
vector<std::pair<uint32_t, uint32_t> >::iterator i = gcc->fGroupCols.begin();
|
|
|
|
while (i != gcc->fGroupCols.end())
|
|
fConcatColumns.push_back((*(i++)).second);
|
|
|
|
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
|
|
|
|
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
|
{
|
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
|
|
throw IDBExcept(fErrorCode);
|
|
}
|
|
|
|
fMemSize += newSize;
|
|
|
|
fData.reinit(fRowGroup, fRowsPerRG);
|
|
fRowGroup.setData(&fData);
|
|
fRowGroup.resetRowGroup(0);
|
|
fRowGroup.initRow(&fRow);
|
|
fRowGroup.getRow(0, &fRow);
|
|
}
|
|
|
|
void GroupConcatNoOrder::processRow(const rowgroup::Row& row)
|
|
{
|
|
// if the row count is less than the limit
|
|
if (fCurrentLength < fGroupConcatLen && concatColIsNull(row) == false)
|
|
{
|
|
copyRow(row, &fRow);
|
|
|
|
// the RID is no meaning here, use it to store the estimated length.
|
|
int16_t estLen = lengthEstimate(fRow);
|
|
fRow.setRid(estLen);
|
|
fCurrentLength += estLen;
|
|
fRowGroup.incRowCount();
|
|
fRow.nextRow();
|
|
|
|
if (fRowGroup.getRowCount() >= fRowsPerRG)
|
|
{
|
|
// A "postfix" but accurate RAM accounting that sums up sizes of RGDatas.
|
|
uint64_t newSize = fRowGroup.getSizeWithStrings();
|
|
|
|
if (!fRm->getMemory(newSize, fSessionMemLimit))
|
|
{
|
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
|
|
throw IDBExcept(fErrorCode);
|
|
}
|
|
fMemSize += newSize;
|
|
|
|
fDataQueue.push(fData);
|
|
fData.reinit(fRowGroup, fRowsPerRG);
|
|
fRowGroup.setData(&fData);
|
|
fRowGroup.resetRowGroup(0);
|
|
fRowGroup.getRow(0, &fRow);
|
|
}
|
|
}
|
|
}
|
|
|
|
void GroupConcatNoOrder::merge(GroupConcator* gc)
|
|
{
|
|
GroupConcatNoOrder* in = dynamic_cast<GroupConcatNoOrder*>(gc);
|
|
|
|
while (in->fDataQueue.size() > 0)
|
|
{
|
|
fDataQueue.push(in->fDataQueue.front());
|
|
in->fDataQueue.pop();
|
|
}
|
|
|
|
fDataQueue.push(in->fData);
|
|
fMemSize += in->fMemSize;
|
|
in->fMemSize = 0;
|
|
}
|
|
|
|
uint8_t* GroupConcatNoOrder::getResultImpl(const string& sep)
|
|
{
|
|
ostringstream oss;
|
|
bool addSep = false;
|
|
|
|
fDataQueue.push(fData);
|
|
size_t prevResultSize = 0;
|
|
|
|
bool isNull = true;
|
|
while (fDataQueue.size() > 0)
|
|
{
|
|
fRowGroup.setData(&fDataQueue.front());
|
|
fRowGroup.getRow(0, &fRow);
|
|
|
|
for (uint64_t i = 0; i < fRowGroup.getRowCount(); i++)
|
|
{
|
|
if (addSep)
|
|
oss << sep;
|
|
else
|
|
addSep = true;
|
|
|
|
outputRow(oss, fRow);
|
|
isNull = false;
|
|
fRow.nextRow();
|
|
}
|
|
size_t sizeDiff = oss.str().size() - prevResultSize;
|
|
prevResultSize = oss.str().size();
|
|
if (!fRm->getMemory(sizeDiff, fSessionMemLimit))
|
|
{
|
|
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
|
|
throw IDBExcept(fErrorCode);
|
|
}
|
|
fMemSize += sizeDiff;
|
|
fDataQueue.pop();
|
|
}
|
|
|
|
return swapStreamWithStringAndReturnBuf(oss, isNull);
|
|
}
|
|
|
|
const string GroupConcatNoOrder::toString() const
|
|
{
|
|
return GroupConcator::toString();
|
|
}
|
|
|
|
} // namespace joblist
|