From d562caecbae4939304432d9d9d3e768047550612 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Sat, 31 Mar 2018 22:38:18 +0300 Subject: [PATCH] MCOL-1052 Generate and send CSEP to and receive set data from ExeMgr. --- dbcon/mysql/ha_calpont.cpp | 19 +- dbcon/mysql/ha_calpont_execplan.cpp | 300 +++++------ dbcon/mysql/ha_calpont_impl.cpp | 762 +++++++++++++++++++++++++++- dbcon/mysql/ha_calpont_impl_if.h | 2 +- 4 files changed, 913 insertions(+), 170 deletions(-) diff --git a/dbcon/mysql/ha_calpont.cpp b/dbcon/mysql/ha_calpont.cpp index e0e835c4b..ae0850109 100644 --- a/dbcon/mysql/ha_calpont.cpp +++ b/dbcon/mysql/ha_calpont.cpp @@ -1172,7 +1172,11 @@ int ha_calpont_group_by_handler::init_scan() { DBUG_ENTER("ha_calpont_group_by_handler::init_scan"); + // MCOL-1052 + THD::infinidb_state oldState = thd->infinidb_vtable.vtable_state; + thd->infinidb_vtable.vtable_state = THD::INFINIDB_CREATE_VTABLE; int rc = ha_calpont_impl_group_by_init(this, table); + thd->infinidb_vtable.vtable_state = oldState; DBUG_RETURN(rc); } @@ -1184,14 +1188,15 @@ int ha_calpont_group_by_handler::next_row() DBUG_ENTER("ha_calpont_group_by_handler::next_row"); int rc = ha_calpont_impl_group_by_next(this, table); + + +// first_row= 0; + //Field *field = *(table->field); + //field->store(5LL, 1); + //field->set_notnull(); + //return(0); + DBUG_RETURN(rc); -/* - first_row= 0; - Field *field = *(table->field); - field->store(5LL, 1); - field->set_notnull(); - return(0); -*/ } int ha_calpont_group_by_handler::end_scan() diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index 5bed8e88c..6a8b1ac95 100644 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -221,7 +221,7 @@ void debug_walk(const Item* item, void* arg) case Item::FIELD_ITEM: { Item_field* ifp = (Item_field*)item; - cout << "FIELD_ITEM: " << (ifp->db_name ? ifp->db_name : "") << '.' << bestTableName(ifp) << + cerr << "FIELD_ITEM: " << (ifp->db_name ? ifp->db_name : "") << '.' << bestTableName(ifp) << '.' << ifp->field_name << endl; break; } @@ -229,10 +229,10 @@ void debug_walk(const Item* item, void* arg) case Item::INT_ITEM: { Item_int* iip = (Item_int*)item; - cout << "INT_ITEM: "; + cerr << "INT_ITEM: "; - if (iip->name) cout << iip->name << " (from name string)" << endl; - else cout << iip->val_int() << endl; + if (iip->name) cerr << iip->name << " (from name string)" << endl; + else cerr << iip->val_int() << endl; break; } @@ -243,19 +243,19 @@ void debug_walk(const Item* item, void* arg) String val, *str = isp->val_str(&val); string valStr; valStr.assign(str->ptr(), str->length()); - cout << "STRING_ITEM: >" << valStr << '<' << endl; + cerr << "STRING_ITEM: >" << valStr << '<' << endl; break; } case Item::REAL_ITEM: { - cout << "REAL_ITEM" << endl; + cerr << "REAL_ITEM" << endl; break; } case Item::DECIMAL_ITEM: { - cout << "DECIMAL_ITEM" << endl; + cerr << "DECIMAL_ITEM" << endl; break; } @@ -263,85 +263,85 @@ void debug_walk(const Item* item, void* arg) { Item_func* ifp = (Item_func*)item; Item_func_opt_neg* inp; - cout << "FUNC_ITEM: "; + cerr << "FUNC_ITEM: "; switch (ifp->functype()) { case Item_func::UNKNOWN_FUNC: // 0 - cout << ifp->func_name() << " (" << ifp->functype() << ")" << endl; + cerr << ifp->func_name() << " (" << ifp->functype() << ")" << endl; break; case Item_func::GT_FUNC: // 7 - cout << '>' << " (" << ifp->functype() << ")" << endl; + cerr << '>' << " (" << ifp->functype() << ")" << endl; break; case Item_func::EQ_FUNC: // 1 - cout << '=' << " (" << ifp->functype() << ")" << endl; + cerr << '=' << " (" << ifp->functype() << ")" << endl; break; case Item_func::GE_FUNC: - cout << ">=" << " (" << ifp->functype() << ")" << endl; + cerr << ">=" << " (" << ifp->functype() << ")" << endl; break; case Item_func::LE_FUNC: - cout << "<=" << " (" << ifp->functype() << ")" << endl; + cerr << "<=" << " (" << ifp->functype() << ")" << endl; break; case Item_func::LT_FUNC: - cout << '<' << " (" << ifp->functype() << ")" << endl; + cerr << '<' << " (" << ifp->functype() << ")" << endl; break; case Item_func::NE_FUNC: - cout << "<>" << " (" << ifp->functype() << ")" << endl; + cerr << "<>" << " (" << ifp->functype() << ")" << endl; break; case Item_func::NEG_FUNC: // 45 - cout << "unary minus" << " (" << ifp->functype() << ")" << endl; + cerr << "unary minus" << " (" << ifp->functype() << ")" << endl; break; case Item_func::IN_FUNC: // 16 inp = (Item_func_opt_neg*)ifp; - if (inp->negated) cout << "not "; + if (inp->negated) cerr << "not "; - cout << "in" << " (" << ifp->functype() << ")" << endl; + cerr << "in" << " (" << ifp->functype() << ")" << endl; break; case Item_func::BETWEEN: inp = (Item_func_opt_neg*)ifp; - if (inp->negated) cout << "not "; + if (inp->negated) cerr << "not "; - cout << "between" << " (" << ifp->functype() << ")" << endl; + cerr << "between" << " (" << ifp->functype() << ")" << endl; break; case Item_func::ISNULL_FUNC: // 10 - cout << "is null" << " (" << ifp->functype() << ")" << endl; + cerr << "is null" << " (" << ifp->functype() << ")" << endl; break; case Item_func::ISNOTNULL_FUNC: // 11 - cout << "is not null" << " (" << ifp->functype() << ")" << endl; + cerr << "is not null" << " (" << ifp->functype() << ")" << endl; break; case Item_func::NOT_ALL_FUNC: - cout << "not_all" << " (" << ifp->functype() << ")" << endl; + cerr << "not_all" << " (" << ifp->functype() << ")" << endl; break; case Item_func::NOT_FUNC: - cout << "not_func" << " (" << ifp->functype() << ")" << endl; + cerr << "not_func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::TRIG_COND_FUNC: - cout << "trig_cond_func" << " (" << ifp->functype() << ")" << endl; + cerr << "trig_cond_func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::ISNOTNULLTEST_FUNC: - cout << "isnotnulltest_func" << " (" << ifp->functype() << ")" << endl; + cerr << "isnotnulltest_func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::MULT_EQUAL_FUNC: { - cout << "mult_equal_func:" << " (" << ifp->functype() << ")" << endl; + cerr << "mult_equal_func:" << " (" << ifp->functype() << ")" << endl; Item_equal* item_eq = (Item_equal*)ifp; Item_equal_fields_iterator it(*item_eq); Item* item; @@ -349,142 +349,142 @@ void debug_walk(const Item* item, void* arg) while ((item = it++)) { Field* equal_field = it.get_curr_field(); - cout << equal_field->field_name << endl; + cerr << equal_field->field_name << endl; } break; } case Item_func::EQUAL_FUNC: - cout << "equal func" << " (" << ifp->functype() << ")" << endl; + cerr << "equal func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::FT_FUNC: - cout << "ft func" << " (" << ifp->functype() << ")" << endl; + cerr << "ft func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::LIKE_FUNC: - cout << "like func" << " (" << ifp->functype() << ")" << endl; + cerr << "like func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::COND_AND_FUNC: - cout << "cond and func" << " (" << ifp->functype() << ")" << endl; + cerr << "cond and func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::COND_OR_FUNC: - cout << "cond or func" << " (" << ifp->functype() << ")" << endl; + cerr << "cond or func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::XOR_FUNC: - cout << "xor func" << " (" << ifp->functype() << ")" << endl; + cerr << "xor func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::INTERVAL_FUNC: - cout << "interval func" << " (" << ifp->functype() << ")" << endl; + cerr << "interval func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_EQUALS_FUNC: - cout << "sp equals func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp equals func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_DISJOINT_FUNC: - cout << "sp disjoint func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp disjoint func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_INTERSECTS_FUNC: - cout << "sp intersects func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp intersects func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_TOUCHES_FUNC: - cout << "sp touches func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp touches func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_CROSSES_FUNC: - cout << "sp crosses func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp crosses func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_WITHIN_FUNC: - cout << "sp within func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp within func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_CONTAINS_FUNC: - cout << "sp contains func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp contains func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_OVERLAPS_FUNC: - cout << "sp overlaps func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp overlaps func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_STARTPOINT: - cout << "sp startpoint func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp startpoint func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_ENDPOINT: - cout << "sp endpoint func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp endpoint func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_EXTERIORRING: - cout << "sp exteriorring func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp exteriorring func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_POINTN: - cout << "sp pointn func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp pointn func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_GEOMETRYN: - cout << "sp geometryn func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp geometryn func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_INTERIORRINGN: - cout << "sp exteriorringn func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp exteriorringn func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SP_RELATE_FUNC: - cout << "sp relate func" << " (" << ifp->functype() << ")" << endl; + cerr << "sp relate func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::NOW_FUNC: - cout << "now func" << " (" << ifp->functype() << ")" << endl; + cerr << "now func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::SUSERVAR_FUNC: - cout << "suservar func" << " (" << ifp->functype() << ")" << endl; + cerr << "suservar func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::GUSERVAR_FUNC: - cout << "guservar func" << " (" << ifp->functype() << ")" << endl; + cerr << "guservar func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::COLLATE_FUNC: - cout << "collate func" << " (" << ifp->functype() << ")" << endl; + cerr << "collate func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::EXTRACT_FUNC: - cout << "extract func" << " (" << ifp->functype() << ")" << endl; + cerr << "extract func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::CHAR_TYPECAST_FUNC: - cout << "char typecast func" << " (" << ifp->functype() << ")" << endl; + cerr << "char typecast func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::FUNC_SP: - cout << "func sp func" << " (" << ifp->functype() << ")" << endl; + cerr << "func sp func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::UDF_FUNC: - cout << "udf func" << " (" << ifp->functype() << ")" << endl; + cerr << "udf func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::GSYSVAR_FUNC: - cout << "gsysvar func" << " (" << ifp->functype() << ")" << endl; + cerr << "gsysvar func" << " (" << ifp->functype() << ")" << endl; break; case Item_func::DYNCOL_FUNC: - cout << "dyncol func" << " (" << ifp->functype() << ")" << endl; + cerr << "dyncol func" << " (" << ifp->functype() << ")" << endl; break; default: - cout << "type=" << ifp->functype() << endl; + cerr << "type=" << ifp->functype() << endl; break; } @@ -494,7 +494,7 @@ void debug_walk(const Item* item, void* arg) case Item::COND_ITEM: { Item_cond* icp = (Item_cond*)item; - cout << "COND_ITEM: " << icp->func_name() << endl; + cerr << "COND_ITEM: " << icp->func_name() << endl; break; } @@ -511,39 +511,39 @@ void debug_walk(const Item* item, void* arg) switch (isp->sum_func()) { case Item_sum::SUM_FUNC: - cout << "SUM_FUNC: " << item_name << endl; + cerr << "SUM_FUNC: " << item_name << endl; break; case Item_sum::SUM_DISTINCT_FUNC: - cout << "SUM_DISTINCT_FUNC: " << item_name << endl; + cerr << "SUM_DISTINCT_FUNC: " << item_name << endl; break; case Item_sum::AVG_FUNC: - cout << "AVG_FUNC: " << item_name << endl; + cerr << "AVG_FUNC: " << item_name << endl; break; case Item_sum::COUNT_FUNC: - cout << "COUNT_FUNC: " << item_name << endl; + cerr << "COUNT_FUNC: " << item_name << endl; break; case Item_sum::COUNT_DISTINCT_FUNC: - cout << "COUNT_DISTINCT_FUNC: " << item_name << endl; + cerr << "COUNT_DISTINCT_FUNC: " << item_name << endl; break; case Item_sum::MIN_FUNC: - cout << "MIN_FUNC: " << item_name << endl; + cerr << "MIN_FUNC: " << item_name << endl; break; case Item_sum::MAX_FUNC: - cout << "MAX_FUNC: " << item_name << endl; + cerr << "MAX_FUNC: " << item_name << endl; break; case Item_sum::UDF_SUM_FUNC: - cout << "UDAF_FUNC: " << item_name << endl; + cerr << "UDAF_FUNC: " << item_name << endl; break; default: - cout << "SUM_FUNC_ITEM type=" << isp->sum_func() << endl; + cerr << "SUM_FUNC_ITEM type=" << isp->sum_func() << endl; break; } @@ -553,24 +553,24 @@ void debug_walk(const Item* item, void* arg) case Item::SUBSELECT_ITEM: { Item_subselect* sub = (Item_subselect*)item; - cout << "SUBSELECT Item: "; + cerr << "SUBSELECT Item: "; switch (sub->substype()) { case Item_subselect::EXISTS_SUBS: - cout << "EXISTS"; + cerr << "EXISTS"; break; case Item_subselect::IN_SUBS: - cout << "IN"; + cerr << "IN"; break; default: - cout << sub->substype(); + cerr << sub->substype(); break; } - cout << endl; + cerr << endl; JOIN* join = sub->get_select_lex()->join; if (join) @@ -581,7 +581,7 @@ void debug_walk(const Item* item, void* arg) cond->traverse_cond(debug_walk, arg, Item::POSTFIX); } - cout << "Finish subselect item traversing" << endl; + cerr << "Finish subselect item traversing" << endl; break; } @@ -599,14 +599,14 @@ void debug_walk(const Item* item, void* arg) //ifp->cached_table->select_lex->select_number gives the select level. // could be used on alias. // could also be used to tell correlated join (equal level). - cout << "CACHED REF FIELD_ITEM: " << ifp->db_name << '.' << bestTableName(ifp) << + cerr << "CACHED REF FIELD_ITEM: " << ifp->db_name << '.' << bestTableName(ifp) << '.' << ifp->field_name << endl; break; } else if (field->type() == Item::FUNC_ITEM) { Item_func* ifp = (Item_func*)field; - cout << "CACHED REF FUNC_ITEM " << ifp->func_name() << endl; + cerr << "CACHED REF FUNC_ITEM " << ifp->func_name() << endl; } else if (field->type() == Item::REF_ITEM) { @@ -686,34 +686,34 @@ void debug_walk(const Item* item, void* arg) } } - cout << "CACHED REF_ITEM: ref type " << refType.c_str() << " real type " << realType.c_str() << endl; + cerr << "CACHED REF_ITEM: ref type " << refType.c_str() << " real type " << realType.c_str() << endl; break; } else { - cout << "REF_ITEM with CACHE_ITEM type unknown " << field->type() << endl; + cerr << "REF_ITEM with CACHE_ITEM type unknown " << field->type() << endl; } } else if (ref->real_item()->type() == Item::FIELD_ITEM) { Item_field* ifp = (Item_field*)ref->real_item(); - cout << "REF FIELD_ITEM: " << ifp->db_name << '.' << bestTableName(ifp) << '.' << + cerr << "REF FIELD_ITEM: " << ifp->db_name << '.' << bestTableName(ifp) << '.' << ifp->field_name << endl; break; } else if (ref->real_item()->type() == Item::FUNC_ITEM) { Item_func* ifp = (Item_func*)ref->real_item(); - cout << "REF FUNC_ITEM " << ifp->func_name() << endl; + cerr << "REF FUNC_ITEM " << ifp->func_name() << endl; } else if (ref->real_item()->type() == Item::WINDOW_FUNC_ITEM) { Item_window_func* ifp = (Item_window_func*)ref->real_item(); - cout << "REF WINDOW_FUNC_ITEM " << ifp->window_func()->func_name() << endl; + cerr << "REF WINDOW_FUNC_ITEM " << ifp->window_func()->func_name() << endl; } else { - cout << "UNKNOWN REF ITEM type " << ref->real_item()->type() << endl; + cerr << "UNKNOWN REF ITEM type " << ref->real_item()->type() << endl; } break; @@ -722,7 +722,7 @@ void debug_walk(const Item* item, void* arg) case Item::ROW_ITEM: { Item_row* row = (Item_row*)item; - cout << "ROW_ITEM: " << endl; + cerr << "ROW_ITEM: " << endl; for (uint32_t i = 0; i < row->cols(); i++) debug_walk(row->element_index(i), 0); @@ -732,7 +732,7 @@ void debug_walk(const Item* item, void* arg) case Item::EXPR_CACHE_ITEM: { - cout << "Expr Cache Item" << endl; + cerr << "Expr Cache Item" << endl; ((Item_cache_wrapper*)item)->get_orig_item()->traverse_cond(debug_walk, arg, Item::POSTFIX); break; } @@ -747,27 +747,27 @@ void debug_walk(const Item* item, void* arg) switch (item->result_type()) { case STRING_RESULT: - cout << "CACHE_STRING_ITEM" << endl; + cerr << "CACHE_STRING_ITEM" << endl; break; case REAL_RESULT: - cout << "CACHE_REAL_ITEM " << isp->val_real() << endl; + cerr << "CACHE_REAL_ITEM " << isp->val_real() << endl; break; case INT_RESULT: - cout << "CACHE_INT_ITEM " << isp->val_int() << endl; + cerr << "CACHE_INT_ITEM " << isp->val_int() << endl; break; case ROW_RESULT: - cout << "CACHE_ROW_ITEM" << endl; + cerr << "CACHE_ROW_ITEM" << endl; break; case DECIMAL_RESULT: - cout << "CACHE_DECIMAL_ITEM " << isp->val_decimal() << endl; + cerr << "CACHE_DECIMAL_ITEM " << isp->val_decimal() << endl; break; default: - cout << "CACHE_UNKNOWN_ITEM" << endl; + cerr << "CACHE_UNKNOWN_ITEM" << endl; break; } @@ -780,7 +780,7 @@ void debug_walk(const Item* item, void* arg) //ifp->cached_table->select_lex->select_number gives the select level. // could be used on alias. // could also be used to tell correlated join (equal level). - cout << "CACHED FIELD_ITEM: " << ifp->db_name << '.' << bestTableName(ifp) << + cerr << "CACHED FIELD_ITEM: " << ifp->db_name << '.' << bestTableName(ifp) << '.' << ifp->field_name << endl; break; } @@ -862,18 +862,18 @@ void debug_walk(const Item* item, void* arg) } } - cout << "CACHE_ITEM ref type " << refType.c_str() << " real type " << realType.c_str() << endl; + cerr << "CACHE_ITEM ref type " << refType.c_str() << " real type " << realType.c_str() << endl; break; } else if (field->type() == Item::FUNC_ITEM) { Item_func* ifp = (Item_func*)field; - cout << "CACHE_ITEM FUNC_ITEM " << ifp->func_name() << endl; + cerr << "CACHE_ITEM FUNC_ITEM " << ifp->func_name() << endl; break; } else { - cout << "CACHE_ITEM type unknown " << field->type() << endl; + cerr << "CACHE_ITEM type unknown " << field->type() << endl; } break; @@ -884,12 +884,12 @@ void debug_walk(const Item* item, void* arg) String val, *str = NULL; Item_temporal_literal* itp = (Item_temporal_literal*)item; str = itp->val_str(&val); - cout << "DATE ITEM: "; + cerr << "DATE ITEM: "; if (str) - cout << ": (" << str->ptr() << ')' << endl; + cerr << ": (" << str->ptr() << ')' << endl; else - cout << ": " << endl; + cerr << ": " << endl; break; } @@ -897,13 +897,13 @@ void debug_walk(const Item* item, void* arg) case Item::WINDOW_FUNC_ITEM: { Item_window_func* ifp = (Item_window_func*)item; - cout << "Window Function Item " << ifp->window_func()->func_name() << endl; + cerr << "Window Function Item " << ifp->window_func()->func_name() << endl; break; } default: { - cout << "UNKNOWN_ITEM type " << item->type() << endl; + cerr << "UNKNOWN_ITEM type " << item->type() << endl; break; } } @@ -1015,11 +1015,11 @@ uint32_t buildOuterJoin(gp_walk_info& gwi, SELECT_LEX& select_lex) #ifdef DEBUG_WALK_COND if (table_ptr->alias) - cout << table_ptr->alias ; + cerr << table_ptr->alias ; else if (table_ptr->alias) - cout << table_ptr->alias; + cerr << table_ptr->alias; - cout << " outer table expression: " << endl; + cerr << " outer table expression: " << endl; expr->traverse_cond(debug_walk, &gwi_outer, Item::POSTFIX); #endif expr->traverse_cond(gp_walk, &gwi_outer, Item::POSTFIX); @@ -1053,13 +1053,13 @@ uint32_t buildOuterJoin(gp_walk_info& gwi, SELECT_LEX& select_lex) Item_cond* expr = reinterpret_cast(table_ptr->embedding->on_expr); #ifdef DEBUG_WALK_COND - cout << "inner tables: " << endl; + cerr << "inner tables: " << endl; set::const_iterator it; for (it = gwi_outer.innerTables.begin(); it != gwi_outer.innerTables.end(); ++it) - cout << (*it) << " "; + cerr << (*it) << " "; - cout << endl; + cerr << endl; expr->traverse_cond(debug_walk, &gwi_outer, Item::POSTFIX); #endif expr->traverse_cond(gp_walk, &gwi_outer, Item::POSTFIX); @@ -1485,7 +1485,7 @@ bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip) // @bug5811. This filter string is for cross engine to use. // Use real table name. ifp->print(&str, QT_INFINIDB_DERIVED); - IDEBUG(cout << str.ptr() << endl); + IDEBUG(cerr << str.ptr() << endl); if (str.ptr()) cf->data(str.c_ptr()); @@ -1811,7 +1811,7 @@ bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip) (current_thd->lex->sql_command == SQLCOM_DELETE) || (current_thd->lex->sql_command == SQLCOM_DELETE_MULTI))) { - IDEBUG( cout << "deleted func with 2 const columns" << endl ); + IDEBUG( cerr << "deleted func with 2 const columns" << endl ); delete rhs; delete lhs; return false; @@ -1865,7 +1865,7 @@ bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip) if (notInner) { lhs->returnAll(true); - IDEBUG( cout << "setting returnAll on " << tan_lhs << endl); + IDEBUG( cerr << "setting returnAll on " << tan_lhs << endl); } } @@ -1882,7 +1882,7 @@ bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip) if (notInner) { rhs->returnAll(true); - IDEBUG( cout << "setting returnAll on " << tan_rhs << endl ); + IDEBUG( cerr << "setting returnAll on " << tan_rhs << endl ); } } @@ -2591,7 +2591,7 @@ CalpontSystemCatalog::ColType fieldType_MysqlToIDB (const Field* field) break; default: - IDEBUG( cout << "fieldType_MysqlToIDB:: Unknown result type of MySQL " + IDEBUG( cerr << "fieldType_MysqlToIDB:: Unknown result type of MySQL " << field->result_type() << endl ); break; } @@ -2699,7 +2699,7 @@ CalpontSystemCatalog::ColType colType_MysqlToIDB (const Item* item) break; default: - IDEBUG( cout << "colType_MysqlToIDB:: Unknown result type of MySQL " + IDEBUG( cerr << "colType_MysqlToIDB:: Unknown result type of MySQL " << item->result_type() << endl ); break; } @@ -2903,7 +2903,7 @@ ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupp { // TODO: item is a Item_cache_wrapper printf("EXPR_CACHE_ITEM in buildReturnedColumn\n"); - cout << "EXPR_CACHE_ITEM in buildReturnedColumn" << endl; + cerr << "EXPR_CACHE_ITEM in buildReturnedColumn" << endl; break; } @@ -3979,7 +3979,7 @@ ParseTree* buildParseTree(Item_func* item, gp_walk_info& gwi, bool& nonSupport) Item_cond* icp = (Item_cond*)item; #ifdef DEBUG_WALK_COND // debug - cout << "Build Parsetree: " << endl; + cerr << "Build Parsetree: " << endl; icp->traverse_cond(debug_walk, &gwi, Item::POSTFIX); #endif //@bug5044. PPSTFIX walking should always be treated as WHERE clause filter @@ -4826,7 +4826,7 @@ void gp_walk(const Item* item, void* arg) gwip->rcWorkStack.push(cc); if (str) - IDEBUG( cout << "Const F&E " << item->full_name() << " evaluate: " << valStr << endl ); + IDEBUG( cerr << "Const F&E " << item->full_name() << " evaluate: " << valStr << endl ); break; } @@ -5386,7 +5386,7 @@ void parse_item (Item* item, vector& field_vec, bool& hasNonSupport } else { - cout << "UNKNOWN REF Item" << endl; + cerr << "UNKNOWN REF Item" << endl; break; } } @@ -5458,7 +5458,7 @@ bool isInfiniDB(TABLE* table_ptr) int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool isUnion) { #ifdef DEBUG_WALK_COND - cout << "getSelectPlan()" << endl; + cerr << "getSelectPlan()" << endl; #endif // by pass the derived table resolve phase of mysql @@ -5545,7 +5545,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i while ((sj_nest = sj_list_it++)) { - cout << sj_nest->db << "." << sj_nest->table_name << endl; + cerr << sj_nest->db << "." << sj_nest->table_name << endl; } #endif @@ -5629,7 +5629,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i CalpontSystemCatalog::TableAliasName tan = make_aliastable(table_ptr->db, table_name, table_ptr->alias, infiniDB); gwi.tableMap[tan] = make_pair(0, table_ptr); #ifdef DEBUG_WALK_COND - cout << tn << endl; + cerr << tn << endl; #endif } } @@ -5709,15 +5709,15 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i distUnionNum = unionVec.size(); /*#ifdef DEBUG_WALK_COND - IDEBUG( cout << ">>>> UNION DEBUG" << endl ); + IDEBUG( cerr << ">>>> UNION DEBUG" << endl ); JOIN* join = sl->join; Item_cond* icp = 0; if (join != 0) icp = reinterpret_cast(join->conds); if (icp) icp->traverse_cond(debug_walk, &gwi, Item::POSTFIX); - IDEBUG ( cout << *plan << endl ); - IDEBUG ( cout << "<<<traverse_cond(debug_walk, &gwi, Item::POSTFIX); - cout << "------------------------------------------------\n" << endl; + cerr << "------------------------------------------------\n" << endl; #endif icp->traverse_cond(gp_walk, &gwi, Item::POSTFIX); @@ -5794,16 +5794,16 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i TABLE_LIST* curr = *tbl; if (curr->table_name) - cout << curr->table_name << " "; + cerr << curr->table_name << " "; else - cout << curr->alias << endl; + cerr << curr->alias << endl; if (curr->outer_join) - cout << " is inner table" << endl; + cerr << " is inner table" << endl; else if (curr->straight) - cout << "straight_join" << endl; + cerr << "straight_join" << endl; else - cout << "join" << endl; + cerr << "join" << endl; if (curr->nested_join) { @@ -5819,7 +5819,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i for (TABLE_LIST** tb = inner; tb < end1; tb++) { TABLE_LIST* curr1 = *tb; - cout << curr1->alias << endl; + cerr << curr1->alias << endl; if (curr1->sj_on_expr) { @@ -5894,7 +5894,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i gwi.clauseType = SELECT; #ifdef DEBUG_WALK_COND { - cout << "------------------- SELECT --------------------" << endl; + cerr << "------------------- SELECT --------------------" << endl; List_iterator_fast it(select_lex.item_list); Item* item; @@ -5903,7 +5903,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i debug_walk(item, 0); } - cout << "-----------------------------------------------\n" << endl; + cerr << "-----------------------------------------------\n" << endl; } #endif @@ -6359,7 +6359,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i } #ifdef DEBUG_WALK_COND - cout << "SELECT clause SUBSELECT Item: " << sub->substype() << endl; + cerr << "SELECT clause SUBSELECT Item: " << sub->substype() << endl; JOIN* join = sub->get_select_lex()->join; if (join) @@ -6370,7 +6370,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i cond->traverse_cond(debug_walk, &gwi, Item::POSTFIX); } - cout << "Finish SELECT clause subselect item traversing" << endl; + cerr << "Finish SELECT clause subselect item traversing" << endl; #endif SelectSubQuery* selectSub = new SelectSubQuery(gwi, sub); //selectSub->gwip(&gwi); @@ -6489,9 +6489,9 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i { Item_cond* having = reinterpret_cast(select_lex.having); #ifdef DEBUG_WALK_COND - cout << "------------------- HAVING ---------------------" << endl; + cerr << "------------------- HAVING ---------------------" << endl; having->traverse_cond(debug_walk, &gwi, Item::POSTFIX); - cout << "------------------------------------------------\n" << endl; + cerr << "------------------------------------------------\n" << endl; #endif having->traverse_cond(gp_walk, &gwi, Item::POSTFIX); @@ -7937,7 +7937,8 @@ int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti) return 0; } -int cp_get_group_plan(THD* thd, SCSEP& csep, cal_table_info& ti, cal_group_info& gi ) +// ti is useless +int cp_get_group_plan(THD* thd, SCSEP& csep, cal_group_info& gi ) { LEX* lex = thd->lex; idbassert(lex != 0); @@ -7945,7 +7946,6 @@ int cp_get_group_plan(THD* thd, SCSEP& csep, cal_table_info& ti, cal_group_info& SELECT_LEX select_lex = lex->select_lex; gp_walk_info gwi; gwi.thd = thd; - gwi.thd->infinidb_vtable.vtable_state = THD::INFINIDB_CREATE_VTABLE; int status = getGroupPlan(gwi, select_lex, csep, gi); if (status > 0) @@ -8369,15 +8369,15 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro distUnionNum = unionVec.size(); /*#ifdef DEBUG_WALK_COND - IDEBUG( cout << ">>>> UNION DEBUG" << endl ); + IDEBUG( cerr << ">>>> UNION DEBUG" << endl ); JOIN* join = sl->join; Item_cond* icp = 0; if (join != 0) icp = reinterpret_cast(join->conds); if (icp) icp->traverse_cond(debug_walk, &gwi, Item::POSTFIX); - IDEBUG ( cout << *plan << endl ); - IDEBUG ( cout << "<<<table_name) - cout << curr->table_name << " "; + cerr << curr->table_name << " "; else - cout << curr->alias << endl; + cerr << curr->alias << endl; if (curr->outer_join) - cout << " is inner table" << endl; + cerr << " is inner table" << endl; else if (curr->straight) - cout << "straight_join" << endl; + cerr << "straight_join" << endl; else - cout << "join" << endl; + cerr << "join" << endl; if (curr->nested_join) { @@ -8480,7 +8480,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro for (TABLE_LIST** tb = inner; tb < end1; tb++) { TABLE_LIST* curr1 = *tb; - cout << curr1->alias << endl; + cerr << curr1->alias << endl; if (curr1->sj_on_expr) { @@ -8564,7 +8564,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro debug_walk(item, 0); } - cout << "-----------------------------------------------\n" << endl; + cerr << "-----------------------------------------------\n" << endl; } #endif diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index c8a02713a..d279d995e 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -276,6 +276,104 @@ void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType& } } +void storeNumericFieldGroupBy(Field** f, int64_t value, CalpontSystemCatalog::ColType& ct) +{ + // unset null bit first + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + // For unsigned, use the ColType returned in the row rather than the + // unsigned_flag set by mysql. This is because mysql gets it wrong for SUM() + // Hopefully, in all other cases we get it right. + switch ((*f)->type()) + { + case MYSQL_TYPE_NEWDECIMAL: + { + Field_new_decimal* f2 = (Field_new_decimal*)*f; + + // @bug4388 stick to InfiniDB's scale in case mysql gives wrong scale due + // to create vtable limitation. + if (f2->dec < ct.scale) + f2->dec = ct.scale; + + char buf[256]; + dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, buf, 256, ct.colDataType); + f2->store(buf, strlen(buf), f2->charset()); + break; + } + + case MYSQL_TYPE_TINY: //TINYINT type + { + Field_tiny* f2 = (Field_tiny*)*f; + longlong int_val = (longlong)value; + f2->store(int_val, f2->unsigned_flag); + break; + } + + case MYSQL_TYPE_SHORT: //SMALLINT type + { + Field_short* f2 = (Field_short*)*f; + longlong int_val = (longlong)value; + f2->store(int_val, f2->unsigned_flag); + break; + } + + case MYSQL_TYPE_LONG: //INT type + { + Field_long* f2 = (Field_long*)*f; + longlong int_val = (longlong)value; + f2->store(int_val, f2->unsigned_flag); + break; + } + + case MYSQL_TYPE_LONGLONG: //BIGINT type + { + Field_longlong* f2 = (Field_longlong*)*f; + longlong int_val = (longlong)value; + f2->store(int_val, f2->unsigned_flag); + break; + } + + case MYSQL_TYPE_FLOAT: // FLOAT type + { + Field_float* f2 = (Field_float*)*f; + float float_val = *(float*)(&value); + f2->store(float_val); + break; + } + + case MYSQL_TYPE_DOUBLE: // DOUBLE type + { + Field_double* f2 = (Field_double*)*f; + double double_val = *(double*)(&value); + f2->store(double_val); + break; + } + + case MYSQL_TYPE_VARCHAR: + { + Field_varstring* f2 = (Field_varstring*)*f; + char tmp[25]; + + if (ct.colDataType == CalpontSystemCatalog::DECIMAL) + dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, tmp, 25, ct.colDataType); + else + snprintf(tmp, 25, "%ld", value); + + f2->store(tmp, strlen(tmp), f2->charset()); + break; + } + + default: + { + Field_longlong* f2 = (Field_longlong*)*f; + longlong int_val = (longlong)value; + f2->store(int_val, f2->unsigned_flag); + break; + } + } +} + // // @bug 2244. Log exception related to lost connection to ExeMgr. // Log exception error from calls to sm::tpl_scan_fetch in fetchNextRow() @@ -761,7 +859,7 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci) return rc; } -int fetchNextRowGrHand(cal_table_info& ti, cal_connection_info* ci) +int fetchNextRowGrHandGroupBy(cal_table_info& ti, cal_connection_info* ci) { int rc = HA_ERR_END_OF_FILE; int num_attr = ti.msTablePtr->s->fields; @@ -1193,6 +1291,442 @@ int fetchNextRowGrHand(cal_table_info& ti, cal_connection_info* ci) return rc; } +int fetchNextRowGrHand(cal_table_info& ti, cal_connection_info* ci) +{ + int rc = HA_ERR_END_OF_FILE; + int num_attr = ti.msTablePtr->s->fields; + sm::status_t sm_stat; + + try + { + if (ti.conn_hndl) + { + sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ti.conn_hndl); + } + else if (ci->cal_conn_hndl) + { + sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ci->cal_conn_hndl, (int*)(¤t_thd->killed)); + } + else + throw runtime_error("internal error"); + } + catch (std::exception& ex) + { +// @bug 2244. Always log this msg for now, as we try to track down when/why we are +// losing socket connection with ExeMgr +//#ifdef INFINIDB_DEBUG + tpl_scan_fetch_LogException( ti, ci, &ex); +//#endif + sm_stat = sm::CALPONT_INTERNAL_ERROR; + } + catch (...) + { +// @bug 2244. Always log this msg for now, as we try to track down when/why we are +// losing socket connection with ExeMgr +//#ifdef INFINIDB_DEBUG + tpl_scan_fetch_LogException( ti, ci, 0 ); +//#endif + sm_stat = sm::CALPONT_INTERNAL_ERROR; + } + + if (sm_stat == sm::STATUS_OK) + { + Field** f; + f = ti.msTablePtr->field; + //set all fields to null in null col bitmap + //memset(buf, -1, ti.msTablePtr->s->null_bytes); + std::vector& colTypes = ti.tpl_scan_ctx->ctp; + int64_t intColVal = 0; + uint64_t uintColVal = 0; + char tmp[256]; + + RowGroup* rowGroup = ti.tpl_scan_ctx->rowGroup; + + // table mode mysql expects all columns of the table. mapping between columnoid and position in rowgroup + // set coltype.position to be the position in rowgroup. only set once. + if (ti.tpl_scan_ctx->rowsreturned == 0 && + (ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF)) + { + for (uint32_t i = 0; i < rowGroup->getColumnCount(); i++) + { + int oid = rowGroup->getOIDs()[i]; + int j = 0; + + for (; j < num_attr; j++) + { + // mysql should haved eliminated duplicate projection columns + if (oid == colTypes[j].columnOID || oid == colTypes[j].ddn.dictOID) + { + colTypes[j].colPosition = i; + break; + } + } + } + } + + rowgroup::Row row; + rowGroup->initRow(&row); + rowGroup->getRow(ti.tpl_scan_ctx->rowsreturned, &row); + int s; + + for (int p = 0; p < num_attr; p++, f++) + { + //This col is going to be written + bitmap_set_bit(ti.msTablePtr->write_set, (*f)->field_index); + + // get coltype if not there yet + if (colTypes[0].colWidth == 0) + { + for (short c = 0; c < num_attr; c++) + { + colTypes[c].colPosition = c; + colTypes[c].colWidth = rowGroup->getColumnWidth(c); + colTypes[c].colDataType = rowGroup->getColTypes()[c]; + colTypes[c].columnOID = rowGroup->getOIDs()[c]; + colTypes[c].scale = rowGroup->getScale()[c]; + colTypes[c].precision = rowGroup->getPrecision()[c]; + } + } + + CalpontSystemCatalog::ColType colType(colTypes[p]); + + // table mode handling + if (ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF) + { + if (colType.colPosition == -1) // not projected by tuplejoblist + continue; + else + s = colType.colPosition; + } + else + { + s = p; + } + + // precision == -16 is borrowed as skip null check indicator for bit ops. + if (row.isNullValue(s) && colType.precision != -16) + { + // @2835. Handle empty string and null confusion. store empty string for string column + if (colType.colDataType == CalpontSystemCatalog::CHAR || + colType.colDataType == CalpontSystemCatalog::VARCHAR || + colType.colDataType == CalpontSystemCatalog::VARBINARY) + { + Field_varstring* f2 = (Field_varstring*)*f; + f2->store(tmp, 0, f2->charset()); + } + + continue; + } + + // fetch and store data + switch (colType.colDataType) + { + case CalpontSystemCatalog::DATE: + { + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + intColVal = row.getUintField<4>(s); + DataConvert::dateToString(intColVal, tmp, 255); + Field_varstring* f2 = (Field_varstring*)*f; + f2->store(tmp, strlen(tmp), f2->charset()); + f2->set_notnull(); + break; + } + + case CalpontSystemCatalog::DATETIME: + { + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + intColVal = row.getUintField<8>(s); + DataConvert::datetimeToString(intColVal, tmp, 255); + + /* setting the field_length is a sort-of hack. The length + * at this point can be long enough to include mseconds. + * ColumnStore doesn't fully support mseconds yet so if + * they are requested, trim them off. + * At a later date we should set this more intelligently + * based on the result set. + */ + /* MCOL-683: UTF-8 datetime no msecs is 57, this sometimes happens! */ + if (((*f)->field_length > 19) && ((*f)->field_length != 57)) + (*f)->field_length = strlen(tmp); + + Field_varstring* f2 = (Field_varstring*)*f; + f2->store(tmp, strlen(tmp), f2->charset()); + f2->set_notnull(); + break; + } + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::VARCHAR: + { + Field_varstring* f2 = (Field_varstring*)*f; + + switch (colType.colWidth) + { + case 1: + intColVal = row.getUintField<1>(s); + f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset()); + break; + + case 2: + intColVal = row.getUintField<2>(s); + f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset()); + break; + + case 4: + intColVal = row.getUintField<4>(s); + f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset()); + break; + + case 8: + //make sure we don't send strlen off into the weeds... + intColVal = row.getUintField<8>(s); + memcpy(tmp, &intColVal, 8); + tmp[8] = 0; + f2->store(tmp, strlen(tmp), f2->charset()); + break; + + default: + f2->store((const char*)row.getStringPointer(s), row.getStringLength(s), f2->charset()); + } + + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + f2->set_notnull(); + break; + } + + case CalpontSystemCatalog::VARBINARY: + { + Field_varstring* f2 = (Field_varstring*)*f; + + if (current_thd->variables.infinidb_varbin_always_hex) + { + uint32_t l; + const uint8_t* p = row.getVarBinaryField(l, s); + uint32_t ll = l * 2; + boost::scoped_array sca(new char[ll]); + vbin2hex(p, l, sca.get()); + f2->store(sca.get(), ll, f2->charset()); + } + else + f2->store((const char*)row.getVarBinaryField(s), row.getVarBinaryLength(s), f2->charset()); + f2->set_notnull(); + + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + break; + } + + case CalpontSystemCatalog::BIGINT: + { + intColVal = row.getIntField<8>(s); + storeNumericFieldGroupBy(f, intColVal, colType); + break; + } + + case CalpontSystemCatalog::UBIGINT: + { + uintColVal = row.getUintField<8>(s); + storeNumericFieldGroupBy(f, uintColVal, colType); + break; + } + + case CalpontSystemCatalog::INT: + { + intColVal = row.getIntField<4>(s); + storeNumericFieldGroupBy(f, intColVal, colType); + break; + } + + case CalpontSystemCatalog::UINT: + { + uintColVal = row.getUintField<4>(s); + storeNumericFieldGroupBy(f, uintColVal, colType); + break; + } + + case CalpontSystemCatalog::SMALLINT: + { + intColVal = row.getIntField<2>(s); + storeNumericFieldGroupBy(f, intColVal, colType); + break; + } + + case CalpontSystemCatalog::USMALLINT: + { + uintColVal = row.getUintField<2>(s); + storeNumericFieldGroupBy(f, uintColVal, colType); + break; + } + + case CalpontSystemCatalog::TINYINT: + { + intColVal = row.getIntField<1>(s); + storeNumericFieldGroupBy(f, intColVal, colType); + break; + } + + case CalpontSystemCatalog::UTINYINT: + { + uintColVal = row.getUintField<1>(s); + storeNumericFieldGroupBy(f, uintColVal, colType); + break; + } + + //In this case, we're trying to load a double output column with float data. This is the + // case when you do sum(floatcol), e.g. + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: + { + float dl = row.getFloatField(s); + + if (dl == std::numeric_limits::infinity()) + continue; + + //int64_t* icvp = (int64_t*)&dl; + //intColVal = *icvp; + Field_float* f2 = (Field_float*)*f; + // bug 3485, reserve enough space for the longest float value + // -3.402823466E+38 to -1.175494351E-38, 0, and + // 1.175494351E-38 to 3.402823466E+38. + (*f)->field_length = 40; + + //float float_val = *(float*)(&value); + //f2->store(float_val); + if (f2->decimals() < (uint32_t)row.getScale(s)) + f2->dec = (uint32_t)row.getScale(s); + + f2->store(dl); + f2->set_notnull(); + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + break; + + //storeNumericField(f, intColVal, colType); + //break; + } + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: + { + double dl = row.getDoubleField(s); + + if (dl == std::numeric_limits::infinity()) + continue; + + Field_double* f2 = (Field_double*)*f; + // bug 3483, reserve enough space for the longest double value + // -1.7976931348623157E+308 to -2.2250738585072014E-308, 0, and + // 2.2250738585072014E-308 to 1.7976931348623157E+308. + (*f)->field_length = 310; + + //double double_val = *(double*)(&value); + //f2->store(double_val); + if (f2->decimals() < (uint32_t)row.getScale(s)) + f2->dec = (uint32_t)row.getScale(s); + + f2->store(dl); + f2->set_notnull(); + + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + break; + + + //int64_t* icvp = (int64_t*)&dl; + //intColVal = *icvp; + //storeNumericField(f, intColVal, colType); + //break; + } + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + intColVal = row.getIntField(s); + storeNumericFieldGroupBy(f, intColVal, colType); + break; + } + + case CalpontSystemCatalog::BLOB: + case CalpontSystemCatalog::TEXT: + { + Field_blob* f2 = (Field_blob*)*f; + f2->set_ptr(row.getVarBinaryLength(s), (unsigned char*)row.getVarBinaryField(s)); + + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + + break; + } + + default: // treat as int64 + { + intColVal = row.getUintField<8>(s); + storeNumericFieldGroupBy(f, intColVal, colType); + break; + } + } + } + + ti.tpl_scan_ctx->rowsreturned++; + ti.c++; +#ifdef INFINIDB_DEBUG + + if ((ti.c % 1000000) == 0) + cerr << "fetchNextRow so far table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl; + +#endif + ti.moreRows = true; + rc = 0; + } + else if (sm_stat == sm::SQL_NOT_FOUND) + { + IDEBUG( cerr << "fetchNextRow done for table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl ); + ti.c = 0; + ti.moreRows = false; + rc = HA_ERR_END_OF_FILE; + } + else if (sm_stat == sm::CALPONT_INTERNAL_ERROR) + { + ti.moreRows = false; + rc = ER_INTERNAL_ERROR; + ci->rc = rc; + } + else if ((uint32_t)sm_stat == logging::ERR_LOST_CONN_EXEMGR) + { + ti.moreRows = false; + rc = logging::ERR_LOST_CONN_EXEMGR; + sm::sm_init(tid2sid(current_thd->thread_id), &ci->cal_conn_hndl, + current_thd->variables.infinidb_local_query); + idbassert(ci->cal_conn_hndl != 0); + ci->rc = rc; + } + else if (sm_stat == sm::SQL_KILLED) + { + // query was aborted by the user. treat it the same as limit query. close + // connection after rnd_close. + ti.c = 0; + ti.moreRows = false; + rc = HA_ERR_END_OF_FILE; + ci->rc = rc; + } + else + { + ti.moreRows = false; + rc = sm_stat; + ci->rc = rc; + } + + return rc; +} + void makeUpdateScalarJoin(const ParseTree* n, void* obj) { TreeNode* tn = n->data(); @@ -5405,7 +5939,7 @@ int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos) ***********************************************************/ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table) { - //first_row= true; + //first_row= true; string tableName = group_hand->table_list->table->s->table_name.str; IDEBUG( cout << "group_by_init for table " << group_hand->table_list->table->s->table_name.str << endl ); @@ -5548,6 +6082,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE bool localQuery = (thd->variables.infinidb_local_query > 0 ? true : false); + /* // table mode if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) { @@ -5626,8 +6161,9 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE if (ci->tableMap.size() == 1) ti.csep->data(idb_mysql_query_str(thd)); } + */ // vtable mode - else + //else { //if (!ci->cal_conn_hndl || thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE) if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE) @@ -5702,6 +6238,23 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE csep->verID(verID); csep->sessionID(sessionID); + if (group_hand->table_list->db_length) + csep->schemaName(group_hand->table_list->db); + + csep->traceFlags(ci->traceFlags); + //ti.msTablePtr = group_hand->table_list->table; + + gi.groupByTables = group_hand->table_list; + gi.groupByFields = group_hand->select; + gi.groupByWhere = group_hand->where; + gi.groupByGroup = group_hand->group_by; + gi.groupByOrder = group_hand->order_by; + gi.groupByHaving = group_hand->having; + gi.groupByDistinct = group_hand->distinct; + + // send plan whenever group_init is called + int status = cp_get_group_plan(thd, csep, gi); +/* if (thd->db) csep->schemaName(thd->db); @@ -5712,7 +6265,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE //get plan int status = cp_get_plan(thd, csep); - +*/ //if (cp_get_plan(thd, csep) != 0) if (status > 0) goto internal_error; @@ -5887,6 +6440,9 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE ti = ci->tableMap[table]; ti.msTablePtr = table; + // MCOL-1052 + thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE; + if ((thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE) || (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) || (thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_QUERY)) @@ -6037,9 +6593,8 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE return ER_INTERNAL_ERROR; // @bug 3005 - if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE && - string(table->s->table_name.str).find("$vtable") != 0) - return HA_ERR_END_OF_FILE; + //if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE) + // return HA_ERR_END_OF_FILE; if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) @@ -6051,13 +6606,13 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE // @bug 2232. Basic SP support // @bug 3939. Only error out for sp with select. Let pass for alter table in sp. - if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE) + /*if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE) { setError(thd, ER_CHECK_NOT_IMPLEMENTED, "This stored procedure syntax is not supported by Columnstore in this version"); thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR; return ER_INTERNAL_ERROR; } - + */ if (!thd->infinidb_vtable.cal_conn_info) thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); @@ -6105,8 +6660,9 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE try { - rc = fetchNextRowGrHand(ti, ci); - rc = 0; + rc = fetchNextRowGrHandGroupBy(ti, ci); + // MCOL-1052 + //rc = 0; } catch (std::exception& e) { @@ -6143,7 +6699,189 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table) { - return 0; + int rc = 0; + THD* thd = current_thd; + cal_connection_info* ci = NULL; + + + if (thd->slave_thread && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + + thd->infinidb_vtable.isNewQuery = true; + + if (thd->infinidb_vtable.cal_conn_info) + ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY ) + { + thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE;// flip back to normal state + return rc; + } + +// if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_PHASE1) +// return rc; +/* + if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE ) + return rc; + + if (((thd->lex)->sql_command == SQLCOM_UPDATE) || + ((thd->lex)->sql_command == SQLCOM_DELETE) || + ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || + ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) + return rc; +*/ + if (((thd->lex)->sql_command == SQLCOM_INSERT) || + ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) ) + { + // @bug 4022. error handling for select part of dml + if (ci->cal_conn_hndl && ci->rc) + { + // send ExeMgr a signal before closing the connection + ByteStream msg; + ByteStream::quadbyte qb = 0; + msg << qb; + + try + { + ci->cal_conn_hndl->exeMgr->write(msg); + } + catch (...) + { + // this is error handling, so ignore connection failure. + } + + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + return rc; + } + } + + if (!ci) + { + thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + } + + // @bug 3078. Also session limit variable works the same as ctrl+c + if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD || + ((thd->lex)->sql_command != SQLCOM_INSERT && + (thd->lex)->sql_command != SQLCOM_INSERT_SELECT && + thd->variables.select_limit != (uint64_t) - 1)) + { + if (ci->cal_conn_hndl) + { + // send ExeMgr a signal before closing the connection + ByteStream msg; + ByteStream::quadbyte qb = 0; + msg << qb; + + try + { + ci->cal_conn_hndl->exeMgr->write(msg); + } + catch (...) + { + // this is the end of query. Ignore the exception if exemgr connection failed + // for whatever reason. + } + + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + // clear querystats because no query stats available for cancelled query + ci->queryStats = ""; + } + + return 0; + } + + IDEBUG( cerr << "group_by_end for table " << table->s->table_name.str << endl ); + + cal_table_info ti = ci->tableMap[table]; + sm::cpsm_conhdl_t* hndl; + + //if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) + // hndl = ti.conn_hndl; + //else + hndl = ci->cal_conn_hndl; + + if (ti.tpl_ctx) + { + if (ti.tpl_scan_ctx.get()) + { + try + { + sm::tpl_scan_close(ti.tpl_scan_ctx); + } + catch (...) + { + rc = ER_INTERNAL_ERROR; + } + } + + ti.tpl_scan_ctx.reset(); + + try + { + sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats); + + // set conn hndl back. could be changed in tpl_close + //if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) + // ti.conn_hndl = hndl; + //else + ci->cal_conn_hndl = hndl; + + ti.tpl_ctx = 0; + } + catch (IDBExcept& e) + { + if (e.errorCode() == ERR_CROSS_ENGINE_CONNECT || e.errorCode() == ERR_CROSS_ENGINE_CONFIG) + { + string msg = string("Columnstore Query Stats - ") + e.what(); + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str()); + } + else + { + setError(thd, ER_INTERNAL_ERROR, e.what()); + rc = ER_INTERNAL_ERROR; + } + } + catch (std::exception& e) + { + setError(thd, ER_INTERNAL_ERROR, e.what()); + rc = ER_INTERNAL_ERROR; + } + catch (...) + { + setError(thd, ER_INTERNAL_ERROR, "Internal error throwed in rnd_end"); + rc = ER_INTERNAL_ERROR; + } + } + + ti.tpl_ctx = 0; + + /* + if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE && + thd->infinidb_vtable.has_order_by) + thd->infinidb_vtable.vtable_state = THD::INFINIDB_ORDER_BY; + */ + ci->tableMap[table] = ti; + + // push warnings from CREATE phase + if (!ci->warningMsg.empty()) + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, ci->warningMsg.c_str()); + + ci->warningMsg.clear(); + // reset expressionId just in case + ci->expressionId = 0; + return rc; } // vim:sw=4 ts=4: diff --git a/dbcon/mysql/ha_calpont_impl_if.h b/dbcon/mysql/ha_calpont_impl_if.h index 7c12c8b2c..21560a4a0 100644 --- a/dbcon/mysql/ha_calpont_impl_if.h +++ b/dbcon/mysql/ha_calpont_impl_if.h @@ -318,7 +318,7 @@ const std::string infinidb_err_msg = "\nThe query includes syntax that is not su int cp_get_plan(THD* thd, execplan::SCSEP& csep); int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_info& ti); -int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_info& ti,cal_impl_if::cal_group_info& gi); +int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_info& gi); int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false); int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep,cal_group_info& gi, bool isUnion = false); void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi);