1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Merge branch 'develop' into columnstore-22.08.3-1

This commit is contained in:
David.Hall
2022-11-04 16:12:56 -05:00
committed by GitHub
90 changed files with 206086 additions and 203373 deletions

View File

@ -22,6 +22,7 @@ set(execplan_LIB_SRCS
functioncolumn.cpp
groupconcatcolumn.cpp
intervalcolumn.cpp
jsonarrayaggcolumn.cpp
logicoperator.cpp
mysqlexecutionplan.cpp
objectidmanager.cpp

View File

@ -74,6 +74,7 @@ class AggregateColumn : public ReturnedColumn
BIT_OR,
BIT_XOR,
GROUP_CONCAT,
JSON_ARRAYAGG,
UDAF,
MULTI_PARM
};

View File

@ -0,0 +1,171 @@
/* Copyright (C) 2022 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <sstream>
#include <cstring>
using namespace std;
#include "bytestream.h"
using namespace messageqcpp;
#include "rowgroup.h"
using namespace rowgroup;
#include "joblisttypes.h"
using namespace joblist;
#include "simplefilter.h"
#include "constantfilter.h"
#include "arithmeticcolumn.h"
#include "functioncolumn.h"
#include "objectreader.h"
#include "jsonarrayaggcolumn.h"
namespace execplan
{
/**
* Constructors/Destructors
*/
JsonArrayAggColumn::JsonArrayAggColumn() : AggregateColumn()
{
}
JsonArrayAggColumn::JsonArrayAggColumn(const uint32_t sessionID) : AggregateColumn(sessionID)
{
}
JsonArrayAggColumn::JsonArrayAggColumn(const JsonArrayAggColumn& rhs, const uint32_t sessionID)
: AggregateColumn(dynamic_cast<const AggregateColumn&>(rhs))
, fOrderCols(rhs.fOrderCols)
, fSeparator(rhs.fSeparator)
{
}
JsonArrayAggColumn::~JsonArrayAggColumn()
{
}
/**
* Methods
*/
const string JsonArrayAggColumn::toString() const
{
ostringstream output;
output << "JsonArrayAggColumn " << data() << endl;
output << AggregateColumn::toString() << endl;
output << "Json Array Order Columns: " << endl;
for (uint32_t i = 0; i < fOrderCols.size(); i++)
{
output << *fOrderCols[i];
}
return output.str();
}
ostream& operator<<(ostream& output, const JsonArrayAggColumn& rhs)
{
output << rhs.toString();
return output;
}
void JsonArrayAggColumn::serialize(messageqcpp::ByteStream& b) const
{
b << (uint8_t)ObjectReader::GROUPCONCATCOLUMN;
AggregateColumn::serialize(b);
CalpontSelectExecutionPlan::ReturnedColumnList::const_iterator rcit;
b << static_cast<uint32_t>(fOrderCols.size());
for (rcit = fOrderCols.begin(); rcit != fOrderCols.end(); ++rcit)
(*rcit)->serialize(b);
b << fSeparator;
}
void JsonArrayAggColumn::unserialize(messageqcpp::ByteStream& b)
{
ObjectReader::checkType(b, ObjectReader::GROUPCONCATCOLUMN);
AggregateColumn::unserialize(b);
fOrderCols.erase(fOrderCols.begin(), fOrderCols.end());
uint32_t size, i;
ReturnedColumn* rc;
b >> size;
for (i = 0; i < size; i++)
{
rc = dynamic_cast<ReturnedColumn*>(ObjectReader::createTreeNode(b));
SRCP srcp(rc);
fOrderCols.push_back(srcp);
}
b >> fSeparator;
}
bool JsonArrayAggColumn::operator==(const JsonArrayAggColumn& t) const
{
const AggregateColumn *rc1, *rc2;
rc1 = static_cast<const AggregateColumn*>(this);
rc2 = static_cast<const AggregateColumn*>(&t);
if (*rc1 != *rc2)
return false;
for (uint32_t i = 0; i < fOrderCols.size(); i++)
{
if (fOrderCols[i].get() != NULL)
{
if (t.fOrderCols[i] == NULL)
return false;
if (*(fOrderCols[i].get()) != t.fOrderCols[i].get())
return false;
}
else if (t.fOrderCols[i].get() != NULL)
return false;
}
return true;
}
bool JsonArrayAggColumn::operator==(const TreeNode* t) const
{
const JsonArrayAggColumn* ac;
ac = dynamic_cast<const JsonArrayAggColumn*>(t);
if (ac == NULL)
return false;
return *this == *ac;
}
bool JsonArrayAggColumn::operator!=(const JsonArrayAggColumn& t) const
{
return !(*this == t);
}
bool JsonArrayAggColumn::operator!=(const TreeNode* t) const
{
return !(*this == t);
}
} // namespace execplan

