1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-20 09:07:44 +03:00
Gagan Goel 55d4214429
MCOL-5429 Fix high memory consumption in GROUP_CONCAT() processing. (#2823)
1. Input and output RowGroup's used in GROUP_CONCAT classes
are currently allocating a raw memory buffer of size equal
to the actual width of the string datatype. As an example,
for the following query:
  SELECT col1, GROUP_CONCAT(col2) FROM t GROUP BY col1;
If col2 is a TEXT field with default width, the input
RowGroup containing the target rows to be concatenated will
assign 64kb of memory for every input row in the RowGroup.
This is wasteful as actual field values in real workloads
would be much smaller. We fix this by enabling the
RowGroup to use the StringStore when the RowGroup contains
long strings.

2. RowAggregation::initialize() allocates a memory buffer
for a NULL row. The size of this buffer is equal to the
row size for the output RowGroup. For the above scenario,
using the default group_concat_max_len (which is a server
variable that sets the maximum length of the GROUP_CONCAT string)
value of 1mb, the buffer size would be
(1mb + 64kb + some additional metadata). If the user sets
group_concat_max_len to a higher value, say 3gb, this buffer
size would be ~3gb. Now if the runtime initiates several
instances of RowAggregation, total memory consumption by
PrimProc could exceed the hardware memory limits causing the
OS OOM to kill the process. We fix this problem by again
enabling the StringStore for the NULL row allocation.

3. In the plugin code in buildAggregateColumn(), there is
an integer overflow when the server group_concat_max_len
variable (which is an uint32_t) is set to a value > INT32_MAX
(such as 3gb) and is assigned to
CalpontSystemCatalog::ColType::colWidth (which is an int32_t).
As a short term fix, we saturate the assigned value to colWidth
to INT32_MAX. Proper fix would be to upgrade
CalpontSystemCatalog::ColType::colWidth to an uint32_t.
2023-04-22 00:43:29 +03:00

1016 lines
33 KiB
C++

/*
Copyright (C) 2014 InfiniDB, Inc.
Copyright (c) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA.
*/
#pragma once
/** @file rowaggregation.h
* Classes in this file are used to aggregate Rows in RowGroups.
* RowAggregation is the class that performs the aggregation.
* RowAggGroupByCol and RowAggFunctionCol are support classes used to describe
* the columns involved in the aggregation.
* @endcode
*/
#include <cstring>
#include <cstdint>
#include <utility>
#include <vector>
#include <tr1/unordered_map>
#include <tr1/unordered_set>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_array.hpp>
#include <boost/scoped_ptr.hpp>
#include "serializeable.h"
#include "bytestream.h"
#include "rowgroup.h"
#include "hasher.h"
#include "stlpoolallocator.h"
#include "returnedcolumn.h"
#include "mcsv1_udaf.h"
#include "constantcolumn.h"
#include "resourcemanager.h"
#include "rowstorage.h"
// To do: move code that depends on joblist to a proper subsystem.
namespace joblist
{
class ResourceManager;
}
namespace rowgroup
{
/** @brief Enumerates aggregate functions supported by RowAggregation
*/
enum RowAggFunctionType
{
ROWAGG_FUNCT_UNDEFINE, // default
ROWAGG_COUNT_ASTERISK, // COUNT(*) counts all rows including nulls
ROWAGG_COUNT_COL_NAME, // COUNT(column_name) only counts non-null rows
ROWAGG_SUM,
ROWAGG_AVG,
ROWAGG_MIN,
ROWAGG_MAX,
// Statistics Function, ROWAGG_STATS is the generic name.
ROWAGG_STATS,
ROWAGG_STDDEV_POP,
ROWAGG_STDDEV_SAMP,
ROWAGG_VAR_POP,
ROWAGG_VAR_SAMP,
// BIT Function, ROWAGG_BIT_OP is the generic name.
ROWAGG_BIT_OP,
ROWAGG_BIT_AND,
ROWAGG_BIT_OR,
ROWAGG_BIT_XOR,
// GROUP_CONCAT
ROWAGG_GROUP_CONCAT,
ROWAGG_JSON_ARRAY,
// DISTINCT: performed on UM only
ROWAGG_COUNT_DISTINCT_COL_NAME, // COUNT(distinct column_name) only counts non-null rows
ROWAGG_DISTINCT_SUM,
ROWAGG_DISTINCT_AVG,
// Constant
ROWAGG_CONSTANT,
// User Defined Aggregate Function
ROWAGG_UDAF,
// If an Aggregate has more than one parameter, this will be used for parameters after the first
ROWAGG_MULTI_PARM,
// internal function type to avoid duplicate the work
// handling ROWAGG_COUNT_NO_OP, ROWAGG_DUP_FUNCT and ROWAGG_DUP_AVG is a little different
// ROWAGG_COUNT_NO_OP : count done by AVG, no need to copy
// ROWAGG_DUP_FUNCT : copy data before AVG calculation, because SUM may share by AVG
// ROWAGG_DUP_AVG : copy data after AVG calculation
ROWAGG_COUNT_NO_OP, // COUNT(column_name), but leave count() to AVG
ROWAGG_DUP_FUNCT, // duplicate aggregate Function(), except AVG and UDAF, in select
ROWAGG_DUP_AVG, // duplicate AVG(column_name) in select
ROWAGG_DUP_STATS, // duplicate statistics functions in select
ROWAGG_DUP_UDAF // duplicate UDAF function in select
};
//------------------------------------------------------------------------------
/** @brief Specifies a column in a RowGroup that is part of the aggregation
* "GROUP BY" clause.
*/
//------------------------------------------------------------------------------
struct RowAggGroupByCol
{
/** @brief RowAggGroupByCol constructor
*
* @param inputColIndex(in) column index into input row
* @param outputColIndex(in) column index into output row
* outputColIndex argument should be omitted if this GroupBy
* column is not to be included in the output.
*/
explicit RowAggGroupByCol(int32_t inputColIndex, int32_t outputColIndex = -1)
: fInputColumnIndex(inputColIndex), fOutputColumnIndex(outputColIndex)
{
}
~RowAggGroupByCol() = default;
uint32_t fInputColumnIndex;
uint32_t fOutputColumnIndex;
};
inline messageqcpp::ByteStream& operator<<(messageqcpp::ByteStream& b, RowAggGroupByCol& o)
{
return (b << o.fInputColumnIndex << o.fOutputColumnIndex);
}
inline messageqcpp::ByteStream& operator>>(messageqcpp::ByteStream& b, RowAggGroupByCol& o)
{
return (b >> o.fInputColumnIndex >> o.fOutputColumnIndex);
}
//------------------------------------------------------------------------------
/** @brief Specifies a column in a RowGroup that is to be aggregated, and what
* aggregation function is to be performed.
*
* If a column is aggregated more than once(ex: SELECT MIN(l_shipdate),
* MAX(l_shipdate)...), then 2 RowAggFunctionCol objects should be created
* with the same inputColIndex, one for the MIN function, and one for the
* MAX function.
*/
//------------------------------------------------------------------------------
struct RowAggFunctionCol
{
/** @brief RowAggFunctionCol constructor
*
* @param aggFunction(in) aggregation function to be performed
* @param inputColIndex(in) column index into input row
* @param outputColIndex(in) column index into output row
* @param auxColIndex(in) auxiliary index into output row for avg/count
* @param stats(in) real statistics function where generic name in aggFunction
*/
RowAggFunctionCol(RowAggFunctionType aggFunction, RowAggFunctionType stats, int32_t inputColIndex,
int32_t outputColIndex, int32_t auxColIndex = -1)
: fAggFunction(aggFunction)
, fStatsFunction(stats)
, fInputColumnIndex(inputColIndex)
, fOutputColumnIndex(outputColIndex)
, fAuxColumnIndex(auxColIndex)
, hasMultiParm(false)
{
}
virtual ~RowAggFunctionCol() = default;
virtual void serialize(messageqcpp::ByteStream& bs) const;
virtual void deserialize(messageqcpp::ByteStream& bs);
RowAggFunctionType fAggFunction; // aggregate function
// statistics function stores ROWAGG_STATS in fAggFunction and real function in fStatsFunction
RowAggFunctionType fStatsFunction;
uint32_t fInputColumnIndex;
uint32_t fOutputColumnIndex;
// fAuxColumnIndex is used in 4 cases:
// 1. for AVG - point to the count column, the fInputColumnIndex is for sum
// 2. for statistics function - point to sum(x), +1 is sum(x**2)
// 3. for UDAF - contain the context user data as binary
// 4. for duplicate - point to the real aggretate column to be copied from
// Set only on UM, the fAuxColumnIndex is defaulted to fOutputColumnIndex+1 on PM.
uint32_t fAuxColumnIndex;
// For UDAF that have more than one parameter and some parameters are constant.
// There will be a series of RowAggFunctionCol created, one for each parameter.
// The first will be a RowUDAFFunctionCol. Subsequent ones will be RowAggFunctionCol
// with fAggFunction == ROWAGG_MULTI_PARM. Order is important.
// If this parameter is constant, that value is here.
execplan::SRCP fpConstCol;
bool hasMultiParm;
};
struct RowUDAFFunctionCol : public RowAggFunctionCol
{
RowUDAFFunctionCol(mcsv1sdk::mcsv1Context& context, int32_t inputColIndex, int32_t outputColIndex,
int32_t auxColIndex = -1)
: RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, inputColIndex, outputColIndex, auxColIndex)
, fUDAFContext(context)
, bInterrupted(false)
{
fUDAFContext.setInterrupted(&bInterrupted);
}
RowUDAFFunctionCol(int32_t inputColIndex, int32_t outputColIndex, int32_t auxColIndex = -1)
: RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, inputColIndex, outputColIndex, auxColIndex)
, bInterrupted(false)
{
}
RowUDAFFunctionCol(const RowUDAFFunctionCol& rhs)
: RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, rhs.fInputColumnIndex, rhs.fOutputColumnIndex,
rhs.fAuxColumnIndex)
, fUDAFContext(rhs.fUDAFContext)
, bInterrupted(false)
{
}
~RowUDAFFunctionCol() override = default;
void serialize(messageqcpp::ByteStream& bs) const override;
void deserialize(messageqcpp::ByteStream& bs) override;
mcsv1sdk::mcsv1Context fUDAFContext; // The UDAF context
bool bInterrupted; // Shared by all the threads
};
inline void RowAggFunctionCol::serialize(messageqcpp::ByteStream& bs) const
{
bs << (uint8_t)fAggFunction;
bs << fInputColumnIndex;
bs << fOutputColumnIndex;
if (fpConstCol)
{
bs << (uint8_t)1;
fpConstCol.get()->serialize(bs);
}
else
{
bs << (uint8_t)0;
}
}
inline void RowAggFunctionCol::deserialize(messageqcpp::ByteStream& bs)
{
bs >> (uint8_t&)fAggFunction;
bs >> fInputColumnIndex;
bs >> fOutputColumnIndex;
uint8_t t;
bs >> t;
if (t)
{
fpConstCol.reset(new execplan::ConstantColumn);
fpConstCol.get()->unserialize(bs);
}
}
inline void RowUDAFFunctionCol::serialize(messageqcpp::ByteStream& bs) const
{
RowAggFunctionCol::serialize(bs);
fUDAFContext.serialize(bs);
}
inline void RowUDAFFunctionCol::deserialize(messageqcpp::ByteStream& bs)
{
// This deserialize is called when the function gets to PrimProc.
// reset is called because we're starting a new sub-evaluate cycle.
RowAggFunctionCol::deserialize(bs);
fUDAFContext.unserialize(bs);
fUDAFContext.setInterrupted(&bInterrupted);
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
rc = fUDAFContext.getFunction()->reset(&fUDAFContext);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
bInterrupted = true;
throw logging::QueryDataExcept(fUDAFContext.getErrorMessage(), logging::aggregateFuncErr);
}
}
struct ConstantAggData
{
std::string fConstValue;
std::string fUDAFName; // If a UDAF is called with constant.
RowAggFunctionType fOp;
bool fIsNull;
ConstantAggData() : fOp(ROWAGG_FUNCT_UNDEFINE), fIsNull(false)
{
}
ConstantAggData(std::string v, RowAggFunctionType f, bool n) : fConstValue(std::move(v)), fOp(f), fIsNull(n)
{
}
ConstantAggData(std::string v, std::string u, RowAggFunctionType f, bool n)
: fConstValue(std::move(v)), fUDAFName(std::move(u)), fOp(f), fIsNull(n)
{
}
};
typedef boost::shared_ptr<RowAggGroupByCol> SP_ROWAGG_GRPBY_t;
typedef boost::shared_ptr<RowAggFunctionCol> SP_ROWAGG_FUNC_t;
struct GroupConcat
{
// GROUP_CONCAT(DISTINCT col1, 'const', col2 ORDER BY col3 desc SEPARATOR 'sep')
std::vector<std::pair<uint32_t, uint32_t>> fGroupCols; // columns to concatenate, and position
std::vector<std::pair<uint32_t, bool>> fOrderCols; // columns to order by [asc/desc]
std::string fSeparator;
std::vector<std::pair<std::string, uint32_t>> fConstCols; // constant columns in group
bool fDistinct;
uint64_t fSize;
RowGroup fRowGroup;
std::shared_ptr<int[]> fMapping;
std::vector<std::pair<int, bool>> fOrderCond; // position to order by [asc/desc]
joblist::ResourceManager* fRm; // resource manager
boost::shared_ptr<int64_t> fSessionMemLimit;
long fTimeZone;
GroupConcat() : fRm(nullptr)
{
}
};
typedef boost::shared_ptr<GroupConcat> SP_GroupConcat;
class GroupConcatAg
{
public:
explicit GroupConcatAg(SP_GroupConcat&);
virtual ~GroupConcatAg();
virtual void initialize(){};
virtual void processRow(const rowgroup::Row&){};
virtual void merge(const rowgroup::Row&, uint64_t){};
void getResult(uint8_t*){};
uint8_t* getResult()
{
return nullptr;
}
protected:
rowgroup::SP_GroupConcat fGroupConcat;
};
typedef boost::shared_ptr<GroupConcatAg> SP_GroupConcatAg;
//------------------------------------------------------------------------------
/** @brief Class that aggregates RowGroups.
*/
//------------------------------------------------------------------------------
class RowAggregation : public messageqcpp::Serializeable
{
public:
/** @brief RowAggregation default constructor
*
* @param rowAggGroupByCols(in) specify GroupBy columns and their
* mapping from input to output. If vector is empty, then all the
* rows will be aggregated into a single implied group. Order is
* important here. The primary GroupBy column should be first, the
* secondary GroupBy column should be second, etc.
* @param rowAggFunctionCols(in) specify function columns and their
* mapping from input to output.
*/
RowAggregation();
RowAggregation(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
joblist::ResourceManager* rm = nullptr, boost::shared_ptr<int64_t> sessMemLimit = {});
RowAggregation(const RowAggregation& rhs);
/** @brief RowAggregation default destructor
*/
~RowAggregation() override;
/** @brief clone this object for multi-thread use
*/
inline virtual RowAggregation* clone() const
{
return new RowAggregation(*this);
}
/** @brief Denotes end of data insertion following multiple calls to addRowGroup().
*/
virtual void endOfInput();
/** @brief reset RowAggregation outputRowGroup and hashMap
*/
virtual void aggReset();
/** @brief Define content of data to be aggregated and its aggregated output.
*
* @param pRowGroupIn(in) contains definition of the input data.
* @param pRowGroupOut(out) contains definition of the output data.
*/
virtual void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut)
{
fRowGroupIn = pRowGroupIn;
fRowGroupOut = pRowGroupOut;
initialize();
}
/** @brief Define content of data to be joined
*
* This method must be call after setInputOutput() for PM hashjoin case.
*
* @param pSmallSideRG(in) contains definition of the small side data.
* @param pLargeSideRG(in) contains definition of the large side data.
*/
void setJoinRowGroups(std::vector<RowGroup>* pSmallSideRG, RowGroup* pLargeSideRG);
/** @brief Returns group by column vector
*
* This function is used to duplicate the RowAggregation object
*
* @returns a reference of the group by vector
*/
std::vector<SP_ROWAGG_GRPBY_t>& getGroupByCols()
{
return fGroupByCols;
}
/** @brief Returns aggregate function vector
*
* This function is used to duplicate the RowAggregation object
*
* @returns a reference of the aggregation function vector
*/
std::vector<SP_ROWAGG_FUNC_t>& getAggFunctions()
{
return fFunctionCols;
}
/** @brief Add a group of rows to be aggregated.
*
* This function can be called to iteratively add RowGroups for aggregation.
*
* @parm pRowGroupIn(in) RowGroup to be added to aggregation.
*/
virtual void addRowGroup(const RowGroup* pRowGroupIn);
virtual void addRowGroup(const RowGroup* pRowGroupIn,
std::vector<std::pair<Row::Pointer, uint64_t>>& inRows);
/** @brief Serialize RowAggregation object into a ByteStream.
*
* @parm bs(out) BytesStream that is to be written to.
*/
void serialize(messageqcpp::ByteStream& bs) const override;
/** @brief Unserialize RowAggregation object from a ByteStream.
*
* @parm bs(in) BytesStream that is to be read from.
*/
void deserialize(messageqcpp::ByteStream& bs) override;
/** @brief load result set into byte stream
*
* @parm bs(out) BytesStream that is to be written to.
*/
void loadResult(messageqcpp::ByteStream& bs);
void loadEmptySet(messageqcpp::ByteStream& bs);
/** @brief get output rowgroup
*
* @returns a const pointer of the output rowgroup
*/
const RowGroup* getOutputRowGroup() const
{
return fRowGroupOut;
}
RowGroup* getOutputRowGroup()
{
return fRowGroupOut;
}
void append(RowAggregation* other);
virtual void aggregateRow(Row& row, const uint64_t* hash = nullptr,
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr);
inline uint32_t aggMapKeyLength() const
{
return fAggMapKeyCount;
}
inline void timeZone(long timeZone)
{
fTimeZone = timeZone;
}
inline long timeZone() const
{
return fTimeZone;
}
inline std::vector<mcsv1sdk::mcsv1Context>* rgContextColl()
{
return &fRGContextColl;
}
void finalAggregation()
{
return fRowAggStorage->finalize([this](Row& row) { mergeEntries(row); }, fRow);
}
std::unique_ptr<RGData> moveCurrentRGData()
{
return std::move(fCurRGData);
}
protected:
virtual void initialize(bool hasGroupConcat = false);
virtual void initMapData(const Row& row);
virtual void attachGroupConcatAg();
virtual void updateEntry(const Row& row, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr);
void mergeEntries(const Row& row);
virtual void doMinMax(const Row&, int64_t, int64_t, int);
virtual void doSum(const Row&, int64_t, int64_t, int);
virtual void doAvg(const Row&, int64_t, int64_t, int64_t, bool merge = false);
virtual void doStatistics(const Row&, int64_t, int64_t, int64_t);
void mergeStatistics(const Row&, uint64_t colOut, uint64_t colAux);
virtual void doBitOp(const Row&, int64_t, int64_t, int);
virtual void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx,
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr);
virtual bool countSpecial(const RowGroup* pRG)
{
fRow.setUintField<8>(fRow.getUintField<8>(0) + pRG->getRowCount(), 0);
return true;
}
void resetUDAF(RowUDAFFunctionCol* rowUDAF);
void resetUDAF(RowUDAFFunctionCol* rowUDAF, uint64_t funcColIdx);
inline bool isNull(const RowGroup* pRowGroup, const Row& row, int64_t col);
inline void makeAggFieldsNull(Row& row);
inline void copyNullRow(Row& row)
{
copyRow(fNullRow, &row);
}
inline void updateIntMinMax(int128_t val1, int128_t val2, int64_t col, int func);
inline void updateIntMinMax(int64_t val1, int64_t val2, int64_t col, int func);
inline void updateUintMinMax(uint64_t val1, uint64_t val2, int64_t col, int func);
inline void updateCharMinMax(uint64_t val1, uint64_t val2, int64_t col, int func);
inline void updateDoubleMinMax(double val1, double val2, int64_t col, int func);
inline void updateLongDoubleMinMax(long double val1, long double val2, int64_t col, int func);
inline void updateFloatMinMax(float val1, float val2, int64_t col, int func);
inline void updateStringMinMax(std::string val1, std::string val2, int64_t col, int func);
std::vector<SP_ROWAGG_GRPBY_t> fGroupByCols;
std::vector<SP_ROWAGG_FUNC_t> fFunctionCols;
uint32_t fAggMapKeyCount; // the number of columns that make up the key
RowGroup fRowGroupIn;
RowGroup* fRowGroupOut;
// for when the group by & distinct keys are not stored in the output rows
rowgroup::RowGroup fKeyRG;
Row fRow;
Row fNullRow;
Row* tmpRow; // used by the hashers & eq functors
boost::scoped_array<uint8_t> fNullRowData;
rowgroup::RGData fNullRowRGData;
rowgroup::RowGroup fNullRowGroup;
std::unique_ptr<RowAggStorage> fRowAggStorage;
// for support PM aggregation after PM hashjoin
std::vector<RowGroup>* fSmallSideRGs;
RowGroup* fLargeSideRG;
std::shared_ptr<std::shared_ptr<int[]>[]> fSmallMappings;
std::shared_ptr<int[]> fLargeMapping;
uint32_t fSmallSideCount;
boost::scoped_array<Row> rowSmalls;
// for 8k poc
RowGroup fEmptyRowGroup;
RGData fEmptyRowData;
Row fEmptyRow;
bool fKeyOnHeap = false;
long fTimeZone;
// We need a separate copy for each thread.
mcsv1sdk::mcsv1Context fRGContext;
std::vector<mcsv1sdk::mcsv1Context> fRGContextColl;
// These are handy for testing the actual type of static_any for UDAF
static const static_any::any& charTypeId;
static const static_any::any& scharTypeId;
static const static_any::any& shortTypeId;
static const static_any::any& intTypeId;
static const static_any::any& longTypeId;
static const static_any::any& llTypeId;
static const static_any::any& int128TypeId;
static const static_any::any& ucharTypeId;
static const static_any::any& ushortTypeId;
static const static_any::any& uintTypeId;
static const static_any::any& ulongTypeId;
static const static_any::any& ullTypeId;
static const static_any::any& floatTypeId;
static const static_any::any& doubleTypeId;
static const static_any::any& longdoubleTypeId;
static const static_any::any& strTypeId;
// For UDAF along with with multiple distinct columns
std::vector<SP_ROWAGG_FUNC_t>* fOrigFunctionCols;
joblist::ResourceManager* fRm = nullptr;
boost::shared_ptr<int64_t> fSessionMemLimit;
std::unique_ptr<RGData> fCurRGData;
};
//------------------------------------------------------------------------------
/** @brief derived Class that aggregates multi-rowgroups on UM
* One-phase case: aggregate from projected RG to final aggregated RG.
*/
//------------------------------------------------------------------------------
class RowAggregationUM : public RowAggregation
{
public:
/** @brief RowAggregationUM constructor
*/
RowAggregationUM()
{
}
RowAggregationUM(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols, joblist::ResourceManager*,
boost::shared_ptr<int64_t> sessionMemLimit);
RowAggregationUM(const RowAggregationUM& rhs);
/** @brief RowAggregationUM default destructor
*/
~RowAggregationUM() override;
/** @brief Denotes end of data insertion following multiple calls to addRowGroup().
*/
void endOfInput() override;
/** @brief Finializes the result set before sending back to the front end.
*/
void finalize();
/** @brief Returns aggregated rows in a RowGroup.
*
* This function should be called repeatedly until false is returned (meaning end of data).
*
* @returns true if more data, else false if no more data.
*/
bool nextRowGroup();
/** @brief Add an aggregator for DISTINCT aggregation
*/
void distinctAggregator(const boost::shared_ptr<RowAggregation>& da)
{
fDistinctAggregator = da;
}
/** @brief expressions to be evaluated after aggregation
*/
void expression(const std::vector<execplan::SRCP>& exp)
{
fExpression = exp;
}
const std::vector<execplan::SRCP>& expression()
{
return fExpression;
}
// for multi threaded
joblist::ResourceManager* getRm()
{
return fRm;
}
inline RowAggregationUM* clone() const override
{
return new RowAggregationUM(*this);
}
/** @brief access the aggregate(constant) columns
*/
void constantAggregate(const std::vector<ConstantAggData>& v)
{
fConstantAggregate = v;
}
const std::vector<ConstantAggData>& constantAggregate() const
{
return fConstantAggregate;
}
/** @brief access the group_concat
*/
void groupConcat(const std::vector<SP_GroupConcat>& v)
{
fGroupConcat = v;
}
const std::vector<SP_GroupConcat>& groupConcat() const
{
return fGroupConcat;
}
void aggReset() override;
void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) override;
protected:
// virtual methods from base
void initialize(bool hasGroupConcat = false) override;
void attachGroupConcatAg() override;
void updateEntry(const Row& row, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
bool countSpecial(const RowGroup* pRG) override
{
fRow.setIntField<8>(fRow.getIntField<8>(fFunctionCols[0]->fOutputColumnIndex) + pRG->getRowCount(),
fFunctionCols[0]->fOutputColumnIndex);
return true;
}
// calculate the average after all rows received. UM only function.
void calculateAvgColumns();
// calculate the statistics function all rows received. UM only function.
void calculateStatisticsFunctions();
// Sets the value from valOut into column colOut, performing any conversions.
void SetUDAFValue(static_any::any& valOut, int64_t colOut);
// If the datatype returned by evaluate isn't what we expect, convert.
void SetUDAFAnyValue(static_any::any& valOut, int64_t colOut);
// calculate the UDAF function all rows received. UM only function.
void calculateUDAFColumns();
// fix duplicates. UM only function.
void fixDuplicates(RowAggFunctionType funct);
// evaluate expressions
virtual void evaluateExpression();
// fix the aggregate(constant)
virtual void fixConstantAggregate();
virtual void doNullConstantAggregate(const ConstantAggData&, uint64_t);
virtual void doNotNullConstantAggregate(const ConstantAggData&, uint64_t);
// @bug3362, group_concat
virtual void doGroupConcat(const Row&, int64_t, int64_t);
virtual void doJsonAgg(const Row&, int64_t, int64_t);
virtual void setGroupConcatString();
bool fHasAvg;
bool fHasStatsFunc;
bool fHasUDAF;
boost::shared_ptr<RowAggregation> fDistinctAggregator;
// for function on aggregation
std::vector<execplan::SRCP> fExpression;
/* Derived classes that use a lot of memory need to update totalMemUsage and request
* the memory from rm in that order. */
uint64_t fTotalMemUsage;
// @bug3475, aggregate(constant), sum(0), count(null), etc
std::vector<ConstantAggData> fConstantAggregate;
// @bug3362, group_concat
std::vector<SP_GroupConcat> fGroupConcat;
std::vector<SP_GroupConcatAg> fGroupConcatAg;
std::vector<SP_ROWAGG_FUNC_t> fFunctionColGc;
private:
uint64_t fLastMemUsage;
};
//------------------------------------------------------------------------------
/** @brief derived Class that aggregates PM partially aggregated RowGroups on UM
* Two-phase case:
* phase 1 - aggregate from projected RG to partial aggregated RG on PM
* The base RowAggregation handles the 1st phase.
* phase 2 - aggregate from partially aggregated RG to final RG on UM
* This class handles the 2nd phase.
*/
//------------------------------------------------------------------------------
class RowAggregationUMP2 : public RowAggregationUM
{
public:
/** @brief RowAggregationUM constructor
*/
RowAggregationUMP2()
{
}
RowAggregationUMP2(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols, joblist::ResourceManager*,
boost::shared_ptr<int64_t> sessionMemLimit);
RowAggregationUMP2(const RowAggregationUMP2& rhs);
/** @brief RowAggregationUMP2 default destructor
*/
~RowAggregationUMP2() override;
inline RowAggregationUMP2* clone() const override
{
return new RowAggregationUMP2(*this);
}
protected:
// virtual methods from base
void updateEntry(const Row& row, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
void doAvg(const Row&, int64_t, int64_t, int64_t, bool merge = false) override;
void doStatistics(const Row&, int64_t, int64_t, int64_t) override;
void doGroupConcat(const Row&, int64_t, int64_t) override;
void doJsonAgg(const Row&, int64_t, int64_t) override;
void doBitOp(const Row&, int64_t, int64_t, int) override;
void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx,
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
bool countSpecial(const RowGroup* pRG) override
{
return false;
}
};
//------------------------------------------------------------------------------
/** @brief derived Class that aggregates on distinct columns on UM
* The internal aggregator will handle one or two phases aggregation
*/
//------------------------------------------------------------------------------
class RowAggregationDistinct : public RowAggregationUMP2
{
public:
/** @brief RowAggregationDistinct constructor
*/
RowAggregationDistinct()
{
}
RowAggregationDistinct(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols, joblist::ResourceManager*,
boost::shared_ptr<int64_t> sessionMemLimit);
/** @brief Copy Constructor for multi-threaded aggregation
*/
RowAggregationDistinct(const RowAggregationDistinct& rhs);
/** @brief RowAggregationDistinct default destructor
*/
~RowAggregationDistinct() override;
/** @brief Add an aggregator for pre-DISTINCT aggregation
*/
void addAggregator(const boost::shared_ptr<RowAggregation>& agg, const RowGroup& rg);
void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) override;
virtual void doDistinctAggregation();
virtual void doDistinctAggregation_rowVec(std::vector<std::pair<Row::Pointer, uint64_t>>& inRows);
void addRowGroup(const RowGroup* pRowGroupIn) override;
void addRowGroup(const RowGroup* pRowGroupIn,
std::vector<std::pair<Row::Pointer, uint64_t>>& inRows) override;
// multi-threade debug
boost::shared_ptr<RowAggregation>& aggregator()
{
return fAggregator;
}
void aggregator(boost::shared_ptr<RowAggregation> aggregator)
{
fAggregator = std::move(aggregator);
}
RowGroup& rowGroupDist()
{
return fRowGroupDist;
}
void rowGroupDist(RowGroup& rowGroupDist)
{
fRowGroupDist = rowGroupDist;
}
inline RowAggregationDistinct* clone() const override
{
return new RowAggregationDistinct(*this);
}
protected:
void updateEntry(const Row& row, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
boost::shared_ptr<RowAggregation> fAggregator;
RowGroup fRowGroupDist;
RGData fDataForDist;
};
//------------------------------------------------------------------------------
/** @brief derived Class for aggregates multiple columns with distinct key word
* Get distinct values of the column per group by entry
*/
//------------------------------------------------------------------------------
class RowAggregationSubDistinct : public RowAggregationUM
{
public:
/** @brief RowAggregationSubDistinct constructor
*/
RowAggregationSubDistinct()
{
}
RowAggregationSubDistinct(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
joblist::ResourceManager*, boost::shared_ptr<int64_t> sessionMemLimit);
RowAggregationSubDistinct(const RowAggregationSubDistinct& rhs);
/** @brief RowAggregationSubDistinct default destructor
*/
~RowAggregationSubDistinct() override;
void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) override;
void addRowGroup(const RowGroup* pRowGroupIn) override;
inline RowAggregationSubDistinct* clone() const override
{
return new RowAggregationSubDistinct(*this);
}
void addRowGroup(const RowGroup* pRowGroupIn,
std::vector<std::pair<Row::Pointer, uint64_t>>& inRow) override;
protected:
// virtual methods from RowAggregationUM
void doGroupConcat(const Row&, int64_t, int64_t) override;
void doJsonAgg(const Row&, int64_t, int64_t) override;
// for groupby columns and the aggregated distinct column
Row fDistRow;
boost::scoped_array<uint8_t> fDistRowData;
};
//------------------------------------------------------------------------------
/** @brief derived Class that aggregates multiple columns with distinct key word
* Each distinct column will have its own aggregator
*/
//------------------------------------------------------------------------------
class RowAggregationMultiDistinct : public RowAggregationDistinct
{
public:
/** @brief RowAggregationMultiDistinct constructor
*/
RowAggregationMultiDistinct()
{
}
RowAggregationMultiDistinct(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
joblist::ResourceManager*, boost::shared_ptr<int64_t> sessionMemLimit);
RowAggregationMultiDistinct(const RowAggregationMultiDistinct& rhs);
/** @brief RowAggregationMultiDistinct default destructor
*/
~RowAggregationMultiDistinct() override;
/** @brief Add sub aggregators
*/
void addSubAggregator(const boost::shared_ptr<RowAggregationUM>& agg, const RowGroup& rg,
const std::vector<SP_ROWAGG_FUNC_t>& funct);
void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) override;
using RowAggregationDistinct::addRowGroup;
void addRowGroup(const RowGroup* pRowGroupIn) override;
using RowAggregationDistinct::doDistinctAggregation;
void doDistinctAggregation() override;
using RowAggregationDistinct::doDistinctAggregation_rowVec;
virtual void doDistinctAggregation_rowVec(
std::vector<std::vector<std::pair<Row::Pointer, uint64_t>>>& inRows);
inline RowAggregationMultiDistinct* clone() const override
{
return new RowAggregationMultiDistinct(*this);
}
void addRowGroup(const RowGroup* pRowGroupIn,
std::vector<std::vector<std::pair<Row::Pointer, uint64_t>>>& inRows);
std::vector<boost::shared_ptr<RowAggregationUM>>& subAggregators()
{
return fSubAggregators;
}
void subAggregators(std::vector<boost::shared_ptr<RowAggregationUM>>& subAggregators)
{
fSubAggregators = subAggregators;
}
protected:
// virtual methods from base
std::vector<boost::shared_ptr<RowAggregationUM>> fSubAggregators;
std::vector<RowGroup> fSubRowGroups;
std::vector<boost::shared_ptr<RGData>> fSubRowData;
std::vector<std::vector<SP_ROWAGG_FUNC_t>> fSubFunctions;
};
typedef boost::shared_ptr<RowAggregation> SP_ROWAGG_t;
typedef boost::shared_ptr<RowAggregation> SP_ROWAGG_PM_t;
typedef boost::shared_ptr<RowAggregationUM> SP_ROWAGG_UM_t;
typedef boost::shared_ptr<RowAggregationDistinct> SP_ROWAGG_DIST;
} // namespace rowgroup