/* Copyright (C) 2014 InfiniDB, Inc. Copyright (c) 2019 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #pragma once /** @file rowaggregation.h * Classes in this file are used to aggregate Rows in RowGroups. * RowAggregation is the class that performs the aggregation. * RowAggGroupByCol and RowAggFunctionCol are support classes used to describe * the columns involved in the aggregation. * @endcode */ #include #include #include #include #include #include #include #include #include #include "serializeable.h" #include "bytestream.h" #include "rowgroup.h" #include "hasher.h" #include "stlpoolallocator.h" #include "returnedcolumn.h" #include "mcsv1_udaf.h" #include "constantcolumn.h" #include "resourcemanager.h" #include "rowstorage.h" #include "nullstring.h" // To do: move code that depends on joblist to a proper subsystem. namespace joblist { class ResourceManager; } namespace rowgroup { /** @brief Enumerates aggregate functions supported by RowAggregation */ enum RowAggFunctionType { ROWAGG_FUNCT_UNDEFINE, // default ROWAGG_COUNT_ASTERISK, // COUNT(*) counts all rows including nulls ROWAGG_COUNT_COL_NAME, // COUNT(column_name) only counts non-null rows ROWAGG_SUM, ROWAGG_AVG, ROWAGG_MIN, ROWAGG_MAX, // Statistics Function, ROWAGG_STATS is the generic name. ROWAGG_STATS, ROWAGG_STDDEV_POP, ROWAGG_STDDEV_SAMP, ROWAGG_VAR_POP, ROWAGG_VAR_SAMP, // BIT Function, ROWAGG_BIT_OP is the generic name. ROWAGG_BIT_OP, ROWAGG_BIT_AND, ROWAGG_BIT_OR, ROWAGG_BIT_XOR, // GROUP_CONCAT ROWAGG_GROUP_CONCAT, ROWAGG_JSON_ARRAY, // DISTINCT: performed on UM only ROWAGG_COUNT_DISTINCT_COL_NAME, // COUNT(distinct column_name) only counts non-null rows ROWAGG_DISTINCT_SUM, ROWAGG_DISTINCT_AVG, // Constant ROWAGG_CONSTANT, // User Defined Aggregate Function ROWAGG_UDAF, // If an Aggregate has more than one parameter, this will be used for parameters after the first ROWAGG_MULTI_PARM, // internal function type to avoid duplicate the work // handling ROWAGG_COUNT_NO_OP, ROWAGG_DUP_FUNCT and ROWAGG_DUP_AVG is a little different // ROWAGG_COUNT_NO_OP : count done by AVG, no need to copy // ROWAGG_DUP_FUNCT : copy data before AVG calculation, because SUM may share by AVG // ROWAGG_DUP_AVG : copy data after AVG calculation ROWAGG_COUNT_NO_OP, // COUNT(column_name), but leave count() to AVG ROWAGG_DUP_FUNCT, // duplicate aggregate Function(), except AVG and UDAF, in select ROWAGG_DUP_AVG, // duplicate AVG(column_name) in select ROWAGG_DUP_STATS, // duplicate statistics functions in select ROWAGG_DUP_UDAF, // duplicate UDAF function in select // a dummy "select some" aggregate needed for non-group-by values in SELECT's with GROUP BY's ROWAGG_SELECT_SOME }; //------------------------------------------------------------------------------ /** @brief Specifies a column in a RowGroup that is part of the aggregation * "GROUP BY" clause. */ //------------------------------------------------------------------------------ struct RowAggGroupByCol { /** @brief RowAggGroupByCol constructor * * @param inputColIndex(in) column index into input row * @param outputColIndex(in) column index into output row * outputColIndex argument should be omitted if this GroupBy * column is not to be included in the output. */ explicit RowAggGroupByCol(int32_t inputColIndex, int32_t outputColIndex = -1) : fInputColumnIndex(inputColIndex), fOutputColumnIndex(outputColIndex) { } ~RowAggGroupByCol() = default; uint32_t fInputColumnIndex; uint32_t fOutputColumnIndex; }; inline messageqcpp::ByteStream& operator<<(messageqcpp::ByteStream& b, RowAggGroupByCol& o) { return (b << o.fInputColumnIndex << o.fOutputColumnIndex); } inline messageqcpp::ByteStream& operator>>(messageqcpp::ByteStream& b, RowAggGroupByCol& o) { return (b >> o.fInputColumnIndex >> o.fOutputColumnIndex); } //------------------------------------------------------------------------------ /** @brief Specifies a column in a RowGroup that is to be aggregated, and what * aggregation function is to be performed. * * If a column is aggregated more than once(ex: SELECT MIN(l_shipdate), * MAX(l_shipdate)...), then 2 RowAggFunctionCol objects should be created * with the same inputColIndex, one for the MIN function, and one for the * MAX function. */ //------------------------------------------------------------------------------ struct RowAggFunctionCol { /** @brief RowAggFunctionCol constructor * * @param aggFunction(in) aggregation function to be performed * @param inputColIndex(in) column index into input row * @param outputColIndex(in) column index into output row * @param auxColIndex(in) auxiliary index into output row for avg/count * @param stats(in) real statistics function where generic name in aggFunction */ RowAggFunctionCol(RowAggFunctionType aggFunction, RowAggFunctionType stats, int32_t inputColIndex, int32_t outputColIndex, int32_t auxColIndex = -1) : fAggFunction(aggFunction) , fStatsFunction(stats) , fInputColumnIndex(inputColIndex) , fOutputColumnIndex(outputColIndex) , fAuxColumnIndex(auxColIndex) , hasMultiParm(false) { } virtual ~RowAggFunctionCol() = default; virtual void serialize(messageqcpp::ByteStream& bs) const; virtual void deserialize(messageqcpp::ByteStream& bs); RowAggFunctionType fAggFunction; // aggregate function // statistics function stores ROWAGG_STATS in fAggFunction and real function in fStatsFunction RowAggFunctionType fStatsFunction; uint32_t fInputColumnIndex; uint32_t fOutputColumnIndex; // fAuxColumnIndex is used in 4 cases: // 1. for AVG - point to the count column, the fInputColumnIndex is for sum // 2. for statistics function - point to sum(x), +1 is sum(x**2) // 3. for UDAF - contain the context user data as binary // 4. for duplicate - point to the real aggretate column to be copied from // Set only on UM, the fAuxColumnIndex is defaulted to fOutputColumnIndex+1 on PM. uint32_t fAuxColumnIndex; // For UDAF that have more than one parameter and some parameters are constant. // There will be a series of RowAggFunctionCol created, one for each parameter. // The first will be a RowUDAFFunctionCol. Subsequent ones will be RowAggFunctionCol // with fAggFunction == ROWAGG_MULTI_PARM. Order is important. // If this parameter is constant, that value is here. execplan::SRCP fpConstCol; bool hasMultiParm; }; struct RowUDAFFunctionCol : public RowAggFunctionCol { RowUDAFFunctionCol(mcsv1sdk::mcsv1Context& context, int32_t inputColIndex, int32_t outputColIndex, int32_t auxColIndex = -1) : RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, inputColIndex, outputColIndex, auxColIndex) , fUDAFContext(context) , bInterrupted(false) { fUDAFContext.setInterrupted(&bInterrupted); } RowUDAFFunctionCol(int32_t inputColIndex, int32_t outputColIndex, int32_t auxColIndex = -1) : RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, inputColIndex, outputColIndex, auxColIndex) , bInterrupted(false) { } RowUDAFFunctionCol(const RowUDAFFunctionCol& rhs) : RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, rhs.fInputColumnIndex, rhs.fOutputColumnIndex, rhs.fAuxColumnIndex) , fUDAFContext(rhs.fUDAFContext) , bInterrupted(false) { } ~RowUDAFFunctionCol() override = default; void serialize(messageqcpp::ByteStream& bs) const override; void deserialize(messageqcpp::ByteStream& bs) override; mcsv1sdk::mcsv1Context fUDAFContext; // The UDAF context bool bInterrupted; // Shared by all the threads }; inline void RowAggFunctionCol::serialize(messageqcpp::ByteStream& bs) const { bs << (uint8_t)fAggFunction; bs << fInputColumnIndex; bs << fOutputColumnIndex; if (fpConstCol) { bs << (uint8_t)1; fpConstCol.get()->serialize(bs); } else { bs << (uint8_t)0; } } inline void RowAggFunctionCol::deserialize(messageqcpp::ByteStream& bs) { bs >> (uint8_t&)fAggFunction; bs >> fInputColumnIndex; bs >> fOutputColumnIndex; uint8_t t; bs >> t; if (t) { fpConstCol.reset(new execplan::ConstantColumn); fpConstCol.get()->unserialize(bs); } } inline void RowUDAFFunctionCol::serialize(messageqcpp::ByteStream& bs) const { RowAggFunctionCol::serialize(bs); fUDAFContext.serialize(bs); } inline void RowUDAFFunctionCol::deserialize(messageqcpp::ByteStream& bs) { // This deserialize is called when the function gets to PrimProc. // reset is called because we're starting a new sub-evaluate cycle. RowAggFunctionCol::deserialize(bs); fUDAFContext.unserialize(bs); fUDAFContext.setInterrupted(&bInterrupted); mcsv1sdk::mcsv1_UDAF::ReturnCode rc; rc = fUDAFContext.getFunction()->reset(&fUDAFContext); if (rc == mcsv1sdk::mcsv1_UDAF::ERROR) { bInterrupted = true; throw logging::QueryDataExcept(fUDAFContext.getErrorMessage(), logging::aggregateFuncErr); } } struct ConstantAggData { utils::NullString fConstValue; std::string fUDAFName; // If a UDAF is called with constant. RowAggFunctionType fOp; ConstantAggData() : fOp(ROWAGG_FUNCT_UNDEFINE) { } ConstantAggData(utils::NullString v, RowAggFunctionType f, bool n) : fConstValue(v), fOp(f) { } ConstantAggData(utils::NullString v, std::string u, RowAggFunctionType f, bool n) : fConstValue(v), fUDAFName(u), fOp(f) { } bool isNull() const { return fConstValue.isNull(); } }; typedef boost::shared_ptr SP_ROWAGG_GRPBY_t; typedef boost::shared_ptr SP_ROWAGG_FUNC_t; struct GroupConcat { // GROUP_CONCAT(DISTINCT col1, 'const', col2 ORDER BY col3 desc SEPARATOR 'sep') std::vector> fGroupCols; // columns to concatenate, and position std::vector> fOrderCols; // columns to order by [asc/desc] std::string fSeparator; std::vector> fConstCols; // constant columns in group bool fDistinct; uint64_t fSize; RowGroup fRowGroup; std::shared_ptr fMapping; std::vector> fOrderCond; // position to order by [asc/desc] joblist::ResourceManager* fRm; // resource manager boost::shared_ptr fSessionMemLimit; long fTimeZone; GroupConcat() : fRm(nullptr) { } }; typedef boost::shared_ptr SP_GroupConcat; class GroupConcatAg { public: explicit GroupConcatAg(SP_GroupConcat&); virtual ~GroupConcatAg(); virtual void initialize(){}; virtual void processRow(const rowgroup::Row&){}; virtual void merge(const rowgroup::Row&, uint64_t){}; uint8_t* getResult() { return nullptr; } protected: rowgroup::SP_GroupConcat fGroupConcat; }; typedef boost::shared_ptr SP_GroupConcatAg; //------------------------------------------------------------------------------ /** @brief Class that aggregates RowGroups. */ //------------------------------------------------------------------------------ class RowAggregation : public messageqcpp::Serializeable { public: /** @brief RowAggregation default constructor * * @param rowAggGroupByCols(in) specify GroupBy columns and their * mapping from input to output. If vector is empty, then all the * rows will be aggregated into a single implied group. Order is * important here. The primary GroupBy column should be first, the * secondary GroupBy column should be second, etc. * @param rowAggFunctionCols(in) specify function columns and their * mapping from input to output. */ RowAggregation(); RowAggregation(const std::vector& rowAggGroupByCols, const std::vector& rowAggFunctionCols, joblist::ResourceManager* rm = nullptr, boost::shared_ptr sessMemLimit = {}, bool withRollup = false); RowAggregation(const RowAggregation& rhs); /** @brief RowAggregation default destructor */ ~RowAggregation() override; /** @brief clone this object for multi-thread use */ inline virtual RowAggregation* clone() const { return new RowAggregation(*this); } /** @brief Denotes end of data insertion following multiple calls to addRowGroup(). */ virtual void endOfInput(); /** @brief reset RowAggregation outputRowGroup and hashMap */ virtual void aggReset(); /** @brief Define content of data to be aggregated and its aggregated output. * * @param pRowGroupIn(in) contains definition of the input data. * @param pRowGroupOut(out) contains definition of the output data. */ virtual void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) { fRowGroupIn = pRowGroupIn; fRowGroupOut = pRowGroupOut; initialize(); } void clearRollup() { fRollupFlag = false; } bool hasRollup() const { return fRollupFlag; } /** @brief Define content of data to be joined * * This method must be call after setInputOutput() for PM hashjoin case. * * @param pSmallSideRG(in) contains definition of the small side data. * @param pLargeSideRG(in) contains definition of the large side data. */ void setJoinRowGroups(std::vector* pSmallSideRG, RowGroup* pLargeSideRG); /** @brief Returns group by column vector * * This function is used to duplicate the RowAggregation object * * @returns a reference of the group by vector */ std::vector& getGroupByCols() { return fGroupByCols; } /** @brief Returns aggregate function vector * * This function is used to duplicate the RowAggregation object * * @returns a reference of the aggregation function vector */ std::vector& getAggFunctions() { return fFunctionCols; } /** @brief Add a group of rows to be aggregated. * * This function can be called to iteratively add RowGroups for aggregation. * * @parm pRowGroupIn(in) RowGroup to be added to aggregation. */ virtual void addRowGroup(const RowGroup* pRowGroupIn); virtual void addRowGroup(const RowGroup* pRowGroupIn, std::vector>& inRows); /** @brief Serialize RowAggregation object into a ByteStream. * * @parm bs(out) BytesStream that is to be written to. */ void serialize(messageqcpp::ByteStream& bs) const override; /** @brief Unserialize RowAggregation object from a ByteStream. * * @parm bs(in) BytesStream that is to be read from. */ void deserialize(messageqcpp::ByteStream& bs) override; /** @brief load result set into byte stream * * @parm bs(out) BytesStream that is to be written to. */ void loadResult(messageqcpp::ByteStream& bs); void loadEmptySet(messageqcpp::ByteStream& bs); /** @brief get output rowgroup * * @returns a const pointer of the output rowgroup */ const RowGroup* getOutputRowGroup() const { return fRowGroupOut; } RowGroup* getOutputRowGroup() { return fRowGroupOut; } void append(RowAggregation* other); virtual void aggregateRow(Row& row, const uint64_t* hash = nullptr, std::vector* rgContextColl = nullptr); inline uint32_t aggMapKeyLength() const { return fAggMapKeyCount; } inline void timeZone(long timeZone) { fTimeZone = timeZone; } inline long timeZone() const { return fTimeZone; } inline std::vector* rgContextColl() { return &fRGContextColl; } void finalAggregation() { return fRowAggStorage->finalize([this](Row& row) { mergeEntries(row); }, fRow); } std::unique_ptr moveCurrentRGData() { return std::move(fCurRGData); } protected: virtual void initialize(bool hasGroupConcat = false); virtual void initMapData(const Row& row); virtual void attachGroupConcatAg(); virtual void updateEntry(const Row& row, std::vector* rgContextColl = nullptr); void mergeEntries(const Row& row); virtual void doMinMax(const Row&, int64_t, int64_t, int); virtual void doSelectSome(const Row& rowIn, int64_t colIn, int64_t colOut); virtual void doSum(const Row&, int64_t, int64_t, int); virtual void doAvg(const Row&, int64_t, int64_t, int64_t, bool merge = false); virtual void doStatistics(const Row&, int64_t, int64_t, int64_t); void mergeStatistics(const Row&, uint64_t colOut, uint64_t colAux); virtual void doBitOp(const Row&, int64_t, int64_t, int); virtual void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx, std::vector* rgContextColl = nullptr); virtual bool countSpecial(const RowGroup* pRG) { fRow.setUintField<8>(fRow.getUintField<8>(0) + pRG->getRowCount(), 0); return true; } void resetUDAF(RowUDAFFunctionCol* rowUDAF); void resetUDAF(RowUDAFFunctionCol* rowUDAF, uint64_t funcColIdx); inline bool isNull(const RowGroup* pRowGroup, const Row& row, int64_t col); inline void makeAggFieldsNull(Row& row); inline void copyNullRow(Row& row) { copyRow(fNullRow, &row); } inline void updateIntMinMax(int128_t val1, int128_t val2, int64_t col, int func); inline void updateIntMinMax(int64_t val1, int64_t val2, int64_t col, int func); inline void updateUintMinMax(uint64_t val1, uint64_t val2, int64_t col, int func); inline void updateCharMinMax(uint64_t val1, uint64_t val2, int64_t col, int func); inline void updateDoubleMinMax(double val1, double val2, int64_t col, int func); inline void updateLongDoubleMinMax(long double val1, long double val2, int64_t col, int func); inline void updateFloatMinMax(float val1, float val2, int64_t col, int func); inline void updateStringMinMax(utils::NullString val1, utils::NullString val2, int64_t col, int func); std::vector fGroupByCols; std::vector fFunctionCols; uint32_t fAggMapKeyCount; // the number of columns that make up the key RowGroup fRowGroupIn; RowGroup* fRowGroupOut; // for when the group by & distinct keys are not stored in the output rows rowgroup::RowGroup fKeyRG; Row fRow; Row fNullRow; Row* tmpRow; // used by the hashers & eq functors boost::scoped_array fNullRowData; rowgroup::RGData fNullRowRGData; rowgroup::RowGroup fNullRowGroup; std::unique_ptr fRowAggStorage; // for support PM aggregation after PM hashjoin std::vector* fSmallSideRGs; RowGroup* fLargeSideRG; std::shared_ptr[]> fSmallMappings; std::shared_ptr fLargeMapping; uint32_t fSmallSideCount; boost::scoped_array rowSmalls; // for 8k poc RowGroup fEmptyRowGroup; RGData fEmptyRowData; Row fEmptyRow; bool fKeyOnHeap = false; long fTimeZone; // We need a separate copy for each thread. mcsv1sdk::mcsv1Context fRGContext; std::vector fRGContextColl; // These are handy for testing the actual type of static_any for UDAF static const static_any::any& charTypeId; static const static_any::any& scharTypeId; static const static_any::any& shortTypeId; static const static_any::any& intTypeId; static const static_any::any& longTypeId; static const static_any::any& llTypeId; static const static_any::any& int128TypeId; static const static_any::any& ucharTypeId; static const static_any::any& ushortTypeId; static const static_any::any& uintTypeId; static const static_any::any& ulongTypeId; static const static_any::any& ullTypeId; static const static_any::any& floatTypeId; static const static_any::any& doubleTypeId; static const static_any::any& longdoubleTypeId; static const static_any::any& strTypeId; // For UDAF along with with multiple distinct columns std::vector* fOrigFunctionCols; joblist::ResourceManager* fRm = nullptr; boost::shared_ptr fSessionMemLimit; std::unique_ptr fCurRGData; bool fRollupFlag = false; std::string fTmpDir = config::Config::makeConfig()->getTempFileDir(config::Config::TempDirPurpose::Aggregates); std::string fCompStr = config::Config::makeConfig()->getConfig("RowAggregation", "Compression"); }; //------------------------------------------------------------------------------ /** @brief derived Class that aggregates multi-rowgroups on UM * One-phase case: aggregate from projected RG to final aggregated RG. */ //------------------------------------------------------------------------------ class RowAggregationUM : public RowAggregation { public: /** @brief RowAggregationUM constructor */ RowAggregationUM() { } RowAggregationUM(const std::vector& rowAggGroupByCols, const std::vector& rowAggFunctionCols, joblist::ResourceManager*, boost::shared_ptr sessionMemLimit, bool withRollup); RowAggregationUM(const RowAggregationUM& rhs); /** @brief RowAggregationUM default destructor */ ~RowAggregationUM() override; /** @brief Denotes end of data insertion following multiple calls to addRowGroup(). */ void endOfInput() override; /** @brief Finializes the result set before sending back to the front end. */ void finalize(); /** @brief Returns aggregated rows in a RowGroup. * * This function should be called repeatedly until false is returned (meaning end of data). * * @returns true if more data, else false if no more data. */ bool nextRowGroup(); /** @brief Returns aggregated rows in a RowGroup as long as there are still not returned result RowGroups. * * This function should be called repeatedly until false is returned (meaning end of data). * Returns data from in-memory storage, as well as spilled data from disk. If disk-based aggregation is * happening, finalAggregation() should be called before returning result RowGroups to finalize the used * RowAggStorages, merge different spilled generations and obtain correct aggregation results. * * @returns True if there are more result RowGroups, else false if all results have been returned. */ bool nextOutputRowGroup(); /** @brief Add an aggregator for DISTINCT aggregation */ void distinctAggregator(const boost::shared_ptr& da) { fDistinctAggregator = da; } /** @brief expressions to be evaluated after aggregation */ void expression(const std::vector& exp) { fExpression = exp; } const std::vector& expression() { return fExpression; } // for multi threaded joblist::ResourceManager* getRm() { return fRm; } inline RowAggregationUM* clone() const override { return new RowAggregationUM(*this); } /** @brief access the aggregate(constant) columns */ void constantAggregate(const std::vector& v) { fConstantAggregate = v; } const std::vector& constantAggregate() const { return fConstantAggregate; } /** @brief access the group_concat */ void groupConcat(const std::vector& v) { fGroupConcat = v; } const std::vector& groupConcat() const { return fGroupConcat; } void aggReset() override; void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) override; protected: // virtual methods from base void initialize(bool hasGroupConcat = false) override; void attachGroupConcatAg() override; void updateEntry(const Row& row, std::vector* rgContextColl = nullptr) override; bool countSpecial(const RowGroup* pRG) override { fRow.setIntField<8>(fRow.getIntField<8>(fFunctionCols[0]->fOutputColumnIndex) + pRG->getRowCount(), fFunctionCols[0]->fOutputColumnIndex); return true; } // calculate the average after all rows received. UM only function. void calculateAvgColumns(); // calculate the statistics function all rows received. UM only function. void calculateStatisticsFunctions(); // Sets the value from valOut into column colOut, performing any conversions. void SetUDAFValue(static_any::any& valOut, int64_t colOut); // If the datatype returned by evaluate isn't what we expect, convert. void SetUDAFAnyValue(static_any::any& valOut, int64_t colOut); // calculate the UDAF function all rows received. UM only function. void calculateUDAFColumns(); // fix duplicates. UM only function. void fixDuplicates(RowAggFunctionType funct); // evaluate expressions virtual void evaluateExpression(); // fix the aggregate(constant) virtual void fixConstantAggregate(); virtual void doNullConstantAggregate(const ConstantAggData&, uint64_t); virtual void doNotNullConstantAggregate(const ConstantAggData&, uint64_t); // @bug3362, group_concat virtual void doGroupConcat(const Row&, int64_t, int64_t); virtual void doJsonAgg(const Row&, int64_t, int64_t); virtual void setGroupConcatString(); bool fHasAvg; bool fHasStatsFunc; bool fHasUDAF; boost::shared_ptr fDistinctAggregator; // for function on aggregation std::vector fExpression; /* Derived classes that use a lot of memory need to update totalMemUsage and request * the memory from rm in that order. */ uint64_t fTotalMemUsage; // @bug3475, aggregate(constant), sum(0), count(null), etc std::vector fConstantAggregate; // @bug3362, group_concat std::vector fGroupConcat; std::vector fGroupConcatAg; std::vector fFunctionColGc; private: uint64_t fLastMemUsage; }; //------------------------------------------------------------------------------ /** @brief derived Class that aggregates PM partially aggregated RowGroups on UM * Two-phase case: * phase 1 - aggregate from projected RG to partial aggregated RG on PM * The base RowAggregation handles the 1st phase. * phase 2 - aggregate from partially aggregated RG to final RG on UM * This class handles the 2nd phase. */ //------------------------------------------------------------------------------ class RowAggregationUMP2 : public RowAggregationUM { public: /** @brief RowAggregationUM constructor */ RowAggregationUMP2() { } RowAggregationUMP2(const std::vector& rowAggGroupByCols, const std::vector& rowAggFunctionCols, joblist::ResourceManager*, boost::shared_ptr sessionMemLimit, bool withRollup); RowAggregationUMP2(const RowAggregationUMP2& rhs); /** @brief RowAggregationUMP2 default destructor */ ~RowAggregationUMP2() override; inline RowAggregationUMP2* clone() const override { return new RowAggregationUMP2(*this); } protected: // virtual methods from base void updateEntry(const Row& row, std::vector* rgContextColl = nullptr) override; void doAvg(const Row&, int64_t, int64_t, int64_t, bool merge = false) override; void doStatistics(const Row&, int64_t, int64_t, int64_t) override; void doGroupConcat(const Row&, int64_t, int64_t) override; void doJsonAgg(const Row&, int64_t, int64_t) override; void doBitOp(const Row&, int64_t, int64_t, int) override; void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx, std::vector* rgContextColl = nullptr) override; bool countSpecial(const RowGroup* pRG) override { return false; } }; //------------------------------------------------------------------------------ /** @brief derived Class that aggregates on distinct columns on UM * The internal aggregator will handle one or two phases aggregation */ //------------------------------------------------------------------------------ class RowAggregationDistinct : public RowAggregationUMP2 { public: /** @brief RowAggregationDistinct constructor */ RowAggregationDistinct() { } RowAggregationDistinct(const std::vector& rowAggGroupByCols, const std::vector& rowAggFunctionCols, joblist::ResourceManager*, boost::shared_ptr sessionMemLimit); /** @brief Copy Constructor for multi-threaded aggregation */ RowAggregationDistinct(const RowAggregationDistinct& rhs); /** @brief RowAggregationDistinct default destructor */ ~RowAggregationDistinct() override; /** @brief Add an aggregator for pre-DISTINCT aggregation */ void addAggregator(const boost::shared_ptr& agg, const RowGroup& rg); void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) override; virtual void doDistinctAggregation(); virtual void doDistinctAggregation_rowVec(std::vector>& inRows); void addRowGroup(const RowGroup* pRowGroupIn) override; void addRowGroup(const RowGroup* pRowGroupIn, std::vector>& inRows) override; // multi-threade debug boost::shared_ptr& aggregator() { return fAggregator; } void aggregator(boost::shared_ptr aggregator) { fAggregator = std::move(aggregator); } RowGroup& rowGroupDist() { return fRowGroupDist; } void rowGroupDist(RowGroup& rowGroupDist) { fRowGroupDist = rowGroupDist; } inline RowAggregationDistinct* clone() const override { return new RowAggregationDistinct(*this); } protected: void updateEntry(const Row& row, std::vector* rgContextColl = nullptr) override; boost::shared_ptr fAggregator; RowGroup fRowGroupDist; RGData fDataForDist; }; //------------------------------------------------------------------------------ /** @brief derived Class for aggregates multiple columns with distinct key word * Get distinct values of the column per group by entry */ //------------------------------------------------------------------------------ class RowAggregationSubDistinct : public RowAggregationUM { public: /** @brief RowAggregationSubDistinct constructor */ RowAggregationSubDistinct() { } RowAggregationSubDistinct(const std::vector& rowAggGroupByCols, const std::vector& rowAggFunctionCols, joblist::ResourceManager*, boost::shared_ptr sessionMemLimit); RowAggregationSubDistinct(const RowAggregationSubDistinct& rhs); /** @brief RowAggregationSubDistinct default destructor */ ~RowAggregationSubDistinct() override; void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) override; void addRowGroup(const RowGroup* pRowGroupIn) override; inline RowAggregationSubDistinct* clone() const override { return new RowAggregationSubDistinct(*this); } void addRowGroup(const RowGroup* pRowGroupIn, std::vector>& inRow) override; protected: // virtual methods from RowAggregationUM void doGroupConcat(const Row&, int64_t, int64_t) override; void doJsonAgg(const Row&, int64_t, int64_t) override; // for groupby columns and the aggregated distinct column Row fDistRow; boost::scoped_array fDistRowData; }; //------------------------------------------------------------------------------ /** @brief derived Class that aggregates multiple columns with distinct key word * Each distinct column will have its own aggregator */ //------------------------------------------------------------------------------ class RowAggregationMultiDistinct : public RowAggregationDistinct { public: /** @brief RowAggregationMultiDistinct constructor */ RowAggregationMultiDistinct() { } RowAggregationMultiDistinct(const std::vector& rowAggGroupByCols, const std::vector& rowAggFunctionCols, joblist::ResourceManager*, boost::shared_ptr sessionMemLimit); RowAggregationMultiDistinct(const RowAggregationMultiDistinct& rhs); /** @brief RowAggregationMultiDistinct default destructor */ ~RowAggregationMultiDistinct() override; /** @brief Add sub aggregators */ void addSubAggregator(const boost::shared_ptr& agg, const RowGroup& rg, const std::vector& funct); void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut) override; using RowAggregationDistinct::addRowGroup; void addRowGroup(const RowGroup* pRowGroupIn) override; using RowAggregationDistinct::doDistinctAggregation; void doDistinctAggregation() override; using RowAggregationDistinct::doDistinctAggregation_rowVec; virtual void doDistinctAggregation_rowVec( std::vector>>& inRows); inline RowAggregationMultiDistinct* clone() const override { return new RowAggregationMultiDistinct(*this); } void addRowGroup(const RowGroup* pRowGroupIn, std::vector>>& inRows); std::vector>& subAggregators() { return fSubAggregators; } void subAggregators(std::vector>& subAggregators) { fSubAggregators = subAggregators; } protected: // virtual methods from base std::vector> fSubAggregators; std::vector fSubRowGroups; std::vector> fSubRowData; std::vector> fSubFunctions; }; typedef boost::shared_ptr SP_ROWAGG_t; typedef boost::shared_ptr SP_ROWAGG_PM_t; typedef boost::shared_ptr SP_ROWAGG_UM_t; typedef boost::shared_ptr SP_ROWAGG_DIST; } // namespace rowgroup