diff --git a/dbcon/mysql/CMakeLists.txt b/dbcon/mysql/CMakeLists.txt index c74e19343..0aa6ddcc0 100644 --- a/dbcon/mysql/CMakeLists.txt +++ b/dbcon/mysql/CMakeLists.txt @@ -22,6 +22,8 @@ SET ( libcalmysql_SRCS ha_pseudocolumn.cpp) add_definitions(-DMYSQL_DYNAMIC_PLUGIN) +add_definitions(-DEBUG_WALK_COND) +add_definitions(-DINFINIDB_DEBUG) set_source_files_properties(ha_calpont.cpp PROPERTIES COMPILE_FLAGS "-fno-rtti -fno-implicit-templates") diff --git a/dbcon/mysql/ha_calpont.cpp b/dbcon/mysql/ha_calpont.cpp index 552a895d6..f1b375480 100644 --- a/dbcon/mysql/ha_calpont.cpp +++ b/dbcon/mysql/ha_calpont.cpp @@ -21,6 +21,7 @@ #define NEED_CALPONT_EXTERNS #include "ha_calpont_impl.h" +#include "ha_mcs_pushdown.h" static handler* calpont_create_handler(handlerton* hton, TABLE_SHARE* table, @@ -32,9 +33,17 @@ static int calpont_rollback(handlerton* hton, THD* thd, bool all); static int calpont_close_connection ( handlerton* hton, THD* thd ); handlerton* calpont_hton; +// handlers creation function for hton. +// Look into ha_mcs_pushdown.* for more details. static group_by_handler* create_calpont_group_by_handler(THD* thd, Query* query); +static derived_handler* +create_columnstore_derived_handler(THD* thd, TABLE_LIST *derived); + +static select_handler* +create_columnstore_select_handler(THD* thd, SELECT_LEX* sel); + /* Variables for example share methods */ /* @@ -131,6 +140,8 @@ static int columnstore_init_func(void* p) calpont_hton->rollback = calpont_rollback; calpont_hton->close_connection = calpont_close_connection; calpont_hton->create_group_by = create_calpont_group_by_handler; + calpont_hton->create_derived = create_columnstore_derived_handler; + calpont_hton->create_select = create_columnstore_select_handler; DBUG_RETURN(0); } @@ -159,6 +170,10 @@ static int infinidb_init_func(void* p) calpont_hton->commit = calpont_commit; calpont_hton->rollback = calpont_rollback; calpont_hton->close_connection = calpont_close_connection; + calpont_hton->create_group_by = create_calpont_group_by_handler; + calpont_hton->create_derived = create_columnstore_derived_handler; + calpont_hton->create_select = create_columnstore_select_handler; + DBUG_RETURN(0); } @@ -929,256 +944,7 @@ struct st_mysql_storage_engine columnstore_storage_engine = struct st_mysql_storage_engine infinidb_storage_engine = { MYSQL_HANDLERTON_INTERFACE_VERSION }; -/*@brief check_walk - It traverses filter conditions*/ -/************************************************************ - * DESCRIPTION: - * It traverses filter predicates looking for unsupported - * JOIN types: non-equi JOIN, e.g t1.c1 > t2.c2; - * logical OR. - * PARAMETERS: - * thd - THD pointer. - * derived - TABLE_LIST* to work with. - * RETURN: - * derived_handler if possible - * NULL in other case - ***********************************************************/ -void check_walk(const Item* item, void* arg) -{ - bool* unsupported_feature = static_cast(arg); - if ( *unsupported_feature ) - return; - switch (item->type()) - { - case Item::FUNC_ITEM: - { - const Item_func* ifp = static_cast(item); - - if ( ifp->functype() != Item_func::EQ_FUNC ) // NON-equi JOIN - { - if ( ifp->argument_count() == 2 && - ifp->arguments()[0]->type() == Item::FIELD_ITEM && - ifp->arguments()[1]->type() == Item::FIELD_ITEM ) - { - Item_field* left= static_cast(ifp->arguments()[0]); - Item_field* right= static_cast(ifp->arguments()[1]); - - if ( left->field->table != right->field->table ) - { - *unsupported_feature = true; - return; - } - } - else // IN + correlated subquery - { - if ( ifp->functype() == Item_func::NOT_FUNC - && ifp->arguments()[0]->type() == Item::EXPR_CACHE_ITEM ) - { - check_walk(ifp->arguments()[0], arg); - } - } - } - break; - } - - case Item::EXPR_CACHE_ITEM: // IN + correlated subquery - { - const Item_cache_wrapper* icw = static_cast(item); - if ( icw->get_orig_item()->type() == Item::FUNC_ITEM ) - { - const Item_func *ifp = static_cast(icw->get_orig_item()); - if ( ifp->argument_count() == 2 && - ( ifp->arguments()[0]->type() == Item::Item::SUBSELECT_ITEM - || ifp->arguments()[1]->type() == Item::Item::SUBSELECT_ITEM )) - { - *unsupported_feature = true; - return; - } - } - break; - } - - case Item::COND_ITEM: // OR in cods is unsupported yet - { - Item_cond* icp = (Item_cond*)item; - if ( is_cond_or(icp) ) - { - *unsupported_feature = true; - } - break; - } - default: - { - break; - } - } -} - -/*@brief create_calpont_group_by_handler- Creates handler*/ -/*********************************************************** - * DESCRIPTION: - * Creates a group_by pushdown handler if there is no: - * non-equi JOIN, e.g * t1.c1 > t2.c2 - * logical OR in the filter predicates - * Impossible WHERE - * Impossible HAVING - * and there is either GROUP BY or aggregation function - * exists at the top level. - * Valid queries with the last two crashes the server if - * processed. - * Details are in server/sql/group_by_handler.h - * PARAMETERS: - * thd - THD pointer - * query - Query structure LFM in group_by_handler.h - * RETURN: - * group_by_handler if success - * NULL in other case - ***********************************************************/ -static group_by_handler* -create_calpont_group_by_handler(THD* thd, Query* query) -{ - ha_calpont_group_by_handler* handler = NULL; - // same as thd->lex->current_select - SELECT_LEX *select_lex = query->from->select_lex; - - // Create a handler if query is valid. See comments for details. - if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE - && ( thd->variables.infinidb_vtable_mode == 0 - || thd->variables.infinidb_vtable_mode == 2 ) - && ( query->group_by || select_lex->with_sum_func ) ) - { - bool unsupported_feature = false; - // revisit SELECT_LEX for all units - for(TABLE_LIST* tl = query->from; !unsupported_feature && tl; tl = tl->next_global) - { - select_lex = tl->select_lex; - // Correlation subquery. Comming soon so fail on this yet. - unsupported_feature = select_lex->is_correlated; - - // Impossible HAVING or WHERE - if ( ( !unsupported_feature && query->having && select_lex->having_value == Item::COND_FALSE ) - || ( select_lex->cond_count > 0 - && select_lex->cond_value == Item::COND_FALSE ) ) - { - unsupported_feature = true; - } - - // Unsupported JOIN conditions - if ( !unsupported_feature ) - { - JOIN *join = select_lex->join; - Item_cond *icp = 0; - - if (join != 0) - icp = reinterpret_cast(join->conds); - - if ( unsupported_feature == false - && icp ) - { - icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); - } - - // Optimizer could move some join conditions into where - if (select_lex->where != 0) - icp = reinterpret_cast(select_lex->where); - - if ( unsupported_feature == false - && icp ) - { - icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); - } - - } - } // unsupported features check ends here - - if ( !unsupported_feature ) - { - handler = new ha_calpont_group_by_handler(thd, query); - - // Notify the server, that CS handles GROUP BY, ORDER BY and HAVING clauses. - query->group_by = NULL; - query->order_by = NULL; - query->having = NULL; - } - } - - return handler; -} - -/*********************************************************** - * DESCRIPTION: - * GROUP BY handler constructor - * PARAMETERS: - * thd - THD pointer. - * query - Query describing structure - ***********************************************************/ -ha_calpont_group_by_handler::ha_calpont_group_by_handler(THD* thd_arg, Query* query) - : group_by_handler(thd_arg, calpont_hton), - select(query->select), - table_list(query->from), - distinct(query->distinct), - where(query->where), - group_by(query->group_by), - order_by(query->order_by), - having(query->having) -{ -} - -/*********************************************************** - * DESCRIPTION: - * GROUP BY destructor - ***********************************************************/ -ha_calpont_group_by_handler::~ha_calpont_group_by_handler() -{ -} - -/*********************************************************** - * DESCRIPTION: - * Makes the plan and prepares the data - * RETURN: - * int rc - ***********************************************************/ -int ha_calpont_group_by_handler::init_scan() -{ - DBUG_ENTER("ha_calpont_group_by_handler::init_scan"); - - // Save vtable_state to restore the after we inited. - THD::infinidb_state oldState = thd->infinidb_vtable.vtable_state; - // MCOL-1052 Should be removed after cleaning the code up. - thd->infinidb_vtable.vtable_state = THD::INFINIDB_CREATE_VTABLE; - int rc = ha_calpont_impl_group_by_init(this, table); - thd->infinidb_vtable.vtable_state = oldState; - - DBUG_RETURN(rc); -} - -/*********************************************************** - * DESCRIPTION: - * Fetches a row and saves it to a temporary table. - * RETURN: - * int rc - ***********************************************************/ -int ha_calpont_group_by_handler::next_row() -{ - DBUG_ENTER("ha_calpont_group_by_handler::next_row"); - int rc = ha_calpont_impl_group_by_next(this, table); - - DBUG_RETURN(rc); -} - -/*********************************************************** - * DESCRIPTION: - * Shuts the scan down. - * RETURN: - * int rc - ***********************************************************/ -int ha_calpont_group_by_handler::end_scan() -{ - DBUG_ENTER("ha_calpont_group_by_handler::end_scan"); - - int rc = ha_calpont_impl_group_by_end(this, table); - - DBUG_RETURN(rc); -} +#include "ha_mcs_pushdown.cpp" mysql_declare_plugin(columnstore) { diff --git a/dbcon/mysql/ha_calpont.h b/dbcon/mysql/ha_calpont.h index bab756851..070f5380b 100644 --- a/dbcon/mysql/ha_calpont.h +++ b/dbcon/mysql/ha_calpont.h @@ -24,7 +24,7 @@ extern handlerton* calpont_hton; /** @brief - This structure will be shared among all open handlers. + INFINIDB_SHARE is a structure that will be shared among all open handlers. This example implements the minimum of what you will probably need. */ typedef struct st_calpont_share @@ -228,51 +228,4 @@ public: } }; - -/*@brief group_by_handler class*/ -/*********************************************************** - * DESCRIPTION: - * Provides server with group_by_handler API methods. - * One should read comments in server/sql/group_by_handler.h - * Attributes: - * select - attribute contains all GROUP BY, HAVING, ORDER items and calls it - * an extended SELECT list according to comments in - * server/sql/group_handler.cc. - * So the temporary table for - * select count(*) from b group by a having a > 3 order by a - * will have 4 columns not 1. - * However server ignores all NULLs used in - * GROUP BY, HAVING, ORDER. - * select_list_descr - contains Item description returned by Item->print() - * that is used in lookup for corresponding columns in - * extended SELECT list. - * table_list - contains all tables involved. Must be CS tables only. - * distinct - looks like a useless thing for now. Couldn't get it set by server. - * where - where items. - * group_by - group by ORDER items. - * order_by - order by ORDER items. - * having - having Item. - * Methods: - * init_scan - get plan and send it to ExeMgr. Get the execution result. - * next_row - get a row back from sm. - * end_scan - finish and clean the things up. - ***********************************************************/ -class ha_calpont_group_by_handler: public group_by_handler -{ -public: - ha_calpont_group_by_handler(THD* thd_arg, Query* query); - ~ha_calpont_group_by_handler(); - int init_scan(); - int next_row(); - int end_scan(); - - List* select; - TABLE_LIST* table_list; - bool distinct; - Item* where; - ORDER* group_by; - ORDER* order_by; - Item* having; -}; #endif //HA_CALPONT_H__ - diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index d40584109..c8b6aa058 100644 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -192,6 +192,150 @@ bool nonConstFunc(Item_func* ifp) return false; } +/*@brief getColNameFromItem - builds a name from an Item */ +/*********************************************************** + * DESCRIPTION: + * This f() looks for a first proper Item_ident and populate + * ostream with schema, table and column names. + * Used to build db.table.field tuple for debugging output + * in getSelectPlan(). TBD getGroupPlan must use this also. + * PARAMETERS: + * item source Item* + * ostream output stream + * RETURNS + * void + ***********************************************************/ +void getColNameFromItem(std::ostringstream& ostream, Item* item) +{ +// MCOL-2121 WIP +// Item_func doesn't have proper db.table.field values +// inherited from Item_ident. TBD what is the valid output. +// !!!dynamic_cast fails compilation + ostream << "'"; + + if (item->type() != Item::FIELD_ITEM) + { + ostream << "unknown db" << '.'; + ostream << "unknown table" << '.'; + ostream << "unknown field"; + } + else + { + Item_ident* iip = reinterpret_cast(item); + + if (iip->db_name) + ostream << iip->db_name << '.'; + else + ostream << "unknown db" << '.'; + + if (iip->table_name) + ostream << iip->table_name << '.'; + else + ostream << "unknown table" << '.'; + + if (iip->field_name.length) + ostream << iip->field_name.str; + else + ostream << "unknown field"; + } + + ostream << "'"; + return; +} + +/*@brf sortItemIsInGroupRec - seeks for an item in grouping*/ +/*********************************************************** + * DESCRIPTION: + * This f() recursively traverses grouping items and looks + * for an FUNC_ITEM, REF_ITEM or FIELD_ITEM. + * f() is used by sortItemIsInGrouping(). + * PARAMETERS: + * sort_item Item* used to build aggregation. + * group_item GROUP BY item. + * RETURNS + * bool + ***********************************************************/ +bool sortItemIsInGroupRec(Item* sort_item, Item* group_item) +{ + bool found = false; + // If ITEM_REF::ref is NULL + if (sort_item == NULL) + { + return found; + } + + Item_func* ifp_sort = reinterpret_cast(sort_item); + + // base cases for Item_field and Item_ref. The second arg is binary cmp switch + found = group_item->eq(sort_item, false); + if (!found && sort_item->type() == Item::REF_ITEM) + { + Item_ref* ifp_sort_ref = reinterpret_cast(sort_item); + found = sortItemIsInGroupRec(*ifp_sort_ref->ref, group_item); + } + + // seeking for a group_item match + for (uint32_t i = 0; !found && i < ifp_sort->argument_count(); i++) + { + Item* ifp_sort_arg = ifp_sort->arguments()[i]; + if (ifp_sort_arg->type() == Item::FUNC_ITEM + || ifp_sort_arg->type() == Item::FIELD_ITEM) + { + Item* ifp_sort_arg = ifp_sort->arguments()[i]; + found = sortItemIsInGroupRec(ifp_sort_arg, group_item); + } + else if (ifp_sort_arg->type() == Item::REF_ITEM) + { + // dereference the Item + Item_ref* ifp_sort_ref = reinterpret_cast(ifp_sort_arg); + found = sortItemIsInGroupRec(*ifp_sort_ref->ref, group_item); + } + } + + return found; +} + +/*@brief sortItemIsInGrouping- seeks for an item in grouping*/ +/*********************************************************** + * DESCRIPTION: + * This f() traverses grouping items and looks for an item. + * only Item_fields, Item_func are considered. However there + * could be Item_ref down the tree. + * f() is used in sorting parsing by getSelectPlan(). + * PARAMETERS: + * sort_item Item* used to build aggregation. + * groupcol GROUP BY items from this unit. + * RETURNS + * bool + ***********************************************************/ +bool sortItemIsInGrouping(Item* sort_item, ORDER* groupcol) +{ + bool found = false; + + if(sort_item->type() == Item::SUM_FUNC_ITEM) + { + found = true; + } + + for (; !found && groupcol; groupcol = groupcol->next) + { + Item* group_item = *(groupcol->item); + found = (group_item->eq(sort_item, false)) ? true : false; + // Detect aggregation functions first then traverse + // if sort field is a Func and group field + // is either Field or Func + // Consider nonConstFunc() check here + if(!found && sort_item->type() == Item::FUNC_ITEM + && (group_item->type() == Item::FUNC_ITEM + || group_item->type() == Item::FIELD_ITEM)) + { + found = sortItemIsInGroupRec(sort_item, group_item); + } + } + + return found; +} + /*@brief buildAggFrmTempField- build aggr func from extSELECT list item*/ /*********************************************************** * DESCRIPTION: @@ -230,7 +374,6 @@ ReturnedColumn* buildAggFrmTempField(Item* item, gp_walk_info& gwi) std::vector::iterator iter = gwi.extSelAggColsItems.begin(); for ( ; iter != gwi.extSelAggColsItems.end(); iter++ ) { - //Item* temp_isfp = *iter; isfp = reinterpret_cast(*iter); if ( isfp->type() == Item::SUM_FUNC_ITEM && @@ -4079,10 +4222,10 @@ SimpleColumn* buildSimpleColumn(Item_field* ifp, gp_walk_info& gwi) { // check foreign engine if (ifp->cached_table && ifp->cached_table->table) - infiniDB = isInfiniDB(ifp->cached_table->table); + infiniDB = isMCSTable(ifp->cached_table->table); // @bug4509. ifp->cached_table could be null for myisam sometimes else if (ifp->field && ifp->field->table) - infiniDB = isInfiniDB(ifp->field->table); + infiniDB = isMCSTable(ifp->field->table); if (infiniDB) { @@ -5732,7 +5875,7 @@ void parse_item (Item* item, vector& field_vec, } } -bool isInfiniDB(TABLE* table_ptr) +bool isMCSTable(TABLE* table_ptr) { #if (defined(_MSC_VER) && defined(_DEBUG)) || defined(SAFE_MUTEX) @@ -5754,14 +5897,18 @@ bool isInfiniDB(TABLE* table_ptr) return false; } -int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool isUnion) +int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, + SCSEP& csep, + bool isUnion, + bool isPushdownHand) { #ifdef DEBUG_WALK_COND cerr << "getSelectPlan()" << endl; #endif // by pass the derived table resolve phase of mysql - if (!(((gwi.thd->lex)->sql_command == SQLCOM_UPDATE ) || + if ( !isPushdownHand && + !(((gwi.thd->lex)->sql_command == SQLCOM_UPDATE ) || ((gwi.thd->lex)->sql_command == SQLCOM_DELETE ) || ((gwi.thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) || ((gwi.thd->lex)->sql_command == SQLCOM_DELETE_MULTI ) ) && gwi.thd->derived_tables_processing) @@ -5913,7 +6060,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i else { // check foreign engine tables - bool infiniDB = (table_ptr->table ? isInfiniDB(table_ptr->table) : true); + bool infiniDB = (table_ptr->table ? isMCSTable(table_ptr->table) : true); // trigger system catalog cache if (infiniDB) @@ -5965,6 +6112,10 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i bool unionSel = false; + // UNION master unit check + // Existed pushdown handlers won't get in this scope + // except UNION pushdown that is to come. + // is_unit_op() give a segv for derived_handler's SELECT_LEX if (!isUnion && select_lex.master_unit()->is_unit_op()) { gwi.thd->infinidb_vtable.isUnion = true; @@ -7212,16 +7363,29 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i { if ((*(ordercol->item))->type() == Item::WINDOW_FUNC_ITEM) gwi.hasWindowFunc = true; + // MCOL-2166 Looking for this sorting item in GROUP_BY items list. + if(isPushdownHand + && !sortItemIsInGrouping(*ordercol->item, select_lex.group_list.first)) + { + std::ostringstream ostream; + std::ostringstream& osr = ostream; + getColNameFromItem(osr, *ordercol->item); + Message::Args args; + args.add(ostream.str()); + string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args); + gwi.parseErrorText = emsg; + setError(gwi.thd, ER_INTERNAL_ERROR, emsg, gwi); + return ERR_NOT_GROUPBY_EXPRESSION; + } } // re-visit the first of ordercol list ordercol = reinterpret_cast(order_list.first); - // for subquery, order+limit by will be supported in infinidb. build order by columns - // @todo union order by and limit support - if (gwi.hasWindowFunc - || gwi.subSelectType != CalpontSelectExecutionPlan::MAIN_SELECT - || ( isUnion && ordercol )) + // for subquery or pushdown query, order+limit by will be supported in CS + // union order by and limit are supported + if (gwi.hasWindowFunc || isPushdownHand || ( isUnion && ordercol ) + || gwi.subSelectType != CalpontSelectExecutionPlan::MAIN_SELECT ) { for (; ordercol; ordercol = ordercol->next) { @@ -7654,12 +7818,13 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i gwi.returnedCols.push_back(minSc); } - if (!isUnion && !gwi.hasWindowFunc && gwi.subSelectType == CalpontSelectExecutionPlan::MAIN_SELECT) + // ORDER BY translation part + if (!isUnion && !gwi.hasWindowFunc + && gwi.subSelectType == CalpontSelectExecutionPlan::MAIN_SELECT ) { std::ostringstream vtb; vtb << "infinidb_vtable.$vtable_" << gwi.thd->thread_id; - //vtb << "$vtable_" << gwi.thd->thread_id; // re-construct the select query and redo phase 1 if (redo) { @@ -7927,11 +8092,20 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i { gwi.thd->infinidb_vtable.has_order_by = true; csep->hasOrderBy(true); + // To activate LimitedOrderBy + if(isPushdownHand) + { + csep->specHandlerProcessed(true); + } ord_cols = " order by " + ord_cols; select_query += ord_cols; } } + // LIMIT processing part + uint64_t limitNum = std::numeric_limits::max(); + + // non-MAIN union branch if (unionSel || gwi.subSelectType != CalpontSelectExecutionPlan::MAIN_SELECT) { if (select_lex.master_unit()->global_parameters()->explicit_limit) @@ -7958,6 +8132,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i } } } + // union with explicit select at the top level else if (isUnion && select_lex.explicit_limit) { if (select_lex.braces) @@ -7969,10 +8144,10 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i csep->limitNum(((Item_int*)select_lex.select_limit)->val_int()); } } + // other types of queries that have explicit LIMIT else if (select_lex.explicit_limit) { uint32_t limitOffset = 0; - uint32_t limitNum = std::numeric_limits::max(); if (join) { @@ -8027,6 +8202,12 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i csep->limitStart(limitOffset); csep->limitNum(limitNum); } + // Pushdown queries w ORDER BY and LIMIT + else if (isPushdownHand && csep->hasOrderBy()) + { + csep->limitStart(limitOffset); + csep->limitNum(limitNum); + } else { ostringstream limit; @@ -8034,6 +8215,12 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i select_query += limit.str(); } } + // Pushdown queries with ORDER BY w/o explicit limit + else if (isPushdownHand && csep->hasOrderBy()) + { + // We must set this to activate LimitedOrderBy in ExeMgr + csep->limitNum((uint64_t) - 2); + } gwi.thd->infinidb_vtable.select_vtable_query.free(); gwi.thd->infinidb_vtable.select_vtable_query.append(select_query.c_str(), select_query.length()); @@ -8047,9 +8234,9 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi); return ER_CHECK_NOT_IMPLEMENTED; } - } + } // LIMIT processing finishes here - if (/*join->select_options*/select_lex.options & SELECT_DISTINCT) + if (select_lex.options & SELECT_DISTINCT) csep->distinct(true); // add the smallest column to count(*) parm. @@ -8283,6 +8470,49 @@ int cp_get_group_plan(THD* thd, SCSEP& csep, cal_impl_if::cal_group_info& gi) return 0; } +int cs_get_derived_plan(derived_handler* handler, THD* thd, SCSEP& csep) +{ + SELECT_LEX select_lex = *handler->select; + gp_walk_info gwi; + gwi.thd = thd; + int status = getSelectPlan(gwi, select_lex, csep, false, true); + + if (status > 0) + return ER_INTERNAL_ERROR; + else if (status < 0) + return status; + +#ifdef DEBUG_WALK_COND + cerr << "---------------- cp_get_derived_plan EXECUTION PLAN ----------------" << endl; + cerr << *csep << endl ; + cerr << "-------------- EXECUTION PLAN END --------------\n" << endl; +#endif + + return 0; +} + +int cs_get_select_plan(select_handler* handler, THD* thd, SCSEP& csep) +{ + SELECT_LEX select_lex = *handler->select; + gp_walk_info gwi; + gwi.thd = thd; + int status = getSelectPlan(gwi, select_lex, csep, false, true); + + if (status > 0) + return ER_INTERNAL_ERROR; + else if (status < 0) + return status; + +#ifdef DEBUG_WALK_COND + cerr << "---------------- cp_get_select_plan EXECUTION PLAN ----------------" << endl; + cerr << *csep << endl ; + cerr << "-------------- EXECUTION PLAN END --------------\n" << endl; +#endif + + return 0; +} + + /*@brief buildConstColFromFilter- change SimpleColumn into ConstColumn*/ /*********************************************************** * DESCRIPTION: @@ -8290,6 +8520,8 @@ int cp_get_group_plan(THD* thd, SCSEP& csep, cal_impl_if::cal_group_info& gi) * filter predicate is used, e.g. * field = 'AIR', field IN ('AIR'). This utility function tries to * replace such fields with ConstantColumns using cond_pushed filters. + * TBD Take into account that originalSC SimpleColumn could be: + * SimpleColumn, ArithmeticColumn, FunctionColumn. * PARAMETERS: * originalSC SimpleColumn* removed field * gwi main strucutre @@ -8470,7 +8702,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro else { // check foreign engine tables - bool infiniDB = (table_ptr->table ? isInfiniDB(table_ptr->table) : true); + bool infiniDB = (table_ptr->table ? isMCSTable(table_ptr->table) : true); // trigger system catalog cache if (infiniDB) @@ -9652,6 +9884,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro } // GROUP processing ends here + // ORDER BY processing starts here if (gwi.thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE) { ORDER* ordercol = reinterpret_cast(gi.groupByOrder); @@ -9752,26 +9985,9 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro // this ORDER BY item. if ( iter == gwi.groupByCols.end() ) { - Item_ident* iip = reinterpret_cast(ord_item); std::ostringstream ostream; - ostream << "'"; - - if (iip->db_name) - ostream << iip->db_name << '.'; - else - ostream << "unknown db" << '.'; - - if (iip->table_name) - ostream << iip->table_name << '.'; - else - ostream << "unknown table" << '.'; - - if (iip->field_name.length) - ostream << iip->field_name.str; - else - ostream << "unknown field"; - - ostream << "'"; + std::ostringstream& osr = ostream; + getColNameFromItem(osr, *ordercol->item); Message::Args args; args.add(ostream.str()); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args); diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index 1d3c98003..3c53c518a 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -80,6 +80,7 @@ using namespace execplan; using namespace dataconvert; #include "sm.h" +#include "ha_mcs_pushdown.h" #include "bytestream.h" #include "messagequeue.h" @@ -3019,7 +3020,7 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table) return rc; } -int ha_calpont_impl_rnd_end(TABLE* table) +int ha_calpont_impl_rnd_end(TABLE* table, bool is_pushdown_hand) { int rc = 0; THD* thd = current_thd; @@ -3047,6 +3048,12 @@ int ha_calpont_impl_rnd_end(TABLE* table) thd->infinidb_vtable.isNewQuery = true; + // Workaround because CS doesn't reset isUnion in a normal way. + if (is_pushdown_hand) + { + thd->infinidb_vtable.isUnion = false; + } + if (get_fe_conn_info_ptr() != NULL) ci = reinterpret_cast(get_fe_conn_info_ptr()); if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY ) @@ -5275,4 +5282,552 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* return rc; } + +/*@brief Initiate the query for derived_handler */ +/*********************************************************** + * DESCRIPTION: + * Execute the query and saves derived table query. + * There is an extra handler argument so I ended up with a + * new init function. The code is a copy of + * ha_calpont_impl_rnd_init() mostly. We should come up with + * a semi-universal structure that allows to save any + * extra data. + * PARAMETERS: + * void* handler either select_ or derived_handler + * TABLE* table - table where to save the results + * RETURN: + * rc as int + ***********************************************************/ +int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) +{ +#ifdef DEBUG_SETENV + string home(getenv("HOME")); + + if (!getenv("CALPONT_HOME")) + { + string calpontHome(home + "/Calpont/etc/"); + setenv("CALPONT_HOME", calpontHome.c_str(), 1); + } + + if (!getenv("CALPONT_CONFIG_FILE")) + { + string calpontConfigFile(home + "/Calpont/etc/Columnstore.xml"); + setenv("CALPONT_CONFIG_FILE", calpontConfigFile.c_str(), 1); + } + + if (!getenv("CALPONT_CSC_IDENT")) + setenv("CALPONT_CSC_IDENT", "dm", 1); + +#endif + + IDEBUG( cout << "pushdown_init for table " << endl ); + THD* thd = current_thd; + + //check whether the system is ready to process statement. +#ifndef _MSC_VER + static DBRM dbrm(true); + bool bSystemQueryReady = dbrm.getSystemQueryReady(); + + if (bSystemQueryReady == 0) + { + // Still not ready + setError(thd, ER_INTERNAL_ERROR, "The system is not yet ready to accept queries"); + thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR; + return ER_INTERNAL_ERROR; + } + else if (bSystemQueryReady < 0) + { + // Still not ready + setError(thd, ER_INTERNAL_ERROR, "DBRM is not responding. Cannot accept queries"); + thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR; + return ER_INTERNAL_ERROR; + } + +#endif + // prevent "create table as select" from running on slave + thd->infinidb_vtable.hasInfiniDBTable = true; + + /* If this node is the slave, ignore DML to IDB tables */ + if (thd->slave_thread && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + + // return error is error status is already set + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR) + return ER_INTERNAL_ERROR; + + // @bug 2232. Basic SP support. Error out non support sp cases. + // @bug 3939. Only error out for sp with select. Let pass for alter table in sp. + if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE) + { + setError(thd, ER_CHECK_NOT_IMPLEMENTED, "This stored procedure syntax is not supported by Columnstore in this version"); + thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR; + return ER_INTERNAL_ERROR; + } + + // mysql reads table twice for order by + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_PHASE1 || + thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY) + return 0; + + if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE ) + return 0; + + //Update and delete code + if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) + return doUpdateDelete(thd); + + uint32_t sessionID = tid2sid(thd->thread_id); + boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); + csc->identity(CalpontSystemCatalog::FE); + + if (!thd->infinidb_vtable.cal_conn_info) + thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + idbassert(ci != 0); + + // MySQL sometimes calls rnd_init multiple times, plan should only be + // generated and sent once. + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE && + !thd->infinidb_vtable.isNewQuery) + return 0; + + if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) + { + if (ci->cal_conn_hndl) + { + // send ExeMgr a signal before closing the connection + ByteStream msg; + ByteStream::quadbyte qb = 0; + msg << qb; + + try + { + ci->cal_conn_hndl->exeMgr->write(msg); + } + catch (...) + { + // canceling query. ignore connection failure. + } + + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + + return 0; + } + + sm::tableid_t tableid = 0; + cal_table_info ti; + sm::cpsm_conhdl_t* hndl; + SCSEP csep; + // Declare handlers ptrs in this scope for future use. + select_handler* sh = NULL; + derived_handler* dh = NULL; + + // update traceFlags according to the autoswitch state. replication query + // on slave are in table mode (create table as...) + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE || + (thd->slave_thread && thd->infinidb_vtable.vtable_state == THD::INFINIDB_INIT)) + { + ci->traceFlags |= CalpontSelectExecutionPlan::TRACE_TUPLE_OFF; + thd->infinidb_vtable.vtable_state = THD::INFINIDB_DISABLE_VTABLE; + } + else + { + ci->traceFlags = (ci->traceFlags | CalpontSelectExecutionPlan::TRACE_TUPLE_OFF)^ + CalpontSelectExecutionPlan::TRACE_TUPLE_OFF; + } + + bool localQuery = (thd->variables.infinidb_local_query > 0 ? true : false); + + { + //if (!ci->cal_conn_hndl || thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE) + if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE) + { + ci->stats.reset(); // reset query stats + ci->stats.setStartTime(); + ci->stats.fUser = thd->main_security_ctx.user; + + if (thd->main_security_ctx.host) + ci->stats.fHost = thd->main_security_ctx.host; + else if (thd->main_security_ctx.host_or_ip) + ci->stats.fHost = thd->main_security_ctx.host_or_ip; + else + ci->stats.fHost = "unknown"; + + try + { + ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser); + } + catch (std::exception& e) + { + string msg = string("Columnstore User Priority - ") + e.what(); + ci->warningMsg = msg; + } + + // if the previous query has error, re-establish the connection + if (ci->queryState != 0) + { + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + } + + sm::sm_init(sessionID, &ci->cal_conn_hndl, localQuery); + idbassert(ci->cal_conn_hndl != 0); + ci->cal_conn_hndl->csc = csc; + idbassert(ci->cal_conn_hndl->exeMgr != 0); + + try + { + ci->cal_conn_hndl->connect(); + } + catch (...) + { + setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR)); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto error; + } + + hndl = ci->cal_conn_hndl; + + if (thd->infinidb_vtable.vtable_state != THD::INFINIDB_SELECT_VTABLE) + { + if (!csep) + csep.reset(new CalpontSelectExecutionPlan()); + + SessionManager sm; + BRM::TxnID txnID; + txnID = sm.getTxnID(sessionID); + + if (!txnID.valid) + { + txnID.id = 0; + txnID.valid = true; + } + + QueryContext verID; + verID = sm.verID(); + + csep->txnID(txnID.id); + csep->verID(verID); + csep->sessionID(sessionID); + + if (thd->db.length) + csep->schemaName(thd->db.str); + + csep->traceFlags(ci->traceFlags); + + if (thd->infinidb_vtable.isInsertSelect) + csep->queryType(CalpontSelectExecutionPlan::INSERT_SELECT); + + // cast the handler and get a plan. + int status = 42; + if (handler_info->hndl_type == mcs_handler_types_t::SELECT) + { + sh = reinterpret_cast(handler_info->hndl_ptr); + status = cs_get_select_plan(sh, thd, csep); + } + else if (handler_info->hndl_type == DERIVED) + { + dh = reinterpret_cast(handler_info->hndl_ptr); + status = cs_get_derived_plan(dh, thd, csep); + } + + // WIP MCOL-2121 Find a way to return an actual error + // It either ends up with 42 or other error status + if (status > 0) + goto internal_error; + else if (status < 0) + return 0; + + // @bug 2547. don't need to send the plan if it's impossible where for all unions. + if (thd->infinidb_vtable.impossibleWhereOnUnion) + return 0; + + string query; + query.assign(idb_mysql_query_str(thd)); + //query.assign(thd->infinidb_vtable.original_query.ptr(), + // thd->infinidb_vtable.original_query.length()); + csep->data(query); + + try + { + csep->priority( ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser)); + } + catch (std::exception& e) + { + string msg = string("Columnstore User Priority - ") + e.what(); + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str()); + } + +#ifdef PLAN_HEX_FILE + // plan serialization + ifstream ifs("/tmp/li1-plan.hex"); + ByteStream bs1; + ifs >> bs1; + ifs.close(); + csep->unserialize(bs1); +#endif + + if (ci->traceFlags & 1) + { + cerr << "---------------- EXECUTION PLAN ----------------" << endl; + cerr << *csep << endl ; + cerr << "-------------- EXECUTION PLAN END --------------\n" << endl; + } + else + { + IDEBUG( cout << "---------------- EXECUTION PLAN ----------------" << endl ); + IDEBUG( cerr << *csep << endl ); + IDEBUG( cout << "-------------- EXECUTION PLAN END --------------\n" << endl ); + } + } + }// end of execution plan generation + + if (thd->infinidb_vtable.vtable_state != THD::INFINIDB_SELECT_VTABLE) + { + ByteStream msg; + ByteStream emsgBs; + + while (true) + { + try + { + ByteStream::quadbyte qb = 4; + msg << qb; + hndl->exeMgr->write(msg); + msg.restart(); + csep->rmParms(rmParms); + + //send plan + csep->serialize(msg); + hndl->exeMgr->write(msg); + + //get ExeMgr status back to indicate a vtable joblist success or not + msg.restart(); + emsgBs.restart(); + msg = hndl->exeMgr->read(); + emsgBs = hndl->exeMgr->read(); + string emsg; + + if (msg.length() == 0 || emsgBs.length() == 0) + { + emsg = "Lost connection to ExeMgr. Please contact your administrator"; + setError(thd, ER_INTERNAL_ERROR, emsg); + return ER_INTERNAL_ERROR; + } + + string emsgStr; + emsgBs >> emsgStr; + bool err = false; + + if (msg.length() == 4) + { + msg >> qb; + + if (qb != 0) + { + err = true; + // for makejoblist error, stats contains only error code and insert from here + // because table fetch is not started + ci->stats.setEndTime(); + ci->stats.fQuery = csep->data(); + ci->stats.fQueryType = csep->queryType(); + ci->stats.fErrorNo = qb; + + try + { + ci->stats.insert(); + } + catch (std::exception& e) + { + string msg = string("Columnstore Query Stats - ") + e.what(); + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str()); + } + } + } + else + { + err = true; + } + + if (err) + { + setError(thd, ER_INTERNAL_ERROR, emsgStr); + return ER_INTERNAL_ERROR; + } + + rmParms.clear(); + + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) + { + ci->tableMap[table] = ti; + } + else + { + ci->queryState = 1; + } + + break; + } + catch (...) + { + sm::sm_cleanup(hndl); + hndl = 0; + + sm::sm_init(sessionID, &hndl, localQuery); + idbassert(hndl != 0); + hndl->csc = csc; + + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) + ti.conn_hndl = hndl; + else + ci->cal_conn_hndl = hndl; + + try + { + hndl->connect(); + } + catch (...) + { + setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR)); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto error; + } + + msg.restart(); + } + } + } + + // set query state to be in_process. Sometimes mysql calls rnd_init multiple + // times, this makes sure plan only being generated and sent once. It will be + // reset when query finishes in sm::end_query + thd->infinidb_vtable.isNewQuery = false; + + // common path for both vtable select phase and table mode -- open scan handle + ti = ci->tableMap[table]; + // This is the server's temp table for the result. + if(sh) + { + ti.msTablePtr = sh->table; + } + else + { + ti.msTablePtr = dh->table; + } + + { + if (ti.tpl_ctx == 0) + { + ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); + } + + // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql + // call rnd_init for a table more than once. + ti.tpl_scan_ctx->rowGroup = NULL; + + try + { + tableid = execplan::IDB_VTABLE_ID; + } + catch (...) + { + string emsg = "No table ID found for table " + string(table->s->table_name.str); + setError(thd, ER_INTERNAL_ERROR, emsg); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto internal_error; + } + + try + { + sm::tpl_open(tableid, ti.tpl_ctx, hndl); + sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl); + } + catch (std::exception& e) + { + string emsg = "table can not be opened: " + string(e.what()); + setError(thd, ER_INTERNAL_ERROR, emsg); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto internal_error; + } + catch (...) + { + string emsg = "table can not be opened"; + setError(thd, ER_INTERNAL_ERROR, emsg); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto internal_error; + } + + ti.tpl_scan_ctx->traceFlags = ci->traceFlags; + + if ((ti.tpl_scan_ctx->ctp).size() == 0) + { + uint32_t num_attr = table->s->fields; + + for (uint32_t i = 0; i < num_attr; i++) + { + CalpontSystemCatalog::ColType ctype; + ti.tpl_scan_ctx->ctp.push_back(ctype); + } + + // populate coltypes here for table mode because tableband gives treeoid for dictionary column + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) + { + CalpontSystemCatalog::RIDList oidlist = csc->columnRIDs(make_table(table->s->db.str, table->s->table_name.str), true); + + if (oidlist.size() != num_attr) + { + string emsg = "Size mismatch probably caused by front end out of sync"; + setError(thd, ER_INTERNAL_ERROR, emsg); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto internal_error; + } + + for (unsigned int j = 0; j < oidlist.size(); j++) + { + CalpontSystemCatalog::ColType ctype = csc->colType(oidlist[j].objnum); + ti.tpl_scan_ctx->ctp[ctype.colPosition] = ctype; + ti.tpl_scan_ctx->ctp[ctype.colPosition].colPosition = -1; + } + } + } + } + + ci->tableMap[table] = ti; + return 0; + +error: + + if (ci->cal_conn_hndl) + { + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + + // do we need to close all connection handle of the table map? + return ER_INTERNAL_ERROR; + +internal_error: + + if (ci->cal_conn_hndl) + { + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + + return ER_INTERNAL_ERROR; +} // vim:sw=4 ts=4: diff --git a/dbcon/mysql/ha_calpont_impl.h b/dbcon/mysql/ha_calpont_impl.h index bdc0e0eef..7e8de5ce5 100644 --- a/dbcon/mysql/ha_calpont_impl.h +++ b/dbcon/mysql/ha_calpont_impl.h @@ -20,6 +20,7 @@ #define HA_CALPONT_IMPL_H__ #include "idb_mysql.h" +#include "ha_mcs_pushdown.h" #ifdef NEED_CALPONT_EXTERNS extern int ha_calpont_impl_discover_existence(const char* schema, const char* name); @@ -29,7 +30,7 @@ extern int ha_calpont_impl_open(const char* name, int mode, uint32_t test_if_loc extern int ha_calpont_impl_close(void); extern int ha_calpont_impl_rnd_init(TABLE* table); extern int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table); -extern int ha_calpont_impl_rnd_end(TABLE* table); +extern int ha_calpont_impl_rnd_end(TABLE* table, bool is_derived_hand = false); extern int ha_calpont_impl_write_row(uchar* buf, TABLE* table); extern void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table); extern int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table); @@ -45,6 +46,7 @@ extern int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos); extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table); extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table); extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table); +extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info , TABLE* table); #endif @@ -52,6 +54,7 @@ extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, #include "ha_calpont_impl_if.h" #include "calpontsystemcatalog.h" #include "ha_calpont.h" +#include "ha_mcs_pushdown.h" extern int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_impl_if::cal_connection_info& ci); extern int ha_calpont_impl_write_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci, ha_rows& rowsInserted); extern int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci); @@ -71,6 +74,7 @@ extern std::string ha_calpont_impl_cleartablelock( cal_impl_if::cal_connection_ extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table); extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table); extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table); +extern int ha_cs_impl_derived_next(TABLE* table); #endif #endif diff --git a/dbcon/mysql/ha_calpont_impl_if.h b/dbcon/mysql/ha_calpont_impl_if.h index 6d8945ad8..820f8d430 100644 --- a/dbcon/mysql/ha_calpont_impl_if.h +++ b/dbcon/mysql/ha_calpont_impl_if.h @@ -337,14 +337,16 @@ const std::string infinidb_err_msg = "\nThe query includes syntax that is not su int cp_get_plan(THD* thd, execplan::SCSEP& csep); int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_info& ti); int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_info& gi); -int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false); +int cs_get_derived_plan(derived_handler* handler, THD* thd, SCSEP& csep); +int cs_get_select_plan(select_handler* handler, THD* thd, SCSEP& csep); +int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false, bool isPushdownHand = false); int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi, bool isUnion = false); void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi); void setError(THD* thd, uint32_t errcode, const std::string errmsg); void gp_walk(const Item* item, void* arg); void parse_item (Item* item, std::vector& field_vec, bool& hasNonSupportItem, uint16& parseInfo, gp_walk_info* gwip = NULL); const std::string bestTableName(const Item_field* ifp); -bool isInfiniDB(TABLE* table_ptr); +bool isMCSTable(TABLE* table_ptr); // execution plan util functions prototypes execplan::ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupport, bool pushdownHand = false); diff --git a/dbcon/mysql/ha_mcs_pushdown.cpp b/dbcon/mysql/ha_mcs_pushdown.cpp new file mode 100644 index 000000000..bc4673a11 --- /dev/null +++ b/dbcon/mysql/ha_mcs_pushdown.cpp @@ -0,0 +1,661 @@ +/* + Copyright (c) 2019 MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +// ha_calpont.cpp includes this file. + +/*@brief check_walk - It traverses filter conditions*/ +/************************************************************ + * DESCRIPTION: + * It traverses filter predicates looking for unsupported + * JOIN types: non-equi JOIN, e.g t1.c1 > t2.c2; + * logical OR. + * PARAMETERS: + * thd - THD pointer. + * derived - TABLE_LIST* to work with. + * RETURN: + * derived_handler if possible + * NULL in other case + ***********************************************************/ +void check_walk(const Item* item, void* arg) +{ + bool* unsupported_feature = static_cast(arg); + if ( *unsupported_feature ) + return; + switch (item->type()) + { + case Item::FUNC_ITEM: + { + const Item_func* ifp = static_cast(item); + + if ( ifp->functype() != Item_func::EQ_FUNC ) // NON-equi JOIN + { + if ( ifp->argument_count() == 2 && + ifp->arguments()[0]->type() == Item::FIELD_ITEM && + ifp->arguments()[1]->type() == Item::FIELD_ITEM ) + { + Item_field* left= static_cast(ifp->arguments()[0]); + Item_field* right= static_cast(ifp->arguments()[1]); + + if ( left->field->table != right->field->table ) + { + *unsupported_feature = true; + return; + } + } + else // IN + correlated subquery + { + if ( ifp->functype() == Item_func::NOT_FUNC + && ifp->arguments()[0]->type() == Item::EXPR_CACHE_ITEM ) + { + check_walk(ifp->arguments()[0], arg); + } + } + } + break; + } + + case Item::EXPR_CACHE_ITEM: // IN + correlated subquery + { + const Item_cache_wrapper* icw = static_cast(item); + if ( icw->get_orig_item()->type() == Item::FUNC_ITEM ) + { + const Item_func *ifp = static_cast(icw->get_orig_item()); + if ( ifp->argument_count() == 2 && + ( ifp->arguments()[0]->type() == Item::Item::SUBSELECT_ITEM + || ifp->arguments()[1]->type() == Item::Item::SUBSELECT_ITEM )) + { + *unsupported_feature = true; + return; + } + } + break; + } + + case Item::COND_ITEM: // OR in JOIN conds is unsupported yet + { + Item_cond* icp = (Item_cond*)item; + if ( is_cond_or(icp) ) + { + *unsupported_feature = true; + } + break; + } + default: + { + break; + } + } +} + +/*@brief create_calpont_group_by_handler- Creates handler*/ +/*********************************************************** + * DESCRIPTION: + * Creates a group_by pushdown handler if there is no: + * non-equi JOIN, e.g * t1.c1 > t2.c2 + * logical OR in the filter predicates + * Impossible WHERE + * Impossible HAVING + * and there is either GROUP BY or aggregation function + * exists at the top level. + * Valid queries with the last two crashes the server if + * processed. + * Details are in server/sql/group_by_handler.h + * PARAMETERS: + * thd - THD pointer + * query - Query structure LFM in group_by_handler.h + * RETURN: + * group_by_handler if success + * NULL in other case + ***********************************************************/ +static group_by_handler* +create_calpont_group_by_handler(THD* thd, Query* query) +{ + ha_calpont_group_by_handler* handler = NULL; + // same as thd->lex->current_select + SELECT_LEX *select_lex = query->from->select_lex; + + // Create a handler if query is valid. See comments for details. + if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE + && ( thd->variables.infinidb_vtable_mode == 0 + || thd->variables.infinidb_vtable_mode == 2 ) + && ( query->group_by || select_lex->with_sum_func ) ) + { + bool unsupported_feature = false; + // revisit SELECT_LEX for all units + for(TABLE_LIST* tl = query->from; !unsupported_feature && tl; tl = tl->next_global) + { + select_lex = tl->select_lex; + // Correlation subquery. Comming soon so fail on this yet. + unsupported_feature = select_lex->is_correlated; + + // Impossible HAVING or WHERE + if ( ( !unsupported_feature && query->having && select_lex->having_value == Item::COND_FALSE ) + || ( select_lex->cond_count > 0 + && select_lex->cond_value == Item::COND_FALSE ) ) + { + unsupported_feature = true; + } + + // Unsupported JOIN conditions + if ( !unsupported_feature ) + { + JOIN *join = select_lex->join; + Item_cond *icp = 0; + + if (join != 0) + icp = reinterpret_cast(join->conds); + + if ( unsupported_feature == false + && icp ) + { + icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); + } + + // Optimizer could move some join conditions into where + if (select_lex->where != 0) + icp = reinterpret_cast(select_lex->where); + + if ( unsupported_feature == false + && icp ) + { + icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); + } + + } + } // unsupported features check ends here + + if ( !unsupported_feature ) + { + handler = new ha_calpont_group_by_handler(thd, query); + + // Notify the server, that CS handles GROUP BY, ORDER BY and HAVING clauses. + query->group_by = NULL; + query->order_by = NULL; + query->having = NULL; + } + } + + return handler; +} + +/*@brief create_columnstore_derived_handler- Creates handler*/ +/************************************************************ + * DESCRIPTION: + * Creates a derived handler if there is no non-equi JOIN, e.g + * t1.c1 > t2.c2 and logical OR in the filter predicates. + * More details in server/sql/derived_handler.h + * PARAMETERS: + * thd - THD pointer. + * derived - TABLE_LIST* to work with. + * RETURN: + * derived_handler if possible + * NULL in other case + ***********************************************************/ +static derived_handler* +create_columnstore_derived_handler(THD* thd, TABLE_LIST *derived) +{ + ha_columnstore_derived_handler* handler = NULL; + handlerton *ht= 0; + + SELECT_LEX_UNIT *unit= derived->derived; + + if ( thd->infinidb_vtable.vtable_state != THD::INFINIDB_DISABLE_VTABLE + && thd->variables.infinidb_vtable_mode != 0 ) + { + return 0; + } + + for (SELECT_LEX *sl= unit->first_select(); sl; sl= sl->next_select()) + { + if (!(sl->join)) + return 0; + for (TABLE_LIST *tbl= sl->join->tables_list; tbl; tbl= tbl->next_local) + { + if (!tbl->table) + return 0; + // Same handlerton type check. + if (!ht) + ht= tbl->table->file->partition_ht(); + else if (ht != tbl->table->file->partition_ht()) + return 0; + } + } + + bool unsupported_feature = false; + { + SELECT_LEX select_lex = *unit->first_select(); + JOIN* join = select_lex.join; + Item_cond* icp = 0; + + if (join != 0) + icp = reinterpret_cast(join->conds); + + if (!join) + { + icp = reinterpret_cast(select_lex.where); + } + + if ( icp ) + { + icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); + } + } + + if ( !unsupported_feature ) + handler= new ha_columnstore_derived_handler(thd, derived); + + return handler; +} + +/*********************************************************** + * DESCRIPTION: + * derived_handler constructor + * PARAMETERS: + * thd - THD pointer. + * tbl - tables involved. + ***********************************************************/ +ha_columnstore_derived_handler::ha_columnstore_derived_handler(THD *thd, + TABLE_LIST *dt) + : derived_handler(thd, calpont_hton) +{ + derived = dt; +} + +/*********************************************************** + * DESCRIPTION: + * derived_handler destructor + ***********************************************************/ +ha_columnstore_derived_handler::~ha_columnstore_derived_handler() +{} + +/*@brief Initiate the query for derived_handler */ +/*********************************************************** + * DESCRIPTION: + * Execute the query and saves derived table query. + * ATM this function sets vtable_state and restores it afterwards + * since it reuses existed vtable code internally. + * PARAMETERS: + * + * RETURN: + * rc as int + ***********************************************************/ +int ha_columnstore_derived_handler::init_scan() +{ + char query_buff[4096]; + + DBUG_ENTER("ha_columnstore_derived_handler::init_scan"); + + // Save query for logging + String derived_query(query_buff, sizeof(query_buff), thd->charset()); + derived_query.length(0); + derived->derived->print(&derived_query, QT_ORDINARY); + + // Save vtable_state to restore the after we inited. + THD::infinidb_state oldState = thd->infinidb_vtable.vtable_state; + thd->infinidb_vtable.vtable_state = THD::INFINIDB_CREATE_VTABLE; + + mcs_handler_info mhi = mcs_handler_info(static_cast(this), DERIVED); + // this::table is the place for the result set + int rc = ha_cs_impl_pushdown_init(&mhi, table); + + thd->infinidb_vtable.vtable_state = oldState; + + DBUG_RETURN(rc); +} + +/*@brief Fetch next row for derived_handler */ +/*********************************************************** + * DESCRIPTION: + * Fetches next row and saves it in the temp table + * ATM this function sets vtable_state and restores it + * afterwards since it reuses existed vtable code internally. + * PARAMETERS: + * + * RETURN: + * rc as int + * + ***********************************************************/ +int ha_columnstore_derived_handler::next_row() +{ + DBUG_ENTER("ha_columnstore_derived_handler::next_row"); + + // Save vtable_state to restore the after we inited. + THD::infinidb_state oldState = thd->infinidb_vtable.vtable_state; + + thd->infinidb_vtable.vtable_state = THD::INFINIDB_CREATE_VTABLE; + + int rc = ha_calpont_impl_rnd_next(table->record[0], table); + + thd->infinidb_vtable.vtable_state = oldState; + + DBUG_RETURN(rc); +} + +/*@brief Finishes the scan and clean it up */ +/*********************************************************** + * DESCRIPTION: + * Finishes the scan for derived handler + * ATM this function sets vtable_state and restores it + * afterwards since it reuses existed vtable code internally. + * PARAMETERS: + * + * RETURN: + * rc as int + * + ***********************************************************/ +int ha_columnstore_derived_handler::end_scan() +{ + DBUG_ENTER("ha_columnstore_derived_handler::end_scan"); + + THD::infinidb_state oldState = thd->infinidb_vtable.vtable_state; + thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE; + + int rc = ha_calpont_impl_rnd_end(table, true); + + thd->infinidb_vtable.vtable_state = oldState; + + DBUG_RETURN(rc); +} + +void ha_columnstore_derived_handler::print_error(int, unsigned long) +{ +} + +/*********************************************************** + * DESCRIPTION: + * GROUP BY handler constructor + * PARAMETERS: + * thd - THD pointer. + * query - Query describing structure + ***********************************************************/ +ha_calpont_group_by_handler::ha_calpont_group_by_handler(THD* thd_arg, Query* query) + : group_by_handler(thd_arg, calpont_hton), + select(query->select), + table_list(query->from), + distinct(query->distinct), + where(query->where), + group_by(query->group_by), + order_by(query->order_by), + having(query->having) +{ +} + +/*********************************************************** + * DESCRIPTION: + * GROUP BY destructor + ***********************************************************/ +ha_calpont_group_by_handler::~ha_calpont_group_by_handler() +{ +} + +/*********************************************************** + * DESCRIPTION: + * Makes the plan and prepares the data + * RETURN: + * int rc + ***********************************************************/ +int ha_calpont_group_by_handler::init_scan() +{ + DBUG_ENTER("ha_calpont_group_by_handler::init_scan"); + + // Save vtable_state to restore the after we inited. + THD::infinidb_state oldState = thd->infinidb_vtable.vtable_state; + // MCOL-1052 Should be removed after cleaning the code up. + thd->infinidb_vtable.vtable_state = THD::INFINIDB_CREATE_VTABLE; + int rc = ha_calpont_impl_group_by_init(this, table); + thd->infinidb_vtable.vtable_state = oldState; + + DBUG_RETURN(rc); +} + +/*********************************************************** + * DESCRIPTION: + * Fetches a row and saves it to a temporary table. + * RETURN: + * int rc + ***********************************************************/ +int ha_calpont_group_by_handler::next_row() +{ + DBUG_ENTER("ha_calpont_group_by_handler::next_row"); + int rc = ha_calpont_impl_group_by_next(this, table); + + DBUG_RETURN(rc); +} + +/*********************************************************** + * DESCRIPTION: + * Shuts the scan down. + * RETURN: + * int rc + ***********************************************************/ +int ha_calpont_group_by_handler::end_scan() +{ + DBUG_ENTER("ha_calpont_group_by_handler::end_scan"); + + int rc = ha_calpont_impl_group_by_end(this, table); + + DBUG_RETURN(rc); +} + +/*@brief create_columnstore_select_handler- Creates handler*/ +/************************************************************ + * DESCRIPTION: + * Creates a select handler if there is no non-equi JOIN, e.g + * t1.c1 > t2.c2 and logical OR in the filter predicates. + * More details in server/sql/select_handler.h + * PARAMETERS: + * thd - THD pointer. + * sel - SELECT_LEX* that describes the query. + * RETURN: + * select_handler if possible + * NULL in other case + ***********************************************************/ +static select_handler* +create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex) +{ + ha_columnstore_select_handler* handler = NULL; + handlerton *ht= 0; + + // Return if vtable enabled. + if ( thd->infinidb_vtable.vtable_state != THD::INFINIDB_DISABLE_VTABLE + && thd->variables.infinidb_vtable_mode != 0 ) + { + return 0; + } + for (SELECT_LEX* sl = select_lex;sl; sl= sl->next_select()) + { + if (!(sl->join)) + return 0; + for (TABLE_LIST *tbl= sl->join->tables_list; tbl; tbl= tbl->next_local) + { + if (!tbl->table) + return 0; + // Same handlerton type check. + if (!ht) + ht= tbl->table->file->partition_ht(); + else if (ht != tbl->table->file->partition_ht()) + return 0; + } + } + + bool unsupported_feature = false; + // Impossible HAVING or WHERE + if ( ( select_lex->having && select_lex->having_value == Item::COND_FALSE ) + || ( select_lex->cond_count > 0 + && select_lex->cond_value == Item::COND_FALSE ) ) + { + unsupported_feature = true; + } + + // Unsupported query check. + if ( !unsupported_feature ) + { + // JOIN expression from WHERE, ON expressions + JOIN* join = select_lex->join; + Item_cond* where_icp = 0; + Item_cond* on_icp = 0; + + if (join != 0) + { + where_icp = reinterpret_cast(join->conds); + } + + if ( where_icp ) + { + where_icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); + } + + // Looking for JOIN with ON expression through + // TABLE_LIST in FROM until CS meets unsupported feature + TABLE_LIST* table_ptr = select_lex->get_table_list(); + for (; !unsupported_feature && table_ptr; table_ptr = table_ptr->next_global) + { + if(table_ptr->on_expr) + { + on_icp = reinterpret_cast(table_ptr->on_expr); + on_icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX); + } + } + + // CROSS JOIN w/o conditions isn't supported until MCOL-301 + // is ready. + if (join && join->table_count >= 2 && ( !where_icp && !on_icp )) + { + unsupported_feature = true; + } + } + + if (!unsupported_feature) + { + handler = new ha_columnstore_select_handler(thd, select_lex); + } + + return handler; +} + +/*********************************************************** + * DESCRIPTION: + * select_handler constructor + * PARAMETERS: + * thd - THD pointer. + * select_lex - sematic tree for the query. + ***********************************************************/ +ha_columnstore_select_handler::ha_columnstore_select_handler(THD *thd, + SELECT_LEX* select_lex) + : select_handler(thd, calpont_hton) +{ + select = select_lex; +} + +/*********************************************************** + * DESCRIPTION: + * select_handler constructor + ***********************************************************/ +ha_columnstore_select_handler::~ha_columnstore_select_handler() +{} + +/*@brief Initiate the query for select_handler */ +/*********************************************************** + * DESCRIPTION: + * Execute the query and saves select table query. + * ATM this function sets vtable_state and restores it afterwards + * since it reuses existed vtable code internally. + * PARAMETERS: + * + * RETURN: + * rc as int + ***********************************************************/ +int ha_columnstore_select_handler::init_scan() +{ + char query_buff[4096]; + + DBUG_ENTER("ha_columnstore_select_handler::init_scan"); + + // Save query for logging + String select_query(query_buff, sizeof(query_buff), thd->charset()); + select_query.length(0); + select->print(thd, &select_query, QT_ORDINARY); + + // Save vtable_state to restore the after we inited. + THD::infinidb_state oldState = thd->infinidb_vtable.vtable_state; + thd->infinidb_vtable.vtable_state = THD::INFINIDB_CREATE_VTABLE; + + mcs_handler_info mhi = mcs_handler_info(static_cast(this), SELECT); + // this::table is the place for the result set + int rc = ha_cs_impl_pushdown_init(&mhi, table); + + thd->infinidb_vtable.vtable_state = oldState; + + DBUG_RETURN(rc); +} + +/*@brief Fetch next row for select_handler */ +/*********************************************************** + * DESCRIPTION: + * Fetches next row and saves it in the temp table + * ATM this function sets vtable_state and restores it + * afterwards since it reuses existed vtable code internally. + * PARAMETERS: + * + * RETURN: + * rc as int + * + ***********************************************************/ +int ha_columnstore_select_handler::next_row() +{ + DBUG_ENTER("ha_columnstore_select_handler::next_row"); + + // Save vtable_state to restore the after we inited. + THD::infinidb_state oldState = thd->infinidb_vtable.vtable_state; + + thd->infinidb_vtable.vtable_state = THD::INFINIDB_CREATE_VTABLE; + + int rc = ha_calpont_impl_rnd_next(table->record[0], table); + + thd->infinidb_vtable.vtable_state = oldState; + + DBUG_RETURN(rc); +} + +/*@brief Finishes the scan and clean it up */ +/*********************************************************** + * DESCRIPTION: + * Finishes the scan for select handler + * ATM this function sets vtable_state and restores it + * afterwards since it reuses existed vtable code internally. + * PARAMETERS: + * + * RETURN: + * rc as int + * + ***********************************************************/ +int ha_columnstore_select_handler::end_scan() +{ + DBUG_ENTER("ha_columnstore_select_handler::end_scan"); + + THD::infinidb_state oldState = thd->infinidb_vtable.vtable_state; + thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE; + + int rc = ha_calpont_impl_rnd_end(table, true); + + thd->infinidb_vtable.vtable_state = oldState; + + DBUG_RETURN(rc); +} + +void ha_columnstore_select_handler::print_error(int, unsigned long) +{} diff --git a/dbcon/mysql/ha_mcs_pushdown.h b/dbcon/mysql/ha_mcs_pushdown.h new file mode 100644 index 000000000..2849cb530 --- /dev/null +++ b/dbcon/mysql/ha_mcs_pushdown.h @@ -0,0 +1,142 @@ +/* + Copyright (c) 2019 MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#ifndef HA_MCS_PUSH +#define HA_MCS_PUSH + +#include "idb_mysql.h" +#include "ha_calpont.h" + +enum mcs_handler_types_t +{ + SELECT, + DERIVED, + GROUP_BY, + LEGACY +}; + +struct mcs_handler_info +{ + mcs_handler_info() : hndl_ptr(NULL), hndl_type(LEGACY) { }; + mcs_handler_info(mcs_handler_types_t type) : hndl_ptr(NULL), hndl_type(type) { }; + mcs_handler_info(void* ptr, mcs_handler_types_t type) : hndl_ptr(ptr), hndl_type(type) { }; + ~mcs_handler_info() { }; + void* hndl_ptr; + mcs_handler_types_t hndl_type; +}; + +/*@brief group_by_handler class*/ +/*********************************************************** + * DESCRIPTION: + * Provides server with group_by_handler API methods. + * One should read comments in server/sql/group_by_handler.h + * Attributes: + * select - attribute contains all GROUP BY, HAVING, ORDER items and calls it + * an extended SELECT list according to comments in + * server/sql/group_handler.cc. + * So the temporary table for + * select count(*) from b group by a having a > 3 order by a + * will have 4 columns not 1. + * However server ignores all NULLs used in + * GROUP BY, HAVING, ORDER. + * select_list_descr - contains Item description returned by Item->print() + * that is used in lookup for corresponding columns in + * extended SELECT list. + * table_list - contains all tables involved. Must be CS tables only. + * distinct - looks like a useless thing for now. Couldn't get it set by server. + * where - where items. + * group_by - group by ORDER items. + * order_by - order by ORDER items. + * having - having Item. + * Methods: + * init_scan - get plan and send it to ExeMgr. Get the execution result. + * next_row - get a row back from sm. + * end_scan - finish and clean the things up. + ***********************************************************/ +class ha_calpont_group_by_handler: public group_by_handler +{ +public: + ha_calpont_group_by_handler(THD* thd_arg, Query* query); + ~ha_calpont_group_by_handler(); + int init_scan(); + int next_row(); + int end_scan(); + + List* select; + TABLE_LIST* table_list; + bool distinct; + Item* where; + ORDER* group_by; + ORDER* order_by; + Item* having; +}; + +/*@brief derived_handler class*/ +/*********************************************************** + * DESCRIPTION: + * derived_handler API methods. Could be used by the server + * tp process sub-queries. + * More details in server/sql/dervied_handler.h + * INFINIDB_SHARE* hton share + * tbl in the constructor is the list of the tables involved. + * Methods: + * init_scan - get plan and send it to ExeMgr. Get the execution result. + * next_row - get a row back from sm. + * end_scan - finish and clean the things up. + ***********************************************************/ +class ha_columnstore_derived_handler: public derived_handler +{ +private: + INFINIDB_SHARE *share; + +public: + ha_columnstore_derived_handler(THD* thd_arg, TABLE_LIST *tbl); + ~ha_columnstore_derived_handler(); + int init_scan(); + int next_row(); + int end_scan(); + void print_error(int, unsigned long); +}; + +/*@brief select_handler class*/ +/*********************************************************** + * DESCRIPTION: + * select_handler API methods. Could be used by the server + * tp pushdown the whole query described by SELECT_LEX. + * More details in server/sql/select_handler.h + * INFINIDB_SHARE* hton share + * sel in the constructor is the semantic tree for the query. + * Methods: + * init_scan - get plan and send it to ExeMgr. Get the execution result. + * next_row - get a row back from sm. + * end_scan - finish and clean the things up. + ***********************************************************/ +class ha_columnstore_select_handler: public select_handler +{ +private: + INFINIDB_SHARE *share; + +public: + ha_columnstore_select_handler(THD* thd_arg, SELECT_LEX* sel); + ~ha_columnstore_select_handler(); + int init_scan(); + int next_row(); + int end_scan(); + void print_error(int, unsigned long); +}; + +#endif diff --git a/dbcon/mysql/ha_view.cpp b/dbcon/mysql/ha_view.cpp index 764c2c5c5..3bc49b2a8 100644 --- a/dbcon/mysql/ha_view.cpp +++ b/dbcon/mysql/ha_view.cpp @@ -117,7 +117,7 @@ void View::transform() else { // check foreign engine tables - bool infiniDB = (table_ptr->table ? isInfiniDB(table_ptr->table) : true); + bool infiniDB = (table_ptr->table ? isMCSTable(table_ptr->table) : true); // trigger system catalog cache if (infiniDB) diff --git a/dbcon/mysql/idb_mysql.h b/dbcon/mysql/idb_mysql.h index 25f3effcf..cb842d740 100644 --- a/dbcon/mysql/idb_mysql.h +++ b/dbcon/mysql/idb_mysql.h @@ -70,6 +70,8 @@ template bool isnan(T); #include "item_windowfunc.h" #include "sql_cte.h" #include "tztime.h" +#include "derived_handler.h" +#include "select_handler.h" // Now clean up the pollution as best we can... #undef min