diff --git a/dbcon/execplan/CMakeLists.txt b/dbcon/execplan/CMakeLists.txt index bc8daac03..45d8f43c7 100755 --- a/dbcon/execplan/CMakeLists.txt +++ b/dbcon/execplan/CMakeLists.txt @@ -22,6 +22,7 @@ set(execplan_LIB_SRCS functioncolumn.cpp groupconcatcolumn.cpp intervalcolumn.cpp + jsonarrayaggcolumn.cpp logicoperator.cpp mysqlexecutionplan.cpp objectidmanager.cpp diff --git a/dbcon/execplan/aggregatecolumn.h b/dbcon/execplan/aggregatecolumn.h index 9b0a75c93..cb33b9468 100644 --- a/dbcon/execplan/aggregatecolumn.h +++ b/dbcon/execplan/aggregatecolumn.h @@ -74,6 +74,7 @@ class AggregateColumn : public ReturnedColumn BIT_OR, BIT_XOR, GROUP_CONCAT, + JSON_ARRAYAGG, UDAF, MULTI_PARM }; diff --git a/dbcon/execplan/jsonarrayaggcolumn.cpp b/dbcon/execplan/jsonarrayaggcolumn.cpp new file mode 100644 index 000000000..33809c68e --- /dev/null +++ b/dbcon/execplan/jsonarrayaggcolumn.cpp @@ -0,0 +1,170 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include + +using namespace std; + +#include "bytestream.h" +using namespace messageqcpp; + +#include "rowgroup.h" +using namespace rowgroup; + +#include "joblisttypes.h" +using namespace joblist; + +#include "simplefilter.h" +#include "constantfilter.h" +#include "arithmeticcolumn.h" +#include "functioncolumn.h" +#include "objectreader.h" +#include "jsonarrayaggcolumn.h" + +namespace execplan +{ +/** + * Constructors/Destructors + */ +JsonArrayAggColumn::JsonArrayAggColumn() : AggregateColumn() +{ +} + +JsonArrayAggColumn::JsonArrayAggColumn(const uint32_t sessionID) : AggregateColumn(sessionID) +{ +} + +JsonArrayAggColumn::JsonArrayAggColumn(const JsonArrayAggColumn& rhs, const uint32_t sessionID) + : AggregateColumn(dynamic_cast(rhs)) + , fOrderCols(rhs.fOrderCols) + , fSeparator(rhs.fSeparator) +{ +} + +JsonArrayAggColumn::~JsonArrayAggColumn() +{ +} + +/** + * Methods + */ + +const string JsonArrayAggColumn::toString() const +{ + ostringstream output; + output << "JsonArrayAggColumn " << data() << endl; + output << AggregateColumn::toString() << endl; + output << "Json Array Order Columns: " << endl; + + for (uint32_t i = 0; i < fOrderCols.size(); i++) + { + output << *fOrderCols[i]; + } + + return output.str(); +} + +ostream& operator<<(ostream& output, const JsonArrayAggColumn& rhs) +{ + output << rhs.toString(); + return output; +} + +void JsonArrayAggColumn::serialize(messageqcpp::ByteStream& b) const +{ + b << (uint8_t)ObjectReader::GROUPCONCATCOLUMN; + AggregateColumn::serialize(b); + + CalpontSelectExecutionPlan::ReturnedColumnList::const_iterator rcit; + b << static_cast(fOrderCols.size()); + + for (rcit = fOrderCols.begin(); rcit != fOrderCols.end(); ++rcit) + (*rcit)->serialize(b); + + b << ','; +} + +void JsonArrayAggColumn::unserialize(messageqcpp::ByteStream& b) +{ + ObjectReader::checkType(b, ObjectReader::GROUPCONCATCOLUMN); + AggregateColumn::unserialize(b); + fOrderCols.erase(fOrderCols.begin(), fOrderCols.end()); + + uint32_t size, i; + ReturnedColumn* rc; + b >> size; + + for (i = 0; i < size; i++) + { + rc = dynamic_cast(ObjectReader::createTreeNode(b)); + SRCP srcp(rc); + fOrderCols.push_back(srcp); + } + +} + +bool JsonArrayAggColumn::operator==(const JsonArrayAggColumn& t) const +{ + const AggregateColumn *rc1, *rc2; + + rc1 = static_cast(this); + rc2 = static_cast(&t); + + if (*rc1 != *rc2) + return false; + + for (uint32_t i = 0; i < fOrderCols.size(); i++) + { + if (fOrderCols[i].get() != NULL) + { + if (t.fOrderCols[i] == NULL) + return false; + + if (*(fOrderCols[i].get()) != t.fOrderCols[i].get()) + return false; + } + else if (t.fOrderCols[i].get() != NULL) + return false; + } + + return true; +} + +bool JsonArrayAggColumn::operator==(const TreeNode* t) const +{ + const JsonArrayAggColumn* ac; + + ac = dynamic_cast(t); + + if (ac == NULL) + return false; + + return *this == *ac; +} + +bool JsonArrayAggColumn::operator!=(const JsonArrayAggColumn& t) const +{ + return !(*this == t); +} + +bool JsonArrayAggColumn::operator!=(const TreeNode* t) const +{ + return !(*this == t); +} + +} // namespace execplan diff --git a/dbcon/execplan/jsonarrayaggcolumn.h b/dbcon/execplan/jsonarrayaggcolumn.h new file mode 100644 index 000000000..c72b4c6c4 --- /dev/null +++ b/dbcon/execplan/jsonarrayaggcolumn.h @@ -0,0 +1,143 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +/** @file */ + +#pragma once +#include + +#include "calpontselectexecutionplan.h" +#include "aggregatecolumn.h" + +namespace messageqcpp +{ +class ByteStream; +} + +/** + * Namespace + */ +namespace execplan +{ +/** + * @brief A class to represent a aggregate return column + * + * This class is a specialization of class ReturnedColumn that + * handles an aggregate function call (e.g., SUM, COUNT, MIN, MAX). + */ +class JsonArrayAggColumn : public AggregateColumn +{ + public: + /** + * Constructors + */ + JsonArrayAggColumn(); + + JsonArrayAggColumn(const uint32_t sessionID); + + JsonArrayAggColumn(const JsonArrayAggColumn& rhs, const uint32_t sessionID = 0); + + /** + * Destructors + */ + virtual ~JsonArrayAggColumn(); + + /** + * Overloaded stream operator + */ + virtual const std::string toString() const; + + /** return a copy of this pointer + * + * deep copy of this pointer and return the copy + */ + virtual JsonArrayAggColumn* clone() const + { + return new JsonArrayAggColumn(*this); + } + + /** + * Accessors and Mutators + */ + void orderCols(const std::vector& orderCols) + { + fOrderCols = orderCols; + } + std::vector& orderCols() + { + return fOrderCols; + } + void separator(const std::string& separator) + { + fSeparator = separator; + } + std::string& separator() + { + return fSeparator; + } + + /** + * Serialize interface + */ + virtual void serialize(messageqcpp::ByteStream&) const; + virtual void unserialize(messageqcpp::ByteStream&); + + /** @brief Do a deep, strict (as opposed to semantic) equivalence test + * + * Do a deep, strict (as opposed to semantic) equivalence test. + * @return true iff every member of t is a duplicate copy of every member of this; + * false otherwise + */ + virtual bool operator==(const TreeNode* t) const; + + /** @brief Do a deep, strict (as opposed to semantic) equivalence test + * + * Do a deep, strict (as opposed to semantic) equivalence test. + * @return true iff every member of t is a duplicate copy of every member of this; + * false otherwise + */ + using AggregateColumn::operator==; + virtual bool operator==(const JsonArrayAggColumn& t) const; + + /** @brief Do a deep, strict (as opposed to semantic) equivalence test + * + * Do a deep, strict (as opposed to semantic) equivalence test. + * @return false iff every member of t is a duplicate copy of every member of this; + * true otherwise + */ + virtual bool operator!=(const TreeNode* t) const; + + /** @brief Do a deep, strict (as opposed to semantic) equivalence test + * + * Do a deep, strict (as opposed to semantic) equivalence test. + * @return false iff every member of t is a duplicate copy of every member of this; + * true otherwise + */ + using AggregateColumn::operator!=; + virtual bool operator!=(const JsonArrayAggColumn& t) const; + + private: + std::vector fOrderCols; + std::string fSeparator; +}; + +/** + * stream operator + */ +std::ostream& operator<<(std::ostream& os, const JsonArrayAggColumn& rhs); + +} // namespace execplan diff --git a/dbcon/joblist/CMakeLists.txt b/dbcon/joblist/CMakeLists.txt index b025005b8..a98937767 100644 --- a/dbcon/joblist/CMakeLists.txt +++ b/dbcon/joblist/CMakeLists.txt @@ -28,6 +28,7 @@ set(joblist_LIB_SRCS joblistfactory.cpp jobstep.cpp jobstepassociation.cpp + jsonarrayagg.cpp lbidlist.cpp limitedorderby.cpp passthrucommand-jl.cpp diff --git a/dbcon/joblist/groupconcat.cpp b/dbcon/joblist/groupconcat.cpp index b693a2eab..7e463a5ea 100644 --- a/dbcon/joblist/groupconcat.cpp +++ b/dbcon/joblist/groupconcat.cpp @@ -1021,7 +1021,6 @@ void GroupConcatNoOrder::getResult(uint8_t* buff, const string& sep) { ostringstream oss; bool addSep = false; - fDataQueue.push(fData); while (fDataQueue.size() > 0) diff --git a/dbcon/joblist/jlf_common.h b/dbcon/joblist/jlf_common.h index 30a939843..a15b8e449 100644 --- a/dbcon/joblist/jlf_common.h +++ b/dbcon/joblist/jlf_common.h @@ -43,6 +43,7 @@ #include "joblist.h" #include "jobstep.h" #include "groupconcat.h" +#include "jsonarrayagg.h" #include "jl_logger.h" #include "resourcemanager.h" diff --git a/dbcon/joblist/jsonarrayagg.cpp b/dbcon/joblist/jsonarrayagg.cpp new file mode 100644 index 000000000..8d64a06a1 --- /dev/null +++ b/dbcon/joblist/jsonarrayagg.cpp @@ -0,0 +1,1065 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2019 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + + +#include +//#define NDEBUG +#include +#include +using namespace std; + +#include +using namespace boost; + +#include "errorids.h" +#include "exceptclasses.h" +using namespace logging; + +#include "returnedcolumn.h" +#include "aggregatecolumn.h" +#include "arithmeticcolumn.h" +#include "functioncolumn.h" +#include "constantcolumn.h" +#include "rowcolumn.h" +#include "groupconcatcolumn.h" +#include "jsonarrayaggcolumn.h" +#include "calpontsystemcatalog.h" +using namespace execplan; + +#include "rowgroup.h" +#include "rowaggregation.h" +using namespace rowgroup; + +#include "dataconvert.h" +using namespace dataconvert; + +#include "jsonarrayagg.h" + +using namespace ordering; + +#include "jobstep.h" +#include "jlf_common.h" +#include "limitedorderby.h" +#include "mcs_decimal.h" + +namespace joblist +{ + +// GroupConcatInfo class implementation +JsonArrayInfo::JsonArrayInfo() +{ +} + +JsonArrayInfo::~JsonArrayInfo() +{ +} + +void JsonArrayInfo::prepJsonArray(JobInfo& jobInfo) +{ + RetColsVector::iterator i = jobInfo.groupConcatCols.begin(); + + while (i != jobInfo.groupConcatCols.end()) + { + JsonArrayAggColumn* gcc = dynamic_cast(i->get()); + const RowColumn* rcp = dynamic_cast(gcc->aggParms()[0].get()); + + SP_GroupConcat groupConcat(new GroupConcat); + groupConcat->fSeparator = gcc->separator(); // or ,? + groupConcat->fDistinct = gcc->distinct(); + groupConcat->fSize = gcc->resultType().colWidth; + groupConcat->fRm = jobInfo.rm; + groupConcat->fSessionMemLimit = jobInfo.umMemLimit; + groupConcat->fTimeZone = jobInfo.timeZone; + + int key = -1; + const vector& cols = rcp->columnVec(); + + for (uint64_t j = 0, k = 0; j < cols.size(); j++) + { + const ConstantColumn* cc = dynamic_cast(cols[j].get()); + + if (cc == NULL) + { + key = getColumnKey(cols[j], jobInfo); + fColumns.insert(key); + groupConcat->fGroupCols.push_back(make_pair(key, k++)); + } + else + { + groupConcat->fConstCols.push_back(make_pair(cc->constval(), j)); + } + } + + vector& orderCols = gcc->orderCols(); + + for (vector::iterator k = orderCols.begin(); k != orderCols.end(); k++) + { + if (dynamic_cast(k->get()) != NULL) + continue; + + key = getColumnKey(*k, jobInfo); + fColumns.insert(key); + groupConcat->fOrderCols.push_back(make_pair(key, k->get()->asc())); + } + + fGroupConcat.push_back(groupConcat); + + i++; + } + + // Rare case: all columns in group_concat are constant columns, use a column in column map. + if (jobInfo.groupConcatCols.size() > 0 && fColumns.size() == 0) + { + int key = -1; + + for (vector::iterator i = jobInfo.tableList.begin(); i != jobInfo.tableList.end() && key == -1; + i++) + { + if (jobInfo.columnMap[*i].size() > 0) + { + key = *(jobInfo.columnMap[*i].begin()); + } + } + + if (key != -1) + { + fColumns.insert(key); + } + else + { + throw runtime_error("Empty column map."); + } + } +} + +uint32_t JsonArrayInfo::getColumnKey(const SRCP& srcp, JobInfo& jobInfo) +{ + int colKey = -1; + const SimpleColumn* sc = dynamic_cast(srcp.get()); + + if (sc != NULL) + { + if (sc->schemaName().empty()) + { + // bug3839, handle columns from subquery. + SimpleColumn tmp(*sc, jobInfo.sessionId); + tmp.oid(tableOid(sc, jobInfo.csc) + 1 + sc->colPosition()); + colKey = getTupleKey(jobInfo, &tmp); + } + else + { + colKey = getTupleKey(jobInfo, sc); + } + + // check if this is a dictionary column + if (jobInfo.keyInfo->dictKeyMap.find(colKey) != jobInfo.keyInfo->dictKeyMap.end()) + colKey = jobInfo.keyInfo->dictKeyMap[colKey]; + } + else + { + const ArithmeticColumn* ac = dynamic_cast(srcp.get()); + const FunctionColumn* fc = dynamic_cast(srcp.get()); + + if (ac != NULL || fc != NULL) + { + colKey = getExpTupleKey(jobInfo, srcp->expressionId()); + } + else + { + cerr << "Unsupported JSON_ARRAYAGG column. " << srcp->toString() << endl; + throw runtime_error("Unsupported JSON_ARRAYAGG column."); + } + } + + return colKey; +} + +void JsonArrayInfo::mapColumns(const RowGroup& projRG) +{ + map projColumnMap; + const vector& keysProj = projRG.getKeys(); + + for (uint64_t i = 0; i < projRG.getColumnCount(); i++) + projColumnMap[keysProj[i]] = i; + + for (vector::iterator k = fGroupConcat.begin(); k != fGroupConcat.end(); k++) + { + vector pos; + vector oids; + vector keys; + vector scale; + vector precision; + vector types; + vector csNums; + pos.push_back(2); + + vector >::iterator i1 = (*k)->fGroupCols.begin(); + + while (i1 != (*k)->fGroupCols.end()) + { + map::iterator j = projColumnMap.find(i1->first); + + if (j == projColumnMap.end()) + { + cerr << "Arrayagg Key:" << i1->first << " is not projected." << endl; + throw runtime_error("Project error."); + } + + pos.push_back(pos.back() + projRG.getColumnWidth(j->second)); + oids.push_back(projRG.getOIDs()[j->second]); + keys.push_back(projRG.getKeys()[j->second]); + types.push_back(projRG.getColTypes()[j->second]); + csNums.push_back(projRG.getCharsetNumber(j->second)); + scale.push_back(projRG.getScale()[j->second]); + precision.push_back(projRG.getPrecision()[j->second]); + + i1++; + } + + vector >::iterator i2 = (*k)->fOrderCols.begin(); + + while (i2 != (*k)->fOrderCols.end()) + { + map::iterator j = projColumnMap.find(i2->first); + + if (j == projColumnMap.end()) + { + cerr << "Order Key:" << i2->first << " is not projected." << endl; + throw runtime_error("Project error."); + } + + vector::iterator i3 = find(keys.begin(), keys.end(), j->first); + int idx = 0; + + if (i3 == keys.end()) + { + idx = keys.size(); + + pos.push_back(pos.back() + projRG.getColumnWidth(j->second)); + oids.push_back(projRG.getOIDs()[j->second]); + keys.push_back(projRG.getKeys()[j->second]); + types.push_back(projRG.getColTypes()[j->second]); + csNums.push_back(projRG.getCharsetNumber(j->second)); + scale.push_back(projRG.getScale()[j->second]); + precision.push_back(projRG.getPrecision()[j->second]); + } + else + { + idx = std::distance(keys.begin(), i3); + } + + (*k)->fOrderCond.push_back(make_pair(idx, i2->second)); + + i2++; + } + + (*k)->fRowGroup = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, + projRG.getStringTableThreshold(), false); + (*k)->fMapping = makeMapping(projRG, (*k)->fRowGroup); + } +} + +shared_array JsonArrayInfo::makeMapping(const RowGroup& in, const RowGroup& out) +{ + // For some reason using the rowgroup mapping fcns don't work completely right in this class + shared_array mapping(new int[out.getColumnCount()]); + + for (uint64_t i = 0; i < out.getColumnCount(); i++) + { + for (uint64_t j = 0; j < in.getColumnCount(); j++) + { + if ((out.getKeys()[i] == in.getKeys()[j])) + { + mapping[i] = j; + break; + } + } + } + + return mapping; +} + +const string JsonArrayInfo::toString() const +{ + ostringstream oss; + oss << "JsonArrayInfo: toString() to be implemented."; + oss << endl; + + return oss.str(); +} + + +JsonArrayAggregatAgUM::JsonArrayAggregatAgUM(rowgroup::SP_GroupConcat& gcc) : GroupConcatAgUM(gcc) +{ + initialize(); +} + + +JsonArrayAggregatAgUM::~JsonArrayAggregatAgUM() +{ +} + +void JsonArrayAggregatAgUM::initialize() +{ + if (fGroupConcat->fDistinct || fGroupConcat->fOrderCols.size() > 0) + fConcator.reset(new JsonArrayAggOrderBy()); + else + fConcator.reset(new JsonArrayAggNoOrder()); + + fConcator->initialize(fGroupConcat); + + fGroupConcat->fRowGroup.initRow(&fRow, true); + fData.reset(new uint8_t[fRow.getSize()]); + fRow.setData(fData.get()); +} + +void JsonArrayAggregatAgUM::processRow(const rowgroup::Row& inRow) +{ + applyMapping(fGroupConcat->fMapping, inRow); + fConcator->processRow(fRow); +} + +void JsonArrayAggregatAgUM::merge(const rowgroup::Row& inRow, int64_t i) +{ + uint8_t* data = inRow.getData(); + joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + inRow.getOffset(i))); + + fConcator->merge(gccAg->concator().get()); +} + +void JsonArrayAggregatAgUM::getResult(uint8_t* buff) +{ + fConcator->getResult(buff, fGroupConcat->fSeparator); +} + +uint8_t* JsonArrayAggregatAgUM::getResult() +{ + return fConcator->getResult(fGroupConcat->fSeparator); +} + +void JsonArrayAggregatAgUM::applyMapping(const boost::shared_array& mapping, const Row& row) +{ + // For some reason the rowgroup mapping fcns don't work right in this class. + for (uint64_t i = 0; i < fRow.getColumnCount(); i++) + { + if (fRow.getColumnWidth(i) > datatypes::MAXLEGACYWIDTH) + { + if (fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::CHAR || + fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::VARCHAR || + fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::TEXT) + { + fRow.setStringField(row.getConstString(mapping[i]), i); + } + else if (fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::LONGDOUBLE) + { + fRow.setLongDoubleField(row.getLongDoubleField(mapping[i]), i); + } + else if (datatypes::isWideDecimalType(fRow.getColType(i), fRow.getColumnWidth(i))) + { + row.copyBinaryField(fRow, i, mapping[i]); + } + } + else + { + if (fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::CHAR || + fRow.getColTypes()[i] == execplan::CalpontSystemCatalog::VARCHAR) + { + fRow.setIntField(row.getUintField(mapping[i]), i); + } + else + { + fRow.setIntField(row.getIntField(mapping[i]), i); + } + } + } +} + + +JsonArrayAggregator::JsonArrayAggregator() : GroupConcator() +{ +} + +JsonArrayAggregator::~JsonArrayAggregator() +{ +} + +void JsonArrayAggregator::initialize(const rowgroup::SP_GroupConcat& gcc) +{ + fGroupConcatLen = gcc->fSize; + fCurrentLength -= strlen(gcc->fSeparator.c_str()); + fTimeZone = gcc->fTimeZone; + + fConstCols = gcc->fConstCols; + fConstantLen = strlen(gcc->fSeparator.c_str()); + + for (uint64_t i = 0; i < fConstCols.size(); i++) + fConstantLen += strlen(fConstCols[i].first.c_str()); +} + + +void JsonArrayAggregator::outputRow(std::ostringstream& oss, const rowgroup::Row& row) +{ + const CalpontSystemCatalog::ColDataType* types = row.getColTypes(); + vector::iterator i = fConcatColumns.begin(); + vector >::iterator j = fConstCols.begin(); + + uint64_t groupColCount = fConcatColumns.size() + fConstCols.size(); + + for (uint64_t k = 0; k < groupColCount; k++) + { + if (j != fConstCols.end() && k == j->second) + { + oss << j->first; + j++; + continue; + } + + switch (types[*i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + { + int64_t intVal = row.getIntField(*i); + + oss << intVal; + + break; + } + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + oss << fixed << row.getDecimalField(*i); + break; + } + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: + { + uint64_t uintVal = row.getUintField(*i); + int scale = (int)row.getScale(*i); + + if (scale == 0) + { + oss << uintVal; + } + else + { + oss << fixed + << datatypes::Decimal(datatypes::TSInt128((int128_t)uintVal), scale, + datatypes::INT128MAXPRECISION); + } + + break; + } + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::TEXT: + { + oss << "\"" << row.getStringField(*i).c_str() << "\""; + break; + } + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: + { + oss << setprecision(15) << row.getDoubleField(*i); + break; + } + + case CalpontSystemCatalog::LONGDOUBLE: + { + oss << setprecision(15) << row.getLongDoubleField(*i); + break; + } + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: + { + oss << row.getFloatField(*i); + break; + } + + case CalpontSystemCatalog::DATE: + { + oss << "\"" << DataConvert::dateToString(row.getUintField(*i)) << "\""; + break; + } + + case CalpontSystemCatalog::DATETIME: + { + oss << "\"" << DataConvert::datetimeToString(row.getUintField(*i)) << "\""; + break; + } + + case CalpontSystemCatalog::TIMESTAMP: + { + oss << "\"" << DataConvert::timestampToString(row.getUintField(*i), fTimeZone) << "\""; + break; + } + + case CalpontSystemCatalog::TIME: + { + oss << "\"" << DataConvert::timeToString(row.getUintField(*i)) << "\""; + break; + } + + default: + { + break; + } + } + + i++; + } +} + +bool JsonArrayAggregator::concatColIsNull(const rowgroup::Row& row) +{ + bool ret = false; + + for (vector::iterator i = fConcatColumns.begin(); i != fConcatColumns.end(); i++) + { + if (row.isNullValue(*i)) + { + ret = true; + break; + } + } + + return ret; +} + +int64_t JsonArrayAggregator::lengthEstimate(const rowgroup::Row& row) +{ + int64_t rowLen = fConstantLen; // fixed constant and separator length + const CalpontSystemCatalog::ColDataType* types = row.getColTypes(); + + // null values are already skipped. + for (vector::iterator i = fConcatColumns.begin(); i != fConcatColumns.end(); i++) + { + if (row.isNullValue(*i)) + continue; + + int64_t fieldLen = 0; + + switch (types[*i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + { + int64_t v = row.getIntField(*i); + + if (v < 0) + fieldLen++; + + while ((v /= 10) != 0) + fieldLen++; + + fieldLen += 1; + break; + } + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: + { + uint64_t v = row.getUintField(*i); + + while ((v /= 10) != 0) + fieldLen++; + + fieldLen += 1; + break; + } + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + fieldLen += 1; + + break; + } + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::TEXT: + { + fieldLen += row.getConstString(*i).length(); + break; + } + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: + case CalpontSystemCatalog::LONGDOUBLE: + { + fieldLen = 1; // minimum length + break; + } + + case CalpontSystemCatalog::DATE: + { + fieldLen = 10; // YYYY-MM-DD + break; + } + + case CalpontSystemCatalog::DATETIME: + case CalpontSystemCatalog::TIMESTAMP: + { + fieldLen = 19; // YYYY-MM-DD HH24:MI:SS + // Decimal point and milliseconds + uint64_t colPrecision = row.getPrecision(*i); + + if (colPrecision > 0 && colPrecision < 7) + { + fieldLen += colPrecision + 1; + } + + break; + } + + case CalpontSystemCatalog::TIME: + { + fieldLen = 10; // -HHH:MI:SS + // Decimal point and milliseconds + uint64_t colPrecision = row.getPrecision(*i); + + if (colPrecision > 0 && colPrecision < 7) + { + fieldLen += colPrecision + 1; + } + + break; + } + + default: + { + break; + } + } + + rowLen += fieldLen; + } + + return rowLen; +} + +const string JsonArrayAggregator::toString() const +{ + ostringstream oss; + oss << "JsonArray size-" << fGroupConcatLen; + oss << "Concat cols: "; + vector::const_iterator i = fConcatColumns.begin(); + vector >::const_iterator j = fConstCols.begin(); + uint64_t groupColCount = fConcatColumns.size() + fConstCols.size(); + + for (uint64_t k = 0; k < groupColCount; k++) + { + if (j != fConstCols.end() && k == j->second) + { + oss << 'c' << " "; + j++; + } + else + { + oss << (*i) << " "; + i++; + } + } + + oss << endl; + + return oss.str(); +} + + +// GroupConcatOrderBy class implementation +JsonArrayAggOrderBy::JsonArrayAggOrderBy() +{ + fRule.fIdbCompare = this; +} + +JsonArrayAggOrderBy::~JsonArrayAggOrderBy() +{ +} + +void JsonArrayAggOrderBy::initialize(const rowgroup::SP_GroupConcat& gcc) +{ + JsonArrayAggregator::initialize(gcc); + + fOrderByCond.resize(0); + + for (uint64_t i = 0; i < gcc->fOrderCond.size(); i++) + fOrderByCond.push_back(IdbSortSpec(gcc->fOrderCond[i].first, gcc->fOrderCond[i].second)); + + fDistinct = gcc->fDistinct; + fRowsPerRG = 128; + fErrorCode = ERR_AGGREGATION_TOO_BIG; + fRm = gcc->fRm; + fSessionMemLimit = gcc->fSessionMemLimit; + + vector >::iterator i = gcc->fGroupCols.begin(); + + while (i != gcc->fGroupCols.end()) + fConcatColumns.push_back((*(i++)).second); + + IdbOrderBy::initialize(gcc->fRowGroup); +} + +uint64_t JsonArrayAggOrderBy::getKeyLength() const +{ + // only distinct the concatenated columns + return fConcatColumns.size(); // cols 0 to fConcatColumns.size() - 1 will be compared +} + +void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row) +{ + // check if this is a distinct row + if (fDistinct && fDistinctMap->find(row.getPointer()) != fDistinctMap->end()) + return; + + // this row is skipped if any concatenated column is null. + if (concatColIsNull(row)) + return; + + // if the row count is less than the limit + if (fCurrentLength < fGroupConcatLen) + { + copyRow(row, &fRow0); + // the RID is no meaning here, use it to store the estimated length. + int16_t estLen = lengthEstimate(fRow0); + fRow0.setRid(estLen); + OrderByRow newRow(fRow0, fRule); + fOrderByQueue.push(newRow); + fCurrentLength += estLen; + + // add to the distinct map + if (fDistinct) + fDistinctMap->insert(fRow0.getPointer()); + + fRowGroup.incRowCount(); + fRow0.nextRow(); + + if (fRowGroup.getRowCount() >= fRowsPerRG) + { + fDataQueue.push(fData); + + uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize(); + + if (!fRm->getMemory(newSize, fSessionMemLimit)) + { + cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; + throw IDBExcept(fErrorCode); + } + fMemSize += newSize; + + fData.reinit(fRowGroup, fRowsPerRG); + fRowGroup.setData(&fData); + fRowGroup.resetRowGroup(0); + fRowGroup.getRow(0, &fRow0); + } + } + + else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue.top().fData)) + { + OrderByRow swapRow = fOrderByQueue.top(); + fRow1.setData(swapRow.fData); + fOrderByQueue.pop(); + fCurrentLength -= fRow1.getRelRid(); + fRow2.setData(swapRow.fData); + + if (!fDistinct) + { + copyRow(row, &fRow1); + } + else + { + // only the copyRow does useful work here + fDistinctMap->erase(swapRow.fData); + copyRow(row, &fRow2); + fDistinctMap->insert(swapRow.fData); + } + + int16_t estLen = lengthEstimate(fRow2); + fRow2.setRid(estLen); + fCurrentLength += estLen; + + fOrderByQueue.push(swapRow); + } +} + +void JsonArrayAggOrderBy::merge(GroupConcator* gc) +{ + JsonArrayAggOrderBy* go = dynamic_cast(gc); + + while (go->fOrderByQueue.empty() == false) + { + const OrderByRow& row = go->fOrderByQueue.top(); + + // check if the distinct row already exists + if (fDistinct && fDistinctMap->find(row.fData) != fDistinctMap->end()) + { + ; // no op; + } + + // if the row count is less than the limit + else if (fCurrentLength < fGroupConcatLen) + { + fOrderByQueue.push(row); + row1.setData(row.fData); + fCurrentLength += row1.getRelRid(); + + // add to the distinct map + if (fDistinct) + fDistinctMap->insert(row.fData); + } + + else if (fOrderByCond.size() > 0 && fRule.less(row.fData, fOrderByQueue.top().fData)) + { + OrderByRow swapRow = fOrderByQueue.top(); + row1.setData(swapRow.fData); + fOrderByQueue.pop(); + fCurrentLength -= row1.getRelRid(); + + if (fDistinct) + { + fDistinctMap->erase(swapRow.fData); + fDistinctMap->insert(row.fData); + } + + row1.setData(row.fData); + fCurrentLength += row1.getRelRid(); + + fOrderByQueue.push(row); + } + + go->fOrderByQueue.pop(); + } +} + +void JsonArrayAggOrderBy::getResult(uint8_t* buff, const string&) +{ + ostringstream oss; + bool addSep = false; + + // need to reverse the order + stack rowStack; + if (fOrderByQueue.size() > 0) + { + oss << '['; + while (fOrderByQueue.size() > 0) + { + rowStack.push(fOrderByQueue.top()); + fOrderByQueue.pop(); + } + + while (rowStack.size() > 0) + { + if (addSep) + oss << ','; + else + addSep = true; + + const OrderByRow& topRow = rowStack.top(); + fRow0.setData(topRow.fData); + outputRow(oss, fRow0); + rowStack.pop(); + } + oss << ']'; + } + int64_t resultSize = oss.str().size(); + resultSize = (resultSize > fGroupConcatLen) ? fGroupConcatLen : resultSize; + fOutputString.reset(new uint8_t[resultSize + 2]); + fOutputString[resultSize] = '\0'; + fOutputString[resultSize + 1] = '\0'; + + strncpy((char*)fOutputString.get(), oss.str().c_str(), resultSize); +} + + +const string JsonArrayAggOrderBy::toString() const +{ + string baseStr = JsonArrayAggregator::toString(); + + ostringstream oss; + oss << "OrderBy cols: "; + vector::const_iterator i = fOrderByCond.begin(); + + for (; i != fOrderByCond.end(); i++) + oss << "(" << i->fIndex << "," << ((i->fAsc) ? "Asc" : "Desc") << "," + << ((i->fNf) ? "null first" : "null last") << ") "; + + if (fDistinct) + oss << endl << " distinct"; + + oss << endl; + + return (baseStr + oss.str()); +} + + +// GroupConcatNoOrder class implementation +JsonArrayAggNoOrder::JsonArrayAggNoOrder() + : fRowsPerRG(128), fErrorCode(ERR_AGGREGATION_TOO_BIG), fMemSize(0), fRm(NULL) +{ +} + +JsonArrayAggNoOrder::~JsonArrayAggNoOrder() +{ + if (fRm) + fRm->returnMemory(fMemSize, fSessionMemLimit); +} + +void JsonArrayAggNoOrder::initialize(const rowgroup::SP_GroupConcat& gcc) +{ + JsonArrayAggregator::initialize(gcc); + + fRowGroup = gcc->fRowGroup; + fRowsPerRG = 128; + fErrorCode = ERR_AGGREGATION_TOO_BIG; + fRm = gcc->fRm; + fSessionMemLimit = gcc->fSessionMemLimit; + + vector >::iterator i = gcc->fGroupCols.begin(); + + while (i != gcc->fGroupCols.end()) + fConcatColumns.push_back((*(i++)).second); + + uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize(); + + if (!fRm->getMemory(newSize, fSessionMemLimit)) + { + cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; + throw IDBExcept(fErrorCode); + } + fMemSize += newSize; + + fData.reinit(fRowGroup, fRowsPerRG); + fRowGroup.setData(&fData); + fRowGroup.resetRowGroup(0); + fRowGroup.initRow(&fRow); + fRowGroup.getRow(0, &fRow); +} + +void JsonArrayAggNoOrder::processRow(const rowgroup::Row& row) +{ + // if the row count is less than the limit + if (fCurrentLength < fGroupConcatLen && concatColIsNull(row) == false) + { + copyRow(row, &fRow); + + // the RID is no meaning here, use it to store the estimated length. + int16_t estLen = lengthEstimate(fRow); + fRow.setRid(estLen); + fCurrentLength += estLen; + fRowGroup.incRowCount(); + fRow.nextRow(); + + if (fRowGroup.getRowCount() >= fRowsPerRG) + { + uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize(); + + if (!fRm->getMemory(newSize, fSessionMemLimit)) + { + cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; + throw IDBExcept(fErrorCode); + } + fMemSize += newSize; + + fDataQueue.push(fData); + fData.reinit(fRowGroup, fRowsPerRG); + fRowGroup.setData(&fData); + fRowGroup.resetRowGroup(0); + fRowGroup.getRow(0, &fRow); + } + } +} + +void JsonArrayAggNoOrder::merge(GroupConcator* gc) +{ + JsonArrayAggNoOrder* in = dynamic_cast(gc); + + while (in->fDataQueue.size() > 0) + { + fDataQueue.push(in->fDataQueue.front()); + in->fDataQueue.pop(); + } + + fDataQueue.push(in->fData); + fMemSize += in->fMemSize; + in->fMemSize = 0; +} + +void JsonArrayAggNoOrder::getResult(uint8_t* buff, const string&) +{ + ostringstream oss; + bool addSep = false; + if (fDataQueue.size() > 0) + { + oss << '['; + fDataQueue.push(fData); + + while (fDataQueue.size() > 0) + { + fRowGroup.setData(&fDataQueue.front()); + fRowGroup.getRow(0, &fRow); + + for (uint64_t i = 0; i < fRowGroup.getRowCount(); i++) + { + if (addSep) + oss << ','; + else + addSep = true; + + outputRow(oss, fRow); + fRow.nextRow(); + } + + fDataQueue.pop(); + } + oss << ']'; + } + int64_t resultSize = oss.str().size(); + resultSize = (resultSize > fGroupConcatLen) ? fGroupConcatLen : resultSize; + fOutputString.reset(new uint8_t[resultSize + 2]); + fOutputString[resultSize] = '\0'; + fOutputString[resultSize + 1] = '\0'; + + strncpy((char*)fOutputString.get(), oss.str().c_str(), resultSize); +} + +const string JsonArrayAggNoOrder::toString() const +{ + return JsonArrayAggregator::toString(); +} + +} // namespace joblist diff --git a/dbcon/joblist/jsonarrayagg.h b/dbcon/joblist/jsonarrayagg.h new file mode 100644 index 000000000..7c9235e23 --- /dev/null +++ b/dbcon/joblist/jsonarrayagg.h @@ -0,0 +1,174 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + + +/** @file */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "groupconcat.h" + +#if defined(_MSC_VER) && defined(JOBLIST_DLLEXPORT) +#define EXPORT __declspec(dllexport) +#else +#define EXPORT +#endif + +namespace joblist +{ +// forward reference +struct JobInfo; +class JsonArrayAggregator; +class ResourceManager; + + +class JsonArrayInfo +{ + public: + JsonArrayInfo(); + virtual ~JsonArrayInfo(); + + void prepJsonArray(JobInfo&); + void mapColumns(const rowgroup::RowGroup&); + + std::set& columns() + { + return fColumns; + } + std::vector& groupConcat() + { + return fGroupConcat; + } + + const std::string toString() const; + + protected: + uint32_t getColumnKey(const execplan::SRCP& srcp, JobInfo& jobInfo); + boost::shared_array makeMapping(const rowgroup::RowGroup&, const rowgroup::RowGroup&); + + std::set fColumns; + std::vector fGroupConcat; +}; + + +class JsonArrayAggregatAgUM : public GroupConcatAgUM +{ + public: + EXPORT JsonArrayAggregatAgUM(rowgroup::SP_GroupConcat&); + EXPORT ~JsonArrayAggregatAgUM(); + + using rowgroup::GroupConcatAg::merge; + void initialize(); + void processRow(const rowgroup::Row&); + EXPORT void merge(const rowgroup::Row&, int64_t); + /*boost::scoped_ptr& concator() + { + return fConcator; + } + */ + EXPORT void getResult(uint8_t*); + EXPORT uint8_t* getResult(); + + protected: + void applyMapping(const boost::shared_array&, const rowgroup::Row&); + +/* + boost::scoped_ptr fConcator; + boost::scoped_array fData; + rowgroup::Row fRow; + bool fNoOrder; +*/ +}; + +// JSON_ARRAYAGG base +class JsonArrayAggregator : public GroupConcator +{ + public: + JsonArrayAggregator(); + virtual ~JsonArrayAggregator(); + + virtual void initialize(const rowgroup::SP_GroupConcat&); + virtual void processRow(const rowgroup::Row&) = 0; + + virtual const std::string toString() const; + + protected: + virtual bool concatColIsNull(const rowgroup::Row&); + virtual void outputRow(std::ostringstream&, const rowgroup::Row&); + virtual int64_t lengthEstimate(const rowgroup::Row&); +}; + +// For JSON_ARRAYAGG withour distinct or orderby +class JsonArrayAggNoOrder : public JsonArrayAggregator +{ + public: + JsonArrayAggNoOrder(); + virtual ~JsonArrayAggNoOrder(); + + void initialize(const rowgroup::SP_GroupConcat&); + void processRow(const rowgroup::Row&); + + using GroupConcator::merge; + void merge(GroupConcator*); + using GroupConcator::getResult; + void getResult(uint8_t* buff, const std::string& sep); + + const std::string toString() const; + + protected: + rowgroup::RowGroup fRowGroup; + rowgroup::Row fRow; + rowgroup::RGData fData; + std::queue fDataQueue; + uint64_t fRowsPerRG; + uint64_t fErrorCode; + uint64_t fMemSize; + ResourceManager* fRm; + boost::shared_ptr fSessionMemLimit; +}; + +// ORDER BY used in JSON_ARRAYAGG class +class JsonArrayAggOrderBy : public JsonArrayAggregator, public ordering::IdbOrderBy +{ + public: + JsonArrayAggOrderBy(); + virtual ~JsonArrayAggOrderBy(); + + using ordering::IdbOrderBy::initialize; + void initialize(const rowgroup::SP_GroupConcat&); + void processRow(const rowgroup::Row&); + uint64_t getKeyLength() const; + + using GroupConcator::merge; + void merge(GroupConcator*); + using GroupConcator::getResult; + void getResult(uint8_t* buff, const std::string& sep); + + const std::string toString() const; + + protected: +}; + +} // namespace joblist + +#undef EXPORT diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 60f641a68..03093c18c 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -174,6 +174,8 @@ inline RowAggFunctionType functionIdMap(int planFuncId) case AggregateColumn::GROUP_CONCAT: return ROWAGG_GROUP_CONCAT; + case AggregateColumn::JSON_ARRAYAGG: return ROWAGG_JSON_ARRAY; + case AggregateColumn::CONSTANT: return ROWAGG_CONSTANT; case AggregateColumn::UDAF: return ROWAGG_UDAF; @@ -1075,7 +1077,7 @@ void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector& continue; } - if (aggOp == ROWAGG_GROUP_CONCAT) + if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY) { TupleInfo ti = getTupleInfo(key, jobInfo); uint32_t ptrSize = sizeof(GroupConcatAg*); @@ -1696,7 +1698,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vectorfAggFunction == ROWAGG_MIN || f->fAggFunction == ROWAGG_MAX || f->fAggFunction == ROWAGG_STATS || f->fAggFunction == ROWAGG_BIT_AND || f->fAggFunction == ROWAGG_BIT_OR || f->fAggFunction == ROWAGG_BIT_XOR || - f->fAggFunction == ROWAGG_CONSTANT || f->fAggFunction == ROWAGG_GROUP_CONCAT)) + f->fAggFunction == ROWAGG_CONSTANT || f->fAggFunction == ROWAGG_GROUP_CONCAT || f->fAggFunction == ROWAGG_JSON_ARRAY)) { funct.reset(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction, f->fInputColumnIndex, f->fOutputColumnIndex, f->fAuxColumnIndex - multiParms)); diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index eab9ea4be..e33371a6a 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -81,6 +81,7 @@ using namespace cal_impl_if; #include "selectfilter.h" #include "existsfilter.h" #include "groupconcatcolumn.h" +#include "jsonarrayaggcolumn.h" #include "outerjoinonfilter.h" #include "intervalcolumn.h" #include "udafcolumn.h" @@ -2904,6 +2905,14 @@ uint32_t setAggOp(AggregateColumn* ac, Item_sum* isp) return rc; } + case Item_sum::JSON_ARRAYAGG_FUNC: + { + Item_func_group_concat* gc = (Item_func_group_concat*)isp; + ac->aggOp(AggregateColumn::JSON_ARRAYAGG); + ac->distinct(gc->get_distinct()); + return rc; + } + case Item_sum::SUM_BIT_FUNC: { string funcName = isp->func_name(); @@ -4922,6 +4931,10 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) { ac = new GroupConcatColumn(gwi.sessionid); } + else if (isp->sum_func() == Item_sum::JSON_ARRAYAGG_FUNC) + { + ac = new JsonArrayAggColumn(gwi.sessionid); + } else if (isp->sum_func() == Item_sum::UDF_SUM_FUNC) { ac = new UDAFColumn(gwi.sessionid); @@ -4950,7 +4963,7 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) try { // special parsing for group_concat - if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC) + if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC || isp->sum_func() == Item_sum::JSON_ARRAYAGG_FUNC) { Item_func_group_concat* gc = (Item_func_group_concat*)isp; vector orderCols; @@ -5018,7 +5031,14 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) } rowCol->columnVec(selCols); - (dynamic_cast(ac))->orderCols(orderCols); + if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC) + { + (dynamic_cast(ac))->orderCols(orderCols); + } + else + { + (dynamic_cast(ac))->orderCols(orderCols); + } parm.reset(rowCol); ac->aggParms().push_back(parm); @@ -5026,7 +5046,14 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) { string separator; separator.assign(gc->get_separator()->ptr(), gc->get_separator()->length()); - (dynamic_cast(ac))->separator(separator); + if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC) + { + (dynamic_cast(ac))->separator(separator); + } + else + { + (dynamic_cast(ac))->separator(separator); + } } } else if (isSupportedAggregateWithOneConstArg(isp, sfitempp)) @@ -5244,7 +5271,8 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) ct.precision = -16; // borrowed to indicate skip null value check on connector ac->resultType(ct); } - else if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC) + else if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC || + isp->sum_func() == Item_sum::JSON_ARRAYAGG_FUNC) { // Item_func_group_concat* gc = (Item_func_group_concat*)isp; CalpontSystemCatalog::ColType ct; diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index 1fc1cdd74..2193e623a 100644 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -35,6 +35,7 @@ #include "mcs_basic_types.h" #include "resourcemanager.h" #include "groupconcat.h" +#include "jsonarrayagg.h" #include "blocksize.h" #include "errorcodes.h" @@ -656,7 +657,8 @@ void RowAggregation::initialize() bool allow_gen = true; for (auto& fun : fFunctionCols) { - if (fun->fAggFunction == ROWAGG_UDAF || fun->fAggFunction == ROWAGG_GROUP_CONCAT) + if (fun->fAggFunction == ROWAGG_UDAF || fun->fAggFunction == ROWAGG_GROUP_CONCAT || + fun->fAggFunction == ROWAGG_JSON_ARRAY) { allow_gen = false; break; @@ -732,7 +734,8 @@ void RowAggregation::aggReset() bool allow_gen = true; for (auto& fun : fFunctionCols) { - if (fun->fAggFunction == ROWAGG_UDAF || fun->fAggFunction == ROWAGG_GROUP_CONCAT) + if (fun->fAggFunction == ROWAGG_UDAF || fun->fAggFunction == ROWAGG_GROUP_CONCAT || + fun->fAggFunction == ROWAGG_JSON_ARRAY) { allow_gen = false; break; @@ -972,7 +975,7 @@ void RowAggregation::makeAggFieldsNull(Row& row) if (fFunctionCol->fAggFunction == ROWAGG_COUNT_ASTERISK || fFunctionCol->fAggFunction == ROWAGG_COUNT_COL_NAME || fFunctionCol->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME || - fFunctionCol->fAggFunction == ROWAGG_COUNT_NO_OP || + fFunctionCol->fAggFunction == ROWAGG_COUNT_NO_OP || fFunctionCol->fAggFunction == ROWAGG_JSON_ARRAY || fFunctionCol->fAggFunction == ROWAGG_GROUP_CONCAT || fFunctionCol->fAggFunction == ROWAGG_STATS) { continue; @@ -1668,6 +1671,7 @@ void RowAggregation::updateEntry(const Row& rowIn, std::vectorfAggFunction == ROWAGG_JSON_ARRAY) + { + // save the object's address in the result row + SP_GroupConcatAg gcc(new joblist::JsonArrayAggregatAgUM(fGroupConcat[j++])); + fGroupConcatAg.push_back(gcc); + *((GroupConcatAg**)(data + fRow.getOffset(colOut))) = gcc.get(); + } } } } @@ -2543,6 +2556,12 @@ void RowAggregationUM::updateEntry(const Row& rowIn, std::vectorprocessRow(rowIn); } +void RowAggregationUM::doJsonAgg(const Row& rowIn, int64_t, int64_t o) +{ + uint8_t* data = fRow.getData(); + joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + fRow.getOffset(o))); + gccAg->processRow(rowIn); +} + //------------------------------------------------------------------------------ // After all PM rowgroups received, calculate the average value. //------------------------------------------------------------------------------ @@ -3989,6 +4015,16 @@ void RowAggregationUM::setGroupConcatString() fRow.setStringField((char*)gcString, fFunctionCols[j]->fOutputColumnIndex); // gccAg->getResult(buff); } + + if (fFunctionCols[j]->fAggFunction == ROWAGG_JSON_ARRAY) + { + uint8_t* buff = data + fRow.getOffset(fFunctionCols[j]->fOutputColumnIndex); + uint8_t* gcString; + joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)buff); + gcString = gccAg->getResult(); + fRow.setStringField((char*)gcString, fFunctionCols[j]->fOutputColumnIndex); + // gccAg->getResult(buff); + } } } } @@ -4105,6 +4141,12 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn, std::vectormerge(rowIn, i); } +void RowAggregationUMP2::doJsonAgg(const Row& rowIn, int64_t i, int64_t o) +{ + uint8_t* data = fRow.getData(); + joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + fRow.getOffset(o))); + gccAg->merge(rowIn, i); +} + //------------------------------------------------------------------------------ // Update the and/or/xor fields if input is not null. // rowIn(in) - Row to be included in aggregation. @@ -4572,6 +4621,12 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn, std::vectormerge(rowIn, i); } +void RowAggregationSubDistinct::doJsonAgg(const Row& rowIn, int64_t i, int64_t o) +{ + uint8_t* data = fRow.getData(); + joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + fRow.getOffset(o))); + gccAg->merge(rowIn, i); +} //------------------------------------------------------------------------------ // Constructor / destructor //------------------------------------------------------------------------------ diff --git a/utils/rowgroup/rowaggregation.h b/utils/rowgroup/rowaggregation.h index 18cb9e748..185e6c101 100644 --- a/utils/rowgroup/rowaggregation.h +++ b/utils/rowgroup/rowaggregation.h @@ -92,6 +92,8 @@ enum RowAggFunctionType // GROUP_CONCAT ROWAGG_GROUP_CONCAT, + ROWAGG_JSON_ARRAY, + // DISTINCT: performed on UM only ROWAGG_COUNT_DISTINCT_COL_NAME, // COUNT(distinct column_name) only counts non-null rows ROWAGG_DISTINCT_SUM, @@ -763,6 +765,7 @@ class RowAggregationUM : public RowAggregation // @bug3362, group_concat virtual void doGroupConcat(const Row&, int64_t, int64_t); + virtual void doJsonAgg(const Row&, int64_t, int64_t); virtual void setGroupConcatString(); bool fHasAvg; @@ -826,6 +829,7 @@ class RowAggregationUMP2 : public RowAggregationUM void doAvg(const Row&, int64_t, int64_t, int64_t, bool merge = false) override; void doStatistics(const Row&, int64_t, int64_t, int64_t) override; void doGroupConcat(const Row&, int64_t, int64_t) override; + void doJsonAgg(const Row&, int64_t, int64_t) override; void doBitOp(const Row&, int64_t, int64_t, int) override; void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx, std::vector* rgContextColl = nullptr) override; @@ -937,7 +941,7 @@ class RowAggregationSubDistinct : public RowAggregationUM protected: // virtual methods from RowAggregationUM void doGroupConcat(const Row&, int64_t, int64_t) override; - + void doJsonAgg(const Row&, int64_t, int64_t) override; // for groupby columns and the aggregated distinct column Row fDistRow; boost::scoped_array fDistRowData;