View File

@ -0,0 +1,143 @@
/* Copyright (C) 2022 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/** @file */
#pragma once
#include <string>
#include "calpontselectexecutionplan.h"
#include "aggregatecolumn.h"
namespace messageqcpp
{
class ByteStream;
}
/**
* Namespace
*/
namespace execplan
{
/**
* @brief A class to represent a aggregate return column
*
* This class is a specialization of class ReturnedColumn that
* handles an aggregate function call (e.g., SUM, COUNT, MIN, MAX).
*/
class JsonArrayAggColumn : public AggregateColumn
{
public:
/**
* Constructors
*/
JsonArrayAggColumn();
JsonArrayAggColumn(const uint32_t sessionID);
JsonArrayAggColumn(const JsonArrayAggColumn& rhs, const uint32_t sessionID = 0);
/**
* Destructors
*/
virtual ~JsonArrayAggColumn();
/**
* Overloaded stream operator
*/
virtual const std::string toString() const;
/** return a copy of this pointer
*
* deep copy of this pointer and return the copy
*/
virtual JsonArrayAggColumn* clone() const
{
return new JsonArrayAggColumn(*this);
}
/**
* Accessors and Mutators
*/
void orderCols(const std::vector<SRCP>& orderCols)
{
fOrderCols = orderCols;
}
std::vector<SRCP>& orderCols()
{
return fOrderCols;
}
void separator(const std::string& separator)
{
fSeparator = separator;
}
std::string& separator()
{
return fSeparator;
}
/**
* Serialize interface
*/
virtual void serialize(messageqcpp::ByteStream&) const;
virtual void unserialize(messageqcpp::ByteStream&);
/** @brief Do a deep, strict (as opposed to semantic) equivalence test
*
* Do a deep, strict (as opposed to semantic) equivalence test.
* @return true iff every member of t is a duplicate copy of every member of this;
* false otherwise
*/
virtual bool operator==(const TreeNode* t) const;
/** @brief Do a deep, strict (as opposed to semantic) equivalence test
*
* Do a deep, strict (as opposed to semantic) equivalence test.
* @return true iff every member of t is a duplicate copy of every member of this;
* false otherwise
*/
using AggregateColumn::operator==;
virtual bool operator==(const JsonArrayAggColumn& t) const;
/** @brief Do a deep, strict (as opposed to semantic) equivalence test
*
* Do a deep, strict (as opposed to semantic) equivalence test.
* @return false iff every member of t is a duplicate copy of every member of this;
* true otherwise
*/
virtual bool operator!=(const TreeNode* t) const;
/** @brief Do a deep, strict (as opposed to semantic) equivalence test
*
* Do a deep, strict (as opposed to semantic) equivalence test.
* @return false iff every member of t is a duplicate copy of every member of this;
* true otherwise
*/
using AggregateColumn::operator!=;
virtual bool operator!=(const JsonArrayAggColumn& t) const;
private:
std::vector<SRCP> fOrderCols;
std::string fSeparator;
};
/**
* stream operator
*/
std::ostream& operator<<(std::ostream& os, const JsonArrayAggColumn& rhs);
} // namespace execplan

View File

@ -28,6 +28,7 @@ set(joblist_LIB_SRCS
joblistfactory.cpp
jobstep.cpp
jobstepassociation.cpp
jsonarrayagg.cpp
lbidlist.cpp
limitedorderby.cpp
passthrucommand-jl.cpp

View File

@ -1021,7 +1021,6 @@ void GroupConcatNoOrder::getResult(uint8_t* buff, const string& sep)
{
ostringstream oss;
bool addSep = false;
fDataQueue.push(fData);
while (fDataQueue.size() > 0)

View File

@ -43,6 +43,7 @@
#include "joblist.h"
#include "jobstep.h"
#include "groupconcat.h"
#include "jsonarrayagg.h"
#include "jl_logger.h"
#include "resourcemanager.h"

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,148 @@
/* Copyright (C) 2022 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/** @file */
#pragma once
#include <utility>
#include <set>
#include <vector>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_array.hpp>
#include "groupconcat.h"
#if defined(_MSC_VER) && defined(JOBLIST_DLLEXPORT)
#define EXPORT __declspec(dllexport)
#else
#define EXPORT
#endif
namespace joblist
{
// forward reference
class JsonArrayAggregator;
class ResourceManager;
class JsonArrayInfo : public GroupConcatInfo
{
public:
void prepJsonArray(JobInfo&);
void mapColumns(const rowgroup::RowGroup&);
const std::string toString() const;
protected:
uint32_t getColumnKey(const execplan::SRCP& srcp, JobInfo& jobInfo);
boost::shared_array<int> makeMapping(const rowgroup::RowGroup&, const rowgroup::RowGroup&);
};
class JsonArrayAggregatAgUM : public GroupConcatAgUM
{
public:
EXPORT JsonArrayAggregatAgUM(rowgroup::SP_GroupConcat&);
EXPORT ~JsonArrayAggregatAgUM();
using rowgroup::GroupConcatAg::merge;
void initialize();
void processRow(const rowgroup::Row&);
EXPORT void merge(const rowgroup::Row&, int64_t);
EXPORT void getResult(uint8_t*);
EXPORT uint8_t* getResult();
protected:
void applyMapping(const boost::shared_array<int>&, const rowgroup::Row&);
};
// JSON_ARRAYAGG base
class JsonArrayAggregator : public GroupConcator
{
public:
JsonArrayAggregator();
virtual ~JsonArrayAggregator();
virtual void initialize(const rowgroup::SP_GroupConcat&);
virtual void processRow(const rowgroup::Row&) = 0;
virtual const std::string toString() const;
protected:
virtual bool concatColIsNull(const rowgroup::Row&);
virtual void outputRow(std::ostringstream&, const rowgroup::Row&);
virtual int64_t lengthEstimate(const rowgroup::Row&);
};
// For JSON_ARRAYAGG withour distinct or orderby
class JsonArrayAggNoOrder : public JsonArrayAggregator
{
public:
JsonArrayAggNoOrder();
virtual ~JsonArrayAggNoOrder();
void initialize(const rowgroup::SP_GroupConcat&);
void processRow(const rowgroup::Row&);
using GroupConcator::merge;
void merge(GroupConcator*);
using GroupConcator::getResult;
void getResult(uint8_t* buff, const std::string& sep);
const std::string toString() const;
protected:
rowgroup::RowGroup fRowGroup;
rowgroup::Row fRow;
rowgroup::RGData fData;
std::queue<rowgroup::RGData> fDataQueue;
uint64_t fRowsPerRG;
uint64_t fErrorCode;
uint64_t fMemSize;
ResourceManager* fRm;
boost::shared_ptr<int64_t> fSessionMemLimit;
};
// ORDER BY used in JSON_ARRAYAGG class
class JsonArrayAggOrderBy : public JsonArrayAggregator, public ordering::IdbOrderBy
{
public:
JsonArrayAggOrderBy();
virtual ~JsonArrayAggOrderBy();
using ordering::IdbOrderBy::initialize;
void initialize(const rowgroup::SP_GroupConcat&);
void processRow(const rowgroup::Row&);
uint64_t getKeyLength() const;
using GroupConcator::merge;
void merge(GroupConcator*);
using GroupConcator::getResult;
void getResult(uint8_t* buff, const std::string& sep);
const std::string toString() const;
protected:
};
} // namespace joblist
#undef EXPORT

View File

@ -42,13 +42,14 @@
// from blocksize.h
const int32_t DATA_BLOCK_SIZE = BLOCK_SIZE;
const int8_t COMPARE_NIL = 0x00;
const int8_t COMPARE_NIL = 0x00; // means c = NULL predicate
const int8_t COMPARE_LT = 0x01;
const int8_t COMPARE_EQ = 0x02;
const int8_t COMPARE_LE = (COMPARE_LT | COMPARE_EQ); // 0x03
const int8_t COMPARE_GT = 0x04;
const int8_t COMPARE_NE = (COMPARE_LT | COMPARE_GT); // 0x05
const int8_t COMPARE_GE = (COMPARE_GT | COMPARE_EQ); // 0x06
const int8_t COMPARE_NULLEQ = 0x07; // means c IS NULL(see column.cpp for details)
const int8_t COMPARE_NOT = 0x08;
const int8_t COMPARE_NLT = (COMPARE_LT | COMPARE_NOT); // 0x09
const int8_t COMPARE_NLE = (COMPARE_LE | COMPARE_NOT); // 0x0b
@ -884,4 +885,3 @@ struct LbidAtVer
#endif
#pragma pack(pop)

View File

@ -174,6 +174,8 @@ inline RowAggFunctionType functionIdMap(int planFuncId)
case AggregateColumn::GROUP_CONCAT: return ROWAGG_GROUP_CONCAT;
case AggregateColumn::JSON_ARRAYAGG: return ROWAGG_JSON_ARRAY;
case AggregateColumn::CONSTANT: return ROWAGG_CONSTANT;
case AggregateColumn::UDAF: return ROWAGG_UDAF;
@ -1075,7 +1077,7 @@ void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector<RowGroup>&
continue;
}
if (aggOp == ROWAGG_GROUP_CONCAT)
if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{
TupleInfo ti = getTupleInfo(key, jobInfo);
uint32_t ptrSize = sizeof(GroupConcatAg*);
@ -1696,7 +1698,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<Ro
continue;
// skip if this is a group_concat
if (aggOp == ROWAGG_GROUP_CONCAT)
if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{
TupleInfo ti = getTupleInfo(aggKey, jobInfo);
uint32_t width = sizeof(GroupConcatAg*);
@ -2229,7 +2231,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<Ro
csNumAggDist.push_back(csNumAgg[colAgg]);
uint32_t width = widthAgg[colAgg];
if (aggOp == ROWAGG_GROUP_CONCAT)
if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{
TupleInfo ti = getTupleInfo(retKey, jobInfo);
@ -2320,7 +2322,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<Ro
}
#if 0
else if (aggOp == ROWAGG_GROUP_CONCAT)
else if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
@ -2840,7 +2842,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<Ro
f->fAggFunction == ROWAGG_MIN || f->fAggFunction == ROWAGG_MAX ||
f->fAggFunction == ROWAGG_STATS || f->fAggFunction == ROWAGG_BIT_AND ||
f->fAggFunction == ROWAGG_BIT_OR || f->fAggFunction == ROWAGG_BIT_XOR ||
f->fAggFunction == ROWAGG_CONSTANT || f->fAggFunction == ROWAGG_GROUP_CONCAT))
f->fAggFunction == ROWAGG_CONSTANT || f->fAggFunction == ROWAGG_GROUP_CONCAT || f->fAggFunction == ROWAGG_JSON_ARRAY))
{
funct.reset(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction, f->fInputColumnIndex,
f->fOutputColumnIndex, f->fAuxColumnIndex - multiParms));

View File

@ -90,7 +90,8 @@ extern "C"
std::string_view secret, std::string_view key,
std::string_view region, std::string_view cmapi_host,
ulong cmapi_port, std::string_view cmapi_version,
std::string_view cmapi_key)
std::string_view cmapi_key, std::string_view terminated_by,
std::string_view enclosed_by, std::string_view escaped_by)
{
CURLcode res;
@ -105,6 +106,10 @@ extern "C"
j["secret"] = secret;
j["region"] = region;
j["database"] = database;
j["terminated_by"] = terminated_by;
j["enclosed_by"] = enclosed_by;
j["escaped_by"] = escaped_by;
std::string param = j.dump();
@ -157,8 +162,14 @@ extern "C"
const char* bucket = args->args[0];
const char* filename = args->args[1];
const char* database = args->args[2];
const char* table = args->args[3];
const char* terminated_by = args->args[4];
const char* enclosed_by = args->args[5];
const char* escaped_by = args->args[6];
ulong cmapi_port = get_cmapi_port(_current_thd());
const char* cmapi_host = get_cmapi_host(_current_thd());
const char* cmapi_version = get_cmapi_version(_current_thd());
@ -169,18 +180,18 @@ extern "C"
const char* secret = get_s3_secret(thd);
const char* key = get_s3_key(thd);
const char* region = get_s3_region(thd);
const char* database = args->args[2];
return columnstore_dataload_impl(initData->curl, initData->result, length, bucket, table, filename,
database, secret, key, region, cmapi_host, cmapi_port, cmapi_version,
::strlen(cmapi_key) == 0 ? parseCMAPIkey().c_str() : cmapi_key);
::strlen(cmapi_key) == 0 ? parseCMAPIkey().c_str() : cmapi_key,
terminated_by, enclosed_by, escaped_by);
}
my_bool columnstore_dataload_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 4)
if (args->arg_count != 7)
{
strcpy(message, "columnstore_dataload needs 4 arguments: (bucket, file_name, db_name, table)");
strcpy(message, "columnstore_dataload needs 7 arguments: (bucket, file_name, db_name, table, terminated_by, enclosed_by, escaped_by)");
return 1;
}

