mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-21 19:45:56 +03:00
This patch fixes the problem in MCOL-4234 and also generally improves behavior of GROUP BY. It does so by introducing a "dummy" aggregate and by wrapping columns into it. This allows for columns that are not in GROUP BY to be used more freely, for example, in SELECT * FROM tbl GROUP BY col - all columns that are not "col" will be wrapped into an aggregate and query will proceed to execution. The dummy aggregate itself does nothing more than remember last value passed into it. There also an additional error message that tries to explain what types of expressions can be wrapped into an aggregate.
1043 lines
34 KiB
C++
1043 lines
34 KiB
C++
/*
|
|
Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (c) 2019 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA.
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
/** @file rowaggregation.h
|
|
* Classes in this file are used to aggregate Rows in RowGroups.
|
|
* RowAggregation is the class that performs the aggregation.
|
|
* RowAggGroupByCol and RowAggFunctionCol are support classes used to describe
|
|
* the columns involved in the aggregation.
|
|
* @endcode
|
|
*/
|
|
|
|
#include <cstring>
|
|
#include <cstdint>
|
|
#include <utility>
|
|
#include <vector>
|
|
#include <tr1/unordered_map>
|
|
#include <tr1/unordered_set>
|
|
#include <boost/shared_ptr.hpp>
|
|
|
|
#include <boost/scoped_array.hpp>
|
|
#include <boost/scoped_ptr.hpp>
|
|
|
|
#include "serializeable.h"
|
|
#include "bytestream.h"
|
|
#include "rowgroup.h"
|
|
#include "hasher.h"
|
|
#include "stlpoolallocator.h"
|
|
#include "returnedcolumn.h"
|
|
#include "mcsv1_udaf.h"
|
|
#include "constantcolumn.h"
|
|
|
|
#include "resourcemanager.h"
|
|
#include "rowstorage.h"
|
|
#include "nullstring.h"
|
|
|
|
// To do: move code that depends on joblist to a proper subsystem.
|
|
namespace joblist
|
|
{
|
|
class ResourceManager;
|
|
}
|
|
|
|
namespace rowgroup
|
|
{
|
|
/** @brief Enumerates aggregate functions supported by RowAggregation
|
|
*/
|
|
enum RowAggFunctionType
|
|
{
|
|
ROWAGG_FUNCT_UNDEFINE, // default
|
|
ROWAGG_COUNT_ASTERISK, // COUNT(*) counts all rows including nulls
|
|
ROWAGG_COUNT_COL_NAME, // COUNT(column_name) only counts non-null rows
|
|
ROWAGG_SUM,
|
|
ROWAGG_AVG,
|
|
ROWAGG_MIN,
|
|
ROWAGG_MAX,
|
|
|
|
// Statistics Function, ROWAGG_STATS is the generic name.
|
|
ROWAGG_STATS,
|
|
ROWAGG_STDDEV_POP,
|
|
ROWAGG_STDDEV_SAMP,
|
|
ROWAGG_VAR_POP,
|
|
ROWAGG_VAR_SAMP,
|
|
|
|
// BIT Function, ROWAGG_BIT_OP is the generic name.
|
|
ROWAGG_BIT_OP,
|
|
ROWAGG_BIT_AND,
|
|
ROWAGG_BIT_OR,
|
|
ROWAGG_BIT_XOR,
|
|
|
|
// GROUP_CONCAT
|
|
ROWAGG_GROUP_CONCAT,
|
|
|
|
ROWAGG_JSON_ARRAY,
|
|
|
|
// DISTINCT: performed on UM only
|
|
ROWAGG_COUNT_DISTINCT_COL_NAME, // COUNT(distinct column_name) only counts non-null rows
|
|
ROWAGG_DISTINCT_SUM,
|
|
ROWAGG_DISTINCT_AVG,
|
|
|
|
// Constant
|
|
ROWAGG_CONSTANT,
|
|
|
|
// User Defined Aggregate Function
|
|
ROWAGG_UDAF,
|
|
|
|
// If an Aggregate has more than one parameter, this will be used for parameters after the first
|
|
ROWAGG_MULTI_PARM,
|
|
|
|
// internal function type to avoid duplicate the work
|
|
// handling ROWAGG_COUNT_NO_OP, ROWAGG_DUP_FUNCT and ROWAGG_DUP_AVG is a little different
|
|
// ROWAGG_COUNT_NO_OP : count done by AVG, no need to copy
|
|
// ROWAGG_DUP_FUNCT : copy data before AVG calculation, because SUM may share by AVG
|
|
// ROWAGG_DUP_AVG : copy data after AVG calculation
|
|
ROWAGG_COUNT_NO_OP, // COUNT(column_name), but leave count() to AVG
|
|
ROWAGG_DUP_FUNCT, // duplicate aggregate Function(), except AVG and UDAF, in select
|
|
ROWAGG_DUP_AVG, // duplicate AVG(column_name) in select
|
|
ROWAGG_DUP_STATS, // duplicate statistics functions in select
|
|
ROWAGG_DUP_UDAF, // duplicate UDAF function in select
|
|
|
|
// a dummy "select some" aggregate needed for non-group-by values in SELECT's with GROUP BY's
|
|
ROWAGG_SELECT_SOME
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
/** @brief Specifies a column in a RowGroup that is part of the aggregation
|
|
* "GROUP BY" clause.
|
|
*/
|
|
//------------------------------------------------------------------------------
|
|
struct RowAggGroupByCol
|
|
{
|
|
/** @brief RowAggGroupByCol constructor
|
|
*
|
|
* @param inputColIndex(in) column index into input row
|
|
* @param outputColIndex(in) column index into output row
|
|
* outputColIndex argument should be omitted if this GroupBy
|
|
* column is not to be included in the output.
|
|
*/
|
|
explicit RowAggGroupByCol(int32_t inputColIndex, int32_t outputColIndex = -1)
|
|
: fInputColumnIndex(inputColIndex), fOutputColumnIndex(outputColIndex)
|
|
{
|
|
}
|
|
~RowAggGroupByCol() = default;
|
|
|
|
uint32_t fInputColumnIndex;
|
|
uint32_t fOutputColumnIndex;
|
|
};
|
|
|
|
inline messageqcpp::ByteStream& operator<<(messageqcpp::ByteStream& b, RowAggGroupByCol& o)
|
|
{
|
|
return (b << o.fInputColumnIndex << o.fOutputColumnIndex);
|
|
}
|
|
inline messageqcpp::ByteStream& operator>>(messageqcpp::ByteStream& b, RowAggGroupByCol& o)
|
|
{
|
|
return (b >> o.fInputColumnIndex >> o.fOutputColumnIndex);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
/** @brief Specifies a column in a RowGroup that is to be aggregated, and what
|
|
* aggregation function is to be performed.
|
|
*
|
|
* If a column is aggregated more than once(ex: SELECT MIN(l_shipdate),
|
|
* MAX(l_shipdate)...), then 2 RowAggFunctionCol objects should be created
|
|
* with the same inputColIndex, one for the MIN function, and one for the
|
|
* MAX function.
|
|
*/
|
|
//------------------------------------------------------------------------------
|
|
struct RowAggFunctionCol
|
|
{
|
|
/** @brief RowAggFunctionCol constructor
|
|
*
|
|
* @param aggFunction(in) aggregation function to be performed
|
|
* @param inputColIndex(in) column index into input row
|
|
* @param outputColIndex(in) column index into output row
|
|
* @param auxColIndex(in) auxiliary index into output row for avg/count
|
|
* @param stats(in) real statistics function where generic name in aggFunction
|
|
*/
|
|
RowAggFunctionCol(RowAggFunctionType aggFunction, RowAggFunctionType stats, int32_t inputColIndex,
|
|
int32_t outputColIndex, int32_t auxColIndex = -1)
|
|
: fAggFunction(aggFunction)
|
|
, fStatsFunction(stats)
|
|
, fInputColumnIndex(inputColIndex)
|
|
, fOutputColumnIndex(outputColIndex)
|
|
, fAuxColumnIndex(auxColIndex)
|
|
, hasMultiParm(false)
|
|
{
|
|
}
|
|
virtual ~RowAggFunctionCol() = default;
|
|
|
|
virtual void serialize(messageqcpp::ByteStream& bs) const;
|
|
virtual void deserialize(messageqcpp::ByteStream& bs);
|
|
|
|
RowAggFunctionType fAggFunction; // aggregate function
|
|
// statistics function stores ROWAGG_STATS in fAggFunction and real function in fStatsFunction
|
|
RowAggFunctionType fStatsFunction;
|
|
|
|
uint32_t fInputColumnIndex;
|
|
uint32_t fOutputColumnIndex;
|
|
|
|
// fAuxColumnIndex is used in 4 cases:
|
|
// 1. for AVG - point to the count column, the fInputColumnIndex is for sum
|
|
// 2. for statistics function - point to sum(x), +1 is sum(x**2)
|
|
// 3. for UDAF - contain the context user data as binary
|
|
// 4. for duplicate - point to the real aggretate column to be copied from
|
|
// Set only on UM, the fAuxColumnIndex is defaulted to fOutputColumnIndex+1 on PM.
|
|
uint32_t fAuxColumnIndex;
|
|
|
|
// For UDAF that have more than one parameter and some parameters are constant.
|
|
// There will be a series of RowAggFunctionCol created, one for each parameter.
|
|
// The first will be a RowUDAFFunctionCol. Subsequent ones will be RowAggFunctionCol
|
|
// with fAggFunction == ROWAGG_MULTI_PARM. Order is important.
|
|
// If this parameter is constant, that value is here.
|
|
execplan::SRCP fpConstCol;
|
|
|
|
bool hasMultiParm;
|
|
};
|
|
|
|
struct RowUDAFFunctionCol : public RowAggFunctionCol
|
|
{
|
|
RowUDAFFunctionCol(mcsv1sdk::mcsv1Context& context, int32_t inputColIndex, int32_t outputColIndex,
|
|
int32_t auxColIndex = -1)
|
|
: RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, inputColIndex, outputColIndex, auxColIndex)
|
|
, fUDAFContext(context)
|
|
, bInterrupted(false)
|
|
{
|
|
fUDAFContext.setInterrupted(&bInterrupted);
|
|
}
|
|
|
|
RowUDAFFunctionCol(int32_t inputColIndex, int32_t outputColIndex, int32_t auxColIndex = -1)
|
|
: RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, inputColIndex, outputColIndex, auxColIndex)
|
|
, bInterrupted(false)
|
|
{
|
|
}
|
|
RowUDAFFunctionCol(const RowUDAFFunctionCol& rhs)
|
|
: RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, rhs.fInputColumnIndex, rhs.fOutputColumnIndex,
|
|
rhs.fAuxColumnIndex)
|
|
, fUDAFContext(rhs.fUDAFContext)
|
|
, bInterrupted(false)
|
|
{
|
|
}
|
|
|
|
~RowUDAFFunctionCol() override = default;
|
|
|
|
void serialize(messageqcpp::ByteStream& bs) const override;
|
|
void deserialize(messageqcpp::ByteStream& bs) override;
|
|
|
|
mcsv1sdk::mcsv1Context fUDAFContext; // The UDAF context
|
|
bool bInterrupted; // Shared by all the threads
|
|
};
|
|
|
|
inline void RowAggFunctionCol::serialize(messageqcpp::ByteStream& bs) const
|
|
{
|
|
bs << (uint8_t)fAggFunction;
|
|
bs << fInputColumnIndex;
|
|
bs << fOutputColumnIndex;
|
|
|
|
if (fpConstCol)
|
|
{
|
|
bs << (uint8_t)1;
|
|
fpConstCol.get()->serialize(bs);
|
|
}
|
|
else
|
|
{
|
|
bs << (uint8_t)0;
|
|
}
|
|
}
|
|
|
|
inline void RowAggFunctionCol::deserialize(messageqcpp::ByteStream& bs)
|
|
{
|
|
bs >> (uint8_t&)fAggFunction;
|
|
bs >> fInputColumnIndex;
|
|
bs >> fOutputColumnIndex;
|
|
uint8_t t;
|
|
bs >> t;
|
|
|
|
if (t)
|
|
{
|
|
fpConstCol.reset(new execplan::ConstantColumn);
|
|
fpConstCol.get()->unserialize(bs);
|
|
}
|
|
}
|
|
|
|
inline void RowUDAFFunctionCol::serialize(messageqcpp::ByteStream& bs) const
|
|
{
|
|
RowAggFunctionCol::serialize(bs);
|
|
fUDAFContext.serialize(bs);
|
|
}
|
|
|
|
inline void RowUDAFFunctionCol::deserialize(messageqcpp::ByteStream& bs)
|
|
{
|
|
// This deserialize is called when the function gets to PrimProc.
|
|
// reset is called because we're starting a new sub-evaluate cycle.
|
|
RowAggFunctionCol::deserialize(bs);
|
|
fUDAFContext.unserialize(bs);
|
|
fUDAFContext.setInterrupted(&bInterrupted);
|
|
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
|
rc = fUDAFContext.getFunction()->reset(&fUDAFContext);
|
|
|
|
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
|
{
|
|
bInterrupted = true;
|
|
throw logging::QueryDataExcept(fUDAFContext.getErrorMessage(), logging::aggregateFuncErr);
|
|
}
|
|
}
|
|
|
|
struct ConstantAggData
|
|
{
|
|
utils::NullString fConstValue;
|
|
std::string fUDAFName; // If a UDAF is called with constant.
|
|
RowAggFunctionType fOp;
|
|
|
|
ConstantAggData() : fOp(ROWAGG_FUNCT_UNDEFINE)
|
|
{
|
|
}
|
|
|
|
ConstantAggData(utils::NullString v, RowAggFunctionType f, bool n) : fConstValue(v), fOp(f)
|
|
{
|
|
}
|
|
|
|
ConstantAggData(utils::NullString v, std::string u, RowAggFunctionType f, bool n)
|
|
: fConstValue(v), fUDAFName(u), fOp(f)
|
|
{
|
|
}
|
|
bool isNull() const
|
|
{
|
|
return fConstValue.isNull();
|
|
}
|
|
};
|
|
|
|
typedef boost::shared_ptr<RowAggGroupByCol> SP_ROWAGG_GRPBY_t;
|
|
typedef boost::shared_ptr<RowAggFunctionCol> SP_ROWAGG_FUNC_t;
|
|
|
|
struct GroupConcat
|
|
{
|
|
// GROUP_CONCAT(DISTINCT col1, 'const', col2 ORDER BY col3 desc SEPARATOR 'sep')
|
|
std::vector<std::pair<uint32_t, uint32_t>> fGroupCols; // columns to concatenate, and position
|
|
std::vector<std::pair<uint32_t, bool>> fOrderCols; // columns to order by [asc/desc]
|
|
std::string fSeparator;
|
|
std::vector<std::pair<utils::NullString, uint32_t>> fConstCols; // constant columns in group
|
|
bool fDistinct;
|
|
uint64_t fSize;
|
|
|
|
RowGroup fRowGroup;
|
|
std::shared_ptr<int[]> fMapping;
|
|
std::vector<std::pair<int, bool>> fOrderCond; // position to order by [asc/desc]
|
|
joblist::ResourceManager* fRm; // resource manager
|
|
boost::shared_ptr<int64_t> fSessionMemLimit;
|
|
long fTimeZone;
|
|
|
|
GroupConcat() : fRm(nullptr)
|
|
{
|
|
}
|
|
};
|
|
|
|
typedef boost::shared_ptr<GroupConcat> SP_GroupConcat;
|
|
|
|
class GroupConcatAg
|
|
{
|
|
public:
|
|
explicit GroupConcatAg(SP_GroupConcat&);
|
|
virtual ~GroupConcatAg();
|
|
|
|
virtual void initialize(){};
|
|
virtual void processRow(const rowgroup::Row&){};
|
|
virtual void merge(const rowgroup::Row&, uint64_t){};
|
|
|
|
uint8_t* getResult()
|
|
{
|
|
return nullptr;
|
|
}
|
|
|
|
protected:
|
|
rowgroup::SP_GroupConcat fGroupConcat;
|
|
};
|
|
|
|
typedef boost::shared_ptr<GroupConcatAg> SP_GroupConcatAg;
|
|
|
|
//------------------------------------------------------------------------------
|
|
/** @brief Class that aggregates RowGroups.
|
|
*/
|
|
//------------------------------------------------------------------------------
|
|
class RowAggregation : public messageqcpp::Serializeable
|
|
{
|
|
public:
|
|
/** @brief RowAggregation default constructor
|
|
*
|
|
* @param rowAggGroupByCols(in) specify GroupBy columns and their
|
|
* mapping from input to output. If vector is empty, then all the
|
|
* rows will be aggregated into a single implied group. Order is
|
|
* important here. The primary GroupBy column should be first, the
|
|
* secondary GroupBy column should be second, etc.
|
|
* @param rowAggFunctionCols(in) specify function columns and their
|
|
* mapping from input to output.
|
|
*/
|
|
RowAggregation();
|
|
RowAggregation(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
|
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
|
joblist::ResourceManager* rm = nullptr, boost::shared_ptr<int64_t> sessMemLimit = {}, bool withRollup = false);
|
|
RowAggregation(const RowAggregation& rhs);
|
|
|
|
/** @brief RowAggregation default destructor
|
|
*/
|
|
~RowAggregation() override;
|
|
|
|
/** @brief clone this object for multi-thread use
|
|
*/
|
|
inline virtual RowAggregation* clone() const
|
|
{
|
|
return new RowAggregation(*this);
|
|
}
|
|
|
|
/** @brief Denotes end of data insertion following multiple calls to addRowGroup().
|
|
*/
|
|
virtual void endOfInput();
|
|
|
|
/** @brief reset RowAggregation outputRowGroup and hashMap
|
|
*/
|
|
virtual void aggReset();
|
|
|
|
/** @brief Define content of data to be aggregated and its aggregated output.
|
|
*
|
|
* @param pRowGroupIn(in) contains definition of the input data.
|
|
* @param pRowGroupOut(out) contains definition of the output data.
|
|
*/
|
|
virtual void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut)
|
|
{
|
|
fRowGroupIn = pRowGroupIn;
|
|
fRowGroupOut = pRowGroupOut;
|
|
initialize();
|
|
}
|
|
|
|
void clearRollup() { fRollupFlag = false; }
|
|
|
|
bool hasRollup() const { return fRollupFlag; }
|
|
|
|
/** @brief Define content of data to be joined
|
|
*
|
|
* This method must be call after setInputOutput() for PM hashjoin case.
|
|
*
|
|
* @param pSmallSideRG(in) contains definition of the small side data.
|
|
* @param pLargeSideRG(in) contains definition of the large side data.
|
|
*/
|
|
void setJoinRowGroups(std::vector<RowGroup>* pSmallSideRG, RowGroup* pLargeSideRG);
|
|
|
|
/** @brief Returns group by column vector
|
|
*
|
|
* This function is used to duplicate the RowAggregation object
|
|
*
|
|
* @returns a reference of the group by vector
|
|
*/
|
|
std::vector<SP_ROWAGG_GRPBY_t>& getGroupByCols()
|
|
{
|
|
return fGroupByCols;
|
|
}
|
|
|
|
/** @brief Returns aggregate function vector
|
|
*
|
|
* This function is used to duplicate the RowAggregation object
|
|
*
|
|
* @returns a reference of the aggregation function vector
|
|
*/
|
|
std::vector<SP_ROWAGG_FUNC_t>& getAggFunctions()
|
|
{
|
|
return fFunctionCols;
|
|
}
|
|
|
|
/** @brief Add a group of rows to be aggregated.
|
|
*
|
|
* This function can be called to iteratively add RowGroups for aggregation.
|
|
*
|
|
* @parm pRowGroupIn(in) RowGroup to be added to aggregation.
|
|
*/
|
|
virtual void addRowGroup(const RowGroup* pRowGroupIn);
|
|
virtual void addRowGroup(const RowGroup* pRowGroupIn,
|
|
std::vector<std::pair<Row::Pointer, uint64_t>>& inRows);
|
|
|
|
/** @brief Serialize RowAggregation object into a ByteStream.
|
|
*
|
|
* @parm bs(out) BytesStream that is to be written to.
|
|
*/
|
|
void serialize(messageqcpp::ByteStream& bs) const override;
|
|
|
|
/** @brief Unserialize RowAggregation object from a ByteStream.
|
|
*
|
|
* @parm bs(in) BytesStream that is to be read from.
|
|
*/
|
|
void deserialize(messageqcpp::ByteStream& bs) override;
|
|
|
|
/** @brief load result set into byte stream
|
|
*
|
|
* @parm bs(out) BytesStream that is to be written to.
|
|
*/
|
|
void loadResult(messageqcpp::ByteStream& bs);
|
|
void loadEmptySet(messageqcpp::ByteStream& bs);
|
|
|
|
/** @brief get output rowgroup
|
|
*
|
|
* @returns a const pointer of the output rowgroup
|
|
*/
|
|
const RowGroup* getOutputRowGroup() const
|
|
{
|
|
return fRowGroupOut;
|
|
}
|
|
RowGroup* getOutputRowGroup()
|
|
{
|
|
return fRowGroupOut;
|
|
}
|
|
|
|
void append(RowAggregation* other);
|
|
|
|
virtual void aggregateRow(Row& row, const uint64_t* hash = nullptr,
|
|
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr);
|
|
inline uint32_t aggMapKeyLength() const
|
|
{
|
|
return fAggMapKeyCount;
|
|
}
|
|
|
|
inline void timeZone(long timeZone)
|
|
{
|
|
fTimeZone = timeZone;
|
|
}
|
|
inline long timeZone() const
|
|
{
|
|
return fTimeZone;
|
|
}
|
|
inline std::vector<mcsv1sdk::mcsv1Context>* rgContextColl()
|
|
{
|
|
return &fRGContextColl;
|
|
}
|
|
|
|
void finalAggregation()
|
|
{
|
|
return fRowAggStorage->finalize([this](Row& row) { mergeEntries(row); }, fRow);
|
|
}
|
|
|
|
std::unique_ptr<RGData> moveCurrentRGData()
|
|
{
|
|
return std::move(fCurRGData);
|
|
}
|
|
|
|
protected:
|
|
virtual void initialize(bool hasGroupConcat = false);
|
|
virtual void initMapData(const Row& row);
|
|
virtual void attachGroupConcatAg();
|
|
|
|
virtual void updateEntry(const Row& row, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr);
|
|
void mergeEntries(const Row& row);
|
|
virtual void doMinMax(const Row&, int64_t, int64_t, int);
|
|
virtual void doSelectSome(const Row& rowIn, int64_t colIn, int64_t colOut);
|
|
virtual void doSum(const Row&, int64_t, int64_t, int);
|
|
virtual void doAvg(const Row&, int64_t, int64_t, int64_t, bool merge = false);
|
|
virtual void doStatistics(const Row&, int64_t, int64_t, int64_t);
|
|
void mergeStatistics(const Row&, uint64_t colOut, uint64_t colAux);
|
|
virtual void doBitOp(const Row&, int64_t, int64_t, int);
|
|
virtual void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx,
|
|
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr);
|
|
virtual bool countSpecial(const RowGroup* pRG)
|
|
{
|
|
fRow.setUintField<8>(fRow.getUintField<8>(0) + pRG->getRowCount(), 0);
|
|
return true;
|
|
}
|
|
|
|
void resetUDAF(RowUDAFFunctionCol* rowUDAF);
|
|
void resetUDAF(RowUDAFFunctionCol* rowUDAF, uint64_t funcColIdx);
|
|
|
|
inline bool isNull(const RowGroup* pRowGroup, const Row& row, int64_t col);
|
|
inline void makeAggFieldsNull(Row& row);
|
|
inline void copyNullRow(Row& row)
|
|
{
|
|
copyRow(fNullRow, &row);
|
|
}
|
|
|
|
inline void updateIntMinMax(int128_t val1, int128_t val2, int64_t col, int func);
|
|
inline void updateIntMinMax(int64_t val1, int64_t val2, int64_t col, int func);
|
|
inline void updateUintMinMax(uint64_t val1, uint64_t val2, int64_t col, int func);
|
|
inline void updateCharMinMax(uint64_t val1, uint64_t val2, int64_t col, int func);
|
|
inline void updateDoubleMinMax(double val1, double val2, int64_t col, int func);
|
|
inline void updateLongDoubleMinMax(long double val1, long double val2, int64_t col, int func);
|
|
inline void updateFloatMinMax(float val1, float val2, int64_t col, int func);
|
|
inline void updateStringMinMax(utils::NullString val1, utils::NullString val2, int64_t col, int func);
|
|
std::vector<SP_ROWAGG_GRPBY_t> fGroupByCols;
|
|
std::vector<SP_ROWAGG_FUNC_t> fFunctionCols;
|
|
uint32_t fAggMapKeyCount; // the number of columns that make up the key
|
|
RowGroup fRowGroupIn;
|
|
RowGroup* fRowGroupOut;
|
|
|
|
// for when the group by & distinct keys are not stored in the output rows
|
|
rowgroup::RowGroup fKeyRG;
|
|
|
|
Row fRow;
|
|
Row fNullRow;
|
|
Row* tmpRow; // used by the hashers & eq functors
|
|
boost::scoped_array<uint8_t> fNullRowData;
|
|
rowgroup::RGData fNullRowRGData;
|
|
rowgroup::RowGroup fNullRowGroup;
|
|
|
|
std::unique_ptr<RowAggStorage> fRowAggStorage;
|
|
|
|
// for support PM aggregation after PM hashjoin
|
|
std::vector<RowGroup>* fSmallSideRGs;
|
|
RowGroup* fLargeSideRG;
|
|
std::shared_ptr<std::shared_ptr<int[]>[]> fSmallMappings;
|
|
std::shared_ptr<int[]> fLargeMapping;
|
|
uint32_t fSmallSideCount;
|
|
boost::scoped_array<Row> rowSmalls;
|
|
|
|
// for 8k poc
|
|
RowGroup fEmptyRowGroup;
|
|
RGData fEmptyRowData;
|
|
Row fEmptyRow;
|
|
|
|
bool fKeyOnHeap = false;
|
|
|
|
long fTimeZone;
|
|
|
|
// We need a separate copy for each thread.
|
|
mcsv1sdk::mcsv1Context fRGContext;
|
|
std::vector<mcsv1sdk::mcsv1Context> fRGContextColl;
|
|
|
|
// These are handy for testing the actual type of static_any for UDAF
|
|
static const static_any::any& charTypeId;
|
|
static const static_any::any& scharTypeId;
|
|
static const static_any::any& shortTypeId;
|
|
static const static_any::any& intTypeId;
|
|
static const static_any::any& longTypeId;
|
|
static const static_any::any& llTypeId;
|
|
static const static_any::any& int128TypeId;
|
|
static const static_any::any& ucharTypeId;
|
|
static const static_any::any& ushortTypeId;
|
|
static const static_any::any& uintTypeId;
|
|
static const static_any::any& ulongTypeId;
|
|
static const static_any::any& ullTypeId;
|
|
static const static_any::any& floatTypeId;
|
|
static const static_any::any& doubleTypeId;
|
|
static const static_any::any& longdoubleTypeId;
|
|
static const static_any::any& strTypeId;
|
|
|
|
// For UDAF along with with multiple distinct columns
|
|
std::vector<SP_ROWAGG_FUNC_t>* fOrigFunctionCols;
|
|
|
|
joblist::ResourceManager* fRm = nullptr;
|
|
boost::shared_ptr<int64_t> fSessionMemLimit;
|
|
std::unique_ptr<RGData> fCurRGData;
|
|
bool fRollupFlag = false;
|
|
|
|
std::string fTmpDir = config::Config::makeConfig()->getTempFileDir(config::Config::TempDirPurpose::Aggregates);
|
|
std::string fCompStr = config::Config::makeConfig()->getConfig("RowAggregation", "Compression");
|
|
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
/** @brief derived Class that aggregates multi-rowgroups on UM
|
|
* One-phase case: aggregate from projected RG to final aggregated RG.
|
|
*/
|
|
//------------------------------------------------------------------------------
|
|
class RowAggregationUM : public RowAggregation
|
|
{
|
|
public:
|
|
/** @brief RowAggregationUM constructor
|
|
*/
|
|
RowAggregationUM()
|
|
{
|
|
}
|
|
RowAggregationUM(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
|
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols, joblist::ResourceManager*,
|
|
boost::shared_ptr<int64_t> sessionMemLimit, bool withRollup);
|
|
RowAggregationUM(const RowAggregationUM& rhs);
|
|
|
|
/** @brief RowAggregationUM default destructor
|
|
*/
|
|
~RowAggregationUM() override;
|
|
|
|
/** @brief Denotes end of data insertion following multiple calls to addRowGroup().
|
|
*/
|
|
void endOfInput() override;
|
|
|
|
/** @brief Finializes the result set before sending back to the front end.
|
|
*/
|
|
void finalize();
|
|
|
|
/** @brief Returns aggregated rows in a RowGroup.
|
|
*
|
|
* This function should be called repeatedly until false is returned (meaning end of data).
|
|
*
|
|
* @returns true if more data, else false if no more data.
|
|
*/
|
|
bool nextRowGroup();
|
|
|
|
/** @brief Returns aggregated rows in a RowGroup as long as there are still not returned result RowGroups.
|
|
*
|
|
* This function should be called repeatedly until false is returned (meaning end of data).
|
|
* Returns data from in-memory storage, as well as spilled data from disk. If disk-based aggregation is
|
|
* happening, finalAggregation() should be called before returning result RowGroups to finalize the used
|
|
* RowAggStorages, merge different spilled generations and obtain correct aggregation results.
|
|
*
|
|
* @returns True if there are more result RowGroups, else false if all results have been returned.
|
|
*/
|
|
bool nextOutputRowGroup();
|
|
|
|
/** @brief Add an aggregator for DISTINCT aggregation
|
|
*/
|
|
void distinctAggregator(const boost::shared_ptr<RowAggregation>& da)
|
|
{
|
|
fDistinctAggregator = da;
|
|
}
|
|
|
|
/** @brief expressions to be evaluated after aggregation
|
|
*/
|
|
void expression(const std::vector<execplan::SRCP>& exp)
|
|
{
|
|
fExpression = exp;
|
|
}
|
|
const std::vector<execplan::SRCP>& expression()
|
|
{
|
|
return fExpression;
|
|
}
|
|
|
|
// for multi threaded
|
|
joblist::ResourceManager* getRm()
|
|
{
|
|
return fRm;
|
|
}
|
|
inline RowAggregationUM* clone() const override
|
|
{
|
|
return new RowAggregationUM(*this);
|
|
}
|
|
|
|
/** @brief access the aggregate(constant) columns
|
|
*/
|
|
void constantAggregate(const std::vector<ConstantAggData>& v)
|
|
{
|
|
fConstantAggregate = v;
|
|
}
|
|
const std::vector<ConstantAggData>& constantAggregate() const
|
|
{
|
|
return fConstantAggregate;
|
|
}
|
|
|
|
/** @brief access the group_concat
|
|
*/
|
|
void groupConcat(const std::vector<SP_GroupConcat>& v)
|
|
{
|
|
fGroupConcat = v;
|
|
}
|
|
const std::vector<SP_GroupConcat>& groupConcat() const
|
|
{
|
|
return fGroupConcat;
|
|
}
|
|
|
|
void aggReset() override;
|
|
|
|
void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) override;
|
|
|
|
protected:
|
|
// virtual methods from base
|
|
void initialize(bool hasGroupConcat = false) override;
|
|
|
|
void attachGroupConcatAg() override;
|
|
void updateEntry(const Row& row, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
|
|
bool countSpecial(const RowGroup* pRG) override
|
|
{
|
|
fRow.setIntField<8>(fRow.getIntField<8>(fFunctionCols[0]->fOutputColumnIndex) + pRG->getRowCount(),
|
|
fFunctionCols[0]->fOutputColumnIndex);
|
|
return true;
|
|
}
|
|
|
|
// calculate the average after all rows received. UM only function.
|
|
void calculateAvgColumns();
|
|
|
|
// calculate the statistics function all rows received. UM only function.
|
|
void calculateStatisticsFunctions();
|
|
|
|
// Sets the value from valOut into column colOut, performing any conversions.
|
|
void SetUDAFValue(static_any::any& valOut, int64_t colOut);
|
|
|
|
// If the datatype returned by evaluate isn't what we expect, convert.
|
|
void SetUDAFAnyValue(static_any::any& valOut, int64_t colOut);
|
|
|
|
// calculate the UDAF function all rows received. UM only function.
|
|
void calculateUDAFColumns();
|
|
|
|
// fix duplicates. UM only function.
|
|
void fixDuplicates(RowAggFunctionType funct);
|
|
|
|
// evaluate expressions
|
|
virtual void evaluateExpression();
|
|
|
|
// fix the aggregate(constant)
|
|
virtual void fixConstantAggregate();
|
|
virtual void doNullConstantAggregate(const ConstantAggData&, uint64_t);
|
|
virtual void doNotNullConstantAggregate(const ConstantAggData&, uint64_t);
|
|
|
|
// @bug3362, group_concat
|
|
virtual void doGroupConcat(const Row&, int64_t, int64_t);
|
|
virtual void doJsonAgg(const Row&, int64_t, int64_t);
|
|
virtual void setGroupConcatString();
|
|
|
|
bool fHasAvg;
|
|
bool fHasStatsFunc;
|
|
bool fHasUDAF;
|
|
|
|
boost::shared_ptr<RowAggregation> fDistinctAggregator;
|
|
|
|
// for function on aggregation
|
|
std::vector<execplan::SRCP> fExpression;
|
|
|
|
/* Derived classes that use a lot of memory need to update totalMemUsage and request
|
|
* the memory from rm in that order. */
|
|
uint64_t fTotalMemUsage;
|
|
|
|
// @bug3475, aggregate(constant), sum(0), count(null), etc
|
|
std::vector<ConstantAggData> fConstantAggregate;
|
|
|
|
// @bug3362, group_concat
|
|
std::vector<SP_GroupConcat> fGroupConcat;
|
|
std::vector<SP_GroupConcatAg> fGroupConcatAg;
|
|
std::vector<SP_ROWAGG_FUNC_t> fFunctionColGc;
|
|
|
|
private:
|
|
uint64_t fLastMemUsage;
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
/** @brief derived Class that aggregates PM partially aggregated RowGroups on UM
|
|
* Two-phase case:
|
|
* phase 1 - aggregate from projected RG to partial aggregated RG on PM
|
|
* The base RowAggregation handles the 1st phase.
|
|
* phase 2 - aggregate from partially aggregated RG to final RG on UM
|
|
* This class handles the 2nd phase.
|
|
*/
|
|
//------------------------------------------------------------------------------
|
|
class RowAggregationUMP2 : public RowAggregationUM
|
|
{
|
|
public:
|
|
/** @brief RowAggregationUM constructor
|
|
*/
|
|
RowAggregationUMP2()
|
|
{
|
|
}
|
|
RowAggregationUMP2(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
|
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols, joblist::ResourceManager*,
|
|
boost::shared_ptr<int64_t> sessionMemLimit, bool withRollup);
|
|
RowAggregationUMP2(const RowAggregationUMP2& rhs);
|
|
|
|
/** @brief RowAggregationUMP2 default destructor
|
|
*/
|
|
~RowAggregationUMP2() override;
|
|
inline RowAggregationUMP2* clone() const override
|
|
{
|
|
return new RowAggregationUMP2(*this);
|
|
}
|
|
|
|
protected:
|
|
// virtual methods from base
|
|
void updateEntry(const Row& row, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
|
|
void doAvg(const Row&, int64_t, int64_t, int64_t, bool merge = false) override;
|
|
void doStatistics(const Row&, int64_t, int64_t, int64_t) override;
|
|
void doGroupConcat(const Row&, int64_t, int64_t) override;
|
|
void doJsonAgg(const Row&, int64_t, int64_t) override;
|
|
void doBitOp(const Row&, int64_t, int64_t, int) override;
|
|
void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx,
|
|
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
|
|
bool countSpecial(const RowGroup* pRG) override
|
|
{
|
|
return false;
|
|
}
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
/** @brief derived Class that aggregates on distinct columns on UM
|
|
* The internal aggregator will handle one or two phases aggregation
|
|
*/
|
|
//------------------------------------------------------------------------------
|
|
class RowAggregationDistinct : public RowAggregationUMP2
|
|
{
|
|
public:
|
|
/** @brief RowAggregationDistinct constructor
|
|
*/
|
|
RowAggregationDistinct()
|
|
{
|
|
}
|
|
RowAggregationDistinct(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
|
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols, joblist::ResourceManager*,
|
|
boost::shared_ptr<int64_t> sessionMemLimit);
|
|
|
|
/** @brief Copy Constructor for multi-threaded aggregation
|
|
*/
|
|
RowAggregationDistinct(const RowAggregationDistinct& rhs);
|
|
|
|
/** @brief RowAggregationDistinct default destructor
|
|
*/
|
|
~RowAggregationDistinct() override;
|
|
|
|
/** @brief Add an aggregator for pre-DISTINCT aggregation
|
|
*/
|
|
void addAggregator(const boost::shared_ptr<RowAggregation>& agg, const RowGroup& rg);
|
|
|
|
void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) override;
|
|
|
|
virtual void doDistinctAggregation();
|
|
virtual void doDistinctAggregation_rowVec(std::vector<std::pair<Row::Pointer, uint64_t>>& inRows);
|
|
void addRowGroup(const RowGroup* pRowGroupIn) override;
|
|
void addRowGroup(const RowGroup* pRowGroupIn,
|
|
std::vector<std::pair<Row::Pointer, uint64_t>>& inRows) override;
|
|
|
|
// multi-threade debug
|
|
boost::shared_ptr<RowAggregation>& aggregator()
|
|
{
|
|
return fAggregator;
|
|
}
|
|
void aggregator(boost::shared_ptr<RowAggregation> aggregator)
|
|
{
|
|
fAggregator = std::move(aggregator);
|
|
}
|
|
RowGroup& rowGroupDist()
|
|
{
|
|
return fRowGroupDist;
|
|
}
|
|
void rowGroupDist(RowGroup& rowGroupDist)
|
|
{
|
|
fRowGroupDist = rowGroupDist;
|
|
}
|
|
inline RowAggregationDistinct* clone() const override
|
|
{
|
|
return new RowAggregationDistinct(*this);
|
|
}
|
|
|
|
protected:
|
|
void updateEntry(const Row& row, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
|
|
|
|
boost::shared_ptr<RowAggregation> fAggregator;
|
|
RowGroup fRowGroupDist;
|
|
RGData fDataForDist;
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
/** @brief derived Class for aggregates multiple columns with distinct key word
|
|
* Get distinct values of the column per group by entry
|
|
*/
|
|
//------------------------------------------------------------------------------
|
|
class RowAggregationSubDistinct : public RowAggregationUM
|
|
{
|
|
public:
|
|
/** @brief RowAggregationSubDistinct constructor
|
|
*/
|
|
RowAggregationSubDistinct()
|
|
{
|
|
}
|
|
RowAggregationSubDistinct(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
|
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
|
joblist::ResourceManager*, boost::shared_ptr<int64_t> sessionMemLimit);
|
|
RowAggregationSubDistinct(const RowAggregationSubDistinct& rhs);
|
|
|
|
/** @brief RowAggregationSubDistinct default destructor
|
|
*/
|
|
~RowAggregationSubDistinct() override;
|
|
|
|
void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) override;
|
|
void addRowGroup(const RowGroup* pRowGroupIn) override;
|
|
inline RowAggregationSubDistinct* clone() const override
|
|
{
|
|
return new RowAggregationSubDistinct(*this);
|
|
}
|
|
|
|
void addRowGroup(const RowGroup* pRowGroupIn,
|
|
std::vector<std::pair<Row::Pointer, uint64_t>>& inRow) override;
|
|
|
|
protected:
|
|
// virtual methods from RowAggregationUM
|
|
void doGroupConcat(const Row&, int64_t, int64_t) override;
|
|
void doJsonAgg(const Row&, int64_t, int64_t) override;
|
|
// for groupby columns and the aggregated distinct column
|
|
Row fDistRow;
|
|
boost::scoped_array<uint8_t> fDistRowData;
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
/** @brief derived Class that aggregates multiple columns with distinct key word
|
|
* Each distinct column will have its own aggregator
|
|
*/
|
|
//------------------------------------------------------------------------------
|
|
class RowAggregationMultiDistinct : public RowAggregationDistinct
|
|
{
|
|
public:
|
|
/** @brief RowAggregationMultiDistinct constructor
|
|
*/
|
|
RowAggregationMultiDistinct()
|
|
{
|
|
}
|
|
RowAggregationMultiDistinct(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
|
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
|
joblist::ResourceManager*, boost::shared_ptr<int64_t> sessionMemLimit);
|
|
RowAggregationMultiDistinct(const RowAggregationMultiDistinct& rhs);
|
|
|
|
/** @brief RowAggregationMultiDistinct default destructor
|
|
*/
|
|
~RowAggregationMultiDistinct() override;
|
|
|
|
/** @brief Add sub aggregators
|
|
*/
|
|
void addSubAggregator(const boost::shared_ptr<RowAggregationUM>& agg, const RowGroup& rg,
|
|
const std::vector<SP_ROWAGG_FUNC_t>& funct);
|
|
|
|
void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) override;
|
|
using RowAggregationDistinct::addRowGroup;
|
|
void addRowGroup(const RowGroup* pRowGroupIn) override;
|
|
|
|
using RowAggregationDistinct::doDistinctAggregation;
|
|
void doDistinctAggregation() override;
|
|
using RowAggregationDistinct::doDistinctAggregation_rowVec;
|
|
virtual void doDistinctAggregation_rowVec(
|
|
std::vector<std::vector<std::pair<Row::Pointer, uint64_t>>>& inRows);
|
|
|
|
inline RowAggregationMultiDistinct* clone() const override
|
|
{
|
|
return new RowAggregationMultiDistinct(*this);
|
|
}
|
|
|
|
void addRowGroup(const RowGroup* pRowGroupIn,
|
|
std::vector<std::vector<std::pair<Row::Pointer, uint64_t>>>& inRows);
|
|
|
|
std::vector<boost::shared_ptr<RowAggregationUM>>& subAggregators()
|
|
{
|
|
return fSubAggregators;
|
|
}
|
|
|
|
void subAggregators(std::vector<boost::shared_ptr<RowAggregationUM>>& subAggregators)
|
|
{
|
|
fSubAggregators = subAggregators;
|
|
}
|
|
|
|
protected:
|
|
// virtual methods from base
|
|
std::vector<boost::shared_ptr<RowAggregationUM>> fSubAggregators;
|
|
std::vector<RowGroup> fSubRowGroups;
|
|
std::vector<boost::shared_ptr<RGData>> fSubRowData;
|
|
std::vector<std::vector<SP_ROWAGG_FUNC_t>> fSubFunctions;
|
|
};
|
|
|
|
typedef boost::shared_ptr<RowAggregation> SP_ROWAGG_t;
|
|
typedef boost::shared_ptr<RowAggregation> SP_ROWAGG_PM_t;
|
|
typedef boost::shared_ptr<RowAggregationUM> SP_ROWAGG_UM_t;
|
|
typedef boost::shared_ptr<RowAggregationDistinct> SP_ROWAGG_DIST;
|
|
|
|
} // namespace rowgroup
|