1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Non working attempt to do MCOL-5227

This commit is contained in:
mariadb-AndreyPiskunov
2022-10-18 17:02:55 +03:00
parent d22627af7d
commit 1714b75434
13 changed files with 1664 additions and 14 deletions

View File

@ -22,6 +22,7 @@ set(execplan_LIB_SRCS
functioncolumn.cpp functioncolumn.cpp
groupconcatcolumn.cpp groupconcatcolumn.cpp
intervalcolumn.cpp intervalcolumn.cpp
jsonarrayaggcolumn.cpp
logicoperator.cpp logicoperator.cpp
mysqlexecutionplan.cpp mysqlexecutionplan.cpp
objectidmanager.cpp objectidmanager.cpp

View File

@ -74,6 +74,7 @@ class AggregateColumn : public ReturnedColumn
BIT_OR, BIT_OR,
BIT_XOR, BIT_XOR,
GROUP_CONCAT, GROUP_CONCAT,
JSON_ARRAYAGG,
UDAF, UDAF,
MULTI_PARM MULTI_PARM
}; };

View File

@ -0,0 +1,170 @@
/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <sstream>
#include <cstring>
using namespace std;
#include "bytestream.h"
using namespace messageqcpp;
#include "rowgroup.h"
using namespace rowgroup;
#include "joblisttypes.h"
using namespace joblist;
#include "simplefilter.h"
#include "constantfilter.h"
#include "arithmeticcolumn.h"
#include "functioncolumn.h"
#include "objectreader.h"
#include "jsonarrayaggcolumn.h"
namespace execplan
{
/**
* Constructors/Destructors
*/
JsonArrayAggColumn::JsonArrayAggColumn() : AggregateColumn()
{
}
JsonArrayAggColumn::JsonArrayAggColumn(const uint32_t sessionID) : AggregateColumn(sessionID)
{
}
JsonArrayAggColumn::JsonArrayAggColumn(const JsonArrayAggColumn& rhs, const uint32_t sessionID)
: AggregateColumn(dynamic_cast<const AggregateColumn&>(rhs))
, fOrderCols(rhs.fOrderCols)
, fSeparator(rhs.fSeparator)
{
}
JsonArrayAggColumn::~JsonArrayAggColumn()
{
}
/**
* Methods
*/
const string JsonArrayAggColumn::toString() const
{
ostringstream output;
output << "JsonArrayAggColumn " << data() << endl;
output << AggregateColumn::toString() << endl;
output << "Json Array Order Columns: " << endl;
for (uint32_t i = 0; i < fOrderCols.size(); i++)
{
output << *fOrderCols[i];
}
return output.str();
}
ostream& operator<<(ostream& output, const JsonArrayAggColumn& rhs)
{
output << rhs.toString();
return output;
}
void JsonArrayAggColumn::serialize(messageqcpp::ByteStream& b) const
{
b << (uint8_t)ObjectReader::GROUPCONCATCOLUMN;
AggregateColumn::serialize(b);
CalpontSelectExecutionPlan::ReturnedColumnList::const_iterator rcit;
b << static_cast<uint32_t>(fOrderCols.size());
for (rcit = fOrderCols.begin(); rcit != fOrderCols.end(); ++rcit)
(*rcit)->serialize(b);
b << ',';
}
void JsonArrayAggColumn::unserialize(messageqcpp::ByteStream& b)
{
ObjectReader::checkType(b, ObjectReader::GROUPCONCATCOLUMN);
AggregateColumn::unserialize(b);
fOrderCols.erase(fOrderCols.begin(), fOrderCols.end());
uint32_t size, i;
ReturnedColumn* rc;
b >> size;
for (i = 0; i < size; i++)
{
rc = dynamic_cast<ReturnedColumn*>(ObjectReader::createTreeNode(b));
SRCP srcp(rc);
fOrderCols.push_back(srcp);
}
}
bool JsonArrayAggColumn::operator==(const JsonArrayAggColumn& t) const
{
const AggregateColumn *rc1, *rc2;
rc1 = static_cast<const AggregateColumn*>(this);
rc2 = static_cast<const AggregateColumn*>(&t);
if (*rc1 != *rc2)
return false;
for (uint32_t i = 0; i < fOrderCols.size(); i++)
{
if (fOrderCols[i].get() != NULL)
{
if (t.fOrderCols[i] == NULL)
return false;
if (*(fOrderCols[i].get()) != t.fOrderCols[i].get())
return false;
}
else if (t.fOrderCols[i].get() != NULL)
return false;
}
return true;
}
bool JsonArrayAggColumn::operator==(const TreeNode* t) const
{
const JsonArrayAggColumn* ac;
ac = dynamic_cast<const JsonArrayAggColumn*>(t);
if (ac == NULL)
return false;
return *this == *ac;
}
bool JsonArrayAggColumn::operator!=(const JsonArrayAggColumn& t) const
{
return !(*this == t);
}
bool JsonArrayAggColumn::operator!=(const TreeNode* t) const
{
return !(*this == t);
}
} // namespace execplan