View File

@ -123,13 +123,17 @@ END //
CREATE OR REPLACE PROCEDURE load_from_s3 (in bucket varchar(256) CHARACTER SET utf8,
in filename varchar(256) CHARACTER SET utf8,
in dbname varchar(256) CHARACTER SET utf8,
in table_name varchar(256) CHARACTER SET utf8)
in table_name varchar(256) CHARACTER SET utf8,
in terminated_by varchar(256) CHARACTER SET utf8,
in enclosed_by varchar(1) CHARACTER SET utf8,
in escaped_by varchar(1) CHARACTER SET utf8
)
LANGUAGE SQL
NOT DETERMINISTIC
MODIFIES SQL DATA
SQL SECURITY INVOKER
BEGIN
select columnstore_dataload(bucket, filename, dbname, table_name);
select columnstore_dataload(bucket, filename, dbname, table_name, terminated_by, enclosed_by, escaped_by);
END //

View File

@ -92,7 +92,7 @@ class StoreFieldMariaDB : public StoreField
{
size_t ll = length * 2;
boost::scoped_array<char> sca(new char[ll]);
ConstString(str, length).bin2hex(sca.get());
utils::ConstString(str, length).bin2hex(sca.get());
return m_field->store_binary(sca.get(), ll);
}
return m_field->store_binary(str, length);
@ -103,7 +103,7 @@ class StoreFieldMariaDB : public StoreField
return m_field->store(val, 0);
}
int store_ulonglong(uint64_t val)override
int store_ulonglong(uint64_t val) override
{
return m_field->store(static_cast<int64_t>(val), 1);
}
@ -844,4 +844,3 @@ class WriteBatchFieldMariaDB : public WriteBatchField
};
} // end of namespace datatypes

View File

