1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

Merge pull request #493 from mariadb-corporation/MCOL-1201-b

Mcol 1201 b
This commit is contained in:
Andrew Hutchings
2018-06-07 17:40:45 +01:00
committed by GitHub
55 changed files with 3120 additions and 1014 deletions

View File

@@ -4038,6 +4038,10 @@ ParseTree* buildParseTree(Item_func* item, gp_walk_info& gwi, bool& nonSupport)
ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
{
// MCOL-1201 For UDAnF multiple parameters
vector<SRCP> selCols;
vector<SRCP> orderCols;
if (!(gwi.thd->infinidb_vtable.cal_conn_info))
gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
@@ -4054,6 +4058,7 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
// N.B. argument_count() is the # of formal parms to the agg fcn. InifniDB only supports 1 argument
// TODO: Support more than one parm
#if 0
if (isp->argument_count() != 1 && isp->sum_func() != Item_sum::GROUP_CONCAT_FUNC
&& isp->sum_func() != Item_sum::UDF_SUM_FUNC)
{
@@ -4061,7 +4066,7 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_MUL_ARG_AGG);
return NULL;
}
#endif
AggregateColumn* ac = NULL;
if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC)
@@ -4084,9 +4089,14 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
{
gwi.fatalParseError = true;
gwi.parseErrorText = "Non supported aggregate type on the select clause";
if (ac)
delete ac;
return NULL;
}
try
{
// special parsing for group_concat
if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC)
{
@@ -4103,7 +4113,11 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
rc = buildReturnedColumn(sfitempp[i], gwi, gwi.fatalParseError);
if (!rc || gwi.fatalParseError)
{
if (ac)
delete ac;
return NULL;
}
selCols.push_back(SRCP(rc));
}
@@ -4123,6 +4137,8 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
if (id->val_int() > (int)selCols.size())
{
gwi.fatalParseError = true;
if (ac)
delete ac;
return NULL;
}
@@ -4135,6 +4151,8 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
if (!rc || gwi.fatalParseError)
{
if (ac)
delete ac;
return NULL;
}
}
@@ -4147,6 +4165,7 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
rowCol->columnVec(selCols);
(dynamic_cast<GroupConcatColumn*>(ac))->orderCols(orderCols);
parm.reset(rowCol);
ac->aggParms().push_back(parm);
if (gc->str_separator())
{
@@ -4190,16 +4209,14 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
// treat as count(*)
if (ac->aggOp() == AggregateColumn::COUNT)
ac->aggOp(AggregateColumn::COUNT_ASTERISK);
ac->constCol(SRCP(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError)));
parm.reset(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError));
ac->constCol(parm);
break;
}
case Item::NULL_ITEM:
{
//ac->aggOp(AggregateColumn::COUNT);
parm.reset(new ConstantColumn("", ConstantColumn::NULLDATA));
//ac->functionParms(parm);
ac->constCol(SRCP(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError)));
break;
}
@@ -4259,7 +4276,6 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
if (gwi.fatalParseError)
break;
//ac->functionParms(parm);
break;
}
@@ -4270,7 +4286,6 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
if (rc)
{
parm.reset(rc);
//ac->functionParms(parm);
break;
}
}
@@ -4296,15 +4311,25 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_AGG_ARGS, args);
}
if (ac)
delete ac;
return NULL;
}
if (parm)
{
// MCOL-1201 multi-argument aggregate
ac->aggParms().push_back(parm);
}
}
}
if (parm)
// Get result type
// Modified for MCOL-1201 multi-argument aggregate
if (ac->aggParms().size() > 0)
{
ac->functionParms(parm);
// These are all one parm functions, so we can safely
// use the first parm for result type.
parm = ac->aggParms()[0];
if (isp->sum_func() == Item_sum::AVG_FUNC ||
isp->sum_func() == Item_sum::AVG_DISTINCT_FUNC)
{
@@ -4431,6 +4456,7 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
}
else
{
// UDAF result type will be set below.
ac->resultType(parm->resultType());
}
}
@@ -4462,18 +4488,24 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
// @bug5977 @note Temporary fix to avoid mysqld crash. The permanent fix will
// be applied in ExeMgr. When the ExeMgr fix is available, this checking
// will be taken out.
if (isp->sum_func() != Item_sum::UDF_SUM_FUNC)
{
if (ac->constCol() && gwi.tbList.empty() && gwi.derivedTbList.empty())
{
gwi.fatalParseError = true;
gwi.parseErrorText = "No project column found for aggregate function";
if (ac)
delete ac;
return NULL;
}
else if (ac->constCol())
{
gwi.count_asterisk_list.push_back(ac);
}
}
// For UDAF, populate the context and call the UDAF init() function.
// The return type is (should be) set in context by init().
if (isp->sum_func() == Item_sum::UDF_SUM_FUNC)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(ac);
@@ -4489,26 +4521,50 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
context.setScale(udafc->resultType().scale);
context.setPrecision(udafc->resultType().precision);
COL_TYPES colTypes;
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator cmIter;
// Build the column type vector. For now, there is only one
colTypes.push_back(make_pair(udafc->functionParms()->alias(), udafc->functionParms()->resultType().colDataType));
context.setParamCount(udafc->aggParms().size());
ColumnDatum colType;
ColumnDatum colTypes[udafc->aggParms().size()];
// Build the column type vector.
// Modified for MCOL-1201 multi-argument aggregate
for (uint32_t i = 0; i < udafc->aggParms().size(); ++i)
{
const execplan::CalpontSystemCatalog::ColType& resultType
= udafc->aggParms()[i]->resultType();
colType.dataType = resultType.colDataType;
colType.precision = resultType.precision;
colType.scale = resultType.scale;
colTypes[i] = colType;
}
// Call the user supplied init()
if (context.getFunction()->init(&context, colTypes) == mcsv1_UDAF::ERROR)
mcsv1sdk::mcsv1_UDAF* udaf = context.getFunction();
if (!udaf)
{
gwi.fatalParseError = true;
gwi.parseErrorText = "Aggregate Function " + context.getName() + " doesn't exist in the ColumnStore engine";
if (ac)
delete ac;
return NULL;
}
if (udaf->init(&context, colTypes) == mcsv1_UDAF::ERROR)
{
gwi.fatalParseError = true;
gwi.parseErrorText = udafc->getContext().getErrorMessage();
if (ac)
delete ac;
return NULL;
}
// UDAF_OVER_REQUIRED means that this function is for Window
// Function only. Reject it here in aggregate land.
if (udafc->getContext().getRunFlag(UDAF_OVER_REQUIRED))
{
gwi.fatalParseError = true;
gwi.parseErrorText =
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_WINDOW_FUNC_ONLY,
context.getName());
if (ac)
delete ac;
return NULL;
}
@@ -4521,7 +4577,24 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
udafc->resultType(ct);
}
}
}
catch (std::logic_error e)
{
gwi.fatalParseError = true;
gwi.parseErrorText = "error building Aggregate Function: ";
gwi.parseErrorText += e.what();
if (ac)
delete ac;
return NULL;
}
catch (...)
{
gwi.fatalParseError = true;
gwi.parseErrorText = "error building Aggregate Function: Unspecified exception";
if (ac)
delete ac;
return NULL;
}
return ac;
}
@@ -4674,6 +4747,7 @@ void gp_walk(const Item* item, void* arg)
if (isp)
{
// @bug 3669. trim trailing spaces for the compare value
if (isp->result_type() == STRING_RESULT)
{
String val, *str = isp->val_str(&val);
@@ -4684,7 +4758,10 @@ void gp_walk(const Item* item, void* arg)
cval.assign(str->ptr(), str->length());
}
size_t spos = cval.find_last_not_of(" ");
if (spos != string::npos)
cval = cval.substr(0, spos + 1);
gwip->rcWorkStack.push(new ConstantColumn(cval));
break;
@@ -7838,8 +7915,15 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi);
return ER_CHECK_NOT_IMPLEMENTED;
}
(*coliter)->functionParms(minSc);
// Replace the last (presumably constant) object with minSc
if ((*coliter)->aggParms().empty())
{
(*coliter)->aggParms().push_back(minSc);
}
else
{
(*coliter)->aggParms()[0] = minSc;
}
}
std::vector<FunctionColumn*>::iterator funciter;
@@ -8005,9 +8089,9 @@ int cp_get_group_plan(THD* thd, SCSEP& csep, cal_impl_if::cal_group_info& gi)
gwi.thd = thd;
int status = getGroupPlan(gwi, select_lex, csep, gi);
cerr << "---------------- cp_get_group_plan EXECUTION PLAN ----------------" << endl;
cerr << *csep << endl ;
cerr << "-------------- EXECUTION PLAN END --------------\n" << endl;
// cerr << "---------------- cp_get_group_plan EXECUTION PLAN ----------------" << endl;
// cerr << *csep << endl ;
// cerr << "-------------- EXECUTION PLAN END --------------\n" << endl;
if (status > 0)
return ER_INTERNAL_ERROR;
@@ -9949,7 +10033,15 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
return ER_CHECK_NOT_IMPLEMENTED;
}
(*coliter)->functionParms(minSc);
// Replace the last (presumably constant) object with minSc
if ((*coliter)->aggParms().empty())
{
(*coliter)->aggParms().push_back(minSc);
}
else
{
(*coliter)->aggParms()[0] = minSc;
}
}
std::vector<FunctionColumn*>::iterator funciter;

View File

@@ -781,8 +781,11 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h
//double double_val = *(double*)(&value);
//f2->store(double_val);
if (f2->decimals() < (uint32_t)row.getScale(s))
f2->dec = (uint32_t)row.getScale(s);
if ((f2->decimals() == DECIMAL_NOT_SPECIFIED && row.getScale(s) > 0)
|| f2->decimals() < row.getScale(s))
{
f2->dec = row.getScale(s);
}
f2->store(dl);
@@ -5275,8 +5278,6 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator colMapIter;
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator condColMapIter;
execplan::ParseTree* ptIt;
execplan::ReturnedColumn* rcIt;
for (TABLE_LIST* tl = gi.groupByTables; tl; tl = tl->next_local)
{
mapiter = ci->tableMap.find(tl->table);
@@ -5581,19 +5582,14 @@ internal_error:
*/
/***********************************************************
* DESCRIPTION:
* Return a result record for each
* group_by_handler::next_row() call.
* Return a result record for each group_by_handler::next_row() call.
* PARAMETERS:
* group_hand - group by handler, that preserves initial
* table and items lists. .
* table - TABLE pointer The table to save the result
* set in.
* group_hand - group by handler, that preserves initial table and items lists. .
* table - TABLE pointer The table to save the result set in.
* RETURN:
* 0 if success
* HA_ERR_END_OF_FILE if the record set has come to
* an end
* others if something went wrong whilst getting the
* result set
* HA_ERR_END_OF_FILE if the record set has come to an end
* others if something went wrong whilst getting the result set
***********************************************************/
int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table)
{

View File

@@ -340,6 +340,7 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
ac->distinct(item_sum->has_with_distinct());
Window_spec* win_spec = wf->window_spec;
SRCP srcp;
CalpontSystemCatalog::ColType ct; // For return type
// arguments
vector<SRCP> funcParms;
@@ -370,18 +371,25 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
context.setColWidth(rt.colWidth);
context.setScale(rt.scale);
context.setPrecision(rt.precision);
context.setParamCount(funcParms.size());
mcsv1sdk::ColumnDatum colType;
mcsv1sdk::ColumnDatum colTypes[funcParms.size()];
// Turn on the Analytic flag so the function is aware it is being called
// as a Window Function.
context.setContextFlag(CONTEXT_IS_ANALYTIC);
COL_TYPES colTypes;
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator cmIter;
// Build the column type vector.
// Modified for MCOL-1201 multi-argument aggregate
for (size_t i = 0; i < funcParms.size(); ++i)
{
colTypes.push_back(make_pair(funcParms[i]->alias(), funcParms[i]->resultType().colDataType));
const execplan::CalpontSystemCatalog::ColType& resultType
= funcParms[i]->resultType();
colType.dataType = resultType.colDataType;
colType.precision = resultType.precision;
colType.scale = resultType.scale;
colTypes[i] = colType;
}
// Call the user supplied init()
@@ -401,7 +409,6 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
}
// Set the return type as set in init()
CalpontSystemCatalog::ColType ct;
ct.colDataType = context.getResultType();
ct.colWidth = context.getColWidth();
ct.scale = context.getScale();
@@ -419,10 +426,10 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
{
case Item_sum::UDF_SUM_FUNC:
{
uint64_t bIgnoreNulls = (ac->getUDAFContext().getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS));
char sIgnoreNulls[18];
sprintf(sIgnoreNulls, "%lu", bIgnoreNulls);
srcp.reset(new ConstantColumn(sIgnoreNulls, (uint64_t)bIgnoreNulls, ConstantColumn::NUM)); // IGNORE/RESPECT NULLS. 1 => RESPECT
uint64_t bRespectNulls = (ac->getUDAFContext().getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) ? 0 : 1;
char sRespectNulls[18];
sprintf(sRespectNulls, "%lu", bRespectNulls);
srcp.reset(new ConstantColumn(sRespectNulls, (uint64_t)bRespectNulls, ConstantColumn::NUM)); // IGNORE/RESPECT NULLS. 1 => RESPECT
funcParms.push_back(srcp);
break;
}
@@ -881,11 +888,13 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
return NULL;
}
ac->resultType(colType_MysqlToIDB(item_sum));
// bug5736. Make the result type double for some window functions when
// infinidb_double_for_decimal_math is set.
ac->adjustResultType();
if (item_sum->sum_func() != Item_sum::UDF_SUM_FUNC)
{
ac->resultType(colType_MysqlToIDB(item_sum));
// bug5736. Make the result type double for some window functions when
// infinidb_double_for_decimal_math is set.
ac->adjustResultType();
}
ac->expressionId(ci->expressionId++);

View File

@@ -84,6 +84,7 @@ CREATE FUNCTION idbpartition RETURNS STRING soname 'libcalmysql.so';
CREATE FUNCTION idblocalpm RETURNS INTEGER soname 'libcalmysql.so';
CREATE FUNCTION mcssystemready RETURNS INTEGER soname 'libcalmysql.so';
CREATE FUNCTION mcssystemreadonly RETURNS INTEGER soname 'libcalmysql.so';
CREATE AGGREGATE FUNCTION regr_avgx RETURNS REAL soname 'libcalmysql.dll';
CREATE DATABASE IF NOT EXISTS infinidb_vtable;
CREATE DATABASE IF NOT EXISTS infinidb_querystats;