From fa4067b6f082faab4e18a1bf2019dcaa215d0d02 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Tue, 27 Mar 2018 18:37:00 +0300 Subject: [PATCH] MCOL-1052 Generate execution plan for a aggregated function query call. --- dbcon/mysql/ha_calpont.cpp | 9 +- dbcon/mysql/ha_calpont.h | 4 - dbcon/mysql/ha_calpont_execplan.cpp | 130 +++- dbcon/mysql/ha_calpont_impl.cpp | 1024 ++++++++++++++++++++++++++- dbcon/mysql/ha_calpont_impl.h | 10 +- dbcon/mysql/ha_calpont_impl_if.h | 5 +- 6 files changed, 1159 insertions(+), 23 deletions(-) diff --git a/dbcon/mysql/ha_calpont.cpp b/dbcon/mysql/ha_calpont.cpp index 3c5f9a41e..e0e835c4b 100644 --- a/dbcon/mysql/ha_calpont.cpp +++ b/dbcon/mysql/ha_calpont.cpp @@ -1160,7 +1160,7 @@ create_calpont_group_by_handler(THD *thd, Query *query) Item *item; List_iterator_fast it(*query->select); - if ( thd->infinidb_vtable.vtable_state != THD::INFINIDB_DISABLE_VTABLE ) + if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE ) { handler = new ha_calpont_group_by_handler(thd, query); } @@ -1172,10 +1172,9 @@ int ha_calpont_group_by_handler::init_scan() { DBUG_ENTER("ha_calpont_group_by_handler::init_scan"); - int rc = ha_calpont_impl_group_by_init(query, table); + int rc = ha_calpont_impl_group_by_init(this, table); DBUG_RETURN(rc); -// return 0; } int ha_calpont_group_by_handler::next_row() @@ -1183,7 +1182,7 @@ int ha_calpont_group_by_handler::next_row() // if (!first_row) // return(HA_ERR_END_OF_FILE); DBUG_ENTER("ha_calpont_group_by_handler::next_row"); - int rc = ha_calpont_impl_group_by_next(query, table); + int rc = ha_calpont_impl_group_by_next(this, table); DBUG_RETURN(rc); /* @@ -1199,7 +1198,7 @@ int ha_calpont_group_by_handler::end_scan() { DBUG_ENTER("ha_calpont_group_by_handler::end_scan"); - int rc = ha_calpont_impl_group_by_end(query, table); + int rc = ha_calpont_impl_group_by_end(this, table); DBUG_RETURN(rc); // return 0; diff --git a/dbcon/mysql/ha_calpont.h b/dbcon/mysql/ha_calpont.h index 660af8d0a..c3f2573e2 100644 --- a/dbcon/mysql/ha_calpont.h +++ b/dbcon/mysql/ha_calpont.h @@ -260,14 +260,10 @@ class ha_calpont_group_by_handler: public group_by_handler int init_scan(); int next_row(); int end_scan(); - - private: List *fields; TABLE_LIST *table_list; bool first_row; Query *query; - - }; #endif //HA_CALPONT_H__ diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index c4a37a77d..8c0934f4c 100644 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -4440,7 +4440,7 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) } else if (ac->constCol()) { - gwi.count_asterisk_list.push_back(ac); + gwi.count_asterisk_list.push_back(ac); } // For UDAF, populate the context and call the UDAF init() function. @@ -7831,6 +7831,10 @@ int cp_get_plan(THD* thd, SCSEP& csep) else if (status < 0) return status; + cerr << "---------------- cp_get_plan EXECUTION PLAN ----------------" << endl; + cerr << *csep << endl ; + cerr << "-------------- EXECUTION PLAN END --------------\n" << endl; + // Derived table projection and filter optimization. derivedTableOptimization(csep); @@ -7932,4 +7936,128 @@ int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti) return 0; } + +int cp_get_group_plan(THD* thd, SCSEP& csep, cal_table_info& ti ) +{ + gp_walk_info* gwi = ti.condInfo; + List_iterator_fast it(*ti.groupByFields); + Item* item; + + if (!gwi) + gwi = new gp_walk_info(); + + gwi->thd = thd; + LEX* lex = thd->lex; + idbassert(lex != 0); + uint32_t sessionID = csep->sessionID(); + gwi->sessionid = sessionID; + TABLE* table = ti.msTablePtr; + boost::shared_ptr csc = + CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); + csc->identity(CalpontSystemCatalog::FE); + + // get all columns that mysql needs to fetch + MY_BITMAP* read_set = table->read_set; + Field** f_ptr, *field; + gwi->columnMap.clear(); + + const CalpontSystemCatalog::TableAliasName tblAliasName= + make_aliastable(table->s->db.str, table->s->table_name.str, table->alias.c_ptr()); + + gwi->tbList.push_back(tblAliasName); + + while ((item = it++)) + { + Item *arg0; + Field *field; + ReturnedColumn* rc = buildAggregateColumn(item, *gwi); + //string alias(table->alias.c_ptr()); + //rc->tableAlias(lower(alias)); + assert (rc); + boost::shared_ptr sprc(rc); + gwi->returnedCols.push_back(sprc); + arg0=((Item_sum*) item)->get_arg(0); + field=((Item_field*) arg0)->field; + gwi->columnMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(string(field->field_name), sprc)); + //arg0=((Item_sum*) item)->get_arg(0); + //field=((Item_field*) arg0)->field; + } + +/* + for (f_ptr = table->field ; (field = *f_ptr) ; f_ptr++) + { + if (bitmap_is_set(read_set, field->field_index)) + { + SimpleColumn* sc = new SimpleColumn(table->s->db.str, table->s->table_name.str, field->field_name, sessionID); + string alias(table->alias.c_ptr()); + sc->tableAlias(lower(alias)); + assert (sc); + boost::shared_ptr spsc(sc); + gwi->returnedCols.push_back(spsc); + gwi->columnMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(string(field->field_name), spsc)); + } + } +*/ + if (gwi->columnMap.empty()) + { + CalpontSystemCatalog::TableName tn = make_table(table->s->db.str, table->s->table_name.str); + CalpontSystemCatalog::TableAliasName tan = make_aliastable(table->s->db.str, table->s->table_name.str, table->alias.c_ptr()); + SimpleColumn* sc = getSmallestColumn(csc, tn, tan, table, *gwi); + SRCP srcp(sc); + gwi->columnMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(sc->columnName(), srcp)); + gwi->returnedCols.push_back(srcp); + } + + + // get filter + if (ti.condInfo) + { + gp_walk_info* gwi = ti.condInfo; + ParseTree* filters = 0; + ParseTree* ptp = 0; + ParseTree* rhs = 0; + + while (!gwi->ptWorkStack.empty()) + { + filters = gwi->ptWorkStack.top(); + gwi->ptWorkStack.pop(); + SimpleFilter* sf = dynamic_cast(filters->data()); + + if (sf && sf->op()->data() == "noop") + { + delete filters; + filters = 0; + + if (gwi->ptWorkStack.empty()) + break; + + continue; + } + + if (gwi->ptWorkStack.empty()) + break; + + ptp = new ParseTree(new LogicOperator("and")); + ptp->left(filters); + rhs = gwi->ptWorkStack.top(); + gwi->ptWorkStack.pop(); + ptp->right(rhs); + gwi->ptWorkStack.push(ptp); + } + + csep->filters(filters); + } + + csep->returnedCols(gwi->returnedCols); + csep->columnMap(gwi->columnMap); + CalpontSelectExecutionPlan::TableList tblist; + tblist.push_back(tblAliasName); + csep->tableList(tblist); + + // @bug 3321. Set max number of blocks in a dictionary file to be scanned for filtering + csep->stringScanThreshold(gwi->thd->variables.infinidb_string_scan_threshold); + + return 0; +} + } diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index 6662b8394..5d8e91e08 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -171,6 +171,7 @@ const unsigned NONSUPPORTED_ERR_THRESH = 2000; vector rmParms; ResourceManager* rm = ResourceManager::instance(); bool useHdfs = rm->useHdfs(); + //convenience fcn inline uint32_t tid2sid(const uint32_t tid) { @@ -760,6 +761,438 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci) return rc; } +int fetchNextRowGrHand(cal_table_info& ti, cal_connection_info* ci) +{ + int rc = HA_ERR_END_OF_FILE; + int num_attr = ti.msTablePtr->s->fields; + sm::status_t sm_stat; + + try + { + if (ti.conn_hndl) + { + sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ti.conn_hndl); + } + else if (ci->cal_conn_hndl) + { + sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ci->cal_conn_hndl, (int*)(¤t_thd->killed)); + } + else + throw runtime_error("internal error"); + } + catch (std::exception& ex) + { +// @bug 2244. Always log this msg for now, as we try to track down when/why we are +// losing socket connection with ExeMgr +//#ifdef INFINIDB_DEBUG + tpl_scan_fetch_LogException( ti, ci, &ex); +//#endif + sm_stat = sm::CALPONT_INTERNAL_ERROR; + } + catch (...) + { +// @bug 2244. Always log this msg for now, as we try to track down when/why we are +// losing socket connection with ExeMgr +//#ifdef INFINIDB_DEBUG + tpl_scan_fetch_LogException( ti, ci, 0 ); +//#endif + sm_stat = sm::CALPONT_INTERNAL_ERROR; + } + + if (sm_stat == sm::STATUS_OK) + { + Field** f; + f = ti.msTablePtr->field; + //set all fields to null in null col bitmap + //memset(buf, -1, ti.msTablePtr->s->null_bytes); + std::vector& colTypes = ti.tpl_scan_ctx->ctp; + int64_t intColVal = 0; + uint64_t uintColVal = 0; + char tmp[256]; + + RowGroup* rowGroup = ti.tpl_scan_ctx->rowGroup; + + // table mode mysql expects all columns of the table. mapping between columnoid and position in rowgroup + // set coltype.position to be the position in rowgroup. only set once. + if (ti.tpl_scan_ctx->rowsreturned == 0 && + (ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF)) + { + for (uint32_t i = 0; i < rowGroup->getColumnCount(); i++) + { + int oid = rowGroup->getOIDs()[i]; + int j = 0; + + for (; j < num_attr; j++) + { + // mysql should haved eliminated duplicate projection columns + if (oid == colTypes[j].columnOID || oid == colTypes[j].ddn.dictOID) + { + colTypes[j].colPosition = i; + break; + } + } + } + } + + rowgroup::Row row; + rowGroup->initRow(&row); + rowGroup->getRow(ti.tpl_scan_ctx->rowsreturned, &row); + int s; + + for (int p = 0; p < num_attr; p++, f++) + { + //This col is going to be written + bitmap_set_bit(ti.msTablePtr->write_set, (*f)->field_index); + + // get coltype if not there yet + if (colTypes[0].colWidth == 0) + { + for (short c = 0; c < num_attr; c++) + { + colTypes[c].colPosition = c; + colTypes[c].colWidth = rowGroup->getColumnWidth(c); + colTypes[c].colDataType = rowGroup->getColTypes()[c]; + colTypes[c].columnOID = rowGroup->getOIDs()[c]; + colTypes[c].scale = rowGroup->getScale()[c]; + colTypes[c].precision = rowGroup->getPrecision()[c]; + } + } + + CalpontSystemCatalog::ColType colType(colTypes[p]); + + // table mode handling + if (ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF) + { + if (colType.colPosition == -1) // not projected by tuplejoblist + continue; + else + s = colType.colPosition; + } + else + { + s = p; + } + + // precision == -16 is borrowed as skip null check indicator for bit ops. + if (row.isNullValue(s) && colType.precision != -16) + { + // @2835. Handle empty string and null confusion. store empty string for string column + if (colType.colDataType == CalpontSystemCatalog::CHAR || + colType.colDataType == CalpontSystemCatalog::VARCHAR || + colType.colDataType == CalpontSystemCatalog::VARBINARY) + { + Field_varstring* f2 = (Field_varstring*)*f; + f2->store(tmp, 0, f2->charset()); + } + + continue; + } + + // fetch and store data + switch (colType.colDataType) + { + case CalpontSystemCatalog::DATE: + { + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + intColVal = row.getUintField<4>(s); + DataConvert::dateToString(intColVal, tmp, 255); + Field_varstring* f2 = (Field_varstring*)*f; + f2->store(tmp, strlen(tmp), f2->charset()); + break; + } + + case CalpontSystemCatalog::DATETIME: + { + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + intColVal = row.getUintField<8>(s); + DataConvert::datetimeToString(intColVal, tmp, 255); + + /* setting the field_length is a sort-of hack. The length + * at this point can be long enough to include mseconds. + * ColumnStore doesn't fully support mseconds yet so if + * they are requested, trim them off. + * At a later date we should set this more intelligently + * based on the result set. + */ + /* MCOL-683: UTF-8 datetime no msecs is 57, this sometimes happens! */ + if (((*f)->field_length > 19) && ((*f)->field_length != 57)) + (*f)->field_length = strlen(tmp); + + Field_varstring* f2 = (Field_varstring*)*f; + f2->store(tmp, strlen(tmp), f2->charset()); + break; + } + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::VARCHAR: + { + Field_varstring* f2 = (Field_varstring*)*f; + + switch (colType.colWidth) + { + case 1: + intColVal = row.getUintField<1>(s); + f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset()); + break; + + case 2: + intColVal = row.getUintField<2>(s); + f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset()); + break; + + case 4: + intColVal = row.getUintField<4>(s); + f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset()); + break; + + case 8: + //make sure we don't send strlen off into the weeds... + intColVal = row.getUintField<8>(s); + memcpy(tmp, &intColVal, 8); + tmp[8] = 0; + f2->store(tmp, strlen(tmp), f2->charset()); + break; + + default: + f2->store((const char*)row.getStringPointer(s), row.getStringLength(s), f2->charset()); + } + + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + break; + } + + case CalpontSystemCatalog::VARBINARY: + { + Field_varstring* f2 = (Field_varstring*)*f; + + if (current_thd->variables.infinidb_varbin_always_hex) + { + uint32_t l; + const uint8_t* p = row.getVarBinaryField(l, s); + uint32_t ll = l * 2; + boost::scoped_array sca(new char[ll]); + vbin2hex(p, l, sca.get()); + f2->store(sca.get(), ll, f2->charset()); + } + else + f2->store((const char*)row.getVarBinaryField(s), row.getVarBinaryLength(s), f2->charset()); + + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + break; + } + + case CalpontSystemCatalog::BIGINT: + { + intColVal = row.getIntField<8>(s); + storeNumericField(f, intColVal, colType); + break; + } + + case CalpontSystemCatalog::UBIGINT: + { + uintColVal = row.getUintField<8>(s); + storeNumericField(f, uintColVal, colType); + break; + } + + case CalpontSystemCatalog::INT: + { + intColVal = row.getIntField<4>(s); + storeNumericField(f, intColVal, colType); + break; + } + + case CalpontSystemCatalog::UINT: + { + uintColVal = row.getUintField<4>(s); + storeNumericField(f, uintColVal, colType); + break; + } + + case CalpontSystemCatalog::SMALLINT: + { + intColVal = row.getIntField<2>(s); + storeNumericField(f, intColVal, colType); + break; + } + + case CalpontSystemCatalog::USMALLINT: + { + uintColVal = row.getUintField<2>(s); + storeNumericField(f, uintColVal, colType); + break; + } + + case CalpontSystemCatalog::TINYINT: + { + intColVal = row.getIntField<1>(s); + storeNumericField(f, intColVal, colType); + break; + } + + case CalpontSystemCatalog::UTINYINT: + { + uintColVal = row.getUintField<1>(s); + storeNumericField(f, uintColVal, colType); + break; + } + + //In this case, we're trying to load a double output column with float data. This is the + // case when you do sum(floatcol), e.g. + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: + { + float dl = row.getFloatField(s); + + if (dl == std::numeric_limits::infinity()) + continue; + + //int64_t* icvp = (int64_t*)&dl; + //intColVal = *icvp; + Field_float* f2 = (Field_float*)*f; + // bug 3485, reserve enough space for the longest float value + // -3.402823466E+38 to -1.175494351E-38, 0, and + // 1.175494351E-38 to 3.402823466E+38. + (*f)->field_length = 40; + + //float float_val = *(float*)(&value); + //f2->store(float_val); + if (f2->decimals() < (uint32_t)row.getScale(s)) + f2->dec = (uint32_t)row.getScale(s); + + f2->store(dl); + + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + break; + + //storeNumericField(f, intColVal, colType); + //break; + } + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: + { + double dl = row.getDoubleField(s); + + if (dl == std::numeric_limits::infinity()) + continue; + + Field_double* f2 = (Field_double*)*f; + // bug 3483, reserve enough space for the longest double value + // -1.7976931348623157E+308 to -2.2250738585072014E-308, 0, and + // 2.2250738585072014E-308 to 1.7976931348623157E+308. + (*f)->field_length = 310; + + //double double_val = *(double*)(&value); + //f2->store(double_val); + if (f2->decimals() < (uint32_t)row.getScale(s)) + f2->dec = (uint32_t)row.getScale(s); + + f2->store(dl); + + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + break; + + + //int64_t* icvp = (int64_t*)&dl; + //intColVal = *icvp; + //storeNumericField(f, intColVal, colType); + //break; + } + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + intColVal = row.getIntField(s); + storeNumericField(f, intColVal, colType); + break; + } + + case CalpontSystemCatalog::BLOB: + case CalpontSystemCatalog::TEXT: + { + Field_blob* f2 = (Field_blob*)*f; + f2->set_ptr(row.getVarBinaryLength(s), (unsigned char*)row.getVarBinaryField(s)); + + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + break; + } + + default: // treat as int64 + { + intColVal = row.getUintField<8>(s); + storeNumericField(f, intColVal, colType); + break; + } + } + } + + ti.tpl_scan_ctx->rowsreturned++; + ti.c++; +#ifdef INFINIDB_DEBUG + + if ((ti.c % 1000000) == 0) + cerr << "fetchNextRow so far table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl; + +#endif + ti.moreRows = true; + rc = 0; + } + else if (sm_stat == sm::SQL_NOT_FOUND) + { + IDEBUG( cerr << "fetchNextRow done for table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl ); + ti.c = 0; + ti.moreRows = false; + rc = HA_ERR_END_OF_FILE; + } + else if (sm_stat == sm::CALPONT_INTERNAL_ERROR) + { + ti.moreRows = false; + rc = ER_INTERNAL_ERROR; + ci->rc = rc; + } + else if ((uint32_t)sm_stat == logging::ERR_LOST_CONN_EXEMGR) + { + ti.moreRows = false; + rc = logging::ERR_LOST_CONN_EXEMGR; + sm::sm_init(tid2sid(current_thd->thread_id), &ci->cal_conn_hndl, + current_thd->variables.infinidb_local_query); + idbassert(ci->cal_conn_hndl != 0); + ci->rc = rc; + } + else if (sm_stat == sm::SQL_KILLED) + { + // query was aborted by the user. treat it the same as limit query. close + // connection after rnd_close. + ti.c = 0; + ti.moreRows = false; + rc = HA_ERR_END_OF_FILE; + ci->rc = rc; + } + else + { + ti.moreRows = false; + rc = sm_stat; + ci->rc = rc; + } + + return rc; +} + void makeUpdateScalarJoin(const ParseTree* n, void* obj) { TreeNode* tn = n->data(); @@ -4962,19 +5395,593 @@ int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos) */ /*********************************************************** * DESCRIPTION: - * Prepare data for group_by_handler::next_row() calls. + * Prepares data for group_by_handler::next_row() calls. * PARAMETERS: - * query - Query structure, that describes the pushdown query. + * group_hand - group by handler, that preserves initial table and items lists. . * table - TABLE pointer The table to save the result set in. * RETURN: * 0 if success * others if something went wrong whilst getting the result set ***********************************************************/ -int ha_calpont_impl_group_by_init(Query* query, TABLE* table) +int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table) { //first_row= true; + string tableName = group_hand->table_list->table->s->table_name.str; + IDEBUG( cout << "group_by_init for table " << + group_hand->table_list->table->s->table_name.str << endl ); + THD* thd = current_thd; - return(0); + //check whether the system is ready to process statement. +#ifndef _MSC_VER + static DBRM dbrm(true); + bool bSystemQueryReady = dbrm.getSystemQueryReady(); + + if (bSystemQueryReady == 0) + { + // Still not ready + setError(thd, ER_INTERNAL_ERROR, "The system is not yet ready to accept queries"); + thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR; + return ER_INTERNAL_ERROR; + } + else if (bSystemQueryReady < 0) + { + // Still not ready + setError(thd, ER_INTERNAL_ERROR, "DBRM is not responding. Cannot accept queries"); + thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR; + return ER_INTERNAL_ERROR; + } + +#endif + // prevent "create table as select" from running on slave + thd->infinidb_vtable.hasInfiniDBTable = true; + + /* If this node is the slave, ignore DML to IDB tables */ + /*if (thd->slave_thread && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + */ + + // @bug 3005. if the table is not $vtable, then this could be a UDF defined on the connector. + // watch this for other complications + //if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE && + // string(table->s->table_name.str).find("$vtable") != 0) + // return 0; + + // return error if error status has been already set + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR) + return ER_INTERNAL_ERROR; + + // by pass the extra union trips. return 0 + //if (thd->infinidb_vtable.isUnion && thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE) + // return 0; + + // @bug 2232. Basic SP support. Error out non support sp cases. + // @bug 3939. Only error out for sp with select. Let pass for alter table in sp. + /*if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE) + { + setError(thd, ER_CHECK_NOT_IMPLEMENTED, "This stored procedure syntax is not supported by Columnstore in this version"); + thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR; + return ER_INTERNAL_ERROR; + } + + // mysql reads table twice for order by + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_PHASE1 || + thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY) + return 0; + + if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE ) + return 0; + + + //Update and delete code + if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) + return doUpdateDelete(thd); + */ + + uint32_t sessionID = tid2sid(thd->thread_id); + boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); + csc->identity(CalpontSystemCatalog::FE); + + if (!thd->infinidb_vtable.cal_conn_info) + thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + idbassert(ci != 0); + + /* + MySQL sometimes calls rnd_init multiple times, plan should only be + generated and sent once. + TO DO Check this statement. + */ + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE && + !thd->infinidb_vtable.isNewQuery) + return 0; + + if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) + { + if (ci->cal_conn_hndl) + { + // send ExeMgr a signal before closing the connection + ByteStream msg; + ByteStream::quadbyte qb = 0; + msg << qb; + + try + { + ci->cal_conn_hndl->exeMgr->write(msg); + } + catch (...) + { + // canceling query. ignore connection failure. + } + + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + + return 0; + } + + sm::tableid_t tableid = 0; + cal_table_info ti; + sm::cpsm_conhdl_t* hndl; + SCSEP csep; + + // update traceFlags according to the autoswitch state. replication query + // on slave are in table mode (create table as...) + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) + { + ci->traceFlags |= CalpontSelectExecutionPlan::TRACE_TUPLE_OFF; + } + + // MCOL-1052 TO DO Remove this + ci->traceFlags = CalpontSelectExecutionPlan::TRACE_LOG; + + bool localQuery = (thd->variables.infinidb_local_query > 0 ? true : false); + + // table mode + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) + { + ti = ci->tableMap[group_hand->table_list->table]; + + // get connection handle for this table handler + // re-establish table handle + if (ti.conn_hndl) + { + sm::sm_cleanup(ti.conn_hndl); + ti.conn_hndl = 0; + } + + sm::sm_init(sessionID, &ti.conn_hndl, localQuery); + ti.conn_hndl->csc = csc; + hndl = ti.conn_hndl; + + try + { + ti.conn_hndl->connect(); + } + catch (...) + { + setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR)); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto error; + } + + // get filter plan for table + if (ti.csep.get() == 0) + { + ti.csep.reset(new CalpontSelectExecutionPlan()); + + SessionManager sm; + BRM::TxnID txnID; + txnID = sm.getTxnID(sessionID); + + if (!txnID.valid) + { + txnID.id = 0; + txnID.valid = true; + } + + QueryContext verID; + verID = sm.verID(); + + ti.csep->txnID(txnID.id); + ti.csep->verID(verID); + ti.csep->sessionID(sessionID); + + if (group_hand->table_list->db_length) + ti.csep->schemaName(group_hand->table_list->db); + + ti.csep->traceFlags(ci->traceFlags); + ti.msTablePtr = group_hand->table_list->table; + ti.groupByFields = group_hand->fields; + + // send plan whenever group_init is called + cp_get_group_plan(thd, ti.csep, ti); + } + + IDEBUG( cerr << tableName << " send plan:" << endl ); + IDEBUG( cerr << *ti.csep << endl ); + csep = ti.csep; + + // for ExeMgr logging sqltext. only log once for the query although multi plans may be sent + if (ci->tableMap.size() == 1) + ti.csep->data(idb_mysql_query_str(thd)); + } + // vtable mode + else + { + //if (!ci->cal_conn_hndl || thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE) + if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE) + { + ci->stats.reset(); // reset query stats + ci->stats.setStartTime(); + ci->stats.fUser = thd->main_security_ctx.user; + + if (thd->main_security_ctx.host) + ci->stats.fHost = thd->main_security_ctx.host; + else if (thd->main_security_ctx.host_or_ip) + ci->stats.fHost = thd->main_security_ctx.host_or_ip; + else + ci->stats.fHost = "unknown"; + + try + { + ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser); + } + catch (std::exception& e) + { + string msg = string("Columnstore User Priority - ") + e.what(); + ci->warningMsg = msg; + } + + // if the previous query has error, re-establish the connection + if (ci->queryState != 0) + { + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + } + + sm::sm_init(sessionID, &ci->cal_conn_hndl, localQuery); + idbassert(ci->cal_conn_hndl != 0); + ci->cal_conn_hndl->csc = csc; + idbassert(ci->cal_conn_hndl->exeMgr != 0); + + try + { + ci->cal_conn_hndl->connect(); + } + catch (...) + { + setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR)); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto error; + } + + hndl = ci->cal_conn_hndl; + + if (thd->infinidb_vtable.vtable_state != THD::INFINIDB_SELECT_VTABLE) + { + //CalpontSelectExecutionPlan csep; + if (!csep) + csep.reset(new CalpontSelectExecutionPlan()); + + SessionManager sm; + BRM::TxnID txnID; + txnID = sm.getTxnID(sessionID); + + if (!txnID.valid) + { + txnID.id = 0; + txnID.valid = true; + } + + QueryContext verID; + verID = sm.verID(); + + csep->txnID(txnID.id); + csep->verID(verID); + csep->sessionID(sessionID); + + if (thd->db) + csep->schemaName(thd->db); + + csep->traceFlags(ci->traceFlags); + + if (thd->infinidb_vtable.isInsertSelect) + csep->queryType(CalpontSelectExecutionPlan::INSERT_SELECT); + + //get plan + int status = cp_get_plan(thd, csep); + + //if (cp_get_plan(thd, csep) != 0) + if (status > 0) + goto internal_error; + else if (status < 0) + return 0; + + // @bug 2547. don't need to send the plan if it's impossible where for all unions. + if (thd->infinidb_vtable.impossibleWhereOnUnion) + return 0; + + string query; + query.assign(thd->infinidb_vtable.original_query.ptr(), + thd->infinidb_vtable.original_query.length()); + csep->data(query); + + try + { + csep->priority( ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser)); + } + catch (std::exception& e) + { + string msg = string("Columnstore User Priority - ") + e.what(); + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str()); + } + +#ifdef PLAN_HEX_FILE + // plan serialization + ifstream ifs("/tmp/li1-plan.hex"); + ByteStream bs1; + ifs >> bs1; + ifs.close(); + csep->unserialize(bs1); +#endif + + if (ci->traceFlags & 1) + { + cerr << "---------------- EXECUTION PLAN ----------------" << endl; + cerr << *csep << endl ; + cerr << "-------------- EXECUTION PLAN END --------------\n" << endl; + } + else + { + IDEBUG( cout << "---------------- EXECUTION PLAN ----------------" << endl ); + IDEBUG( cerr << *csep << endl ); + IDEBUG( cout << "-------------- EXECUTION PLAN END --------------\n" << endl ); + } + } + }// end of execution plan generation + + if (thd->infinidb_vtable.vtable_state != THD::INFINIDB_SELECT_VTABLE) + { + ByteStream msg; + ByteStream emsgBs; + + while (true) + { + try + { + ByteStream::quadbyte qb = 4; + msg << qb; + hndl->exeMgr->write(msg); + msg.restart(); + csep->rmParms(rmParms); + + //send plan + csep->serialize(msg); + hndl->exeMgr->write(msg); + + //get ExeMgr status back to indicate a vtable joblist success or not + msg.restart(); + emsgBs.restart(); + msg = hndl->exeMgr->read(); + emsgBs = hndl->exeMgr->read(); + string emsg; + + if (msg.length() == 0 || emsgBs.length() == 0) + { + emsg = "Lost connection to ExeMgr. Please contact your administrator"; + setError(thd, ER_INTERNAL_ERROR, emsg); + return ER_INTERNAL_ERROR; + } + + string emsgStr; + emsgBs >> emsgStr; + bool err = false; + + if (msg.length() == 4) + { + msg >> qb; + + if (qb != 0) + { + err = true; + // for makejoblist error, stats contains only error code and insert from here + // because table fetch is not started + ci->stats.setEndTime(); + ci->stats.fQuery = csep->data(); + ci->stats.fQueryType = csep->queryType(); + ci->stats.fErrorNo = qb; + + try + { + ci->stats.insert(); + } + catch (std::exception& e) + { + string msg = string("Columnstore Query Stats - ") + e.what(); + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str()); + } + } + } + else + { + err = true; + } + + if (err) + { + setError(thd, ER_INTERNAL_ERROR, emsgStr); + return ER_INTERNAL_ERROR; + } + + rmParms.clear(); + + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) + { + ci->tableMap[table] = ti; + } + else + { + ci->queryState = 1; + } + + break; + } + catch (...) + { + sm::sm_cleanup(hndl); + hndl = 0; + + sm::sm_init(sessionID, &hndl, localQuery); + idbassert(hndl != 0); + hndl->csc = csc; + + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) + ti.conn_hndl = hndl; + else + ci->cal_conn_hndl = hndl; + + try + { + hndl->connect(); + } + catch (...) + { + setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR)); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto error; + } + + msg.restart(); + } + } + } + + // set query state to be in_process. Sometimes mysql calls rnd_init multiple + // times, this makes sure plan only being generated and sent once. It will be + // reset when query finishes in sm::end_query + thd->infinidb_vtable.isNewQuery = false; + + // common path for both vtable select phase and table mode -- open scan handle + ti = ci->tableMap[table]; + ti.msTablePtr = table; + + if ((thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE) || + (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) || + (thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_QUERY)) + { + if (ti.tpl_ctx == 0) + { + ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); + } + + // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql + // call rnd_init for a table more than once. + ti.tpl_scan_ctx->rowGroup = NULL; + + try + { + tableid = execplan::IDB_VTABLE_ID; + } + catch (...) + { + string emsg = "No table ID found for table " + string(table->s->table_name.str); + setError(thd, ER_INTERNAL_ERROR, emsg); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto internal_error; + } + + try + { + sm::tpl_open(tableid, ti.tpl_ctx, hndl); + sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl); + } + catch (std::exception& e) + { + string emsg = "table can not be opened: " + string(e.what()); + setError(thd, ER_INTERNAL_ERROR, emsg); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto internal_error; + } + catch (...) + { + string emsg = "table can not be opened"; + setError(thd, ER_INTERNAL_ERROR, emsg); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto internal_error; + } + + ti.tpl_scan_ctx->traceFlags = ci->traceFlags; + + if ((ti.tpl_scan_ctx->ctp).size() == 0) + { + uint32_t num_attr = table->s->fields; + + for (uint32_t i = 0; i < num_attr; i++) + { + CalpontSystemCatalog::ColType ctype; + ti.tpl_scan_ctx->ctp.push_back(ctype); + } + + // populate coltypes here for table mode because tableband gives treeoid for dictionary column + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) + { + CalpontSystemCatalog::RIDList oidlist = csc->columnRIDs(make_table(table->s->db.str, table->s->table_name.str), true); + + if (oidlist.size() != num_attr) + { + string emsg = "Size mismatch probably caused by front end out of sync"; + setError(thd, ER_INTERNAL_ERROR, emsg); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + goto internal_error; + } + + for (unsigned int j = 0; j < oidlist.size(); j++) + { + CalpontSystemCatalog::ColType ctype = csc->colType(oidlist[j].objnum); + ti.tpl_scan_ctx->ctp[ctype.colPosition] = ctype; + ti.tpl_scan_ctx->ctp[ctype.colPosition].colPosition = -1; + } + } + } + } + + ci->tableMap[table] = ti; + return 0; + +error: + + if (ci->cal_conn_hndl) + { + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + + // do we need to close all connection handle of the table map? + return ER_INTERNAL_ERROR; + +internal_error: + + if (ci->cal_conn_hndl) + { + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + + return ER_INTERNAL_ERROR; + //return(0); } /*@brief ha_calpont_impl_group_by_next - Return result set for MariaDB group_by @@ -4984,14 +5991,14 @@ int ha_calpont_impl_group_by_init(Query* query, TABLE* table) * DESCRIPTION: * Return a result record for each group_by_handler::next_row() call. * PARAMETERS: - * query - Query structure, that describes the pushdown query. + * group_hand - group by handler, that preserves initial table and items lists. . * table - TABLE pointer The table to save the result set in. * RETURN: * 0 if success * HA_ERR_END_OF_FILE if the record set has come to an end * others if something went wrong whilst getting the result set ***********************************************************/ -int ha_calpont_impl_group_by_next(Query* query, TABLE* table) +int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table) { //if (!first_row) //return(HA_ERR_END_OF_FILE); @@ -5088,7 +6095,7 @@ int ha_calpont_impl_group_by_next(Query* query, TABLE* table) try { - //rc = fetchNextRow(buf, ti, ci); + rc = fetchNextRowGrHand(ti, ci); rc = 0; } catch (std::exception& e) @@ -5124,11 +6131,10 @@ int ha_calpont_impl_group_by_next(Query* query, TABLE* table) return rc; } -int ha_calpont_impl_group_by_end(Query* query, TABLE* table) +int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table) { return 0; } - // vim:sw=4 ts=4: diff --git a/dbcon/mysql/ha_calpont_impl.h b/dbcon/mysql/ha_calpont_impl.h index f6bea072c..7f7c43aae 100644 --- a/dbcon/mysql/ha_calpont_impl.h +++ b/dbcon/mysql/ha_calpont_impl.h @@ -48,15 +48,16 @@ extern int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type); extern int ha_calpont_impl_update_row(); extern int ha_calpont_impl_delete_row(); extern int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos); -extern int ha_calpont_impl_group_by_init(Query* query, TABLE* table); -extern int ha_calpont_impl_group_by_next(Query* query, TABLE* table); -extern int ha_calpont_impl_group_by_end(Query* query, TABLE* table); +extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table); +extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table); +extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table); #endif #ifdef NEED_CALPONT_INTERFACE #include "ha_calpont_impl_if.h" #include "calpontsystemcatalog.h" +#include "ha_calpont.h" extern int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_impl_if::cal_connection_info& ci); extern int ha_calpont_impl_write_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci, ha_rows& rowsInserted); extern int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci); @@ -73,6 +74,9 @@ extern std::string ha_calpont_impl_droppartition_ (execplan::CalpontSystemCatal extern std::string ha_calpont_impl_viewtablelock( cal_impl_if::cal_connection_info& ci, execplan::CalpontSystemCatalog::TableName& tablename); extern std::string ha_calpont_impl_cleartablelock( cal_impl_if::cal_connection_info& ci, uint64_t tableLockID); +extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table); +extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table); +extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table); #endif #endif diff --git a/dbcon/mysql/ha_calpont_impl_if.h b/dbcon/mysql/ha_calpont_impl_if.h index 709e1b051..71fd0b35f 100644 --- a/dbcon/mysql/ha_calpont_impl_if.h +++ b/dbcon/mysql/ha_calpont_impl_if.h @@ -178,7 +178,8 @@ struct cal_table_info msTablePtr(0), conn_hndl(0), condInfo(0), - moreRows(false) + moreRows(false), + groupByFields(0) { } ~cal_table_info() {} sm::cpsm_tplh_t* tpl_ctx; @@ -189,6 +190,7 @@ struct cal_table_info gp_walk_info* condInfo; execplan::SCSEP csep; bool moreRows; //are there more rows to consume (b/c of limit) + List *groupByFields; // MCOL-1052 For CSEP generation }; typedef std::tr1::unordered_map CalTableMap; @@ -297,6 +299,7 @@ const std::string infinidb_err_msg = "\nThe query includes syntax that is not su int cp_get_plan(THD* thd, execplan::SCSEP& csep); int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_info& ti); +int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_info& ti); int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false); void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi); void setError(THD* thd, uint32_t errcode, const std::string errmsg);