View File

@ -0,0 +1,143 @@
/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/** @file */
#pragma once
#include <string>
#include "calpontselectexecutionplan.h"
#include "aggregatecolumn.h"
namespace messageqcpp
{
class ByteStream;
}
/**
* Namespace
*/
namespace execplan
{
/**
* @brief A class to represent a aggregate return column
*
* This class is a specialization of class ReturnedColumn that
* handles an aggregate function call (e.g., SUM, COUNT, MIN, MAX).
*/
class JsonArrayAggColumn : public AggregateColumn
{
public:
/**
* Constructors
*/
JsonArrayAggColumn();
JsonArrayAggColumn(const uint32_t sessionID);
JsonArrayAggColumn(const JsonArrayAggColumn& rhs, const uint32_t sessionID = 0);
/**
* Destructors
*/
virtual ~JsonArrayAggColumn();
/**
* Overloaded stream operator
*/
virtual const std::string toString() const;
/** return a copy of this pointer
*
* deep copy of this pointer and return the copy
*/
virtual JsonArrayAggColumn* clone() const
{
return new JsonArrayAggColumn(*this);
}
/**
* Accessors and Mutators
*/
void orderCols(const std::vector<SRCP>& orderCols)
{
fOrderCols = orderCols;
}
std::vector<SRCP>& orderCols()
{
return fOrderCols;
}
void separator(const std::string& separator)
{
fSeparator = separator;
}
std::string& separator()
{
return fSeparator;
}
/**
* Serialize interface
*/
virtual void serialize(messageqcpp::ByteStream&) const;
virtual void unserialize(messageqcpp::ByteStream&);
/** @brief Do a deep, strict (as opposed to semantic) equivalence test
*
* Do a deep, strict (as opposed to semantic) equivalence test.
* @return true iff every member of t is a duplicate copy of every member of this;
* false otherwise
*/
virtual bool operator==(const TreeNode* t) const;
/** @brief Do a deep, strict (as opposed to semantic) equivalence test
*
* Do a deep, strict (as opposed to semantic) equivalence test.
* @return true iff every member of t is a duplicate copy of every member of this;
* false otherwise
*/
using AggregateColumn::operator==;
virtual bool operator==(const JsonArrayAggColumn& t) const;
/** @brief Do a deep, strict (as opposed to semantic) equivalence test
*
* Do a deep, strict (as opposed to semantic) equivalence test.
* @return false iff every member of t is a duplicate copy of every member of this;
* true otherwise
*/
virtual bool operator!=(const TreeNode* t) const;
/** @brief Do a deep, strict (as opposed to semantic) equivalence test
*
* Do a deep, strict (as opposed to semantic) equivalence test.
* @return false iff every member of t is a duplicate copy of every member of this;
* true otherwise
*/
using AggregateColumn::operator!=;
virtual bool operator!=(const JsonArrayAggColumn& t) const;
private:
std::vector<SRCP> fOrderCols;
std::string fSeparator;
};
/**
* stream operator
*/
std::ostream& operator<<(std::ostream& os, const JsonArrayAggColumn& rhs);
} // namespace execplan

View File

@ -28,6 +28,7 @@ set(joblist_LIB_SRCS
joblistfactory.cpp joblistfactory.cpp
jobstep.cpp jobstep.cpp
jobstepassociation.cpp jobstepassociation.cpp
jsonarrayagg.cpp
lbidlist.cpp lbidlist.cpp
limitedorderby.cpp limitedorderby.cpp
passthrucommand-jl.cpp passthrucommand-jl.cpp

View File

@ -1021,7 +1021,6 @@ void GroupConcatNoOrder::getResult(uint8_t* buff, const string& sep)
{ {
ostringstream oss; ostringstream oss;
bool addSep = false; bool addSep = false;
fDataQueue.push(fData); fDataQueue.push(fData);
while (fDataQueue.size() > 0) while (fDataQueue.size() > 0)

View File

@ -43,6 +43,7 @@
#include "joblist.h" #include "joblist.h"
#include "jobstep.h" #include "jobstep.h"
#include "groupconcat.h" #include "groupconcat.h"
#include "jsonarrayagg.h"
#include "jl_logger.h" #include "jl_logger.h"
#include "resourcemanager.h" #include "resourcemanager.h"

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,174 @@
/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/** @file */
#pragma once
#include <utility>
#include <set>
#include <vector>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_array.hpp>
#include "groupconcat.h"
#if defined(_MSC_VER) && defined(JOBLIST_DLLEXPORT)
#define EXPORT __declspec(dllexport)
#else
#define EXPORT
#endif
namespace joblist
{
// forward reference
struct JobInfo;
class JsonArrayAggregator;
class ResourceManager;
class JsonArrayInfo
{
public:
JsonArrayInfo();
virtual ~JsonArrayInfo();
void prepJsonArray(JobInfo&);
void mapColumns(const rowgroup::RowGroup&);
std::set<uint32_t>& columns()
{
return fColumns;
}
std::vector<rowgroup::SP_GroupConcat>& groupConcat()
{
return fGroupConcat;
}
const std::string toString() const;
protected:
uint32_t getColumnKey(const execplan::SRCP& srcp, JobInfo& jobInfo);
boost::shared_array<int> makeMapping(const rowgroup::RowGroup&, const rowgroup::RowGroup&);
std::set<uint32_t> fColumns;
std::vector<rowgroup::SP_GroupConcat> fGroupConcat;
};
class JsonArrayAggregatAgUM : public GroupConcatAgUM
{
public:
EXPORT JsonArrayAggregatAgUM(rowgroup::SP_GroupConcat&);
EXPORT ~JsonArrayAggregatAgUM();
using rowgroup::GroupConcatAg::merge;
void initialize();
void processRow(const rowgroup::Row&);
EXPORT void merge(const rowgroup::Row&, int64_t);
/*boost::scoped_ptr<JsonArrayAggregator>& concator()
{
return fConcator;
}
*/
EXPORT void getResult(uint8_t*);
EXPORT uint8_t* getResult();
protected:
void applyMapping(const boost::shared_array<int>&, const rowgroup::Row&);
/*
boost::scoped_ptr<JsonArrayAggregator> fConcator;
boost::scoped_array<uint8_t> fData;
rowgroup::Row fRow;
bool fNoOrder;
*/
};
// JSON_ARRAYAGG base
class JsonArrayAggregator : public GroupConcator
{
public:
JsonArrayAggregator();
virtual ~JsonArrayAggregator();
virtual void initialize(const rowgroup::SP_GroupConcat&);
virtual void processRow(const rowgroup::Row&) = 0;
virtual const std::string toString() const;
protected:
virtual bool concatColIsNull(const rowgroup::Row&);
virtual void outputRow(std::ostringstream&, const rowgroup::Row&);
virtual int64_t lengthEstimate(const rowgroup::Row&);
};
// For JSON_ARRAYAGG withour distinct or orderby
class JsonArrayAggNoOrder : public JsonArrayAggregator
{
public:
JsonArrayAggNoOrder();
virtual ~JsonArrayAggNoOrder();
void initialize(const rowgroup::SP_GroupConcat&);
void processRow(const rowgroup::Row&);
using GroupConcator::merge;
void merge(GroupConcator*);
using GroupConcator::getResult;
void getResult(uint8_t* buff, const std::string& sep);
const std::string toString() const;
protected:
rowgroup::RowGroup fRowGroup;
rowgroup::Row fRow;
rowgroup::RGData fData;
std::queue<rowgroup::RGData> fDataQueue;
uint64_t fRowsPerRG;
uint64_t fErrorCode;
uint64_t fMemSize;
ResourceManager* fRm;
boost::shared_ptr<int64_t> fSessionMemLimit;
};
// ORDER BY used in JSON_ARRAYAGG class
class JsonArrayAggOrderBy : public JsonArrayAggregator, public ordering::IdbOrderBy
{
public:
JsonArrayAggOrderBy();
virtual ~JsonArrayAggOrderBy();
using ordering::IdbOrderBy::initialize;
void initialize(const rowgroup::SP_GroupConcat&);
void processRow(const rowgroup::Row&);
uint64_t getKeyLength() const;
using GroupConcator::merge;
void merge(GroupConcator*);
using GroupConcator::getResult;
void getResult(uint8_t* buff, const std::string& sep);
const std::string toString() const;
protected:
};
} // namespace joblist
#undef EXPORT

View File

@ -174,6 +174,8 @@ inline RowAggFunctionType functionIdMap(int planFuncId)
case AggregateColumn::GROUP_CONCAT: return ROWAGG_GROUP_CONCAT; case AggregateColumn::GROUP_CONCAT: return ROWAGG_GROUP_CONCAT;
case AggregateColumn::JSON_ARRAYAGG: return ROWAGG_JSON_ARRAY;
case AggregateColumn::CONSTANT: return ROWAGG_CONSTANT; case AggregateColumn::CONSTANT: return ROWAGG_CONSTANT;
case AggregateColumn::UDAF: return ROWAGG_UDAF; case AggregateColumn::UDAF: return ROWAGG_UDAF;
@ -1075,7 +1077,7 @@ void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector<RowGroup>&
continue; continue;
} }
if (aggOp == ROWAGG_GROUP_CONCAT) if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{ {
TupleInfo ti = getTupleInfo(key, jobInfo); TupleInfo ti = getTupleInfo(key, jobInfo);
uint32_t ptrSize = sizeof(GroupConcatAg*); uint32_t ptrSize = sizeof(GroupConcatAg*);
@ -1696,7 +1698,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<Ro
continue; continue;
// skip if this is a group_concat // skip if this is a group_concat
if (aggOp == ROWAGG_GROUP_CONCAT) if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{ {
TupleInfo ti = getTupleInfo(aggKey, jobInfo); TupleInfo ti = getTupleInfo(aggKey, jobInfo);
uint32_t width = sizeof(GroupConcatAg*); uint32_t width = sizeof(GroupConcatAg*);
@ -2229,7 +2231,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<Ro
csNumAggDist.push_back(csNumAgg[colAgg]); csNumAggDist.push_back(csNumAgg[colAgg]);
uint32_t width = widthAgg[colAgg]; uint32_t width = widthAgg[colAgg];
if (aggOp == ROWAGG_GROUP_CONCAT) if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{ {
TupleInfo ti = getTupleInfo(retKey, jobInfo); TupleInfo ti = getTupleInfo(retKey, jobInfo);
@ -2320,7 +2322,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<Ro
} }
#if 0 #if 0
else if (aggOp == ROWAGG_GROUP_CONCAT) else if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{ {
TupleInfo ti = getTupleInfo(retKey, jobInfo); TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid); oidsAggDist.push_back(ti.oid);
@ -2840,7 +2842,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<Ro
f->fAggFunction == ROWAGG_MIN || f->fAggFunction == ROWAGG_MAX || f->fAggFunction == ROWAGG_MIN || f->fAggFunction == ROWAGG_MAX ||
f->fAggFunction == ROWAGG_STATS || f->fAggFunction == ROWAGG_BIT_AND || f->fAggFunction == ROWAGG_STATS || f->fAggFunction == ROWAGG_BIT_AND ||
f->fAggFunction == ROWAGG_BIT_OR || f->fAggFunction == ROWAGG_BIT_XOR || f->fAggFunction == ROWAGG_BIT_OR || f->fAggFunction == ROWAGG_BIT_XOR ||
f->fAggFunction == ROWAGG_CONSTANT || f->fAggFunction == ROWAGG_GROUP_CONCAT)) f->fAggFunction == ROWAGG_CONSTANT || f->fAggFunction == ROWAGG_GROUP_CONCAT || f->fAggFunction == ROWAGG_JSON_ARRAY))
{ {
funct.reset(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction, f->fInputColumnIndex, funct.reset(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction, f->fInputColumnIndex,
f->fOutputColumnIndex, f->fAuxColumnIndex - multiParms)); f->fOutputColumnIndex, f->fAuxColumnIndex - multiParms));

View File

@ -81,6 +81,7 @@ using namespace cal_impl_if;
#include "selectfilter.h" #include "selectfilter.h"
#include "existsfilter.h" #include "existsfilter.h"
#include "groupconcatcolumn.h" #include "groupconcatcolumn.h"
#include "jsonarrayaggcolumn.h"
#include "outerjoinonfilter.h" #include "outerjoinonfilter.h"
#include "intervalcolumn.h" #include "intervalcolumn.h"
#include "udafcolumn.h" #include "udafcolumn.h"
@ -2904,6 +2905,14 @@ uint32_t setAggOp(AggregateColumn* ac, Item_sum* isp)
return rc; return rc;
} }
case Item_sum::JSON_ARRAYAGG_FUNC:
{
Item_func_group_concat* gc = (Item_func_group_concat*)isp;
ac->aggOp(AggregateColumn::JSON_ARRAYAGG);
ac->distinct(gc->get_distinct());
return rc;
}
case Item_sum::SUM_BIT_FUNC: case Item_sum::SUM_BIT_FUNC:
{ {
string funcName = isp->func_name(); string funcName = isp->func_name();
@ -4922,6 +4931,10 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
{ {
ac = new GroupConcatColumn(gwi.sessionid); ac = new GroupConcatColumn(gwi.sessionid);
} }
else if (isp->sum_func() == Item_sum::JSON_ARRAYAGG_FUNC)
{
ac = new JsonArrayAggColumn(gwi.sessionid);
}
else if (isp->sum_func() == Item_sum::UDF_SUM_FUNC) else if (isp->sum_func() == Item_sum::UDF_SUM_FUNC)
{ {
ac = new UDAFColumn(gwi.sessionid); ac = new UDAFColumn(gwi.sessionid);
@ -4950,7 +4963,7 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
try try
{ {
// special parsing for group_concat // special parsing for group_concat
if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC) if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC || isp->sum_func() == Item_sum::JSON_ARRAYAGG_FUNC)
{ {
Item_func_group_concat* gc = (Item_func_group_concat*)isp; Item_func_group_concat* gc = (Item_func_group_concat*)isp;
vector<SRCP> orderCols; vector<SRCP> orderCols;
@ -5018,7 +5031,14 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
} }
rowCol->columnVec(selCols); rowCol->columnVec(selCols);
(dynamic_cast<GroupConcatColumn*>(ac))->orderCols(orderCols); if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC)
{
(dynamic_cast<GroupConcatColumn*>(ac))->orderCols(orderCols);
}
else
{
(dynamic_cast<JsonArrayAggColumn*>(ac))->orderCols(orderCols);
}
parm.reset(rowCol); parm.reset(rowCol);
ac->aggParms().push_back(parm); ac->aggParms().push_back(parm);
@ -5026,7 +5046,14 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
{ {
string separator; string separator;
separator.assign(gc->get_separator()->ptr(), gc->get_separator()->length()); separator.assign(gc->get_separator()->ptr(), gc->get_separator()->length());
(dynamic_cast<GroupConcatColumn*>(ac))->separator(separator); if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC)
{
(dynamic_cast<GroupConcatColumn*>(ac))->separator(separator);
}
else
{
(dynamic_cast<JsonArrayAggColumn*>(ac))->separator(separator);
}
} }
} }
else if (isSupportedAggregateWithOneConstArg(isp, sfitempp)) else if (isSupportedAggregateWithOneConstArg(isp, sfitempp))
@ -5244,7 +5271,8 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
ct.precision = -16; // borrowed to indicate skip null value check on connector ct.precision = -16; // borrowed to indicate skip null value check on connector
ac->resultType(ct); ac->resultType(ct);
} }
else if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC) else if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC ||
isp->sum_func() == Item_sum::JSON_ARRAYAGG_FUNC)
{ {
// Item_func_group_concat* gc = (Item_func_group_concat*)isp; // Item_func_group_concat* gc = (Item_func_group_concat*)isp;
CalpontSystemCatalog::ColType ct; CalpontSystemCatalog::ColType ct;

View File

@ -35,6 +35,7 @@
#include "mcs_basic_types.h" #include "mcs_basic_types.h"
#include "resourcemanager.h" #include "resourcemanager.h"
#include "groupconcat.h" #include "groupconcat.h"
#include "jsonarrayagg.h"
#include "blocksize.h" #include "blocksize.h"
#include "errorcodes.h" #include "errorcodes.h"
@ -656,7 +657,8 @@ void RowAggregation::initialize()
bool allow_gen = true; bool allow_gen = true;
for (auto& fun : fFunctionCols) for (auto& fun : fFunctionCols)
{ {
if (fun->fAggFunction == ROWAGG_UDAF || fun->fAggFunction == ROWAGG_GROUP_CONCAT) if (fun->fAggFunction == ROWAGG_UDAF || fun->fAggFunction == ROWAGG_GROUP_CONCAT ||
fun->fAggFunction == ROWAGG_JSON_ARRAY)
{ {
allow_gen = false; allow_gen = false;
break; break;
@ -732,7 +734,8 @@ void RowAggregation::aggReset()
bool allow_gen = true; bool allow_gen = true;
for (auto& fun : fFunctionCols) for (auto& fun : fFunctionCols)
{ {
if (fun->fAggFunction == ROWAGG_UDAF || fun->fAggFunction == ROWAGG_GROUP_CONCAT) if (fun->fAggFunction == ROWAGG_UDAF || fun->fAggFunction == ROWAGG_GROUP_CONCAT ||
fun->fAggFunction == ROWAGG_JSON_ARRAY)
{ {
allow_gen = false; allow_gen = false;
break; break;
@ -972,7 +975,7 @@ void RowAggregation::makeAggFieldsNull(Row& row)
if (fFunctionCol->fAggFunction == ROWAGG_COUNT_ASTERISK || if (fFunctionCol->fAggFunction == ROWAGG_COUNT_ASTERISK ||
fFunctionCol->fAggFunction == ROWAGG_COUNT_COL_NAME || fFunctionCol->fAggFunction == ROWAGG_COUNT_COL_NAME ||
fFunctionCol->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME || fFunctionCol->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME ||
fFunctionCol->fAggFunction == ROWAGG_COUNT_NO_OP || fFunctionCol->fAggFunction == ROWAGG_COUNT_NO_OP || fFunctionCol->fAggFunction == ROWAGG_JSON_ARRAY ||
fFunctionCol->fAggFunction == ROWAGG_GROUP_CONCAT || fFunctionCol->fAggFunction == ROWAGG_STATS) fFunctionCol->fAggFunction == ROWAGG_GROUP_CONCAT || fFunctionCol->fAggFunction == ROWAGG_STATS)
{ {
continue; continue;
@ -1668,6 +1671,7 @@ void RowAggregation::updateEntry(const Row& rowIn, std::vector<mcsv1sdk::mcsv1Co
case ROWAGG_DUP_STATS: case ROWAGG_DUP_STATS:
case ROWAGG_DUP_UDAF: case ROWAGG_DUP_UDAF:
case ROWAGG_CONSTANT: case ROWAGG_CONSTANT:
case ROWAGG_JSON_ARRAY:
case ROWAGG_GROUP_CONCAT: break; case ROWAGG_GROUP_CONCAT: break;
case ROWAGG_UDAF: case ROWAGG_UDAF:
@ -1730,6 +1734,7 @@ void RowAggregation::mergeEntries(const Row& rowIn)
case ROWAGG_DUP_STATS: case ROWAGG_DUP_STATS:
case ROWAGG_DUP_UDAF: case ROWAGG_DUP_UDAF:
case ROWAGG_CONSTANT: case ROWAGG_CONSTANT:
case ROWAGG_JSON_ARRAY:
case ROWAGG_GROUP_CONCAT: break; case ROWAGG_GROUP_CONCAT: break;
case ROWAGG_UDAF: doUDAF(rowIn, colOut, colOut, colOut + 1, i); break; case ROWAGG_UDAF: doUDAF(rowIn, colOut, colOut, colOut + 1, i); break;
@ -2480,6 +2485,14 @@ void RowAggregationUM::attachGroupConcatAg()
fGroupConcatAg.push_back(gcc); fGroupConcatAg.push_back(gcc);
*((GroupConcatAg**)(data + fRow.getOffset(colOut))) = gcc.get(); *((GroupConcatAg**)(data + fRow.getOffset(colOut))) = gcc.get();
} }
if (fFunctionColGc[i]->fAggFunction == ROWAGG_JSON_ARRAY)
{
// save the object's address in the result row
SP_GroupConcatAg gcc(new joblist::JsonArrayAggregatAgUM(fGroupConcat[j++]));
fGroupConcatAg.push_back(gcc);
*((GroupConcatAg**)(data + fRow.getOffset(colOut))) = gcc.get();
}
} }
} }
} }
@ -2543,6 +2556,12 @@ void RowAggregationUM::updateEntry(const Row& rowIn, std::vector<mcsv1sdk::mcsv1
break; break;
} }
case ROWAGG_JSON_ARRAY:
{
doJsonAgg(rowIn, colIn, colOut);
break;
}
case ROWAGG_COUNT_NO_OP: case ROWAGG_COUNT_NO_OP:
case ROWAGG_DUP_FUNCT: case ROWAGG_DUP_FUNCT:
case ROWAGG_DUP_AVG: case ROWAGG_DUP_AVG:
@ -2581,6 +2600,13 @@ void RowAggregationUM::doGroupConcat(const Row& rowIn, int64_t, int64_t o)
gccAg->processRow(rowIn); gccAg->processRow(rowIn);
} }
void RowAggregationUM::doJsonAgg(const Row& rowIn, int64_t, int64_t o)
{
uint8_t* data = fRow.getData();
joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + fRow.getOffset(o)));
gccAg->processRow(rowIn);
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// After all PM rowgroups received, calculate the average value. // After all PM rowgroups received, calculate the average value.
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -3989,6 +4015,16 @@ void RowAggregationUM::setGroupConcatString()
fRow.setStringField((char*)gcString, fFunctionCols[j]->fOutputColumnIndex); fRow.setStringField((char*)gcString, fFunctionCols[j]->fOutputColumnIndex);
// gccAg->getResult(buff); // gccAg->getResult(buff);
} }
if (fFunctionCols[j]->fAggFunction == ROWAGG_JSON_ARRAY)
{
uint8_t* buff = data + fRow.getOffset(fFunctionCols[j]->fOutputColumnIndex);
uint8_t* gcString;
joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)buff);
gcString = gccAg->getResult();
fRow.setStringField((char*)gcString, fFunctionCols[j]->fOutputColumnIndex);
// gccAg->getResult(buff);
}
} }
} }
} }
@ -4105,6 +4141,12 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn, std::vector<mcsv1sdk::mcs
break; break;
} }
case ROWAGG_JSON_ARRAY:
{
doJsonAgg(rowIn, colIn, colOut);
break;
}
case ROWAGG_COUNT_NO_OP: case ROWAGG_COUNT_NO_OP:
case ROWAGG_DUP_FUNCT: case ROWAGG_DUP_FUNCT:
case ROWAGG_DUP_AVG: case ROWAGG_DUP_AVG:
@ -4323,6 +4365,13 @@ void RowAggregationUMP2::doGroupConcat(const Row& rowIn, int64_t i, int64_t o)
gccAg->merge(rowIn, i); gccAg->merge(rowIn, i);
} }
void RowAggregationUMP2::doJsonAgg(const Row& rowIn, int64_t i, int64_t o)
{
uint8_t* data = fRow.getData();
joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + fRow.getOffset(o)));
gccAg->merge(rowIn, i);
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Update the and/or/xor fields if input is not null. // Update the and/or/xor fields if input is not null.
// rowIn(in) - Row to be included in aggregation. // rowIn(in) - Row to be included in aggregation.
@ -4572,6 +4621,12 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn, std::vector<mcsv1sdk:
break; break;
} }
case ROWAGG_JSON_ARRAY:
{
doJsonAgg(rowIn, colIn, colOut);
break;
}
case ROWAGG_COUNT_NO_OP: case ROWAGG_COUNT_NO_OP:
case ROWAGG_DUP_FUNCT: case ROWAGG_DUP_FUNCT:
case ROWAGG_DUP_AVG: case ROWAGG_DUP_AVG:
@ -4700,6 +4755,12 @@ void RowAggregationSubDistinct::doGroupConcat(const Row& rowIn, int64_t i, int64
gccAg->merge(rowIn, i); gccAg->merge(rowIn, i);
} }
void RowAggregationSubDistinct::doJsonAgg(const Row& rowIn, int64_t i, int64_t o)
{
uint8_t* data = fRow.getData();
joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + fRow.getOffset(o)));
gccAg->merge(rowIn, i);
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Constructor / destructor // Constructor / destructor
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@ -92,6 +92,8 @@ enum RowAggFunctionType
// GROUP_CONCAT // GROUP_CONCAT
ROWAGG_GROUP_CONCAT, ROWAGG_GROUP_CONCAT,
ROWAGG_JSON_ARRAY,
// DISTINCT: performed on UM only // DISTINCT: performed on UM only
ROWAGG_COUNT_DISTINCT_COL_NAME, // COUNT(distinct column_name) only counts non-null rows ROWAGG_COUNT_DISTINCT_COL_NAME, // COUNT(distinct column_name) only counts non-null rows
ROWAGG_DISTINCT_SUM, ROWAGG_DISTINCT_SUM,
@ -763,6 +765,7 @@ class RowAggregationUM : public RowAggregation
// @bug3362, group_concat // @bug3362, group_concat
virtual void doGroupConcat(const Row&, int64_t, int64_t); virtual void doGroupConcat(const Row&, int64_t, int64_t);
virtual void doJsonAgg(const Row&, int64_t, int64_t);
virtual void setGroupConcatString(); virtual void setGroupConcatString();
bool fHasAvg; bool fHasAvg;
@ -826,6 +829,7 @@ class RowAggregationUMP2 : public RowAggregationUM
void doAvg(const Row&, int64_t, int64_t, int64_t, bool merge = false) override; void doAvg(const Row&, int64_t, int64_t, int64_t, bool merge = false) override;
void doStatistics(const Row&, int64_t, int64_t, int64_t) override; void doStatistics(const Row&, int64_t, int64_t, int64_t) override;
void doGroupConcat(const Row&, int64_t, int64_t) override; void doGroupConcat(const Row&, int64_t, int64_t) override;
void doJsonAgg(const Row&, int64_t, int64_t) override;
void doBitOp(const Row&, int64_t, int64_t, int) override; void doBitOp(const Row&, int64_t, int64_t, int) override;
void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx, void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx,
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override; std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
@ -937,7 +941,7 @@ class RowAggregationSubDistinct : public RowAggregationUM
protected: protected:
// virtual methods from RowAggregationUM // virtual methods from RowAggregationUM
void doGroupConcat(const Row&, int64_t, int64_t) override; void doGroupConcat(const Row&, int64_t, int64_t) override;
void doJsonAgg(const Row&, int64_t, int64_t) override;
// for groupby columns and the aggregated distinct column // for groupby columns and the aggregated distinct column
Row fDistRow; Row fDistRow;
boost::scoped_array<uint8_t> fDistRowData; boost::scoped_array<uint8_t> fDistRowData;