1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-1052 Generate execution plan for a aggregated function query call.

This commit is contained in:
Roman Nozdrin
2018-03-27 18:37:00 +03:00
parent cff504c8bf
commit fa4067b6f0
6 changed files with 1159 additions and 23 deletions

View File

@ -1160,7 +1160,7 @@ create_calpont_group_by_handler(THD *thd, Query *query)
Item *item;
List_iterator_fast<Item> it(*query->select);
if ( thd->infinidb_vtable.vtable_state != THD::INFINIDB_DISABLE_VTABLE )
if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE )
{
handler = new ha_calpont_group_by_handler(thd, query);
}
@ -1172,10 +1172,9 @@ int ha_calpont_group_by_handler::init_scan()
{
DBUG_ENTER("ha_calpont_group_by_handler::init_scan");
int rc = ha_calpont_impl_group_by_init(query, table);
int rc = ha_calpont_impl_group_by_init(this, table);
DBUG_RETURN(rc);
// return 0;
}
int ha_calpont_group_by_handler::next_row()
@ -1183,7 +1182,7 @@ int ha_calpont_group_by_handler::next_row()
// if (!first_row)
// return(HA_ERR_END_OF_FILE);
DBUG_ENTER("ha_calpont_group_by_handler::next_row");
int rc = ha_calpont_impl_group_by_next(query, table);
int rc = ha_calpont_impl_group_by_next(this, table);
DBUG_RETURN(rc);
/*
@ -1199,7 +1198,7 @@ int ha_calpont_group_by_handler::end_scan()
{
DBUG_ENTER("ha_calpont_group_by_handler::end_scan");
int rc = ha_calpont_impl_group_by_end(query, table);
int rc = ha_calpont_impl_group_by_end(this, table);
DBUG_RETURN(rc);
// return 0;

View File

@ -260,14 +260,10 @@ class ha_calpont_group_by_handler: public group_by_handler
int init_scan();
int next_row();
int end_scan();
private:
List<Item> *fields;
TABLE_LIST *table_list;
bool first_row;
Query *query;
};
#endif //HA_CALPONT_H__

View File

@ -7831,6 +7831,10 @@ int cp_get_plan(THD* thd, SCSEP& csep)
else if (status < 0)
return status;
cerr << "---------------- cp_get_plan EXECUTION PLAN ----------------" << endl;
cerr << *csep << endl ;
cerr << "-------------- EXECUTION PLAN END --------------\n" << endl;
// Derived table projection and filter optimization.
derivedTableOptimization(csep);
@ -7932,4 +7936,128 @@ int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti)
return 0;
}
int cp_get_group_plan(THD* thd, SCSEP& csep, cal_table_info& ti )
{
gp_walk_info* gwi = ti.condInfo;
List_iterator_fast<Item> it(*ti.groupByFields);
Item* item;
if (!gwi)
gwi = new gp_walk_info();
gwi->thd = thd;
LEX* lex = thd->lex;
idbassert(lex != 0);
uint32_t sessionID = csep->sessionID();
gwi->sessionid = sessionID;
TABLE* table = ti.msTablePtr;
boost::shared_ptr<CalpontSystemCatalog> csc =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(CalpontSystemCatalog::FE);
// get all columns that mysql needs to fetch
MY_BITMAP* read_set = table->read_set;
Field** f_ptr, *field;
gwi->columnMap.clear();
const CalpontSystemCatalog::TableAliasName tblAliasName=
make_aliastable(table->s->db.str, table->s->table_name.str, table->alias.c_ptr());
gwi->tbList.push_back(tblAliasName);
while ((item = it++))
{
Item *arg0;
Field *field;
ReturnedColumn* rc = buildAggregateColumn(item, *gwi);
//string alias(table->alias.c_ptr());
//rc->tableAlias(lower(alias));
assert (rc);
boost::shared_ptr<ReturnedColumn> sprc(rc);
gwi->returnedCols.push_back(sprc);
arg0=((Item_sum*) item)->get_arg(0);
field=((Item_field*) arg0)->field;
gwi->columnMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(string(field->field_name), sprc));
//arg0=((Item_sum*) item)->get_arg(0);
//field=((Item_field*) arg0)->field;
}
/*
for (f_ptr = table->field ; (field = *f_ptr) ; f_ptr++)
{
if (bitmap_is_set(read_set, field->field_index))
{
SimpleColumn* sc = new SimpleColumn(table->s->db.str, table->s->table_name.str, field->field_name, sessionID);
string alias(table->alias.c_ptr());
sc->tableAlias(lower(alias));
assert (sc);
boost::shared_ptr<SimpleColumn> spsc(sc);
gwi->returnedCols.push_back(spsc);
gwi->columnMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(string(field->field_name), spsc));
}
}
*/
if (gwi->columnMap.empty())
{
CalpontSystemCatalog::TableName tn = make_table(table->s->db.str, table->s->table_name.str);
CalpontSystemCatalog::TableAliasName tan = make_aliastable(table->s->db.str, table->s->table_name.str, table->alias.c_ptr());
SimpleColumn* sc = getSmallestColumn(csc, tn, tan, table, *gwi);
SRCP srcp(sc);
gwi->columnMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(sc->columnName(), srcp));
gwi->returnedCols.push_back(srcp);
}
// get filter
if (ti.condInfo)
{
gp_walk_info* gwi = ti.condInfo;
ParseTree* filters = 0;
ParseTree* ptp = 0;
ParseTree* rhs = 0;
while (!gwi->ptWorkStack.empty())
{
filters = gwi->ptWorkStack.top();
gwi->ptWorkStack.pop();
SimpleFilter* sf = dynamic_cast<SimpleFilter*>(filters->data());
if (sf && sf->op()->data() == "noop")
{
delete filters;
filters = 0;
if (gwi->ptWorkStack.empty())
break;
continue;
}
if (gwi->ptWorkStack.empty())
break;
ptp = new ParseTree(new LogicOperator("and"));
ptp->left(filters);
rhs = gwi->ptWorkStack.top();
gwi->ptWorkStack.pop();
ptp->right(rhs);
gwi->ptWorkStack.push(ptp);
}
csep->filters(filters);
}
csep->returnedCols(gwi->returnedCols);
csep->columnMap(gwi->columnMap);
CalpontSelectExecutionPlan::TableList tblist;
tblist.push_back(tblAliasName);
csep->tableList(tblist);
// @bug 3321. Set max number of blocks in a dictionary file to be scanned for filtering
csep->stringScanThreshold(gwi->thd->variables.infinidb_string_scan_threshold);
return 0;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -48,15 +48,16 @@ extern int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type);
extern int ha_calpont_impl_update_row();
extern int ha_calpont_impl_delete_row();
extern int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos);
extern int ha_calpont_impl_group_by_init(Query* query, TABLE* table);
extern int ha_calpont_impl_group_by_next(Query* query, TABLE* table);
extern int ha_calpont_impl_group_by_end(Query* query, TABLE* table);
extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table);
#endif
#ifdef NEED_CALPONT_INTERFACE
#include "ha_calpont_impl_if.h"
#include "calpontsystemcatalog.h"
#include "ha_calpont.h"
extern int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_impl_if::cal_connection_info& ci);
extern int ha_calpont_impl_write_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci, ha_rows& rowsInserted);
extern int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci);
@ -73,6 +74,9 @@ extern std::string ha_calpont_impl_droppartition_ (execplan::CalpontSystemCatal
extern std::string ha_calpont_impl_viewtablelock( cal_impl_if::cal_connection_info& ci, execplan::CalpontSystemCatalog::TableName& tablename);
extern std::string ha_calpont_impl_cleartablelock( cal_impl_if::cal_connection_info& ci, uint64_t tableLockID);
extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table);
#endif
#endif

View File

@ -178,7 +178,8 @@ struct cal_table_info
msTablePtr(0),
conn_hndl(0),
condInfo(0),
moreRows(false)
moreRows(false),
groupByFields(0)
{ }
~cal_table_info() {}
sm::cpsm_tplh_t* tpl_ctx;
@ -189,6 +190,7 @@ struct cal_table_info
gp_walk_info* condInfo;
execplan::SCSEP csep;
bool moreRows; //are there more rows to consume (b/c of limit)
List<Item> *groupByFields; // MCOL-1052 For CSEP generation
};
typedef std::tr1::unordered_map<TABLE*, cal_table_info> CalTableMap;
@ -297,6 +299,7 @@ const std::string infinidb_err_msg = "\nThe query includes syntax that is not su
int cp_get_plan(THD* thd, execplan::SCSEP& csep);
int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_info& ti);
int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_info& ti);
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false);
void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi);
void setError(THD* thd, uint32_t errcode, const std::string errmsg);