@ -81,6 +81,7 @@ using namespace cal_impl_if;
#include "selectfilter.h"
#include "existsfilter.h"
#include "groupconcatcolumn.h"
#include "jsonarrayaggcolumn.h"
#include "outerjoinonfilter.h"
#include "intervalcolumn.h"
#include "udafcolumn.h"
@ -2904,6 +2905,14 @@ uint32_t setAggOp(AggregateColumn* ac, Item_sum* isp)
return rc;
}
case Item_sum::JSON_ARRAYAGG_FUNC:
{
Item_func_json_arrayagg* gc = (Item_func_json_arrayagg*)isp;
ac->aggOp(AggregateColumn::JSON_ARRAYAGG);
ac->distinct(gc->get_distinct());
return rc;
}
case Item_sum::SUM_BIT_FUNC:
{
string funcName = isp->func_name();
@ -4907,9 +4916,10 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
gwi.aggOnSelect = true;
// Argument_count() is the # of formal parms to the agg fcn. Columnstore
// only supports 1 argument except UDAnF, COUNT(DISTINC) and GROUP_CONCAT
// only supports 1 argument except UDAnF, COUNT(DISTINC), GROUP_CONCAT and JSON_ARRAYAGG
if (isp->argument_count() != 1 && isp->sum_func() != Item_sum::COUNT_DISTINCT_FUNC &&
isp->sum_func() != Item_sum::GROUP_CONCAT_FUNC && isp->sum_func() != Item_sum::UDF_SUM_FUNC)
isp->sum_func() != Item_sum::GROUP_CONCAT_FUNC && isp->sum_func() != Item_sum::UDF_SUM_FUNC &&
isp->sum_func() != Item_sum::JSON_ARRAYAGG_FUNC)
{
gwi.fatalParseError = true;
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_MUL_ARG_AGG);
@ -4922,6 +4932,10 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
{
ac = new GroupConcatColumn(gwi.sessionid);
}
else if (isp->sum_func() == Item_sum::JSON_ARRAYAGG_FUNC)
{
ac = new JsonArrayAggColumn(gwi.sessionid);
}
else if (isp->sum_func() == Item_sum::UDF_SUM_FUNC)
{
ac = new UDAFColumn(gwi.sessionid);
@ -5029,6 +5043,85 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
(dynamic_cast<GroupConcatColumn*>(ac))->separator(separator);
}
}
else if (isp->sum_func() == Item_sum::JSON_ARRAYAGG_FUNC)
{
Item_func_json_arrayagg* gc = (Item_func_json_arrayagg*)isp;
vector<SRCP> orderCols;
RowColumn* rowCol = new RowColumn();
vector<SRCP> selCols;
uint32_t select_ctn = gc->get_count_field();
ReturnedColumn* rc = NULL;
for (uint32_t i = 0; i < select_ctn; i++)
{
rc = buildReturnedColumn(sfitempp[i], gwi, gwi.fatalParseError);
if (!rc || gwi.fatalParseError)
{
if (ac)
delete ac;
return NULL;
}
selCols.push_back(SRCP(rc));
}
ORDER **order_item, **end;
for (order_item = gc->get_order(), end = order_item + gc->get_order_field(); order_item < end;
order_item++)
{
Item* ord_col = *(*order_item)->item;
if (ord_col->type() == Item::CONST_ITEM && ord_col->cmp_type() == INT_RESULT)
{
Item_int* id = (Item_int*)ord_col;
if (id->val_int() > (int)selCols.size())
{
gwi.fatalParseError = true;
if (ac)
delete ac;
return NULL;
}
rc = selCols[id->val_int() - 1]->clone();
rc->orderPos(id->val_int() - 1);
}
else
{
rc = buildReturnedColumn(ord_col, gwi, gwi.fatalParseError);
if (!rc || gwi.fatalParseError)
{
if (ac)
delete ac;
return NULL;
}
}
// 10.2 TODO: direction is now a tri-state flag
rc->asc((*order_item)->direction == ORDER::ORDER_ASC ? true : false);
orderCols.push_back(SRCP(rc));
}
rowCol->columnVec(selCols);
(dynamic_cast<JsonArrayAggColumn*>(ac))->orderCols(orderCols);
parm.reset(rowCol);
ac->aggParms().push_back(parm);
if (gc->get_separator())
{
string separator;
separator.assign(gc->get_separator()->ptr(), gc->get_separator()->length());
(dynamic_cast<JsonArrayAggColumn*>(ac))->separator(separator);
}
}
else if (isSupportedAggregateWithOneConstArg(isp, sfitempp))
{
processAggregateColumnConstArg(gwi, parm, ac, sfitempp[0], constArgParam);
@ -5113,17 +5206,17 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
// @bug 3603. for cases like max(rand()). try to build function first.
if (!rc)
{
rc = buildFunctionColumn(ifp, gwi, gwi.fatalParseError);
}
if (!rc)
{
gwi.fatalParseError = true;
}
parm.reset(rc);
gwi.clauseType = clauseType;
if (gwi.fatalParseError)
break;
break;
}
case Item::REF_ITEM:
{
ReturnedColumn* rc = buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError);
@ -5244,7 +5337,8 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
ct.precision = -16; // borrowed to indicate skip null value check on connector
ac->resultType(ct);
}
else if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC)
else if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC ||
isp->sum_func() == Item_sum::JSON_ARRAYAGG_FUNC)
{
// Item_func_group_concat* gc = (Item_func_group_concat*)isp;
CalpontSystemCatalog::ColType ct;

View File

@ -2206,7 +2206,9 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack)
gp_walk_info gwi(timeZoneOffset);
gwi.thd = thd;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
if (thd->slave_thread && !get_replication_slave(thd) &&
(isDMLStatement(thd->lex->sql_command) ||
thd->lex->sql_command == SQLCOM_ALTER_TABLE))
return 0;
// check whether the system is ready to process statement.
@ -2580,7 +2582,9 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table, long timeZone)
{
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
if (thd->slave_thread && !get_replication_slave(thd) &&
(isDMLStatement(thd->lex->sql_command) ||
thd->lex->sql_command == SQLCOM_ALTER_TABLE))
return HA_ERR_END_OF_FILE;
if (isMCSTableUpdate(thd) || isMCSTableDelete(thd))
@ -2659,7 +2663,9 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
int rc = 0;
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
if (thd->slave_thread && !get_replication_slave(thd) &&
(isDMLStatement(thd->lex->sql_command) ||
thd->lex->sql_command == SQLCOM_ALTER_TABLE))
return 0;
cal_connection_info* ci = nullptr;