From cff504c8bfbde147981e34199fdc60a7dbb1a365 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Thu, 22 Mar 2018 17:12:56 +0300 Subject: [PATCH] MCOL-1052. init_scan() initial implementation. --- dbcon/mysql/ha_calpont.cpp | 59 +++++++++-- dbcon/mysql/ha_calpont.h | 29 +++--- dbcon/mysql/ha_calpont_impl.cpp | 173 ++++++++++++++++++++++++++++++++ dbcon/mysql/ha_calpont_impl.h | 4 + 4 files changed, 246 insertions(+), 19 deletions(-) diff --git a/dbcon/mysql/ha_calpont.cpp b/dbcon/mysql/ha_calpont.cpp index 533d99e76..3c5f9a41e 100644 --- a/dbcon/mysql/ha_calpont.cpp +++ b/dbcon/mysql/ha_calpont.cpp @@ -1139,28 +1139,73 @@ static MYSQL_SYSVAR_ULONG( 0); #endif +/*@brief ha_calpont_impl_group_by_init - Get data for MariaDB group_by + pushdown handler +*/ +/*********************************************************** + * DESCRIPTION: + * Creates a group_by pushdown handler. + * Details are in server/sql/group_by_handler.h + * PARAMETERS: + * thd - THD pointer. + * query - Query structure, that describes the pushdown query. + * RETURN: + * group_by_handler if success + * NULL in other case + ***********************************************************/ static group_by_handler * create_calpont_group_by_handler(THD *thd, Query *query) { - ha_calpont_group_by_handler *handler; - Item *item; - List_iterator_fast it(*query->select); + ha_calpont_group_by_handler *handler = NULL; + Item *item; + List_iterator_fast it(*query->select); - handler= new ha_calpont_group_by_handler(thd, query->select, query->from); - return handler; + if ( thd->infinidb_vtable.vtable_state != THD::INFINIDB_DISABLE_VTABLE ) + { + handler = new ha_calpont_group_by_handler(thd, query); + } + + return handler; +} + +int ha_calpont_group_by_handler::init_scan() +{ + DBUG_ENTER("ha_calpont_group_by_handler::init_scan"); + + int rc = ha_calpont_impl_group_by_init(query, table); + + DBUG_RETURN(rc); +// return 0; } int ha_calpont_group_by_handler::next_row() { - if (!first_row) - return(HA_ERR_END_OF_FILE); +// if (!first_row) +// return(HA_ERR_END_OF_FILE); + DBUG_ENTER("ha_calpont_group_by_handler::next_row"); + int rc = ha_calpont_impl_group_by_next(query, table); + + DBUG_RETURN(rc); +/* first_row= 0; Field *field = *(table->field); field->store(5LL, 1); field->set_notnull(); return(0); +*/ } +int ha_calpont_group_by_handler::end_scan() +{ + DBUG_ENTER("ha_calpont_group_by_handler::end_scan"); + + int rc = ha_calpont_impl_group_by_end(query, table); + + DBUG_RETURN(rc); +// return 0; +} + + static struct st_mysql_sys_var* calpont_system_variables[] = { // MYSQL_SYSVAR(enum_var), diff --git a/dbcon/mysql/ha_calpont.h b/dbcon/mysql/ha_calpont.h index b187c03d4..660af8d0a 100644 --- a/dbcon/mysql/ha_calpont.h +++ b/dbcon/mysql/ha_calpont.h @@ -250,19 +250,24 @@ public: class ha_calpont_group_by_handler: public group_by_handler { - List *fields; - TABLE_LIST *table_list; - bool first_row; + public: +// ha_calpont_group_by_handler(THD *thd_arg, List *fields_arg, +// TABLE_LIST *table_list_arg, Query *query) + ha_calpont_group_by_handler(THD *thd_arg, Query *query) + : group_by_handler(thd_arg, calpont_hton), fields(query->select), + table_list(query->from), query(query) {} + ~ha_calpont_group_by_handler() {} + int init_scan(); + int next_row(); + int end_scan(); + + private: + List *fields; + TABLE_LIST *table_list; + bool first_row; + Query *query; + -public: - ha_calpont_group_by_handler(THD *thd_arg, List *fields_arg, -TABLE_LIST *table_list_arg) - : group_by_handler(thd_arg, calpont_hton), fields(fields_arg), - table_list(table_list_arg) {} - ~ha_calpont_group_by_handler() {} - int init_scan() { first_row= true ; return 0; } - int next_row(); - int end_scan() { return 0; } }; #endif //HA_CALPONT_H__ diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index 8078d452e..6662b8394 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -4957,5 +4957,178 @@ int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos) return ER_INTERNAL_ERROR; } +/*@brief ha_calpont_impl_group_by_init - Get data for MariaDB group_by + pushdown handler +*/ +/*********************************************************** + * DESCRIPTION: + * Prepare data for group_by_handler::next_row() calls. + * PARAMETERS: + * query - Query structure, that describes the pushdown query. + * table - TABLE pointer The table to save the result set in. + * RETURN: + * 0 if success + * others if something went wrong whilst getting the result set + ***********************************************************/ +int ha_calpont_impl_group_by_init(Query* query, TABLE* table) +{ + //first_row= true; + + return(0); +} + +/*@brief ha_calpont_impl_group_by_next - Return result set for MariaDB group_by + pushdown handler +*/ +/*********************************************************** + * DESCRIPTION: + * Return a result record for each group_by_handler::next_row() call. + * PARAMETERS: + * query - Query structure, that describes the pushdown query. + * table - TABLE pointer The table to save the result set in. + * RETURN: + * 0 if success + * HA_ERR_END_OF_FILE if the record set has come to an end + * others if something went wrong whilst getting the result set + ***********************************************************/ +int ha_calpont_impl_group_by_next(Query* query, TABLE* table) +{ + //if (!first_row) + //return(HA_ERR_END_OF_FILE); + + //first_row = 0; + //Field *field = *(table->field); + //field->store(5LL, 1); + //field->set_notnull(); + //return(0); + THD* thd = current_thd; + + /* If this node is the slave, ignore DML to IDB tables */ + if (thd->slave_thread && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + + + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR) + return ER_INTERNAL_ERROR; + + // @bug 3005 + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE && + string(table->s->table_name.str).find("$vtable") != 0) + return HA_ERR_END_OF_FILE; + + if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || + ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) + return HA_ERR_END_OF_FILE; + + // @bug 2547 + if (thd->infinidb_vtable.impossibleWhereOnUnion) + return HA_ERR_END_OF_FILE; + + // @bug 2232. Basic SP support + // @bug 3939. Only error out for sp with select. Let pass for alter table in sp. + if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE) + { + setError(thd, ER_CHECK_NOT_IMPLEMENTED, "This stored procedure syntax is not supported by Columnstore in this version"); + thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR; + return ER_INTERNAL_ERROR; + } + + if (!thd->infinidb_vtable.cal_conn_info) + thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + // @bug 3078 + if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) + { + if (ci->cal_conn_hndl) + { + // send ExeMgr a signal before cloing the connection + ByteStream msg; + ByteStream::quadbyte qb = 0; + msg << qb; + + try + { + ci->cal_conn_hndl->exeMgr->write(msg); + } + catch (...) + { + // cancel query. ignore connection failure. + } + + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + + return 0; + } + + if (ci->alterTableState > 0) return HA_ERR_END_OF_FILE; + + cal_table_info ti; + ti = ci->tableMap[table]; + int rc = HA_ERR_END_OF_FILE; + + if (!ti.tpl_ctx || !ti.tpl_scan_ctx) + { + CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id)); + return ER_INTERNAL_ERROR; + } + + idbassert(ti.msTablePtr == table); + + try + { + //rc = fetchNextRow(buf, ti, ci); + rc = 0; + } + catch (std::exception& e) + { + string emsg = string("Error while fetching from ExeMgr: ") + e.what(); + setError(thd, ER_INTERNAL_ERROR, emsg); + CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id)); + return ER_INTERNAL_ERROR; + } + + ci->tableMap[table] = ti; + + if (rc != 0 && rc != HA_ERR_END_OF_FILE) + { + string emsg; + + // remove this check when all error handling migrated to the new framework. + if (rc >= 1000) + emsg = ti.tpl_scan_ctx->errMsg; + else + { + logging::ErrorCodes errorcodes; + emsg = errorcodes.errorString(rc); + } + + setError(thd, ER_INTERNAL_ERROR, emsg); + //setError(thd, ER_INTERNAL_ERROR, "testing"); + ci->stats.fErrorNo = rc; + CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id)); + rc = ER_INTERNAL_ERROR; + } + + return rc; +} + +int ha_calpont_impl_group_by_end(Query* query, TABLE* table) +{ + return 0; +} + + // vim:sw=4 ts=4: diff --git a/dbcon/mysql/ha_calpont_impl.h b/dbcon/mysql/ha_calpont_impl.h index 1e7f7d5df..f6bea072c 100644 --- a/dbcon/mysql/ha_calpont_impl.h +++ b/dbcon/mysql/ha_calpont_impl.h @@ -48,6 +48,10 @@ extern int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type); extern int ha_calpont_impl_update_row(); extern int ha_calpont_impl_delete_row(); extern int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos); +extern int ha_calpont_impl_group_by_init(Query* query, TABLE* table); +extern int ha_calpont_impl_group_by_next(Query* query, TABLE* table); +extern int ha_calpont_impl_group_by_end(Query* query, TABLE* table); + #endif #ifdef NEED_CALPONT_INTERFACE