1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/groupconcat.cpp

1598 lines
39 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2019-2023 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: groupconcat.cpp 9705 2013-07-17 20:06:07Z pleblanc $
#include <algorithm>
#include <iostream>
// #define NDEBUG
#include <cassert>
#include <string>
#include "windowfunction/idborderby.h"
using namespace std;
#include "errorids.h"
using namespace logging;
#include "returnedcolumn.h"
#include "aggregatecolumn.h"
#include "arithmeticcolumn.h"
#include "functioncolumn.h"
#include "constantcolumn.h"
#include "rowcolumn.h"
#include "groupconcatcolumn.h"
#include "calpontsystemcatalog.h"
using namespace execplan;
#include "rowgroup.h"
#include "rowaggregation.h"
using namespace rowgroup;
#include "dataconvert.h"
using namespace dataconvert;
#include "groupconcat.h"
using namespace ordering;
#include "jobstep.h"
#include "jlf_common.h"
#include "mcs_decimal.h"
#include "utils/json/json.hpp"
using namespace nlohmann;
namespace joblist
{
// GroupConcatInfo class implementation
GroupConcatInfo::GroupConcatInfo() = default;
GroupConcatInfo::~GroupConcatInfo() = default;
void GroupConcatInfo::prepGroupConcat(JobInfo& jobInfo)
{
for (const auto& gccol : jobInfo.groupConcatCols)
{
auto* gcc = dynamic_cast<GroupConcatColumn*>(gccol.get());
const auto* rcp = dynamic_cast<const RowColumn*>(gcc->aggParms()[0].get());
SP_GroupConcat groupConcat(new GroupConcat(jobInfo.rm, jobInfo.umMemLimit));
groupConcat->fSeparator = gcc->separator();
groupConcat->fDistinct = gcc->distinct();
groupConcat->fSize = gcc->resultType().colWidth;
groupConcat->fTimeZone = jobInfo.timeZone;
int key = -1;
const vector<SRCP>& cols = rcp->columnVec();
for (uint64_t j = 0, k = 0; j < cols.size(); j++)
{
const auto* cc = dynamic_cast<const ConstantColumn*>(cols[j].get());
if (cc == nullptr)
{
key = getColumnKey(cols[j], jobInfo);
fColumns.insert(key);
groupConcat->fGroupCols.emplace_back(key, k++);
}
else
{
groupConcat->fConstCols.emplace_back(cc->constval(), j);
}
}
vector<SRCP>& orderCols = gcc->orderCols();
for (const auto& orderCol : orderCols)
{
if (dynamic_cast<const ConstantColumn*>(orderCol.get()) != nullptr)
continue;
key = getColumnKey(orderCol, jobInfo);
fColumns.insert(key);
groupConcat->fOrderCols.emplace_back(key, orderCol->asc());
}
groupConcat->id = fGroupConcat.size();
fGroupConcat.emplace_back(std::move(groupConcat));
}
// Rare case: all columns in group_concat are constant columns, use a column in column map.
if (!jobInfo.groupConcatCols.empty() && fColumns.empty())
{
int key = -1;
for (auto i = jobInfo.tableList.begin(); i != jobInfo.tableList.end() && key == -1; ++i)
{
if (!jobInfo.columnMap[*i].empty())
{
key = *(jobInfo.columnMap[*i].begin());
}
}
if (key != -1)
{
fColumns.insert(key);
}
else
{
throw runtime_error("Empty column map.");
}
}
}
uint32_t GroupConcatInfo::getColumnKey(const SRCP& srcp, JobInfo& jobInfo) const
{
int colKey = -1;
const auto* sc = dynamic_cast<const SimpleColumn*>(srcp.get());
if (sc != nullptr)
{
if (sc->schemaName().empty())
{
// bug3839, handle columns from subquery.
SimpleColumn tmp(*sc, jobInfo.sessionId);
tmp.oid(tableOid(sc, jobInfo.csc) + 1 + sc->colPosition());
colKey = getTupleKey(jobInfo, &tmp);
}
else
{
colKey = getTupleKey(jobInfo, sc);
}
// check if this is a dictionary column
if (jobInfo.keyInfo->dictKeyMap.find(colKey) != jobInfo.keyInfo->dictKeyMap.end())
colKey = jobInfo.keyInfo->dictKeyMap[colKey];
}
else
{
const auto* ac = dynamic_cast<const ArithmeticColumn*>(srcp.get());
const auto* fc = dynamic_cast<const FunctionColumn*>(srcp.get());
if (ac != nullptr || fc != nullptr)
{
colKey = getExpTupleKey(jobInfo, srcp->expressionId());
}
else
{
cerr << "Unsupported GROUP_CONCAT/JSON_ARRAYAGG column. " << srcp->toString() << endl;
throw runtime_error("Unsupported GROUP_CONCAT/JSON_ARRAYAGG column.");
}
}
return colKey;
}
void GroupConcatInfo::mapColumns(const RowGroup& projRG)
{
map<uint32_t, uint32_t> projColumnMap;
const vector<uint32_t>& keysProj = projRG.getKeys();
for (uint64_t i = 0; i < projRG.getColumnCount(); i++)
projColumnMap[keysProj[i]] = i;
for (auto k = fGroupConcat.begin(); k != fGroupConcat.end(); k++)
{
vector<uint32_t> pos;
vector<uint32_t> oids;
vector<uint32_t> keys;
vector<uint32_t> scale;
vector<uint32_t> precision;
vector<CalpontSystemCatalog::ColDataType> types;
vector<uint32_t> csNums;
pos.push_back(2);
auto i1 = (*k)->fGroupCols.begin();
while (i1 != (*k)->fGroupCols.end())
{
map<uint32_t, uint32_t>::iterator j = projColumnMap.find(i1->first);
if (j == projColumnMap.end())
{
cerr << "Concat/ArrayAgg Key:" << i1->first << " is not projected." << endl;
throw runtime_error("Project error.");
}
pos.push_back(pos.back() + projRG.getColumnWidth(j->second));
oids.push_back(projRG.getOIDs()[j->second]);
keys.push_back(projRG.getKeys()[j->second]);
types.push_back(projRG.getColTypes()[j->second]);
csNums.push_back(projRG.getCharsetNumber(j->second));
scale.push_back(projRG.getScale()[j->second]);
precision.push_back(projRG.getPrecision()[j->second]);
++i1;
}
auto i2 = (*k)->fOrderCols.begin();
while (i2 != (*k)->fOrderCols.end())
{
auto j = projColumnMap.find(i2->first);
if (j == projColumnMap.end())
{
cerr << "Order Key:" << i2->first << " is not projected." << endl;
throw runtime_error("Project error.");
}
vector<uint32_t>::iterator i3 = find(keys.begin(), keys.end(), j->first);
int idx = 0;
if (i3 == keys.end())
{
idx = keys.size();
pos.push_back(pos.back() + projRG.getColumnWidth(j->second));
oids.push_back(projRG.getOIDs()[j->second]);
keys.push_back(projRG.getKeys()[j->second]);
types.push_back(projRG.getColTypes()[j->second]);
csNums.push_back(projRG.getCharsetNumber(j->second));
scale.push_back(projRG.getScale()[j->second]);
precision.push_back(projRG.getPrecision()[j->second]);
}
else
{
idx = std::distance(keys.begin(), i3);
}
(*k)->fOrderCond.push_back(make_pair(idx, i2->second));
++i2;
}
(*k)->fRowGroup = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision,
projRG.getStringTableThreshold(), false);
// MCOL-5429/MCOL-5491 Use stringstore if the datatype of the groupconcat/json_arrayagg
// field is a long string.
if ((*k)->fRowGroup.hasLongString())
{
(*k)->fRowGroup.setUseStringTable(true);
}
(*k)->fMapping = makeMapping(projRG, (*k)->fRowGroup);
}
}
std::shared_ptr<int[]> GroupConcatInfo::makeMapping(const RowGroup& in, const RowGroup& out) const
{
// For some reason using the rowgroup mapping fcns don't work completely right in this class
std::shared_ptr<int[]> mapping(new int[out.getColumnCount()]);
for (uint64_t i = 0; i < out.getColumnCount(); i++)
{
for (uint64_t j = 0; j < in.getColumnCount(); j++)
{
if ((out.getKeys()[i] == in.getKeys()[j]))
{
mapping[i] = j;
break;
}
}
}
return mapping;
}
const string GroupConcatInfo::toString() const
{
ostringstream oss;
oss << "GroupConcatInfo: toString() to be implemented.";
oss << endl;
return oss.str();
}
GroupConcatAg::GroupConcatAg(rowgroup::SP_GroupConcat& gcc, bool isJsonArrayAgg)
: fGroupConcat(gcc), fIsJsonArrayAgg(isJsonArrayAgg)
{
initialize();
}
GroupConcatAg::~GroupConcatAg() = default;
void GroupConcatAg::initialize()
{
if (fGroupConcat->fDistinct || fGroupConcat->fOrderCols.size() > 0)
fConcator.reset(new GroupConcatOrderBy(fIsJsonArrayAgg));
else
fConcator.reset(new GroupConcatNoOrder(fIsJsonArrayAgg));
fConcator->initialize(fGroupConcat);
// MCOL-5429/MCOL-5491 Use stringstore if the datatype of the group_concat/json_arrayagg
// field is a long string.
if (fGroupConcat->fRowGroup.hasLongString())
{
fRowGroup = fGroupConcat->fRowGroup;
fRowGroup.setUseStringTable(true);
fRowGroup.setUseOnlyLongString(true);
fRowRGData.reinit(fRowGroup, 1);
fRowGroup.setData(&fRowRGData);
fRowGroup.resetRowGroup(0);
fRowGroup.initRow(&fRow);
fRowGroup.getRow(0, &fRow);
fMemSize = fRowGroup.getSizeWithStrings(1);
}
else
{
fGroupConcat->fRowGroup.initRow(&fRow, true);
fData.reset(new uint8_t[fRow.getSize()]);
fRow.setData(rowgroup::Row::Pointer(fData.get()));
fMemSize = fRow.getSize();
}
}
void GroupConcatAg::processRow(const rowgroup::Row& inRow)
{
applyMapping(fGroupConcat->fMapping, inRow);
fConcator->processRow(fRow);
}
void GroupConcatAg::merge(const rowgroup::Row& inRow, uint64_t i)
{
auto* gccAg = dynamic_cast<joblist::GroupConcatAg*>(inRow.getAggregateData(i));
fConcator->merge(gccAg->concator().get());
}
uint8_t* GroupConcatAg::getResult()
{
return fConcator->getResult(fGroupConcat->fSeparator);
}
void GroupConcatAg::serialize(messageqcpp::ByteStream& bs) const
{
bs << (uint8_t)fIsJsonArrayAgg;
fGroupConcat->serialize(bs);
fConcator->serialize(bs);
if (fRowGroup.hasLongString())
{
bs << uint8_t(1);
fRowRGData.serialize(bs, fRowGroup.getDataSize(1));
}
else
{
bs << uint8_t(0);
bs.append(fData.get(), fRow.getSize());
}
}
void GroupConcatAg::deserialize(messageqcpp::ByteStream& bs)
{
uint8_t tmp8;
bs >> tmp8;
fIsJsonArrayAgg = tmp8;
fGroupConcat->deserialize(bs);
if (fGroupConcat->fDistinct || !fGroupConcat->fOrderCols.empty())
{
fConcator.reset(new GroupConcatOrderBy(fIsJsonArrayAgg));
}
else
{
fConcator.reset(new GroupConcatNoOrder(fIsJsonArrayAgg));
}
fConcator->initialize(fGroupConcat);
fConcator->deserialize(bs);
bs >> tmp8;
if (tmp8)
{
fRowRGData.deserialize(bs, fRow.getSize());
fRowGroup.setData(&fRowRGData);
fRowGroup.initRow(&fRow);
}
else
{
RGDataSizeType size;
bs >> size;
fData.reset(new uint8_t[size]);
memcpy(fData.get(), bs.buf(), size);
bs.advance(size);
fRow.setData(rowgroup::Row::Pointer(fData.get()));
}
}
rowgroup::RGDataSizeType GroupConcatAg::getDataSize() const
{
return fMemSize + fConcator->getDataSize();
}
void GroupConcatAg::applyMapping(const std::shared_ptr<int[]>& mapping, const Row& row)
{
// For some reason the rowgroup mapping fcns don't work right in this class.
for (uint64_t i = 0; i < fRow.getColumnCount(); i++)
{
if (fRow.getColumnWidth(i) > datatypes::MAXLEGACYWIDTH)
{
if (fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::CHAR ||
fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::VARCHAR ||
fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::TEXT)
{
// TODO: free previous string if it is in the StringStorage
fRow.setStringField(row.getConstString(mapping[i]), i);
}
else if (fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::LONGDOUBLE)
{
fRow.setLongDoubleField(row.getLongDoubleField(mapping[i]), i);
}
else if (datatypes::isWideDecimalType(fRow.getColType(i), fRow.getColumnWidth(i)))
{
row.copyBinaryField(fRow, i, mapping[i]);
}
}
else
{
if (fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::CHAR ||
fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::VARCHAR)
{
fRow.setIntField(row.getUintField(mapping[i]), i);
}
else
{
fRow.setIntField(row.getIntField(mapping[i]), i);
}
}
}
}
// GroupConcator class implementation
void GroupConcator::initialize(const rowgroup::SP_GroupConcat& gcc)
{
// MCOL-901 This value comes from the Server and it is
// too high(1MB or 3MB by default) to allocate it for every instance.
fGroupConcatLen = gcc->fSize;
size_t sepSize = gcc->fSeparator.size();
fCurrentLength -= sepSize; // XXX Yet I have to find out why spearator has c_str() as nullptr here.
fTimeZone = gcc->fTimeZone;
fConstCols = gcc->fConstCols;
fConstantLen = sepSize;
fRm = gcc->fRm;
fSessionMemLimit = gcc->fSessionMemLimit;
for (const auto& str : gcc->fConstCols)
{
fConstantLen += str.first.length();
}
}
void GroupConcator::outputRow(std::ostringstream& oss, const rowgroup::Row& row)
{
const CalpontSystemCatalog::ColDataType* types = row.getColTypes();
vector<uint32_t>::iterator i = fConcatColumns.begin();
auto j = fConstCols.begin();
uint64_t groupColCount = fConcatColumns.size() + fConstCols.size();
for (uint64_t k = 0; k < groupColCount; k++)
{
if (j != fConstCols.end() && k == j->second)
{
oss << j->first.safeString();
j++;
continue;
}
switch (types[*i])
{
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::SMALLINT:
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
{
if (fIsJsonArrayAgg && row.isNullValue(*i))
{
oss << "null";
}
else
{
int64_t intVal = row.getIntField(*i);
oss << intVal;
}
break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
if (fIsJsonArrayAgg && row.isNullValue(*i))
{
oss << "null";
}
else
{
oss << fixed << row.getDecimalField(*i);
}
break;
}
case CalpontSystemCatalog::UTINYINT:
case CalpontSystemCatalog::USMALLINT:
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::UBIGINT:
{
if (fIsJsonArrayAgg && row.isNullValue(*i))
{
oss << "null";
}
else
{
uint64_t uintVal = row.getUintField(*i);
int scale = (int)row.getScale(*i);
if (scale == 0)
{
oss << uintVal;
}
else
{
oss << fixed
<< datatypes::Decimal(datatypes::TSInt128((int128_t)uintVal), scale,
datatypes::INT128MAXPRECISION);
}
}
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::TEXT:
{
if (fIsJsonArrayAgg)
{
auto maybeJson =
row.getStringField(*i).safeString("null"); // XXX: MULL??? it is not checked anywhere.
const auto j = json::parse(maybeJson, nullptr, false);
if (j.is_discarded())
{
oss << std::quoted(maybeJson.c_str());
}
else
{
oss << maybeJson.c_str();
}
}
else
{
oss << row.getStringField(*i).str();
}
break;
}
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
{
if (fIsJsonArrayAgg && row.isNullValue(*i))
{
oss << "null";
}
else
{
oss << setprecision(15) << row.getDoubleField(*i);
}
break;
}
case CalpontSystemCatalog::LONGDOUBLE:
{
if (fIsJsonArrayAgg && row.isNullValue(*i))
{
oss << "null";
}
else
{
oss << setprecision(15) << row.getLongDoubleField(*i);
}
break;
}
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
if (fIsJsonArrayAgg && row.isNullValue(*i))
{
oss << "null";
}
else
{
oss << row.getFloatField(*i);
}
break;
}
case CalpontSystemCatalog::DATE:
{
if (fIsJsonArrayAgg)
{
if (row.isNullValue(*i))
{
oss << "null";
}
else
{
oss << std::quoted(DataConvert::dateToString(row.getUintField(*i)));
}
}
else
{
oss << DataConvert::dateToString(row.getUintField(*i));
}
break;
}
case CalpontSystemCatalog::DATETIME:
{
if (fIsJsonArrayAgg)
{
if (row.isNullValue(*i))
{
oss << "null";
}
else
{
oss << std::quoted(DataConvert::datetimeToString(row.getUintField(*i)));
}
}
else
{
oss << DataConvert::datetimeToString(row.getUintField(*i));
}
break;
}
case CalpontSystemCatalog::TIMESTAMP:
{
if (fIsJsonArrayAgg)
{
if (row.isNullValue(*i))
{
oss << "null";
}
else
{
oss << std::quoted(DataConvert::timestampToString(row.getUintField(*i), fTimeZone));
}
}
else
{
oss << DataConvert::timestampToString(row.getUintField(*i), fTimeZone);
}
break;
}
case CalpontSystemCatalog::TIME:
{
if (fIsJsonArrayAgg)
{
if (row.isNullValue(*i))
{
oss << "null";
}
else
{
oss << std::quoted(DataConvert::timeToString(row.getUintField(*i)));
}
}
else
{
oss << DataConvert::timeToString(row.getUintField(*i));
}
break;
}
default:
{
break;
}
}
++i;
}
}
bool GroupConcator::concatColIsNull(const rowgroup::Row& row)
{
return std::any_of(fConcatColumns.cbegin(), fConcatColumns.cend(),
[&row](uint32_t idx) { return row.isNullValue(idx); });
}
int64_t GroupConcator::lengthEstimate(const rowgroup::Row& row)
{
int64_t rowLen = fConstantLen; // fixed constant and separator length
const CalpontSystemCatalog::ColDataType* types = row.getColTypes();
// null values are already skipped.
for (vector<uint32_t>::iterator i = fConcatColumns.begin(); i != fConcatColumns.end(); i++)
{
if (row.isNullValue(*i))
continue;
int64_t fieldLen = 0;
switch (types[*i])
{
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::SMALLINT:
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
{
int64_t v = row.getIntField(*i);
if (v < 0)
fieldLen++;
while ((v /= 10) != 0)
fieldLen++;
fieldLen += 1;
break;
}
case CalpontSystemCatalog::UTINYINT:
case CalpontSystemCatalog::USMALLINT:
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::UBIGINT:
{
uint64_t v = row.getUintField(*i);
while ((v /= 10) != 0)
fieldLen++;
fieldLen += 1;
break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
fieldLen += 1;
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::TEXT:
{
fieldLen += row.getConstString(*i).length();
break;
}
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
case CalpontSystemCatalog::LONGDOUBLE:
{
fieldLen = 1; // minimum length
break;
}
case CalpontSystemCatalog::DATE:
{
fieldLen = 10; // YYYY-MM-DD
break;
}
case CalpontSystemCatalog::DATETIME:
case CalpontSystemCatalog::TIMESTAMP:
{
fieldLen = 19; // YYYY-MM-DD HH24:MI:SS
// Decimal point and milliseconds
uint64_t colPrecision = row.getPrecision(*i);
if (colPrecision > 0 && colPrecision < 7)
{
fieldLen += colPrecision + 1;
}
break;
}
case CalpontSystemCatalog::TIME:
{
fieldLen = 10; // -HHH:MI:SS
// Decimal point and milliseconds
uint64_t colPrecision = row.getPrecision(*i);
if (colPrecision > 0 && colPrecision < 7)
{
fieldLen += colPrecision + 1;
}
break;
}
default:
{
break;
}
}
rowLen += fieldLen;
}
return rowLen;
}
const string GroupConcator::toString() const
{
ostringstream oss;
oss << "GroupConcat size-" << fGroupConcatLen;
oss << "Concat cols: ";
vector<uint32_t>::const_iterator i = fConcatColumns.begin();
auto j = fConstCols.begin();
uint64_t groupColCount = fConcatColumns.size() + fConstCols.size();
for (uint64_t k = 0; k < groupColCount; k++)
{
if (j != fConstCols.end() && k == j->second)
{
oss << 'c' << " ";
j++;
}
else
{
oss << (*i) << " ";
i++;
}
}
oss << endl;
return oss.str();
}
void GroupConcator::serialize(messageqcpp::ByteStream& bs) const
{
messageqcpp::serializeInlineVector(bs, fConcatColumns);
RGDataSizeType size = fConstCols.size();
bs << size;
for (const auto& [k, v] : fConstCols)
{
bs << k;
bs << v;
}
bs << fCurrentLength;
bs << fGroupConcatLen;
bs << fConstantLen;
bs << fTimeZone;
}
void GroupConcator::deserialize(messageqcpp::ByteStream& bs)
{
fConstCols.clear();
messageqcpp::deserializeInlineVector(bs, fConcatColumns);
RGDataSizeType size;
bs >> size;
fConstCols.reserve(size);
for (RGDataSizeType i = 0; i < size; i++)
{
NullString f;
bs >> f;
uint32_t s;
bs >> s;
fConstCols.emplace_back(f, s);
}
bs >> fCurrentLength;
bs >> fGroupConcatLen;
bs >> fConstantLen;
bs >> fTimeZone;
}
class GroupConcatOrderByRow
{
public:
GroupConcatOrderByRow(const rowgroup::Row& r, uint64_t rowIdx, ordering::CompareRule& c)
: fData(r.getPointer()), fIdx(rowIdx), fRule(&c)
{
}
bool operator<(const GroupConcatOrderByRow& rhs) const
{
return fRule->less(fData, rhs.fData);
}
rowgroup::Row::Pointer fData;
uint64_t fIdx;
ordering::CompareRule* fRule;
};
class GroupConcatOrderBy::SortingPQ
: public priority_queue<GroupConcatOrderByRow, vector<GroupConcatOrderByRow>, less<GroupConcatOrderByRow>>
{
public:
using BaseType =
std::priority_queue<GroupConcatOrderByRow, vector<GroupConcatOrderByRow>, less<GroupConcatOrderByRow>>;
using size_type = BaseType::size_type;
SortingPQ(size_type capacity) : BaseType()
{
reserve(capacity);
}
SortingPQ(const container_type& v) : BaseType(less<GroupConcatOrderByRow>(), v)
{
}
void reserve(size_type capacity)
{
this->c.reserve(capacity);
}
size_type capacity() const
{
return this->c.capacity();
}
container_type::const_iterator begin() const
{
return this->c.begin();
}
container_type::const_iterator end() const
{
return this->c.end();
}
using BaseType::empty;
using BaseType::pop;
using BaseType::push;
using BaseType::size;
using BaseType::top;
};
// GroupConcatOrderBy class implementation
GroupConcatOrderBy::GroupConcatOrderBy(bool isJsonArrayAgg) : GroupConcator(isJsonArrayAgg)
{
fRule.fIdbCompare = this;
}
GroupConcatOrderBy::~GroupConcatOrderBy()
{
// delete compare objects
for (auto& compare : fRule.fCompares)
{
delete compare;
compare = nullptr;
}
fRule.fCompares.clear();
}
void GroupConcatOrderBy::initialize(const rowgroup::SP_GroupConcat& gcc)
{
gcc->fRowGroup.setUseOnlyLongString(true);
ordering::IdbCompare::initialize(gcc->fRowGroup);
GroupConcator::initialize(gcc);
fOrderByCond.resize(0);
fOrderByCond.reserve(gcc->fOrderCond.size());
for (const auto& [idx, asc] : gcc->fOrderCond)
{
fOrderByCond.emplace_back(idx, asc);
}
fDistinct = gcc->fDistinct;
fConcatColumns.reserve(fConcatColumns.size() + gcc->fGroupCols.size());
for (auto& x : gcc->fGroupCols)
{
fConcatColumns.emplace_back(x.second);
}
auto size = fRowGroup.getSizeWithStrings(fRowsPerRG);
fMemSize += size;
RGDataUnPtr rgdata(new RGData(fRowGroup, fRowsPerRG));
fRowGroup.setData(rgdata.get());
fRowGroup.resetRowGroup((0));
fRowGroup.initRow(&fRow0);
fRowGroup.getRow(0, &fRow0);
fDataVec.emplace_back(std::move(rgdata));
fRule.compileRules(fOrderByCond, fRowGroup);
fRowGroup.initRow(&row1);
fRowGroup.initRow(&row2);
if (fDistinct)
{
fDistinctMap.reset(new DistinctMap(10, Hasher(this, getKeyLength()), Eq(this, getKeyLength())));
}
fOrderByQueue.reset(new SortingPQ(10));
}
uint64_t GroupConcatOrderBy::getKeyLength() const
{
// only distinct the concatenated columns
return fConcatColumns.size(); // cols 0 to fConcatColumns.size() - 1 will be compared
}
void GroupConcatOrderBy::serialize(messageqcpp::ByteStream& bs) const
{
GroupConcator::serialize(bs);
uint64_t sz = fOrderByCond.size();
bs << sz;
for (const auto& obcond : fOrderByCond)
{
bs << obcond.fIndex;
bs << obcond.fAsc;
bs << obcond.fNf;
}
sz = fDataVec.size();
bs << sz;
for (const auto& rgdata : fDataVec)
{
rgdata->serialize(bs, fRowGroup.getDataSize(fRowsPerRG));
}
bs << uint8_t(fDistinct);
if (fDistinct)
{
sz = fDistinctMap->size();
bs << sz;
for (const auto& idx : *fDistinctMap)
{
bs << idx.second;
}
}
sz = fOrderByQueue->size();
bs << sz;
for (const auto& obq : *fOrderByQueue)
{
bs << obq.fIdx;
}
}
void GroupConcatOrderBy::deserialize(messageqcpp::ByteStream& bs)
{
GroupConcator::deserialize(bs);
fMemSize = 0;
uint64_t sz;
bs >> sz;
fOrderByCond.resize(sz);
uint8_t tmp8;
for (uint8_t i = 0; i < sz; ++i)
{
bs >> fOrderByCond[i].fIndex;
bs >> fOrderByCond[i].fAsc;
bs >> fOrderByCond[i].fNf;
}
bs >> sz;
fDataVec.resize(sz);
if (sz > 0)
{
for (uint64_t i = 0; i < sz; ++i)
{
fDataVec[i].reset(new rowgroup::RGData(fRowGroup, fRowsPerRG));
fDataVec[i]->deserialize(bs, fRowGroup.getDataSize(fRowsPerRG));
fRowGroup.setData(fDataVec[i].get());
auto rgsize = fRowGroup.getSizeWithStrings(fRowsPerRG);
fMemSize += rgsize;
}
fRowGroup.initRow(&fRow0);
fRowGroup.getRow(fRowGroup.getRowCount() - 1, &fRow0);
}
else
{
createNewRGData();
}
fRule.fIdbCompare = this;
for (auto& compare : fRule.fCompares)
{
delete compare;
compare = nullptr;
}
fRule.fCompares.clear();
fRule.compileRules(fOrderByCond, fRowGroup);
fRowGroup.initRow(&row1);
fRowGroup.initRow(&row2);
bs >> tmp8;
fDistinct = tmp8;
if (fDistinct)
{
bs >> sz;
fDistinctMap.reset(new DistinctMap(sz, Hasher(this, getKeyLength()), Eq(this, getKeyLength())));
for (uint64_t i = 0; i < sz; ++i)
{
uint64_t idx;
bs >> idx;
auto [gid, rid] = rowIdxToGidRid(idx, fRowsPerRG);
rowgroup::Row row;
fRowGroup.setData(fDataVec[gid].get());
fRowGroup.initRow(&row);
fRowGroup.getRow(rid, &row);
fDistinctMap->emplace(row.getPointer(), idx);
}
}
bs >> sz;
fOrderByQueue.reset(new SortingPQ(sz));
for (uint64_t i = 0; i < sz; ++i)
{
uint64_t idx;
bs >> idx;
auto [gid, rid] = rowIdxToGidRid(idx, fRowsPerRG);
rowgroup::Row row;
fRowGroup.setData(fDataVec[gid].get());
fRowGroup.initRow(&row);
fRowGroup.getRow(rid, &row);
fOrderByQueue->push(GroupConcatOrderByRow(row, idx, fRule));
}
fRowGroup.setData(fDataVec.back().get());
fRowGroup.getRow(fRowGroup.getRowCount() - 1, &fRow0);
}
void GroupConcatOrderBy::createNewRGData()
{
auto newSize = fRowGroup.getSizeWithStrings(fRowsPerRG);
fMemSize += newSize;
fDataVec.emplace_back(make_unique<rowgroup::RGData>(fRowGroup, fRowsPerRG));
fRowGroup.setData(fDataVec.back().get());
fRowGroup.setUseOnlyLongString(true);
fRowGroup.resetRowGroup(0);
fRowGroup.initRow(&fRow0);
fRowGroup.getRow(0, &fRow0);
}
rowgroup::RGDataSizeType GroupConcatOrderBy::getDataSize() const
{
return fMemSize + fOrderByQueue->capacity() * sizeof(GroupConcatOrderByRow) +
(fDistinct ? fDistinctMap->size() : 0) * 32 /* TODO: speculative unordered_map memory consumption per
item, replace it with counting allocator */
;
}
void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
{
// check if this is a distinct row
if (fDistinct && fDistinctMap->find(row.getPointer()) != fDistinctMap->end())
return;
// this row is skipped if any concatenated column is null.
if (!fIsJsonArrayAgg && concatColIsNull(row))
return;
// if the row count is less than the limit
if (fCurrentLength < fGroupConcatLen)
{
copyRow(row, &fRow0);
// the RID is no meaning here, use it to store the estimated length.
int16_t estLen = lengthEstimate(fRow0);
fRow0.setRid(estLen);
fRowGroup.incRowCount();
GroupConcatOrderByRow newRow(fRow0, getCurrentRowIdx(), fRule);
fOrderByQueue->push(newRow);
fCurrentLength += estLen;
// add to the distinct map
if (fDistinct)
fDistinctMap->emplace(fRow0.getPointer(), getCurrentRowIdx());
fRow0.nextRow();
if (fRowGroup.getRowCount() >= fRowsPerRG)
{
// A "postfix" but accurate RAM accounting that sums up sizes of RGDatas.
uint64_t newSize = fRowGroup.getSizeWithStrings();
fMemSize += newSize;
rowgroup::RGDataUnPtr rgdata(new rowgroup::RGData(fRowGroup, fRowsPerRG));
fRowGroup.setData(rgdata.get());
fRowGroup.resetRowGroup(0);
fRowGroup.getRow(0, &fRow0);
fDataVec.emplace_back(std::move(rgdata));
}
}
else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue->top().fData))
{
GroupConcatOrderByRow swapRow = fOrderByQueue->top();
fRow1.setData(swapRow.fData);
fOrderByQueue->pop();
fCurrentLength -= fRow1.getRelRid();
fRow2.setData(swapRow.fData);
if (!fDistinct)
{
copyRow(row, &fRow1);
}
else
{
// only the copyRow does useful work here
fDistinctMap->erase(swapRow.fData);
copyRow(row, &fRow2);
fDistinctMap->emplace(swapRow.fData, swapRow.fIdx);
}
int16_t estLen = lengthEstimate(fRow2);
fRow2.setRid(estLen);
fCurrentLength += estLen;
fOrderByQueue->push(swapRow);
}
}
void GroupConcatOrderBy::merge(GroupConcator* gc)
{
GroupConcatOrderBy* go = dynamic_cast<GroupConcatOrderBy*>(gc);
fMemSize += go->fMemSize;
go->fMemSize = 0;
uint32_t shift = fDataVec.size();
auto& rgdatas = go->getRGDatas();
for (auto& rgdata : rgdatas)
{
fDataVec.emplace_back(std::move(rgdata));
}
rgdatas.clear();
auto* orderByQueue = go->getQueue();
while (!orderByQueue->empty())
{
GroupConcatOrderByRow row = orderByQueue->top();
row.fIdx = shiftGroupIdxBy(row.fIdx, shift);
row.fRule = &fRule;
// check if the distinct row already exists
if (fDistinct && fDistinctMap->find(row.fData) != fDistinctMap->end())
{
; // no op;
}
// if the row count is less than the limit
else if (fCurrentLength < fGroupConcatLen)
{
fOrderByQueue->push(row);
row1.setData(row.fData);
fCurrentLength += row1.getRelRid();
// add to the distinct map
if (fDistinct)
fDistinctMap->emplace(row.fData, row.fIdx);
}
else if (fOrderByCond.size() > 0 && fRule.less(row.fData, fOrderByQueue->top().fData))
{
GroupConcatOrderByRow swapRow = fOrderByQueue->top();
row1.setData(swapRow.fData);
fOrderByQueue->pop();
fCurrentLength -= row1.getRelRid();
if (fDistinct)
{
fDistinctMap->erase(swapRow.fData);
fDistinctMap->emplace(row.fData, row.fIdx);
}
row1.setData(row.fData);
fCurrentLength += row1.getRelRid();
fOrderByQueue->push(row);
}
orderByQueue->pop();
}
}
uint8_t* GroupConcatOrderBy::getResultImpl(const string& sep)
{
ostringstream oss;
bool addSep = false;
// need to reverse the order
stack<GroupConcatOrderByRow> rowStack;
while (fOrderByQueue->size() > 0)
{
rowStack.push(fOrderByQueue->top());
fOrderByQueue->pop();
}
size_t prevResultSize = 0;
size_t rowsProcessed = 0;
bool isNull = true;
if (rowStack.size() > 0)
{
if (fIsJsonArrayAgg)
oss << "[";
while (rowStack.size() > 0)
{
if (addSep)
oss << sep;
else
addSep = true;
const GroupConcatOrderByRow& topRow = rowStack.top();
fRow0.setData(topRow.fData);
outputRow(oss, fRow0);
isNull = false;
rowStack.pop();
++rowsProcessed;
if (rowsProcessed >= fRowsPerRG)
{
size_t sizeDiff = oss.str().size() - prevResultSize;
prevResultSize = oss.str().size();
fMemSize += sizeDiff;
rowsProcessed = 0;
}
}
if (fIsJsonArrayAgg)
oss << "]";
}
return swapStreamWithStringAndReturnBuf(oss, isNull);
}
uint8_t* GroupConcator::swapStreamWithStringAndReturnBuf(ostringstream& oss, bool isNull)
{
if (isNull)
{
outputBuf_.reset();
return nullptr;
}
// XXX: what is all this black magic for?
int64_t resultSize = oss.str().size();
oss << '\0' << '\0';
outputBuf_.reset(new std::string(std::move(*oss.rdbuf()).str()));
if (resultSize >= fGroupConcatLen + 1)
{
(*outputBuf_)[fGroupConcatLen] = '\0';
}
if (resultSize >= fGroupConcatLen + 2)
{
(*outputBuf_)[fGroupConcatLen + 1] = '\0';
}
// FIXME: a string_view can be returned here to get rid of strlen() later
return reinterpret_cast<uint8_t*>(outputBuf_->data());
}
uint8_t* GroupConcator::getResult(const string& sep)
{
return getResultImpl(sep);
}
const string GroupConcatOrderBy::toString() const
{
string baseStr = GroupConcator::toString();
ostringstream oss;
oss << "OrderBy cols: ";
vector<IdbSortSpec>::const_iterator i = fOrderByCond.begin();
for (; i != fOrderByCond.end(); i++)
oss << "(" << i->fIndex << "," << ((i->fAsc) ? "Asc" : "Desc") << ","
<< ((i->fNf) ? "null first" : "null last") << ") ";
if (fDistinct)
oss << endl << " distinct";
oss << endl;
return (baseStr + oss.str());
}
uint64_t GroupConcatOrderBy::Hasher::operator()(const rowgroup::Row::Pointer& p) const
{
Row& row = ts->row1;
row.setPointer(p);
return row.hash(colCount - 1);
}
bool GroupConcatOrderBy::Eq::operator()(const rowgroup::Row::Pointer& p1,
const rowgroup::Row::Pointer& p2) const
{
Row& r1 = ts->row1;
Row& r2 = ts->row2;
r1.setPointer(p1);
r2.setPointer(p2);
return r1.equals(r2, colCount - 1);
}
uint64_t GroupConcatOrderBy::getCurrentRowIdx() const
{
return rowGidRidToIdx(fDataVec.size() - 1, fRowGroup.getRowCount() - 1, fRowsPerRG);
}
uint64_t GroupConcatOrderBy::shiftGroupIdxBy(uint64_t idx, uint32_t shift)
{
auto [gid, rid] = rowIdxToGidRid(idx, fRowsPerRG);
return rowGidRidToIdx(gid + shift, rid, fRowsPerRG);
}
// GroupConcatNoOrder class implementation
GroupConcatNoOrder::~GroupConcatNoOrder()
{
}
void GroupConcatNoOrder::initialize(const rowgroup::SP_GroupConcat& gcc)
{
GroupConcator::initialize(gcc);
fRowGroup = gcc->fRowGroup;
fRowGroup.setUseOnlyLongString(true);
fRowsPerRG = 128;
fConcatColumns.reserve(fConcatColumns.size() + gcc->fGroupCols.size());
for (auto& colIdx : gcc->fGroupCols)
{
fConcatColumns.push_back(colIdx.second);
}
createNewRGData();
}
void GroupConcatNoOrder::processRow(const rowgroup::Row& row)
{
// if the row count is less than the limit
if (fCurrentLength < fGroupConcatLen && (fIsJsonArrayAgg || concatColIsNull(row) == false))
{
copyRow(row, &fRow);
// the RID is no meaning here, use it to store the estimated length.
int16_t estLen = lengthEstimate(fRow);
fRow.setRid(estLen);
fCurrentLength += estLen;
fRowGroup.incRowCount();
fRow.nextRow();
auto newSize = fRowGroup.getSizeWithStrings(fRowsPerRG);
if (newSize > fCurMemSize)
{
auto diff = newSize - fCurMemSize;
fCurMemSize = newSize;
fMemSize += diff;
}
if (fRowGroup.getRowCount() >= fRowsPerRG)
{
createNewRGData();
}
}
}
void GroupConcatNoOrder::merge(GroupConcator* gc)
{
auto* in = dynamic_cast<GroupConcatNoOrder*>(gc);
assert(in != nullptr);
for (auto& i : in->getRGDatas())
{
fDataVec.emplace_back(std::move(i));
}
fRowGroup.setData(fDataVec.back().get());
fRowGroup.setUseOnlyLongString(true);
fRowGroup.initRow(&fRow);
fRowGroup.getRow(fRowGroup.getRowCount(), &fRow);
fMemSize += in->fMemSize;
fCurMemSize = in->fCurMemSize;
in->fMemSize = in->fCurMemSize = 0;
}
uint8_t* GroupConcatNoOrder::getResultImpl(const string& sep)
{
ostringstream oss;
bool addSep = false;
size_t prevResultSize = 0;
bool isNull = true;
bool addBrackets = true;
for (auto& rgdata : fDataVec)
{
fRowGroup.setData(rgdata.get());
fRowGroup.initRow(&fRow);
fRowGroup.getRow(0, &fRow);
for (uint64_t i = 0; i < fRowGroup.getRowCount(); i++)
{
if (addBrackets && fIsJsonArrayAgg)
{
oss << "[";
addBrackets = false;
}
if (addSep)
oss << sep;
else
addSep = true;
outputRow(oss, fRow);
isNull = false;
fRow.nextRow();
}
size_t sizeDiff = oss.str().size() - prevResultSize;
prevResultSize = oss.str().size();
fMemSize += sizeDiff;
rgdata.reset();
}
if (fIsJsonArrayAgg && !addBrackets)
oss << "]";
return swapStreamWithStringAndReturnBuf(oss, isNull);
}
void GroupConcatNoOrder::serialize(messageqcpp::ByteStream& bs) const
{
GroupConcator::serialize(bs);
RGDataSizeType sz = fDataVec.size();
bs << sz;
for (auto& rgdata : fDataVec)
{
if (rgdata)
{
rgdata->serialize(bs, fRowGroup.getDataSize());
}
}
}
void GroupConcatNoOrder::deserialize(messageqcpp::ByteStream& bs)
{
GroupConcator::deserialize(bs);
RGDataSizeType sz;
bs >> sz;
fMemSize = fCurMemSize = 0;
fDataVec.resize(sz);
if (sz == 0)
{
createNewRGData();
}
else
{
for (RGDataSizeType i = 0; i < sz; i++)
{
fDataVec[i].reset(new RGData(fRowGroup, fRowsPerRG));
fDataVec[i]->deserialize(bs, fRowGroup.getDataSize(fRowsPerRG));
fRowGroup.setData(fDataVec[i].get());
fCurMemSize = fRowGroup.getSizeWithStrings(fRowsPerRG);
fMemSize += fCurMemSize;
}
}
}
const string GroupConcatNoOrder::toString() const
{
return GroupConcator::toString();
}
void GroupConcatNoOrder::createNewRGData()
{
auto newSize = fRowGroup.getDataSize(fRowsPerRG);
fMemSize += newSize;
fCurMemSize = newSize;
fDataVec.emplace_back(make_unique<rowgroup::RGData>(fRowGroup, fRowsPerRG));
fRowGroup.setData(fDataVec.back().get());
fRowGroup.setUseOnlyLongString(true);
fRowGroup.resetRowGroup(0);
fRowGroup.initRow(&fRow);
fRowGroup.getRow(0, &fRow);
}
} // namespace joblist