mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
feat(runtime)!: MCOL-678 A "GROUP BY ... WITH ROLLUP" support
Adds a special column which helps to differentiate data and rollups of various depts and a simple logic to row aggregation to add processing of subtotals.
This commit is contained in:
parent
5013717730
commit
920607520c
@ -93,6 +93,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(const int location)
|
||||
, // 100MB mem usage for disk based join,
|
||||
fUMMemLimit(numeric_limits<int64_t>::max())
|
||||
, fIsDML(false)
|
||||
, fWithRollup(false)
|
||||
{
|
||||
fUuid = QueryTeleClient::genUUID();
|
||||
}
|
||||
@ -100,7 +101,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(const int location)
|
||||
CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(
|
||||
const ReturnedColumnList& returnedCols, ParseTree* filters, const SelectList& subSelects,
|
||||
const GroupByColumnList& groupByCols, ParseTree* having, const OrderByColumnList& orderByCols,
|
||||
const string alias, const int location, const bool dependent)
|
||||
const string alias, const int location, const bool dependent, const bool withRollup)
|
||||
: fReturnedCols(returnedCols)
|
||||
, fFilters(filters)
|
||||
, fSubSelects(subSelects)
|
||||
@ -111,7 +112,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(
|
||||
, fLocation(location)
|
||||
, fDependent(dependent)
|
||||
, fPriority(querystats::DEFAULT_USER_PRIORITY_LEVEL)
|
||||
|
||||
, fWithRollup(withRollup)
|
||||
{
|
||||
fUuid = QueryTeleClient::genUUID();
|
||||
}
|
||||
@ -119,6 +120,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(
|
||||
CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(string data)
|
||||
: fData(data)
|
||||
, fPriority(querystats::DEFAULT_USER_PRIORITY_LEVEL)
|
||||
, fWithRollup(false)
|
||||
{
|
||||
fUuid = QueryTeleClient::genUUID();
|
||||
}
|
||||
@ -470,6 +472,7 @@ void CalpontSelectExecutionPlan::serialize(messageqcpp::ByteStream& b) const
|
||||
messageqcpp::ByteStream::octbyte timeZone = fTimeZone;
|
||||
b << timeZone;
|
||||
b << fPron;
|
||||
b << (uint8_t)fWithRollup;
|
||||
}
|
||||
|
||||
void CalpontSelectExecutionPlan::unserialize(messageqcpp::ByteStream& b)
|
||||
@ -672,6 +675,8 @@ void CalpontSelectExecutionPlan::unserialize(messageqcpp::ByteStream& b)
|
||||
fTimeZone = timeZone;
|
||||
b >> fPron;
|
||||
utils::Pron::instance().pron(fPron);
|
||||
b >> tmp8;
|
||||
fWithRollup = tmp8;
|
||||
}
|
||||
|
||||
bool CalpontSelectExecutionPlan::operator==(const CalpontSelectExecutionPlan& t) const
|
||||
|
@ -151,7 +151,7 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
|
||||
CalpontSelectExecutionPlan(const ReturnedColumnList& returnedCols, ParseTree* filters,
|
||||
const SelectList& subSelects, const GroupByColumnList& groupByCols,
|
||||
ParseTree* having, const OrderByColumnList& orderByCols, const std::string alias,
|
||||
const int location, const bool dependent);
|
||||
const int location, const bool dependent, const bool withRollup);
|
||||
|
||||
CalpontSelectExecutionPlan(const std::string data);
|
||||
|
||||
@ -244,6 +244,18 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
|
||||
fGroupByCols = groupByCols;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subtotals.
|
||||
*/
|
||||
bool withRollup() const
|
||||
{
|
||||
return fWithRollup;
|
||||
}
|
||||
void withRollup(bool withRollup)
|
||||
{
|
||||
fWithRollup = withRollup;
|
||||
}
|
||||
|
||||
/**
|
||||
* order by column list
|
||||
*/
|
||||
@ -957,6 +969,10 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
|
||||
long fTimeZone = 0;
|
||||
std::vector<execplan::ParseTree*> fDynamicParseTreeVec;
|
||||
std::string fPron;
|
||||
/**
|
||||
* A flag to compute subtotals, related to GROUP BY operation.
|
||||
*/
|
||||
bool fWithRollup;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -296,6 +296,66 @@ void ConstantColumn::unserialize(messageqcpp::ByteStream& b)
|
||||
b >> (uint8_t&)fResult.decimalVal.precision;
|
||||
}
|
||||
|
||||
RollupMarkColumn::RollupMarkColumn()
|
||||
{
|
||||
fExpressionId = 0x55667788ULL;
|
||||
fResultType.colDataType = CalpontSystemCatalog::INT;
|
||||
fResultType.colWidth = 4;
|
||||
// no-op.
|
||||
}
|
||||
RollupMarkColumn::~RollupMarkColumn()
|
||||
{
|
||||
// no-op
|
||||
}
|
||||
void RollupMarkColumn::serialize(messageqcpp::ByteStream& b) const
|
||||
{
|
||||
b << (ObjectReader::id_t)ObjectReader::ROLLUPMARKCOLUMN;
|
||||
ReturnedColumn::serialize(b);
|
||||
messageqcpp::ByteStream::octbyte timeZone = fTimeZone;
|
||||
b << timeZone;
|
||||
b << static_cast<ByteStream::doublebyte>(fReturnAll);
|
||||
b << (uint64_t)fResult.intVal;
|
||||
b << fResult.uintVal;
|
||||
b << fResult.doubleVal;
|
||||
b << fResult.longDoubleVal;
|
||||
b << fResult.floatVal;
|
||||
b << (uint8_t)fResult.boolVal;
|
||||
b << fResult.strVal;
|
||||
b << (uint64_t)fResult.decimalVal.value;
|
||||
b << fResult.decimalVal.s128Value;
|
||||
b << (uint8_t)fResult.decimalVal.scale;
|
||||
b << (uint8_t)fResult.decimalVal.precision;
|
||||
}
|
||||
void RollupMarkColumn::unserialize(messageqcpp::ByteStream& b)
|
||||
{
|
||||
ObjectReader::checkType(b, ObjectReader::ROLLUPMARKCOLUMN);
|
||||
ReturnedColumn::unserialize(b);
|
||||
// uint64_t val;
|
||||
|
||||
messageqcpp::ByteStream::octbyte timeZone;
|
||||
b >> timeZone;
|
||||
fTimeZone = timeZone;
|
||||
b >> reinterpret_cast<ByteStream::doublebyte&>(fReturnAll);
|
||||
b >> (uint64_t&)fResult.intVal;
|
||||
b >> fResult.uintVal;
|
||||
b >> fResult.doubleVal;
|
||||
b >> fResult.longDoubleVal;
|
||||
b >> fResult.floatVal;
|
||||
b >> (uint8_t&)fResult.boolVal;
|
||||
b >> fResult.strVal;
|
||||
b >> (uint64_t&)fResult.decimalVal.value;
|
||||
b >> fResult.decimalVal.s128Value;
|
||||
b >> (uint8_t&)fResult.decimalVal.scale;
|
||||
b >> (uint8_t&)fResult.decimalVal.precision;
|
||||
}
|
||||
static utils::NullString ns;
|
||||
const utils::NullString& RollupMarkColumn::getStrVal(rowgroup::Row& row, bool& isNull)
|
||||
{
|
||||
return ns;
|
||||
}
|
||||
|
||||
|
||||
|
||||
bool ConstantColumn::operator==(const ConstantColumn& t) const
|
||||
{
|
||||
const ReturnedColumn *rc1, *rc2;
|
||||
|
@ -468,4 +468,276 @@ class ConstantColumnTemporal : public ConstantColumn
|
||||
*/
|
||||
std::ostream& operator<<(std::ostream& output, const ConstantColumn& rhs);
|
||||
|
||||
class RollupMarkColumn : public ReturnedColumn
|
||||
{
|
||||
public:
|
||||
|
||||
/**
|
||||
* ctor
|
||||
*/
|
||||
RollupMarkColumn();
|
||||
/**
|
||||
* ctor
|
||||
*/
|
||||
|
||||
/**
|
||||
* dtor
|
||||
*/
|
||||
virtual ~RollupMarkColumn();
|
||||
|
||||
/**
|
||||
* accessor
|
||||
*/
|
||||
inline long timeZone() const
|
||||
{
|
||||
return fTimeZone;
|
||||
}
|
||||
/**
|
||||
* mutator
|
||||
*/
|
||||
inline void timeZone(const long timeZone)
|
||||
{
|
||||
fTimeZone = timeZone;
|
||||
}
|
||||
/**
|
||||
* accessor
|
||||
*/
|
||||
virtual const std::string data() const override
|
||||
{
|
||||
return "";
|
||||
}
|
||||
/**
|
||||
* accessor
|
||||
*/
|
||||
virtual void data(const std::string data) override
|
||||
{
|
||||
idbassert(0);
|
||||
}
|
||||
/**
|
||||
* accessor
|
||||
*/
|
||||
virtual const std::string toString() const override
|
||||
{
|
||||
return "RollupMarkColumn";
|
||||
}
|
||||
|
||||
virtual std::string toCppCode(IncludeSet& includes) const override
|
||||
{
|
||||
idbassert(0);
|
||||
}
|
||||
/** return a copy of this pointer
|
||||
*
|
||||
* deep copy of this pointer and return the copy
|
||||
*/
|
||||
inline virtual RollupMarkColumn* clone() const override
|
||||
{
|
||||
return new RollupMarkColumn();
|
||||
}
|
||||
|
||||
/*
|
||||
* The serialization interface
|
||||
*/
|
||||
/**
|
||||
* serialize
|
||||
*/
|
||||
virtual void serialize(messageqcpp::ByteStream&) const override;
|
||||
/**
|
||||
* unserialize
|
||||
*/
|
||||
virtual void unserialize(messageqcpp::ByteStream&) override;
|
||||
|
||||
/** @brief Do a deep, strict (as opposed to semantic) equivalence test
|
||||
*
|
||||
* Do a deep, strict (as opposed to semantic) equivalence test.
|
||||
* @return true iff every member of t is a duplicate copy of every member of this; false otherwise
|
||||
*/
|
||||
virtual bool operator==(const TreeNode* t) const override
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/** @brief Do a deep, strict (as opposed to semantic) equivalence test
|
||||
*
|
||||
* Do a deep, strict (as opposed to semantic) equivalence test.
|
||||
* @return true iff every member of t is a duplicate copy of every member of this; false otherwise
|
||||
*/
|
||||
bool operator==(const RollupMarkColumn& t) const
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
/** @brief Do a deep, strict (as opposed to semantic) equivalence test
|
||||
*
|
||||
* Do a deep, strict (as opposed to semantic) equivalence test.
|
||||
* @return false iff every member of t is a duplicate copy of every member of this; true otherwise
|
||||
*/
|
||||
virtual bool operator!=(const TreeNode* t) const override
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
/** @brief Do a deep, strict (as opposed to semantic) equivalence test
|
||||
*
|
||||
* Do a deep, strict (as opposed to semantic) equivalence test.
|
||||
* @return false iff every member of t is a duplicate copy of every member of this; true otherwise
|
||||
*/
|
||||
bool operator!=(const RollupMarkColumn& t) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual bool hasWindowFunc() override
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Constant column on the filte can always be moved into derived table */
|
||||
virtual void setDerivedTable() override
|
||||
{
|
||||
fDerivedTable = std::string("*");
|
||||
}
|
||||
|
||||
bool isNull() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
private:
|
||||
std::string fData;
|
||||
long fTimeZone;
|
||||
|
||||
/***********************************************************
|
||||
* F&E framework *
|
||||
***********************************************************/
|
||||
public:
|
||||
using ReturnedColumn::evaluate;
|
||||
virtual void evaluate(rowgroup::Row& row)
|
||||
{
|
||||
}
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
virtual bool getBoolVal(rowgroup::Row& row, bool& isNull) override
|
||||
{
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
virtual const utils::NullString& getStrVal(rowgroup::Row& row, bool& isNull) override;
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
virtual int64_t getIntVal(rowgroup::Row& row, bool& isNull) override
|
||||
{
|
||||
isNull = false;
|
||||
return 0x12340000UL;
|
||||
}
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
virtual uint64_t getUintVal(rowgroup::Row& row, bool& isNull) override
|
||||
{
|
||||
return getIntVal(row, isNull);
|
||||
}
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
virtual float getFloatVal(rowgroup::Row& row, bool& isNull) override
|
||||
{
|
||||
return getIntVal(row, isNull);
|
||||
}
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
virtual double getDoubleVal(rowgroup::Row& row, bool& isNull) override
|
||||
{
|
||||
return getIntVal(row, isNull);
|
||||
}
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
virtual IDB_Decimal getDecimalVal(rowgroup::Row& row, bool& isNull) override
|
||||
{
|
||||
isNull = false;
|
||||
return fResult.decimalVal;
|
||||
}
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
virtual int32_t getDateIntVal(rowgroup::Row& row, bool& isNull) override
|
||||
{
|
||||
isNull = false;
|
||||
|
||||
if (!fResult.valueConverted)
|
||||
{
|
||||
fResult.intVal = dataconvert::DataConvert::stringToDate(fResult.strVal.safeString());
|
||||
fResult.valueConverted = true;
|
||||
}
|
||||
|
||||
return fResult.intVal;
|
||||
}
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
virtual int64_t getDatetimeIntVal(rowgroup::Row& row, bool& isNull) override
|
||||
{
|
||||
isNull = false;
|
||||
|
||||
if (!fResult.valueConverted)
|
||||
{
|
||||
isNull = isNull || fResult.strVal.isNull();
|
||||
fResult.intVal = dataconvert::DataConvert::stringToDatetime(fResult.strVal.safeString(""));
|
||||
fResult.valueConverted = true;
|
||||
}
|
||||
|
||||
return fResult.intVal;
|
||||
}
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
virtual int64_t getTimestampIntVal(rowgroup::Row& row, bool& isNull) override
|
||||
{
|
||||
isNull = false;
|
||||
|
||||
if (!fResult.valueConverted)
|
||||
{
|
||||
isNull = isNull || fResult.strVal.isNull();
|
||||
fResult.intVal = dataconvert::DataConvert::stringToTimestamp(fResult.strVal.safeString(""), fTimeZone);
|
||||
fResult.valueConverted = true;
|
||||
}
|
||||
|
||||
return fResult.intVal;
|
||||
}
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
virtual int64_t getTimeIntVal(rowgroup::Row& row, bool& isNull) override
|
||||
{
|
||||
isNull = false;
|
||||
|
||||
if (!fResult.valueConverted)
|
||||
{
|
||||
fResult.intVal = dataconvert::DataConvert::stringToTime(fResult.strVal.safeString(""));
|
||||
fResult.valueConverted = true;
|
||||
}
|
||||
|
||||
return fResult.intVal;
|
||||
}
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
inline float getFloatVal() const
|
||||
{
|
||||
return fResult.floatVal;
|
||||
}
|
||||
/**
|
||||
* F&E
|
||||
*/
|
||||
inline double getDoubleVal() const
|
||||
{
|
||||
return fResult.doubleVal;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace execplan
|
||||
|
@ -114,6 +114,8 @@ TreeNode* ObjectReader::createTreeNode(messageqcpp::ByteStream& b)
|
||||
|
||||
case CONSTANTCOLUMN: ret = new ConstantColumn(); break;
|
||||
|
||||
case ROLLUPMARKCOLUMN: ret = new RollupMarkColumn(); break;
|
||||
|
||||
case FUNCTIONCOLUMN: ret = new FunctionColumn(); break;
|
||||
|
||||
case ROWCOLUMN: ret = new RowColumn(); break;
|
||||
|
@ -79,6 +79,7 @@ class ObjectReader
|
||||
GROUPCONCATCOLUMN,
|
||||
ARITHMETICCOLUMN,
|
||||
CONSTANTCOLUMN,
|
||||
ROLLUPMARKCOLUMN,
|
||||
FUNCTIONCOLUMN,
|
||||
ROWCOLUMN,
|
||||
WINDOWFUNCTIONCOLUMN,
|
||||
|
@ -197,6 +197,7 @@ struct JobInfo
|
||||
, constantCol(CONST_COL_NONE)
|
||||
, hasDistinct(false)
|
||||
, hasAggregation(false)
|
||||
, hasRollup(false)
|
||||
, limitStart(0)
|
||||
, limitCount(-1)
|
||||
, joinNum(0)
|
||||
@ -239,6 +240,7 @@ struct JobInfo
|
||||
// aggregation
|
||||
bool hasDistinct;
|
||||
bool hasAggregation;
|
||||
bool hasRollup;
|
||||
std::vector<uint32_t> groupByColVec;
|
||||
std::vector<uint32_t> distinctColVec;
|
||||
std::vector<uint32_t> expressionVec;
|
||||
|
@ -417,6 +417,8 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps,
|
||||
// last step can be tbps (no join) or thjs, either one can have a group 3 expression
|
||||
if (bps || thjs)
|
||||
{
|
||||
// this part may set FE2 (setFE23Output()) and may affect behavior of PrimProc's
|
||||
// batchprimitiveprocessor's execute() function when processing aggregates.
|
||||
tjs->setOutputRowGroup(rg01);
|
||||
tjs->setFcnExpGroup3(exps);
|
||||
tjs->setFE23Output(rg1);
|
||||
@ -430,6 +432,9 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps,
|
||||
}
|
||||
else
|
||||
{
|
||||
// this may change behavior in the primproc side, look into
|
||||
// a primitives/prim-proc/batchprimitiveprocessor.
|
||||
// This is especially important for aggregation.
|
||||
if (thjs && thjs->hasFcnExpGroup2())
|
||||
thjs->setFE23Output(rg1);
|
||||
else
|
||||
|
@ -250,6 +250,7 @@ const JobStepVector doProject(const RetColsVector& retCols, JobInfo& jobInfo)
|
||||
const ArithmeticColumn* ac = NULL;
|
||||
const FunctionColumn* fc = NULL;
|
||||
const ConstantColumn* cc = NULL;
|
||||
const RollupMarkColumn* mc = NULL;
|
||||
uint64_t eid = -1;
|
||||
CalpontSystemCatalog::ColType ct;
|
||||
ExpressionStep* es = new ExpressionStep(jobInfo);
|
||||
@ -271,6 +272,11 @@ const JobStepVector doProject(const RetColsVector& retCols, JobInfo& jobInfo)
|
||||
eid = cc->expressionId();
|
||||
ct = cc->resultType();
|
||||
}
|
||||
else if ((mc = dynamic_cast<const RollupMarkColumn*>(retCols[i].get())) != NULL)
|
||||
{
|
||||
eid = mc->expressionId();
|
||||
ct = mc->resultType();
|
||||
}
|
||||
else
|
||||
{
|
||||
std::ostringstream errmsg;
|
||||
@ -549,7 +555,13 @@ void checkGroupByCols(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
|
||||
{
|
||||
// skip constant columns
|
||||
if (dynamic_cast<ConstantColumn*>(i->get()) != NULL)
|
||||
{
|
||||
if (csep->withRollup())
|
||||
{
|
||||
throw runtime_error("constant GROUP BY columns are not supported when WITH ROLLUP is used");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
ReturnedColumn* rc = i->get();
|
||||
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(rc);
|
||||
@ -691,6 +703,8 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
||||
const CalpontSelectExecutionPlan::GroupByColumnList& groupByCols = csep->groupByCols();
|
||||
uint64_t lastGroupByPos = 0;
|
||||
|
||||
jobInfo.hasRollup = csep->withRollup();
|
||||
|
||||
for (uint64_t i = 0; i < groupByCols.size(); i++)
|
||||
{
|
||||
pcv.push_back(groupByCols[i]);
|
||||
@ -699,6 +713,7 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
||||
const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(groupByCols[i].get());
|
||||
const ArithmeticColumn* ac = NULL;
|
||||
const FunctionColumn* fc = NULL;
|
||||
const RollupMarkColumn* mc = NULL;
|
||||
|
||||
if (sc != NULL)
|
||||
{
|
||||
@ -771,6 +786,18 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
||||
if (find(projectKeys.begin(), projectKeys.end(), tupleKey) == projectKeys.end())
|
||||
projectKeys.push_back(tupleKey);
|
||||
}
|
||||
else if ((mc = dynamic_cast<const RollupMarkColumn*>(groupByCols[i].get())) != NULL)
|
||||
{
|
||||
uint64_t eid = mc->expressionId();
|
||||
CalpontSystemCatalog::ColType ct = mc->resultType();
|
||||
TupleInfo ti(setExpTupleInfo(ct, eid, mc->alias(), jobInfo));
|
||||
uint32_t tupleKey = ti.key;
|
||||
jobInfo.groupByColVec.push_back(tupleKey);
|
||||
if (find(projectKeys.begin(), projectKeys.end(), tupleKey) == projectKeys.end())
|
||||
{
|
||||
projectKeys.push_back(tupleKey);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
std::ostringstream errmsg;
|
||||
@ -898,6 +925,10 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
||||
|
||||
if (gcc != NULL)
|
||||
{
|
||||
if (jobInfo.hasRollup)
|
||||
{
|
||||
throw runtime_error("GROUP_CONCAT and JSONARRAYAGG aggregations are not supported when WITH ROLLUP modifier is used");
|
||||
}
|
||||
jobInfo.groupConcatCols.push_back(retCols[i]);
|
||||
|
||||
uint64_t eid = gcc->expressionId();
|
||||
@ -1223,6 +1254,9 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
||||
if (ac->windowfunctionColumnList().size() > 0)
|
||||
hasWndCols = true;
|
||||
}
|
||||
else if (dynamic_cast<const RollupMarkColumn*>(srcp.get()) != NULL)
|
||||
{
|
||||
}
|
||||
else if ((fc = dynamic_cast<const FunctionColumn*>(srcp.get())) != NULL)
|
||||
{
|
||||
if (fc->aggColumnList().size() > 0)
|
||||
@ -1254,7 +1288,9 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
||||
tupleKey = ti.key;
|
||||
|
||||
if (hasAggCols && !hasWndCols)
|
||||
{
|
||||
jobInfo.expressionVec.push_back(tupleKey);
|
||||
}
|
||||
}
|
||||
|
||||
// add to project list
|
||||
@ -1668,7 +1704,7 @@ void parseExecutionPlan(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo, JobS
|
||||
}
|
||||
|
||||
// special case, select without a table, like: select 1;
|
||||
if (jobInfo.constantCol == CONST_COL_ONLY)
|
||||
if (jobInfo.constantCol == CONST_COL_ONLY) // XXX: WITH ROLLUP
|
||||
return;
|
||||
|
||||
// If there are no filters (select * from table;) then add one simple scan
|
||||
|
@ -3108,6 +3108,10 @@ void TupleBPS::setFE1Input(const RowGroup& feInput)
|
||||
void TupleBPS::setFcnExpGroup2(const boost::shared_ptr<funcexp::FuncExpWrapper>& fe,
|
||||
const rowgroup::RowGroup& rg, bool runFE2onPM)
|
||||
{
|
||||
// the presence of fe2 changes rowgroup format which is used in PrimProc.
|
||||
// please be aware, if you are modifying several parts of the system.
|
||||
// the relevant part is in primimities/prim-proc/batchprimitiveprocessor,
|
||||
// execute() function, a branch where aggregation is handled.
|
||||
fe2 = fe;
|
||||
fe2Output = rg;
|
||||
checkDupOutputColumns(rg);
|
||||
|
@ -904,7 +904,9 @@ SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
|
||||
if (distinctAgg == true)
|
||||
prep2PhasesDistinctAggregate(jobInfo, rgs, aggs);
|
||||
else
|
||||
{
|
||||
prep2PhasesAggregate(jobInfo, rgs, aggs);
|
||||
}
|
||||
}
|
||||
|
||||
if (tbps != NULL)
|
||||
@ -1501,7 +1503,7 @@ void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector<RowGroup>&
|
||||
|
||||
RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, csNumAgg, scaleAgg, precisionAgg,
|
||||
jobInfo.stringTableThreshold);
|
||||
SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec, jobInfo.rm, jobInfo.umMemLimit));
|
||||
SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec, jobInfo.rm, jobInfo.umMemLimit, false));
|
||||
rowAgg->timeZone(jobInfo.timeZone);
|
||||
rowgroups.push_back(aggRG);
|
||||
aggregators.push_back(rowAgg);
|
||||
@ -2594,7 +2596,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<Ro
|
||||
|
||||
RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, csNumAgg, scaleAgg, precisionAgg,
|
||||
jobInfo.stringTableThreshold);
|
||||
SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec1, jobInfo.rm, jobInfo.umMemLimit));
|
||||
SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec1, jobInfo.rm, jobInfo.umMemLimit, false));
|
||||
rowAgg->timeZone(jobInfo.timeZone);
|
||||
|
||||
posAggDist.push_back(2); // rid
|
||||
@ -2861,7 +2863,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<Ro
|
||||
|
||||
// construct sub-aggregator
|
||||
SP_ROWAGG_UM_t subAgg(
|
||||
new RowAggregationUM(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
|
||||
new RowAggregationUM(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit, false));
|
||||
subAgg->timeZone(jobInfo.timeZone);
|
||||
subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
|
||||
|
||||
@ -3177,11 +3179,14 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector<RowGroup>
|
||||
colAggPm++;
|
||||
}
|
||||
|
||||
// PM: put the count column for avg next to the sum
|
||||
// let fall through to add a count column for average function
|
||||
if (aggOp != ROWAGG_AVG)
|
||||
break;
|
||||
/* fall through */
|
||||
// PM: put the count column for avg next to the sum
|
||||
// let fall through to add a count column for average function
|
||||
if (aggOp != ROWAGG_AVG)
|
||||
break;
|
||||
// The AVG aggregation has a special treatment everywhere.
|
||||
// This is so because AVG(column) is a SUM(column)/COUNT(column)
|
||||
// and these aggregations can be utilized by AVG, if present.
|
||||
/* fall through */
|
||||
|
||||
case ROWAGG_COUNT_ASTERISK:
|
||||
case ROWAGG_COUNT_COL_NAME:
|
||||
@ -3559,8 +3564,10 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector<RowGroup>
|
||||
|
||||
// a duplicate group by column
|
||||
if (dupGroupbyIndex != -1)
|
||||
{
|
||||
functionVecUm.push_back(SP_ROWAGG_FUNC_t(
|
||||
new RowAggFunctionCol(ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex)));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -3706,7 +3713,7 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector<RowGroup>
|
||||
|
||||
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, csNumAggUm, scaleAggUm,
|
||||
precisionAggUm, jobInfo.stringTableThreshold);
|
||||
SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionVecUm, jobInfo.rm, jobInfo.umMemLimit));
|
||||
SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionVecUm, jobInfo.rm, jobInfo.umMemLimit, false));
|
||||
rowAggUm->timeZone(jobInfo.timeZone);
|
||||
rowgroups.push_back(aggRgUm);
|
||||
aggregators.push_back(rowAggUm);
|
||||
@ -3718,7 +3725,7 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector<RowGroup>
|
||||
|
||||
RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm, csNumAggPm, scaleAggPm,
|
||||
precisionAggPm, jobInfo.stringTableThreshold);
|
||||
SP_ROWAGG_PM_t rowAggPm(new RowAggregation(groupByPm, functionVecPm));
|
||||
SP_ROWAGG_PM_t rowAggPm(new RowAggregation(groupByPm, functionVecPm, nullptr, nullptr, jobInfo.hasRollup));
|
||||
rowAggPm->timeZone(jobInfo.timeZone);
|
||||
rowgroups.push_back(aggRgPm);
|
||||
aggregators.push_back(rowAggPm);
|
||||
@ -4803,7 +4810,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(JobInfo& jobInfo, vector<R
|
||||
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, csNumAggUm, scaleAggUm,
|
||||
precisionAggUm, jobInfo.stringTableThreshold);
|
||||
SP_ROWAGG_UM_t rowAggUm(
|
||||
new RowAggregationUMP2(groupByUm, functionNoDistVec, jobInfo.rm, jobInfo.umMemLimit));
|
||||
new RowAggregationUMP2(groupByUm, functionNoDistVec, jobInfo.rm, jobInfo.umMemLimit, false));
|
||||
rowAggUm->timeZone(jobInfo.timeZone);
|
||||
|
||||
posAggDist.push_back(2); // rid
|
||||
@ -5061,7 +5068,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(JobInfo& jobInfo, vector<R
|
||||
|
||||
// construct sub-aggregator
|
||||
SP_ROWAGG_UM_t subAgg(
|
||||
new RowAggregationUMP2(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
|
||||
new RowAggregationUMP2(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit, false));
|
||||
subAgg->timeZone(jobInfo.timeZone);
|
||||
|
||||
// add to rowAggDist
|
||||
@ -5308,7 +5315,6 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
|
||||
uint32_t rgVecShift = float(fNumOfBuckets) / fNumOfThreads * threadID;
|
||||
|
||||
RowAggregationMultiDistinct* multiDist = nullptr;
|
||||
|
||||
if (!fDoneAggregate)
|
||||
{
|
||||
if (fInputJobStepAssociation.outSize() == 0)
|
||||
@ -5437,6 +5443,7 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
|
||||
for (uint32_t i = 0; i < fNumOfBuckets; i++)
|
||||
{
|
||||
fAggregators[i].reset(fAggregator->clone());
|
||||
fAggregators[i]->clearRollup();
|
||||
fAggregators[i]->setInputOutput(fRowGroupIn, &fRowGroupOuts[i]);
|
||||
}
|
||||
}
|
||||
@ -5522,7 +5529,9 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
|
||||
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[c].get())
|
||||
->addRowGroup(&fRowGroupIns[threadID], rowBucketVecs[c]);
|
||||
else
|
||||
{
|
||||
fAggregators[c]->addRowGroup(&fRowGroupIns[threadID], rowBucketVecs[c][0]);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -7392,13 +7392,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
|
||||
#endif
|
||||
int rc = 0;
|
||||
// rollup is currently not supported
|
||||
if (select_lex.olap == ROLLUP_TYPE)
|
||||
{
|
||||
gwi.fatalParseError = true;
|
||||
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_ROLLUP_NOT_SUPPORT);
|
||||
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
|
||||
return ER_CHECK_NOT_IMPLEMENTED;
|
||||
}
|
||||
bool withRollup = select_lex.olap == ROLLUP_TYPE;
|
||||
|
||||
setExecutionParams(gwi, csep);
|
||||
|
||||
@ -8331,6 +8325,11 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
|
||||
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
|
||||
return ER_CHECK_NOT_IMPLEMENTED;
|
||||
}
|
||||
if (withRollup)
|
||||
{
|
||||
SRCP rc(new RollupMarkColumn());
|
||||
gwi.groupByCols.insert(gwi.groupByCols.end(), rc);
|
||||
}
|
||||
}
|
||||
|
||||
// ORDER BY processing
|
||||
@ -8653,6 +8652,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
|
||||
gwi.additionalRetCols.end());
|
||||
|
||||
csep->groupByCols(gwi.groupByCols);
|
||||
csep->withRollup(withRollup);
|
||||
csep->orderByCols(gwi.orderByCols);
|
||||
csep->returnedCols(gwi.returnedCols);
|
||||
csep->columnMap(gwi.columnMap);
|
||||
@ -8907,13 +8907,15 @@ ConstantColumn* buildConstColFromFilter(SimpleColumn* originalSC, gp_walk_info&
|
||||
return result;
|
||||
}
|
||||
|
||||
// XXX: need to trigger that somehow.
|
||||
int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_group_info& gi, bool isUnion)
|
||||
{
|
||||
#ifdef DEBUG_WALK_COND
|
||||
cerr << "getGroupPlan()" << endl;
|
||||
#endif
|
||||
|
||||
// rollup is currently not supported
|
||||
// XXX: rollup is currently not supported (not tested) in this part.
|
||||
// but this is not triggered in any of tests.
|
||||
if (select_lex.olap == ROLLUP_TYPE)
|
||||
{
|
||||
gwi.fatalParseError = true;
|
||||
|
23
docs/with-rollup.md
Normal file
23
docs/with-rollup.md
Normal file
@ -0,0 +1,23 @@
|
||||
# What is WITH ROLLUP
|
||||
|
||||
If you are grouping and aggregating data, adding "wITH ROLLUP" after ``GROUP BY`` part would allow you to get subtotals.
|
||||
|
||||
Main page: https://mariadb.com/kb/en/select-with-rollup/
|
||||
|
||||
Subtotals are marked with NULLs instead of keys. E.g., if you ``SELECT year, SUM(sales) FROM bookstores GROUP BY year WITH ROLLUP``, the result will contain rows each year with total sales sums and also there will be a row with NULL as year's value and a whole total sales sum.
|
||||
|
||||
It is handy (one query instead of three) and also is required for TPC-DS.
|
||||
|
||||
# How it is implemented in other engines (speculation)
|
||||
|
||||
InnoDB outputs sorted results in ``GROUP BY`` queries. Thus, when ``GROUP BY`` prefix change, we can output one subtotal and start other. There is no storage required for all subtotals, however small it is.
|
||||
|
||||
# How it is implemented in Columnstore
|
||||
|
||||
Columnstore does not have sort-based aggregation, it has hash-based aggregation. The data can arrive in any order and come out also in any order. We have to keep all subtotals in our hash tables. We also have to distinguish between a data with a NULL value in GROUP BY key and a subtotal key.
|
||||
|
||||
Thus, if there is a ``WITH ROLLUP`` modifier present, we add a hidden column to the keys of ``GROUP BY`` part of a query. The column is represented with RollupMarkColumn class which is very similar to a ConstantColumn, but should be handled slightly differently. For example, it cannot be optimized away. When we process data in the aggregation, we process each row as usual, but then:
|
||||
|
||||
1. we increase value of the RollupMarkColumn-provided value by one to distinguish from raw data (or mark row as a subtotal row) and
|
||||
2. we set a corresponding ``GROUP BY`` key to a NULL and process row again.
|
||||
|
@ -15,10 +15,72 @@ year SUM(sales)
|
||||
2015 180518
|
||||
SELECT year, SUM(sales) FROM booksales GROUP BY year WITH ROLLUP ORDER BY year;
|
||||
ERROR HY000: Incorrect usage of CUBE/ROLLUP and ORDER BY
|
||||
SELECT year, sales, MAX(country) FROM booksales GROUP BY year, sales;
|
||||
year sales MAX(country)
|
||||
2014 12234 Senegal
|
||||
2014 64980 Senegal
|
||||
2014 8760 Paraguay
|
||||
2014 87970 Paraguay
|
||||
2015 15647 Senegal
|
||||
2015 76940 Paraguay
|
||||
2015 78901 Senegal
|
||||
2015 9030 Paraguay
|
||||
SELECT year, MAX(country) FROM booksales GROUP BY year;
|
||||
year MAX(country)
|
||||
2014 Senegal
|
||||
2015 Senegal
|
||||
SELECT year, sales, MAX(country) FROM booksales GROUP BY year, sales WITH ROLLUP;
|
||||
year sales MAX(country)
|
||||
2014 12234 Senegal
|
||||
2014 64980 Senegal
|
||||
2014 8760 Paraguay
|
||||
2014 87970 Paraguay
|
||||
2014 NULL Senegal
|
||||
2015 15647 Senegal
|
||||
2015 76940 Paraguay
|
||||
2015 78901 Senegal
|
||||
2015 9030 Paraguay
|
||||
2015 NULL Senegal
|
||||
NULL NULL Senegal
|
||||
SELECT year, SUM(sales) FROM booksales GROUP BY year WITH ROLLUP;
|
||||
ERROR 42000: The storage engine for the table doesn't support MCS-1014: Rollup is currently not supported.
|
||||
SELECT year, SUM(sales) FROM booksales GROUP BY year ASC WITH ROLLUP;
|
||||
ERROR 42000: The storage engine for the table doesn't support MCS-1014: Rollup is currently not supported.
|
||||
SELECT year, SUM(sales) FROM booksales GROUP BY year DESC WITH ROLLUP;
|
||||
ERROR 42000: The storage engine for the table doesn't support MCS-1014: Rollup is currently not supported.
|
||||
year SUM(sales)
|
||||
2014 173944
|
||||
2015 180518
|
||||
NULL 354462
|
||||
SELECT country, genre, SUM(sales) FROM booksales GROUP BY country, genre WITH ROLLUP;
|
||||
country genre SUM(sales)
|
||||
NULL NULL 354462
|
||||
Paraguay NULL 182700
|
||||
Paraguay fiction 164910
|
||||
Paraguay non-fiction 17790
|
||||
Senegal NULL 171762
|
||||
Senegal fiction 27881
|
||||
Senegal non-fiction 143881
|
||||
CREATE TABLE three_cols ( key1 INTEGER, key2 INTEGER, value DECIMAL(38)) ENGINE=COLUMNSTORE;
|
||||
INSERT INTO three_cols(key1, key2, value) VALUES
|
||||
(NULL, NULL, NULL)
|
||||
, (NULL, NULL, 1)
|
||||
, (NULL, 1, 2)
|
||||
, ( 1, 1, 3)
|
||||
, ( 1, 2, 4)
|
||||
, ( 1, 2, 5)
|
||||
, ( 2, 3, 6)
|
||||
, ( 2, 3, 7);
|
||||
SELECT key1, key2, SUM(value), AVG(value), MIN(value), MAX(value), COUNT(value), COUNT(*) FROM three_cols GROUP BY key1, key2 WITH ROLLUP;
|
||||
key1 key2 SUM(value) AVG(value) MIN(value) MAX(value) COUNT(value) COUNT(*)
|
||||
1 1 3 3.0000 3 3 1 1
|
||||
1 2 9 4.5000 4 5 2 2
|
||||
1 NULL 12 4.0000 3 5 3 3
|
||||
2 3 13 6.5000 6 7 2 2
|
||||
2 NULL 13 6.5000 6 7 2 2
|
||||
NULL 1 2 2.0000 2 2 1 1
|
||||
NULL NULL 1 1.0000 1 1 1 2
|
||||
NULL NULL 28 4.0000 1 7 7 8
|
||||
NULL NULL 3 1.5000 1 2 2 3
|
||||
SELECT key1, key2, GROUP_CONCAT(value) FROM three_cols GROUP BY key1, key2 WITH ROLLUP;
|
||||
ERROR HY000: Internal error: GROUP_CONCAT and JSONARRAYAGG aggregations are not supported when WITH ROLLUP modifier is used
|
||||
SELECT key1, key2, JSON_ARRAYAGG(value) FROM three_cols GROUP BY key1, key2 WITH ROLLUP;
|
||||
ERROR HY000: Internal error: GROUP_CONCAT and JSONARRAYAGG aggregations are not supported when WITH ROLLUP modifier is used
|
||||
SELECT 100, SUM(value) FROM three_cols GROUP BY 1 WITH ROLLUP;
|
||||
ERROR HY000: Internal error: constant GROUP BY columns are not supported when WITH ROLLUP is used
|
||||
DROP DATABASE IF EXISTS mcs84_db;
|
||||
|
@ -29,13 +29,41 @@ SELECT year, SUM(sales) FROM booksales GROUP BY year ORDER BY year;
|
||||
--error 1221
|
||||
SELECT year, SUM(sales) FROM booksales GROUP BY year WITH ROLLUP ORDER BY year;
|
||||
|
||||
# WITH ROLLUP not supported yet. MCOL-678
|
||||
--error 1178
|
||||
--sorted_result
|
||||
SELECT year, sales, MAX(country) FROM booksales GROUP BY year, sales;
|
||||
--sorted_result
|
||||
SELECT year, MAX(country) FROM booksales GROUP BY year;
|
||||
--sorted_result
|
||||
SELECT year, sales, MAX(country) FROM booksales GROUP BY year, sales WITH ROLLUP;
|
||||
|
||||
--sorted_result
|
||||
SELECT year, SUM(sales) FROM booksales GROUP BY year WITH ROLLUP;
|
||||
--error 1178
|
||||
SELECT year, SUM(sales) FROM booksales GROUP BY year ASC WITH ROLLUP;
|
||||
--error 1178
|
||||
SELECT year, SUM(sales) FROM booksales GROUP BY year DESC WITH ROLLUP;
|
||||
|
||||
--sorted_result
|
||||
SELECT country, genre, SUM(sales) FROM booksales GROUP BY country, genre WITH ROLLUP;
|
||||
|
||||
CREATE TABLE three_cols ( key1 INTEGER, key2 INTEGER, value DECIMAL(38)) ENGINE=COLUMNSTORE;
|
||||
|
||||
INSERT INTO three_cols(key1, key2, value) VALUES
|
||||
(NULL, NULL, NULL)
|
||||
, (NULL, NULL, 1)
|
||||
, (NULL, 1, 2)
|
||||
, ( 1, 1, 3)
|
||||
, ( 1, 2, 4)
|
||||
, ( 1, 2, 5)
|
||||
, ( 2, 3, 6)
|
||||
, ( 2, 3, 7);
|
||||
|
||||
--sorted_result
|
||||
SELECT key1, key2, SUM(value), AVG(value), MIN(value), MAX(value), COUNT(value), COUNT(*) FROM three_cols GROUP BY key1, key2 WITH ROLLUP;
|
||||
|
||||
--error 1815
|
||||
SELECT key1, key2, GROUP_CONCAT(value) FROM three_cols GROUP BY key1, key2 WITH ROLLUP;
|
||||
--error 1815
|
||||
SELECT key1, key2, JSON_ARRAYAGG(value) FROM three_cols GROUP BY key1, key2 WITH ROLLUP;
|
||||
|
||||
--error 1815
|
||||
SELECT 100, SUM(value) FROM three_cols GROUP BY 1 WITH ROLLUP;
|
||||
|
||||
# Clean up
|
||||
DROP DATABASE IF EXISTS mcs84_db;
|
||||
|
@ -1672,6 +1672,8 @@ void BatchPrimitiveProcessor::execute()
|
||||
{
|
||||
*serialized << (uint8_t)1; // the "count this msg" var
|
||||
|
||||
// see TupleBPS::setFcnExpGroup2() and where it gets called.
|
||||
// it sets fe2 there, on the other side of communication.
|
||||
RowGroup& toAggregate = (fe2 ? fe2Output : outputRG);
|
||||
// toAggregate.convertToInlineDataInPlace();
|
||||
|
||||
|
@ -506,12 +506,13 @@ RowAggregation::RowAggregation()
|
||||
, fLargeSideRG(nullptr)
|
||||
, fSmallSideCount(0)
|
||||
, fOrigFunctionCols(nullptr)
|
||||
, fRollupFlag(false)
|
||||
{
|
||||
}
|
||||
|
||||
RowAggregation::RowAggregation(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
||||
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
||||
joblist::ResourceManager* rm, boost::shared_ptr<int64_t> sl)
|
||||
joblist::ResourceManager* rm, boost::shared_ptr<int64_t> sl, bool withRollup)
|
||||
: fRowGroupOut(nullptr)
|
||||
, fSmallSideRGs(nullptr)
|
||||
, fLargeSideRG(nullptr)
|
||||
@ -519,6 +520,7 @@ RowAggregation::RowAggregation(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCol
|
||||
, fOrigFunctionCols(nullptr)
|
||||
, fRm(rm)
|
||||
, fSessionMemLimit(std::move(sl))
|
||||
, fRollupFlag(withRollup)
|
||||
{
|
||||
fGroupByCols.assign(rowAggGroupByCols.begin(), rowAggGroupByCols.end());
|
||||
fFunctionCols.assign(rowAggFunctionCols.begin(), rowAggFunctionCols.end());
|
||||
@ -534,6 +536,7 @@ RowAggregation::RowAggregation(const RowAggregation& rhs)
|
||||
, fOrigFunctionCols(nullptr)
|
||||
, fRm(rhs.fRm)
|
||||
, fSessionMemLimit(rhs.fSessionMemLimit)
|
||||
, fRollupFlag(rhs.fRollupFlag)
|
||||
{
|
||||
fGroupByCols.assign(rhs.fGroupByCols.begin(), rhs.fGroupByCols.end());
|
||||
fFunctionCols.assign(rhs.fFunctionCols.begin(), rhs.fFunctionCols.end());
|
||||
@ -812,49 +815,67 @@ void RowAggregationUM::aggReset()
|
||||
void RowAggregation::aggregateRow(Row& row, const uint64_t* hash,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
||||
{
|
||||
uint32_t cnt = fRollupFlag ? fGroupByCols.size() : 1;
|
||||
for (uint32_t z = 0; z < cnt; z++) {
|
||||
// groupby column list is not empty, find the entry.
|
||||
if (!fGroupByCols.empty())
|
||||
{
|
||||
bool is_new_row;
|
||||
if (hash != nullptr)
|
||||
is_new_row = fRowAggStorage->getTargetRow(row, *hash, fRow);
|
||||
else
|
||||
is_new_row = fRowAggStorage->getTargetRow(row, fRow);
|
||||
|
||||
if (is_new_row)
|
||||
if (!fGroupByCols.empty())
|
||||
{
|
||||
initMapData(row);
|
||||
attachGroupConcatAg();
|
||||
bool is_new_row;
|
||||
if (hash != nullptr)
|
||||
is_new_row = fRowAggStorage->getTargetRow(row, *hash, fRow);
|
||||
else
|
||||
is_new_row = fRowAggStorage->getTargetRow(row, fRow);
|
||||
|
||||
// If there's UDAF involved, reset the user data.
|
||||
if (fOrigFunctionCols)
|
||||
if (is_new_row)
|
||||
{
|
||||
// This is a multi-distinct query and fFunctionCols may not
|
||||
// contain all the UDAF we need to reset
|
||||
for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++)
|
||||
initMapData(row);
|
||||
attachGroupConcatAg();
|
||||
|
||||
// If there's UDAF involved, reset the user data.
|
||||
if (fOrigFunctionCols)
|
||||
{
|
||||
if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
|
||||
// This is a multi-distinct query and fFunctionCols may not
|
||||
// contain all the UDAF we need to reset
|
||||
for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++)
|
||||
{
|
||||
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get());
|
||||
resetUDAF(rowUDAFColumnPtr, i);
|
||||
if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get());
|
||||
resetUDAF(rowUDAFColumnPtr, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
else
|
||||
{
|
||||
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
{
|
||||
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
|
||||
resetUDAF(rowUDAFColumnPtr, i);
|
||||
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
|
||||
resetUDAF(rowUDAFColumnPtr, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
updateEntry(row, rgContextColl);
|
||||
updateEntry(row, rgContextColl);
|
||||
// these quantities are unsigned and comparing z and cnt - 1 can be incorrect
|
||||
// because cnt can be zero.
|
||||
if ((z + 1 < cnt)) {
|
||||
// if we are rolling up, we mark appropriate field as NULL and also increment
|
||||
// value in the "mark" column, so that we can differentiate between data and
|
||||
// various rollups.
|
||||
row.setIntField(row.getIntField(cnt - 1) + 1, cnt - 1);
|
||||
// cnt is number of columns in GROUP BY with mark included:
|
||||
// year, store, MARK - cnt is 3, we have two columns to mark as NULL.
|
||||
// z=0 will NULL store, z=1 will NULL year. We will not NULL MARK as
|
||||
// z=2 does not satisfy z+1 < cnt.
|
||||
// In other ords, the "z" counter goes from 0 to cnt-2, see the "if"
|
||||
// condition above.Thus, least value of cnt - 2 - z is zero,
|
||||
row.setToNull(cnt - 2 - z);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@ -1593,6 +1614,7 @@ void RowAggregation::serialize(messageqcpp::ByteStream& bs) const
|
||||
|
||||
messageqcpp::ByteStream::octbyte timeZone = fTimeZone;
|
||||
bs << timeZone;
|
||||
bs << (int8_t)fRollupFlag;
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@ -1640,6 +1662,9 @@ void RowAggregation::deserialize(messageqcpp::ByteStream& bs)
|
||||
messageqcpp::ByteStream::octbyte timeZone;
|
||||
bs >> timeZone;
|
||||
fTimeZone = timeZone;
|
||||
uint8_t tmp8;
|
||||
bs >> tmp8;
|
||||
fRollupFlag = tmp8;
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@ -2368,8 +2393,8 @@ void RowAggregation::loadEmptySet(messageqcpp::ByteStream& bs)
|
||||
//------------------------------------------------------------------------------
|
||||
RowAggregationUM::RowAggregationUM(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
||||
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
||||
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit)
|
||||
: RowAggregation(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit)
|
||||
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit, bool withRollup)
|
||||
: RowAggregation(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, withRollup)
|
||||
, fHasAvg(false)
|
||||
, fHasStatsFunc(false)
|
||||
, fHasUDAF(false)
|
||||
@ -4085,8 +4110,8 @@ bool RowAggregationUM::nextRowGroup()
|
||||
//------------------------------------------------------------------------------
|
||||
RowAggregationUMP2::RowAggregationUMP2(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
||||
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
||||
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit)
|
||||
: RowAggregationUM(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit)
|
||||
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit, bool withRollup)
|
||||
: RowAggregationUM(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, withRollup)
|
||||
{
|
||||
}
|
||||
|
||||
@ -4476,7 +4501,7 @@ RowAggregationDistinct::RowAggregationDistinct(const vector<SP_ROWAGG_GRPBY_t>&
|
||||
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
||||
joblist::ResourceManager* r,
|
||||
boost::shared_ptr<int64_t> sessionLimit)
|
||||
: RowAggregationUMP2(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit)
|
||||
: RowAggregationUMP2(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, false)
|
||||
{
|
||||
}
|
||||
|
||||
@ -4675,7 +4700,7 @@ RowAggregationSubDistinct::RowAggregationSubDistinct(const vector<SP_ROWAGG_GRPB
|
||||
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
||||
joblist::ResourceManager* r,
|
||||
boost::shared_ptr<int64_t> sessionLimit)
|
||||
: RowAggregationUM(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit)
|
||||
: RowAggregationUM(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, false)
|
||||
{
|
||||
fKeyOnHeap = false;
|
||||
}
|
||||
|
@ -389,7 +389,7 @@ class RowAggregation : public messageqcpp::Serializeable
|
||||
RowAggregation();
|
||||
RowAggregation(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
||||
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
||||
joblist::ResourceManager* rm = nullptr, boost::shared_ptr<int64_t> sessMemLimit = {});
|
||||
joblist::ResourceManager* rm = nullptr, boost::shared_ptr<int64_t> sessMemLimit = {}, bool withRollup = false);
|
||||
RowAggregation(const RowAggregation& rhs);
|
||||
|
||||
/** @brief RowAggregation default destructor
|
||||
@ -423,6 +423,8 @@ class RowAggregation : public messageqcpp::Serializeable
|
||||
initialize();
|
||||
}
|
||||
|
||||
void clearRollup() { fRollupFlag = false; }
|
||||
|
||||
/** @brief Define content of data to be joined
|
||||
*
|
||||
* This method must be call after setInputOutput() for PM hashjoin case.
|
||||
@ -630,6 +632,7 @@ class RowAggregation : public messageqcpp::Serializeable
|
||||
joblist::ResourceManager* fRm = nullptr;
|
||||
boost::shared_ptr<int64_t> fSessionMemLimit;
|
||||
std::unique_ptr<RGData> fCurRGData;
|
||||
bool fRollupFlag = false;
|
||||
};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@ -647,7 +650,7 @@ class RowAggregationUM : public RowAggregation
|
||||
}
|
||||
RowAggregationUM(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
||||
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols, joblist::ResourceManager*,
|
||||
boost::shared_ptr<int64_t> sessionMemLimit);
|
||||
boost::shared_ptr<int64_t> sessionMemLimit, bool withRollup);
|
||||
RowAggregationUM(const RowAggregationUM& rhs);
|
||||
|
||||
/** @brief RowAggregationUM default destructor
|
||||
@ -812,7 +815,7 @@ class RowAggregationUMP2 : public RowAggregationUM
|
||||
}
|
||||
RowAggregationUMP2(const std::vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
||||
const std::vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols, joblist::ResourceManager*,
|
||||
boost::shared_ptr<int64_t> sessionMemLimit);
|
||||
boost::shared_ptr<int64_t> sessionMemLimit, bool withRollup);
|
||||
RowAggregationUMP2(const RowAggregationUMP2& rhs);
|
||||
|
||||
/** @brief RowAggregationUMP2 default destructor
|
||||
|
@ -322,6 +322,8 @@ RGData::RGData(const RowGroup& rg, uint32_t rowCount)
|
||||
memset(rowData.get(), 0, rg.getDataSize(rowCount)); // XXXPAT: make valgrind happy temporarily
|
||||
#endif
|
||||
memset(rowData.get(), 0, rg.getDataSize(rowCount)); // XXXPAT: make valgrind happy temporarily
|
||||
columnCount = rg.getColumnCount();
|
||||
rowSize = rg.getRowSize();
|
||||
}
|
||||
|
||||
RGData::RGData(const RowGroup& rg)
|
||||
@ -341,6 +343,8 @@ RGData::RGData(const RowGroup& rg)
|
||||
*/
|
||||
memset(rowData.get(), 0, rg.getMaxDataSize());
|
||||
#endif
|
||||
columnCount = rg.getColumnCount();
|
||||
rowSize = rg.getRowSize();
|
||||
}
|
||||
|
||||
void RGData::reinit(const RowGroup& rg, uint32_t rowCount)
|
||||
@ -360,6 +364,8 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount)
|
||||
*/
|
||||
memset(rowData.get(), 0, rg.getDataSize(rowCount));
|
||||
#endif
|
||||
columnCount = rg.getColumnCount();
|
||||
rowSize = rg.getRowSize();
|
||||
}
|
||||
|
||||
void RGData::reinit(const RowGroup& rg)
|
||||
@ -372,6 +378,8 @@ void RGData::serialize(ByteStream& bs, uint32_t amount) const
|
||||
// cout << "serializing!\n";
|
||||
bs << (uint32_t)RGDATA_SIG;
|
||||
bs << (uint32_t)amount;
|
||||
bs << columnCount;
|
||||
bs << rowSize;
|
||||
bs.append(rowData.get(), amount);
|
||||
|
||||
if (strings)
|
||||
@ -402,6 +410,14 @@ void RGData::deserialize(ByteStream& bs, uint32_t defAmount)
|
||||
{
|
||||
bs >> sig;
|
||||
bs >> amount;
|
||||
uint32_t colCountTemp;
|
||||
uint32_t rowSizeTemp;
|
||||
bs >> colCountTemp;
|
||||
bs >> rowSizeTemp;
|
||||
if (rowSize != 0)
|
||||
{
|
||||
idbassert(colCountTemp == columnCount && rowSize == rowSizeTemp);
|
||||
}
|
||||
rowData.reset(new uint8_t[std::max(amount, defAmount)]);
|
||||
buf = bs.buf();
|
||||
memcpy(rowData.get(), buf, amount);
|
||||
|
@ -320,6 +320,8 @@ class RGData
|
||||
}
|
||||
|
||||
private:
|
||||
uint32_t rowSize = 0; // can't be.
|
||||
uint32_t columnCount = 0; // shouldn't be, but...
|
||||
std::shared_ptr<uint8_t[]> rowData;
|
||||
std::shared_ptr<StringStore> strings;
|
||||
std::shared_ptr<UserDataStore> userDataStore;
|
||||
@ -2195,6 +2197,7 @@ inline uint64_t StringStore::getSize() const
|
||||
|
||||
inline void RGData::getRow(uint32_t num, Row* row)
|
||||
{
|
||||
idbassert(columnCount == row->getColumnCount() && rowSize == row->getSize());
|
||||
uint32_t size = row->getSize();
|
||||
row->setData(
|
||||
Row::Pointer(&rowData[RowGroup::getHeaderSize() + (num * size)], strings.get(), userDataStore.get()));
|
||||
|
Loading…
x
Reference in New Issue
Block a user