diff --git a/dbcon/mysql/CMakeLists.txt b/dbcon/mysql/CMakeLists.txt index ae8f30622..fa2df6eaa 100644 --- a/dbcon/mysql/CMakeLists.txt +++ b/dbcon/mysql/CMakeLists.txt @@ -4,6 +4,8 @@ include_directories( ${ENGINE_COMMON_INCLUDES} SET ( libcalmysql_SRCS + ha_mcs_sysvars.cpp + ha_mcs_client_udfs.cpp ha_calpont.cpp ha_calpont_impl.cpp ha_calpont_dml.cpp diff --git a/dbcon/mysql/ha_calpont.cpp b/dbcon/mysql/ha_calpont.cpp index 6ea5f85a6..8dc96942f 100644 --- a/dbcon/mysql/ha_calpont.cpp +++ b/dbcon/mysql/ha_calpont.cpp @@ -16,97 +16,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -// $Id: ha_calpont.cpp 9642 2013-06-24 14:57:42Z rdempsey $ - -/* Copyright (C) 2003 MySQL AB - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -/** - @file ha_example.cc - - @brief - The ha_example engine is a stubbed storage engine for example purposes only; - it does nothing at this point. Its purpose is to provide a source - code illustration of how to begin writing new storage engines; see also - /storage/example/ha_example.h. - - @details - ha_example will let you create/open/delete tables, but - nothing further (for example, indexes are not supported nor can data - be stored in the table). Use this example as a template for - implementing the same functionality in your own storage engine. You - can enable the example storage engine in your build by doing the - following during your build process:
./configure - --with-example-storage-engine - - Once this is done, MySQL will let you create tables with:
- CREATE TABLE (...) ENGINE=EXAMPLE; - - The example storage engine is set up to use table locks. It - implements an example "SHARE" that is inserted into a hash by table - name. You can use this to store information of state that any - example handler object will be able to see when it is using that - table. - - Please read the object definition in ha_example.h before reading the rest - of this file. - - @note - When you create an EXAMPLE table, the MySQL Server creates a table .frm - (format) file in the database directory, using the table name as the file - name as is customary with MySQL. No other files are created. To get an idea - of what occurs, here is an example select that would do a scan of an entire - table: - - @code - ha_example::store_lock - ha_example::external_lock - ha_example::info - ha_example::rnd_init - ha_example::extra - ENUM HA_EXTRA_CACHE Cache record in HA_rrnd() - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::extra - ENUM HA_EXTRA_NO_CACHE End caching of records (def) - ha_example::external_lock - ha_example::extra - ENUM HA_EXTRA_RESET Reset database to after open - @endcode - - Here you see that the example storage engine has 9 rows called before - rnd_next signals that it has reached the end of its data. Also note that - the table in question was already opened; had it not been open, a call to - ha_example::open() would also have been necessary. Calls to - ha_example::extra() are hints as to what will be occuring to the request. - - A Longer Example can be found called the "Skeleton Engine" which can be - found on TangentOrg. It has both an engine and a full build environment - for building a pluggable storage engine. - - Happy coding!
- -Brian -*/ - #include "ha_calpont.h" #include "columnstoreversion.h" @@ -1101,44 +1010,6 @@ struct st_mysql_storage_engine columnstore_storage_engine = struct st_mysql_storage_engine infinidb_storage_engine = { MYSQL_HANDLERTON_INTERFACE_VERSION }; -#if 0 -static ulong srv_enum_var = 0; -static ulong srv_ulong_var = 0; - -const char* enum_var_names[] = -{ - "e1", "e2", NullS -}; - -TYPELIB enum_var_typelib = -{ - array_elements(enum_var_names) - 1, "enum_var_typelib", - enum_var_names, NULL -}; - -static MYSQL_SYSVAR_ENUM( - enum_var, // name - srv_enum_var, // varname - PLUGIN_VAR_RQCMDARG, // opt - "Sample ENUM system variable.", // comment - NULL, // check - NULL, // update - 0, // def - &enum_var_typelib); // typelib - -static MYSQL_SYSVAR_ULONG( - ulong_var, - srv_ulong_var, - PLUGIN_VAR_RQCMDARG, - "0..1000", - NULL, - NULL, - 8, - 0, - 1000, - 0); -#endif - /*@brief check_walk - It traverses filter conditions*/ /************************************************************ * DESCRIPTION: @@ -1368,14 +1239,6 @@ int ha_calpont_group_by_handler::end_scan() DBUG_RETURN(rc); } - -static struct st_mysql_sys_var* calpont_system_variables[] = -{ -// MYSQL_SYSVAR(enum_var), -// MYSQL_SYSVAR(ulong_var), - NULL -}; - mysql_declare_plugin(columnstore) { MYSQL_STORAGE_ENGINE_PLUGIN, @@ -1388,7 +1251,7 @@ mysql_declare_plugin(columnstore) columnstore_done_func, /* Plugin Deinit */ 0x0100 /* 1.0 */, NULL, /* status variables */ - calpont_system_variables, /* system variables */ + mcs_system_variables, /* system variables */ NULL, /* reserved */ 0 /* config flags */ }, @@ -1403,7 +1266,7 @@ mysql_declare_plugin(columnstore) infinidb_done_func, /* Plugin Deinit */ 0x0100 /* 1.0 */, NULL, /* status variables */ - calpont_system_variables, /* system variables */ + mcs_system_variables, /* system variables */ NULL, /* reserved */ 0 /* config flags */ } @@ -1419,9 +1282,9 @@ maria_declare_plugin(columnstore) columnstore_init_func, columnstore_done_func, 0x0100, /* 1.0 */ - NULL, /* status variables */ - calpont_system_variables, /* system variables */ - "1.0", /* string version */ + NULL, /* status variables */ + mcs_system_variables, /* system variables */ + "1.0", /* string version */ MariaDB_PLUGIN_MATURITY_STABLE /* maturity */ }, { @@ -1434,10 +1297,10 @@ maria_declare_plugin(columnstore) infinidb_init_func, infinidb_done_func, 0x0100, /* 1.0 */ - NULL, /* status variables */ - calpont_system_variables, /* system variables */ - "1.0", /* string version */ - MariaDB_PLUGIN_MATURITY_STABLE /* maturity */ + NULL, /* status variables */ + mcs_system_variables, /* system variables */ + "1.0", /* string version */ + MariaDB_PLUGIN_MATURITY_STABLE /* maturity */ } maria_declare_plugin_end; diff --git a/dbcon/mysql/ha_calpont.h b/dbcon/mysql/ha_calpont.h index e618ed4f0..bab756851 100644 --- a/dbcon/mysql/ha_calpont.h +++ b/dbcon/mysql/ha_calpont.h @@ -15,35 +15,16 @@ along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -/** @file ha_example.h - - @brief - The ha_example engine is a stubbed storage engine for example purposes only; - it does nothing at this point. Its purpose is to provide a source - code illustration of how to begin writing new storage engines; see also - /storage/example/ha_example.cc. - - @note - Please read ha_example.cc before reading this file. - Reminder: The example storage engine implements all methods that are *required* - to be implemented. For a full list of all methods that you can implement, see - handler.h. - - @see - /sql/handler.h and /storage/example/ha_example.cc -*/ - -// $Id: ha_calpont.h 9210 2013-01-21 14:10:42Z rdempsey $ - #ifndef HA_CALPONT_H__ #define HA_CALPONT_H__ #include #include "idb_mysql.h" +#include "ha_mcs_sysvars.h" extern handlerton* calpont_hton; /** @brief - EXAMPLE_SHARE is a structure that will be shared among all open handlers. + This structure will be shared among all open handlers. This example implements the minimum of what you will probably need. */ typedef struct st_calpont_share diff --git a/dbcon/mysql/ha_calpont_ddl.cpp b/dbcon/mysql/ha_calpont_ddl.cpp index dd6206869..62e372635 100644 --- a/dbcon/mysql/ha_calpont_ddl.cpp +++ b/dbcon/mysql/ha_calpont_ddl.cpp @@ -49,6 +49,7 @@ using namespace std; #include using namespace boost; +#include "ha_mcs_sysvars.h" #include "idb_mysql.h" #include "ha_calpont_impl_if.h" @@ -679,10 +680,10 @@ int ProcessDDLStatement(string& ddlStatement, string& schema, const string& tabl IDBCompressInterface idbCompress; parser.Parse(ddlStatement.c_str()); - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); if (parser.Good()) { @@ -2144,7 +2145,7 @@ int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO* return 1; } - int compressiontype = thd->variables.infinidb_compression_type; + int compressiontype = get_compression_type(thd); if (compressiontype == 1) compressiontype = 2; @@ -2156,7 +2157,7 @@ int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO* } if ( compressiontype == MAX_INT ) - compressiontype = thd->variables.infinidb_compression_type; + compressiontype = get_compression_type(thd); else if ( compressiontype < 0 ) { string emsg = IDBErrorInfo::instance()->errorMsg(ERR_INVALID_COMPRESSION_TYPE); @@ -2489,12 +2490,12 @@ extern "C" if ( thd->db.length ) db = thd->db.str; - int compressiontype = thd->variables.infinidb_compression_type; + int compressiontype = get_compression_type(thd); if (compressiontype == 1) compressiontype = 2; if ( compressiontype == MAX_INT ) - compressiontype = thd->variables.infinidb_compression_type; + compressiontype = get_compression_type(thd); //hdfs if ((compressiontype == 0) && (useHdfs)) diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index 914743a25..6e9755707 100644 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -58,6 +58,7 @@ using namespace logging; #include "idb_mysql.h" #include "ha_calpont_impl_if.h" +#include "ha_mcs_sysvars.h" #include "ha_subquery.h" //#include "ha_view.h" using namespace cal_impl_if; @@ -1442,10 +1443,10 @@ bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip) } } - if (!(gwip->thd->infinidb_vtable.cal_conn_info)) - gwip->thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(gwip->thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); if (ifp->functype() == Item_func::BETWEEN) @@ -2466,10 +2467,10 @@ void setError(THD* thd, uint32_t errcode, string errmsg) thd->infinidb_vtable.override_largeside_estimate = false; // reset expressionID - if (!(thd->infinidb_vtable.cal_conn_info)) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); ci->expressionId = 0; } @@ -3104,10 +3105,10 @@ ArithmeticColumn* buildArithmeticColumn( bool& nonSupport, bool pushdownHand) { - if (!(gwi.thd->infinidb_vtable.cal_conn_info)) - gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(gwi.thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); ArithmeticColumn* ac = new ArithmeticColumn(); Item** sfitempp = item->arguments(); @@ -3275,7 +3276,7 @@ ArithmeticColumn* buildArithmeticColumn( //idbassert(pt->left() && pt->right() && pt->left()->data() && pt->right()->data()); CalpontSystemCatalog::ColType mysql_type = colType_MysqlToIDB(item); - if (current_thd->variables.infinidb_double_for_decimal_math == 1) + if (get_double_for_decimal_math(current_thd) == true) aop->adjustResultType(mysql_type); else aop->resultType(mysql_type); @@ -3329,10 +3330,10 @@ ReturnedColumn* buildFunctionColumn( bool& nonSupport, bool pushdownHand) { - if (!(gwi.thd->infinidb_vtable.cal_conn_info)) - gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(gwi.thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); string funcName = ifp->func_name(); FuncExp* funcExp = FuncExp::instance(); @@ -3801,10 +3802,10 @@ ReturnedColumn* buildFunctionColumn( FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonSupport) { - if (!(gwi.thd->infinidb_vtable.cal_conn_info)) - gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(gwi.thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); FunctionColumn* fc = new FunctionColumn(); FunctionParm funcParms; @@ -4194,10 +4195,10 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) vector selCols; vector orderCols; bool bIsConst = false; - if (!(gwi.thd->infinidb_vtable.cal_conn_info)) - gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(gwi.thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); Item_sum* isp = reinterpret_cast(item); Item** sfitempp = isp->get_orig_args(); @@ -5775,7 +5776,8 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i return ER_CHECK_NOT_IMPLEMENTED; } - gwi.internalDecimalScale = (gwi.thd->variables.infinidb_use_decimal_scale ? gwi.thd->variables.infinidb_decimal_scale : -1); + gwi.internalDecimalScale = (get_use_decimal_scale(gwi.thd) ? get_decimal_scale(gwi.thd) : -1); + gwi.subSelectType = csep->subType(); JOIN* join = select_lex.join; @@ -5808,25 +5810,25 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i // @bug 2123. Override large table estimate if infinidb_ordered hint was used. // @bug 2404. Always override if the infinidb_ordered_only variable is turned on. - if (gwi.thd->infinidb_vtable.override_largeside_estimate || gwi.thd->variables.infinidb_ordered_only) + if (gwi.thd->infinidb_vtable.override_largeside_estimate || get_ordered_only(gwi.thd)) csep->overrideLargeSideEstimate(true); // @bug 5741. Set a flag when in Local PM only query mode - csep->localQuery(gwi.thd->variables.infinidb_local_query); + csep->localQuery(get_local_query(gwi.thd)); // @bug 3321. Set max number of blocks in a dictionary file to be scanned for filtering - csep->stringScanThreshold(gwi.thd->variables.infinidb_string_scan_threshold); + csep->stringScanThreshold(get_string_scan_threshold(gwi.thd)); - csep->stringTableThreshold(gwi.thd->variables.infinidb_stringtable_threshold); + csep->stringTableThreshold(get_stringtable_threshold(gwi.thd)); - csep->djsSmallSideLimit(gwi.thd->variables.infinidb_diskjoin_smallsidelimit * 1024ULL * 1024); - csep->djsLargeSideLimit(gwi.thd->variables.infinidb_diskjoin_largesidelimit * 1024ULL * 1024); - csep->djsPartitionSize(gwi.thd->variables.infinidb_diskjoin_bucketsize * 1024ULL * 1024); + csep->djsSmallSideLimit(get_diskjoin_smallsidelimit(gwi.thd) * 1024ULL * 1024); + csep->djsLargeSideLimit(get_diskjoin_largesidelimit(gwi.thd) * 1024ULL * 1024); + csep->djsPartitionSize(get_diskjoin_bucketsize(gwi.thd) * 1024ULL * 1024); - if (gwi.thd->variables.infinidb_um_mem_limit == 0) + if (get_um_mem_limit(gwi.thd) == 0) csep->umMemLimit(numeric_limits::max()); else - csep->umMemLimit(gwi.thd->variables.infinidb_um_mem_limit * 1024ULL * 1024); + csep->umMemLimit(get_um_mem_limit(gwi.thd) * 1024ULL * 1024); // populate table map and trigger syscolumn cache for all the tables (@bug 1637). // all tables on FROM list must have at least one col in colmap @@ -8241,7 +8243,7 @@ int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti) csep->tableList(tblist); // @bug 3321. Set max number of blocks in a dictionary file to be scanned for filtering - csep->stringScanThreshold(gwi->thd->variables.infinidb_string_scan_threshold); + csep->stringScanThreshold(get_string_scan_threshold(gwi->thd)); return 0; } @@ -8339,7 +8341,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro return ER_CHECK_NOT_IMPLEMENTED; } - gwi.internalDecimalScale = (gwi.thd->variables.infinidb_use_decimal_scale ? gwi.thd->variables.infinidb_decimal_scale : -1); + gwi.internalDecimalScale = (get_use_decimal_scale(gwi.thd) ? get_decimal_scale(gwi.thd) : -1); gwi.subSelectType = csep->subType(); JOIN* join = select_lex.join; @@ -8356,25 +8358,25 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro // @bug 2123. Override large table estimate if infinidb_ordered hint was used. // @bug 2404. Always override if the infinidb_ordered_only variable is turned on. - if (gwi.thd->infinidb_vtable.override_largeside_estimate || gwi.thd->variables.infinidb_ordered_only) + if (gwi.thd->infinidb_vtable.override_largeside_estimate || get_ordered_only(gwi.thd)) csep->overrideLargeSideEstimate(true); // @bug 5741. Set a flag when in Local PM only query mode - csep->localQuery(gwi.thd->variables.infinidb_local_query); + csep->localQuery(get_local_query(gwi.thd)); // @bug 3321. Set max number of blocks in a dictionary file to be scanned for filtering - csep->stringScanThreshold(gwi.thd->variables.infinidb_string_scan_threshold); + csep->stringScanThreshold(get_string_scan_threshold(gwi.thd)); - csep->stringTableThreshold(gwi.thd->variables.infinidb_stringtable_threshold); + csep->stringTableThreshold(get_stringtable_threshold(gwi.thd)); - csep->djsSmallSideLimit(gwi.thd->variables.infinidb_diskjoin_smallsidelimit * 1024ULL * 1024); - csep->djsLargeSideLimit(gwi.thd->variables.infinidb_diskjoin_largesidelimit * 1024ULL * 1024); - csep->djsPartitionSize(gwi.thd->variables.infinidb_diskjoin_bucketsize * 1024ULL * 1024); + csep->djsSmallSideLimit(get_diskjoin_smallsidelimit(gwi.thd) * 1024ULL * 1024); + csep->djsLargeSideLimit(get_diskjoin_largesidelimit(gwi.thd) * 1024ULL * 1024); + csep->djsPartitionSize(get_diskjoin_bucketsize(gwi.thd) * 1024ULL * 1024); - if (gwi.thd->variables.infinidb_um_mem_limit == 0) + if (get_um_mem_limit(gwi.thd) == 0) csep->umMemLimit(numeric_limits::max()); else - csep->umMemLimit(gwi.thd->variables.infinidb_um_mem_limit * 1024ULL * 1024); + csep->umMemLimit(get_um_mem_limit(gwi.thd) * 1024ULL * 1024); // populate table map and trigger syscolumn cache for all the tables (@bug 1637). // all tables on FROM list must have at least one col in colmap diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index c3a4cfd02..b0ef4109c 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -142,6 +142,7 @@ using namespace funcexp; #include "installdir.h" #include "columnstoreversion.h" +#include "ha_mcs_sysvars.h" namespace cal_impl_if { @@ -168,7 +169,7 @@ static const string interval_names[] = const unsigned NONSUPPORTED_ERR_THRESH = 2000; //TODO: make this session-safe (put in connMap?) -vector rmParms; +//vector rmParms; ResourceManager* rm = ResourceManager::instance(); bool useHdfs = rm->useHdfs(); @@ -567,7 +568,7 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h { Field_varstring* f2 = (Field_varstring*)*f; - if (current_thd->variables.infinidb_varbin_always_hex) + if (get_varbin_always_hex(current_thd)) { uint32_t l; const uint8_t* p = row.getVarBinaryField(l, s); @@ -771,7 +772,7 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h ti.moreRows = false; rc = logging::ERR_LOST_CONN_EXEMGR; sm::sm_init(tid2sid(current_thd->thread_id), &ci->cal_conn_hndl, - current_thd->variables.infinidb_local_query); + get_local_query(current_thd)); idbassert(ci->cal_conn_hndl != 0); ci->rc = rc; } @@ -875,10 +876,10 @@ void makeUpdateSemiJoin(const ParseTree* n, void* obj) uint32_t doUpdateDelete(THD* thd) { - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); //@bug 5660. Error out DDL/DML on slave node, or on local query node if (ci->isSlaveNode && !thd->slave_thread) @@ -1602,7 +1603,7 @@ uint32_t doUpdateDelete(THD* thd) ByteStream bytestream, bytestream1; bytestream << sessionID; boost::shared_ptr plan = pDMLPackage->get_ExecutionPlan(); - updateCP->rmParms(rmParms); + updateCP->rmParms(ci->rmParms); updateCP->serialize(*plan); // recover original vtable state thd->infinidb_vtable.vtable_state = origState; @@ -1917,835 +1918,6 @@ uint32_t doUpdateDelete(THD* thd) } //anon namespace -extern "C" -{ -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calgetstats(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - THD* thd = current_thd; - - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - - unsigned long l = ci->queryStats.size(); - - if (l == 0) - { - *is_null = 1; - return 0; - } - - if (l > 255) l = 255; - - memcpy(result, ci->queryStats.c_str(), l); - *length = l; - return result; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calgetstats_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count != 0) - { - strcpy(message, "CALGETSTATS() takes no arguments"); - return 1; - } - - initid->maybe_null = 1; - initid->max_length = 255; - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calgetstats_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - long long calsettrace(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, char* error) - { - THD* thd = current_thd; - - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - - long long oldTrace = ci->traceFlags; - ci->traceFlags = (uint32_t)(*((long long*)args->args[0])); - // keep the vtablemode bit - ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_OFF); - ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH); - return oldTrace; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calsettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count != 1 || args->arg_type[0] != INT_RESULT) - { - strcpy(message, "CALSETTRACE() requires one INTEGER argument"); - return 1; - } - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calsettrace_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif -// Return 1 if system is ready for reads or 0 if not. - long long mcssystemready(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, char* error) - { - long long rtn = 0; - Oam oam; - DBRM dbrm(true); - SystemStatus systemstatus; - - try - { - oam.getSystemStatus(systemstatus); - - if (systemstatus.SystemOpState == ACTIVE - && dbrm.getSystemReady() - && dbrm.getSystemQueryReady()) - { - return 1; - } - } - catch (...) - { - *error = 1; - } - - return rtn; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool mcssystemready_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void mcssystemready_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif -// Return non-zero if system is read only; 0 if writeable - long long mcssystemreadonly(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, char* error) - { - long long rtn = 0; - DBRM dbrm(true); - - try - { - if (dbrm.getSystemSuspended()) - { - rtn = 1; - } - - if (dbrm.isReadWrite() > 0) // Returns 0 for writable, 5 for read only - { - rtn = 2; - } - } - catch (...) - { - *error = 1; - rtn = 1; - } - - return rtn; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool mcssystemreadonly_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void mcssystemreadonly_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif -// Return non-zero if this is the primary UM; 0 if not primary - long long mcssystemprimary(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, char* error) - { - long long rtn = 0; - Oam oam; - string PrimaryUMModuleName; - string localModule; - oamModuleInfo_t st; - - try - { - st = oam.getModuleInfo(); - localModule = boost::get<0>(st); - PrimaryUMModuleName = config::Config::makeConfig()->getConfig("SystemConfig", "PrimaryUMModuleName"); - - if (boost::iequals(localModule, PrimaryUMModuleName)) - rtn = 1; - if (PrimaryUMModuleName == "unassigned") - rtn = 1; - } - catch (runtime_error& e) - { - // It's difficult to return an error message from a numerical UDF - //string msg = string("ERROR: Problem getting Primary UM Module Name. ") + e.what(); - *error = 1; - } - catch (...) - { - *error = 1; - } - return rtn; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool mcssystemprimary_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void mcssystemprimary_deinit(UDF_INIT* initid) - { - } - -#define MAXSTRINGLENGTH 50 - - const char* PmSmallSideMaxMemory = "pmmaxmemorysmallside"; - - const char* SetParmsPrelude = "Updated "; - const char* SetParmsError = "Invalid parameter: "; - const char* InvalidParmSize = "Invalid parameter size: Input value cannot be larger than "; - - const size_t Plen = strlen(SetParmsPrelude); - const size_t Elen = strlen(SetParmsError); - - const char* invalidParmSizeMessage(uint64_t size, size_t& len) - { - static char str[sizeof(InvalidParmSize) + 12] = {0}; - ostringstream os; - os << InvalidParmSize << size; - len = os.str().length(); - strcpy(str, os.str().c_str()); - return str; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calsetparms(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - char parameter[MAXSTRINGLENGTH]; - char valuestr[MAXSTRINGLENGTH]; - size_t plen = args->lengths[0]; - size_t vlen = args->lengths[1]; - - memcpy(parameter, args->args[0], plen); - memcpy(valuestr, args->args[1], vlen); - - parameter[plen] = '\0'; - valuestr[vlen] = '\0'; - - uint64_t value = Config::uFromText(valuestr); - - THD* thd = current_thd; - uint32_t sessionID = tid2sid(thd->thread_id); - - const char* msg = SetParmsError; - size_t mlen = Elen; - bool includeInput = true; - - string pstr(parameter); - boost::algorithm::to_lower(pstr); - - if (pstr == PmSmallSideMaxMemory) - { - joblist::ResourceManager* rm = joblist::ResourceManager::instance(); - - if (rm->getHjTotalUmMaxMemorySmallSide() >= value) - { - rmParms.push_back(RMParam(sessionID, execplan::PMSMALLSIDEMEMORY, value)); - - msg = SetParmsPrelude; - mlen = Plen; - } - else - { - msg = invalidParmSizeMessage(rm->getHjTotalUmMaxMemorySmallSide(), mlen); - includeInput = false; - } - } - - memcpy(result, msg, mlen); - - if (includeInput) - { - memcpy(result + mlen, parameter, plen); - mlen += plen; - memcpy(result + mlen++, " ", 1); - memcpy(result + mlen, valuestr, vlen); - *length = mlen + vlen; - } - else - *length = mlen; - - return result; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calsetparms_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count != 2 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT) - { - strcpy(message, "CALSETPARMS() requires two string arguments"); - return 1; - } - - initid->max_length = MAXSTRINGLENGTH; - - char valuestr[MAXSTRINGLENGTH]; - size_t vlen = args->lengths[1]; - - memcpy(valuestr, args->args[1], vlen--); - - for (size_t i = 0; i < vlen; ++i) - if (!isdigit(valuestr[i])) - { - strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K"); - return 1; - } - - if (!isdigit(valuestr[vlen])) - { - switch (valuestr[vlen]) - { - case 'G': - case 'g': - case 'M': - case 'm': - case 'K': - case 'k': - case '\0': - break; - - default: - strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K"); - return 1; - } - } - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calsetparms_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calviewtablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT)) - { - strcpy(message, "CALVIEWTABLELOCK() requires two string arguments"); - return 1; - } - else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) ) - { - strcpy(message, "CALVIEWTABLELOCK() requires one string argument"); - return 1; - } - else if (args->arg_count > 2 ) - { - strcpy(message, "CALVIEWTABLELOCK() takes one or two arguments only"); - return 1; - } - else if (args->arg_count == 0 ) - { - strcpy(message, "CALVIEWTABLELOCK() requires at least one argument"); - return 1; - } - - initid->maybe_null = 1; - initid->max_length = 255; - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calviewtablelock(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - THD* thd = current_thd; - - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - CalpontSystemCatalog::TableName tableName; - - if ( args->arg_count == 2 ) - { - tableName.schema = args->args[0]; - tableName.table = args->args[1]; - } - else if ( args->arg_count == 1 ) - { - tableName.table = args->args[0]; - - if (thd->db.length) - tableName.schema = thd->db.str; - else - { - string msg("No schema information provided"); - memcpy(result, msg.c_str(), msg.length()); - *length = msg.length(); - return result; - } - } - - if ( !ci->dmlProc ) - { - ci->dmlProc = new MessageQueueClient("DMLProc"); - //cout << "viewtablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl; - } - - string lockinfo = ha_calpont_impl_viewtablelock(*ci, tableName); - - memcpy(result, lockinfo.c_str(), lockinfo.length()); - *length = lockinfo.length(); - return result; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calviewtablelock_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calcleartablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if ((args->arg_count != 1) || (args->arg_type[0] != INT_RESULT)) - { - strcpy(message, - "CALCLEARTABLELOCK() requires one integer argument (the lockID)"); - return 1; - } - - initid->maybe_null = 1; - initid->max_length = 255; - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calcleartablelock(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - THD* thd = current_thd; - - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast( - thd->infinidb_vtable.cal_conn_info); - long long lockID = *reinterpret_cast(args->args[0]); - - if ( !ci->dmlProc ) - { - ci->dmlProc = new MessageQueueClient("DMLProc"); - //cout << "cleartablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl; - } - - unsigned long long uLockID = lockID; - string lockinfo = ha_calpont_impl_cleartablelock(*ci, uLockID); - - memcpy(result, lockinfo.c_str(), lockinfo.length()); - *length = lockinfo.length(); - return result; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calcleartablelock_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool callastinsertid_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT)) - { - strcpy(message, "CALLASTINSRTID() requires two string arguments"); - return 1; - } - else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) ) - { - strcpy(message, "CALLASTINSERTID() requires one string argument"); - return 1; - } - else if (args->arg_count > 2 ) - { - strcpy(message, "CALLASTINSERTID() takes one or two arguments only"); - return 1; - } - else if (args->arg_count == 0 ) - { - strcpy(message, "CALLASTINSERTID() requires at least one argument"); - return 1; - } - - initid->maybe_null = 1; - initid->max_length = 255; - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - long long callastinsertid(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, char* error) - { - THD* thd = current_thd; - - CalpontSystemCatalog::TableName tableName; - uint64_t nextVal = 0; - - if ( args->arg_count == 2 ) - { - tableName.schema = args->args[0]; - tableName.table = args->args[1]; - } - else if ( args->arg_count == 1 ) - { - tableName.table = args->args[0]; - - if (thd->db.length) - tableName.schema = thd->db.str; - else - { - return -1; - } - } - - boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(tid2sid(thd->thread_id)); - csc->identity(execplan::CalpontSystemCatalog::FE); - - try - { - nextVal = csc->nextAutoIncrValue(tableName); - } - catch (std::exception&) - { - string msg("No such table found during autincrement"); - setError(thd, ER_INTERNAL_ERROR, msg); - return nextVal; - } - - if (nextVal == AUTOINCR_SATURATED) - { - setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT)); - return nextVal; - } - - //@Bug 3559. Return a message for table without autoincrement column. - if (nextVal == 0) - { - string msg("Autoincrement does not exist for this table."); - setError(thd, ER_INTERNAL_ERROR, msg); - return nextVal; - } - - return (nextVal - 1); - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void callastinsertid_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calflushcache_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - long long calflushcache(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, char* error) - { - return static_cast(cacheutils::flushPrimProcCache()); - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calflushcache_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count != 0) - { - strcpy(message, "CALFLUSHCACHE() takes no arguments"); - return 1; - } - - return 0; - } - - static const unsigned long TraceSize = 16 * 1024; - -//mysqld will call this with only 766 bytes available in result no matter what we asked for in calgettrace_init() -// if we return a pointer that is not result, mysqld will take our pointer and use it, freeing up result -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calgettrace(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - THD* thd = current_thd; - const string* msgp; - int flags = 0; - - if (args->arg_count > 0) - { - if (args->arg_type[0] == INT_RESULT) - { - flags = *reinterpret_cast(args->args[0]); - } - } - - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - - if (flags > 0) - //msgp = &connMap[sessionID].extendedStats; - msgp = &ci->extendedStats; - else - //msgp = &connMap[sessionID].miniStats; - msgp = &ci->miniStats; - - unsigned long l = msgp->size(); - - if (l == 0) - { - *is_null = 1; - return 0; - } - - if (l > TraceSize) l = TraceSize; - - *length = l; - return msgp->c_str(); - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calgettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { -#if 0 - - if (args->arg_count != 0) - { - strcpy(message, "CALGETTRACE() takes no arguments"); - return 1; - } - -#endif - initid->maybe_null = 1; - initid->max_length = TraceSize; - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calgettrace_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calgetversion(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - string version(columnstore_version); - *length = version.size(); - memcpy(result, version.c_str(), *length); - return result; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calgetversion_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count != 0) - { - strcpy(message, "CALGETVERSION() takes no arguments"); - return 1; - } - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calgetversion_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calgetsqlcount(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - THD* thd = current_thd; - - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); - idbassert(ci != 0); - - MessageQueueClient* mqc = 0; - mqc = new MessageQueueClient("ExeMgr1"); - - ByteStream msg; - ByteStream::quadbyte runningSql, waitingSql; - ByteStream::quadbyte qb = 5; - msg << qb; - mqc->write(msg); - - //get ExeMgr response - msg.restart(); - msg = mqc->read(); - - if (msg.length() == 0) - { - memcpy(result, "Lost connection to ExeMgr", *length); - return result; - } - - msg >> runningSql; - msg >> waitingSql; - delete mqc; - - char ans[128]; - sprintf(ans, "Running SQL statements %d, Waiting SQL statments %d", runningSql, waitingSql); - *length = strlen(ans); - memcpy(result, ans, *length); - return result; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calgetsqlcount_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count != 0) - { - strcpy(message, "CALGETSQLCOUNT() takes no arguments"); - return 1; - } - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calgetsqlcount_deinit(UDF_INIT* initid) - { - } - - -} //extern "C" - int ha_calpont_impl_open(const char* name, int mode, uint32_t test_if_locked) { IDEBUG ( cout << "ha_calpont_impl_open: " << name << ", " << mode << ", " << test_if_locked << endl ); @@ -2877,10 +2049,10 @@ int ha_calpont_impl_rnd_init(TABLE* table) boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); csc->identity(CalpontSystemCatalog::FE); - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); idbassert(ci != 0); @@ -2934,7 +2106,7 @@ int ha_calpont_impl_rnd_init(TABLE* table) CalpontSelectExecutionPlan::TRACE_TUPLE_OFF; } - bool localQuery = (thd->variables.infinidb_local_query > 0 ? true : false); + bool localQuery = get_local_query(thd); // table mode if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) @@ -3155,7 +2327,7 @@ int ha_calpont_impl_rnd_init(TABLE* table) msg << qb; hndl->exeMgr->write(msg); msg.restart(); - csep->rmParms(rmParms); + csep->rmParms(ci->rmParms); //send plan csep->serialize(msg); @@ -3215,7 +2387,7 @@ int ha_calpont_impl_rnd_init(TABLE* table) return ER_INTERNAL_ERROR; } - rmParms.clear(); + ci->rmParms.clear(); if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) { @@ -3415,10 +2587,10 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table) return ER_INTERNAL_ERROR; } - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); // @bug 3078 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) @@ -3516,8 +2688,8 @@ int ha_calpont_impl_rnd_end(TABLE* table) thd->infinidb_vtable.isNewQuery = true; - if (thd->infinidb_vtable.cal_conn_info) - ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + if (get_fe_conn_info_ptr() != NULL) + ci = reinterpret_cast(get_fe_conn_info_ptr()); if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY ) { @@ -3565,8 +2737,8 @@ int ha_calpont_impl_rnd_end(TABLE* table) if (!ci) { - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + set_fe_conn_info_ptr((void*)new cal_connection_info()); + ci = reinterpret_cast(get_fe_conn_info_ptr()); } // @bug 3078. Also session limit variable works the same as ctrl+c @@ -3686,10 +2858,10 @@ int ha_calpont_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO* c { THD* thd = current_thd; - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); // @bug1940 Do nothing for select query. Support of set default engine to IDB. if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE || @@ -3735,10 +2907,10 @@ int ha_calpont_impl_delete_table(const char* name) if (string(name).find("@0024vtable") != string::npos) return 0; - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); if (!thd) return 0; @@ -3811,10 +2983,10 @@ int ha_calpont_impl_write_row(uchar* buf, TABLE* table) } } - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); if (thd->slave_thread) return 0; @@ -3857,10 +3029,10 @@ int ha_calpont_impl_update_row() //@Bug 2540. Return the correct error code. THD* thd = current_thd; - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); int rc = ci->rc; if ( rc != 0 ) @@ -3874,10 +3046,10 @@ int ha_calpont_impl_delete_row() //@Bug 2540. Return the correct error code. THD* thd = current_thd; - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); int rc = ci->rc; if ( rc != 0 ) @@ -3890,10 +3062,10 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) { THD* thd = current_thd; - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); // clear rows variable ci->rowsHaveInserted = 0; @@ -3958,7 +3130,7 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert ) { - ci->useCpimport = thd->variables.infinidb_use_import_for_batchinsert; + ci->useCpimport = get_use_import_for_batchinsert(thd); if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0)) ci->useCpimport = 0; @@ -4031,14 +3203,14 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) ci->mysqld_pid = getpid(); //get delimiter - if (char(thd->variables.infinidb_import_for_batchinsert_delimiter) != '\007') - ci->delimiter = char(thd->variables.infinidb_import_for_batchinsert_delimiter); + if (char(get_import_for_batchinsert_delimiter(thd)) != '\007') + ci->delimiter = char(get_import_for_batchinsert_delimiter(thd)); else ci->delimiter = '\007'; //get enclosed by - if (char(thd->variables.infinidb_import_for_batchinsert_enclosed_by) != 8) - ci->enclosed_by = char(thd->variables.infinidb_import_for_batchinsert_enclosed_by); + if (char(get_import_for_batchinsert_enclosed_by(thd)) != 8) + ci->enclosed_by = char(get_import_for_batchinsert_enclosed_by(thd)); else ci->enclosed_by = 8; @@ -4054,7 +3226,7 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) if (ci->enclosed_by == 34) // Double quotes strcat(escapechar, "\\"); - if (thd->variables.infinidb_local_query > 0 ) + if (get_local_query(thd)) { OamCache* oamcache = OamCache::makeOamCache(); int localModuleId = oamcache->getLocalPMId(); @@ -4458,10 +3630,10 @@ int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table) std::string aTmpDir(startup::StartUp::tmpDir()); - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); if (thd->slave_thread) return 0; @@ -4716,10 +3888,10 @@ int ha_calpont_impl_commit (handlerton* hton, THD* thd, bool all) thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_PHASE1) return 0; - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); if (ci->isAlter) return 0; @@ -4757,10 +3929,10 @@ int ha_calpont_impl_rollback (handlerton* hton, THD* thd, bool all) // return 0; //} - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); if ( !ci->dmlProc ) { @@ -4787,10 +3959,10 @@ int ha_calpont_impl_close_connection (handlerton* hton, THD* thd) execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id)); - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); if (!ci) return 0; @@ -4817,10 +3989,10 @@ int ha_calpont_impl_rename_table(const char* from, const char* to) IDEBUG( cout << "ha_calpont_impl_rename_table: " << from << " => " << to << endl ); THD* thd = current_thd; - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); //@Bug 1948. Alter table call rename table twice if ( ci->alterTableState == cal_connection_info::ALTER_FIRST_RENAME ) @@ -4868,10 +4040,10 @@ COND* ha_calpont_impl_cond_push(COND* cond, TABLE* table) alias.assign(table->alias.ptr(), table->alias.length()); IDEBUG( cout << "ha_calpont_impl_cond_push: " << alias << endl ); - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); cal_table_info ti = ci->tableMap[table]; @@ -4949,10 +4121,10 @@ int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type) if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_INIT ) return 0; - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { @@ -5114,10 +4286,10 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); csc->identity(CalpontSystemCatalog::FE); - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); idbassert(ci != 0); @@ -5159,7 +4331,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE sm::cpsm_conhdl_t* hndl; SCSEP csep; - bool localQuery = (thd->variables.infinidb_local_query > 0 ? true : false); + bool localQuery = get_local_query(thd); { ci->stats.reset(); // reset query stats @@ -5338,7 +4510,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE msg << qb; hndl->exeMgr->write(msg); msg.restart(); - csep->rmParms(rmParms); + csep->rmParms(ci->rmParms); //send plan csep->serialize(msg); @@ -5398,7 +4570,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE return ER_INTERNAL_ERROR; } - rmParms.clear(); + ci->rmParms.clear(); if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) { @@ -5616,10 +4788,10 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE return ER_INTERNAL_ERROR; } */ - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); // @bug 3078 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) @@ -5720,8 +4892,8 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* thd->infinidb_vtable.isNewQuery = true; thd->infinidb_vtable.isUnion = false; - if (thd->infinidb_vtable.cal_conn_info) - ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + if (get_fe_conn_info_ptr() != NULL) + ci = reinterpret_cast(get_fe_conn_info_ptr()); // MCOL-1052 //if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY ) @@ -5758,8 +4930,8 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* if (!ci) { - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); - ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + set_fe_conn_info_ptr((void*)new cal_connection_info()); + ci = reinterpret_cast(get_fe_conn_info_ptr()); } // @bug 3078. Also session limit variable works the same as ctrl+c diff --git a/dbcon/mysql/ha_calpont_impl.h b/dbcon/mysql/ha_calpont_impl.h index 7f7c43aae..bdc0e0eef 100644 --- a/dbcon/mysql/ha_calpont_impl.h +++ b/dbcon/mysql/ha_calpont_impl.h @@ -16,12 +16,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -/* - * $Id: ha_calpont_impl.h 9413 2013-04-22 22:03:42Z zzhu $ - */ - -/** @file */ - #ifndef HA_CALPONT_IMPL_H__ #define HA_CALPONT_IMPL_H__ diff --git a/dbcon/mysql/ha_calpont_impl_if.h b/dbcon/mysql/ha_calpont_impl_if.h index 72579111b..4f0046745 100644 --- a/dbcon/mysql/ha_calpont_impl_if.h +++ b/dbcon/mysql/ha_calpont_impl_if.h @@ -319,10 +319,10 @@ struct cal_connection_info char delimiter; char enclosed_by; std::vector columnTypes; + // MCOL-1101 remove compilation unit variable rmParms + std::vector rmParms; }; -typedef std::tr1::unordered_map CalConnMap; - const std::string infinidb_err_msg = "\nThe query includes syntax that is not supported by MariaDB Columnstore. Use 'show warnings;' to get more information. Review the MariaDB Columnstore Syntax guide for additional information on supported distributed syntax or consider changing the MariaDB Columnstore Operating Mode (infinidb_vtable_mode)."; int cp_get_plan(THD* thd, execplan::SCSEP& csep); diff --git a/dbcon/mysql/ha_mcs_client_udfs.cpp b/dbcon/mysql/ha_mcs_client_udfs.cpp new file mode 100644 index 000000000..1766bdf1c --- /dev/null +++ b/dbcon/mysql/ha_mcs_client_udfs.cpp @@ -0,0 +1,871 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2016 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#define NEED_CALPONT_INTERFACE +#include "ha_calpont_impl.h" + +#include "ha_calpont_impl_if.h" +using namespace cal_impl_if; + +#include "configcpp.h" +using namespace config; +#include "brmtypes.h" +using namespace BRM; +#include "bytestream.h" +using namespace messageqcpp; +#include "liboamcpp.h" +using namespace oam; +#include "cacheutils.h" + +#include "errorcodes.h" +#include "idberrorinfo.h" +#include "errorids.h" +using namespace logging; + +//#include "resourcemanager.h" + +#include "columnstoreversion.h" +#include "ha_mcs_sysvars.h" + +extern "C" +{ +#define MAXSTRINGLENGTH 50 + + const char* PmSmallSideMaxMemory = "pmmaxmemorysmallside"; + + const char* SetParmsPrelude = "Updated "; + const char* SetParmsError = "Invalid parameter: "; + const char* InvalidParmSize = "Invalid parameter size: Input value cannot be larger than "; + + const size_t Plen = strlen(SetParmsPrelude); + const size_t Elen = strlen(SetParmsError); + + const char* invalidParmSizeMessage(uint64_t size, size_t& len) + { + static char str[sizeof(InvalidParmSize) + 12] = {0}; + ostringstream os; + os << InvalidParmSize << size; + len = os.str().length(); + strcpy(str, os.str().c_str()); + return str; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calsetparms(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + char parameter[MAXSTRINGLENGTH]; + char valuestr[MAXSTRINGLENGTH]; + size_t plen = args->lengths[0]; + size_t vlen = args->lengths[1]; + + memcpy(parameter, args->args[0], plen); + memcpy(valuestr, args->args[1], vlen); + + parameter[plen] = '\0'; + valuestr[vlen] = '\0'; + + uint64_t value = Config::uFromText(valuestr); + + THD* thd = current_thd; + uint32_t sessionID = CalpontSystemCatalog::idb_tid2sid(thd->thread_id); + + const char* msg = SetParmsError; + size_t mlen = Elen; + bool includeInput = true; + + string pstr(parameter); + boost::algorithm::to_lower(pstr); + + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + idbassert(ci != 0); + + if (pstr == PmSmallSideMaxMemory) + { + joblist::ResourceManager* rm = joblist::ResourceManager::instance(); + + if (rm->getHjTotalUmMaxMemorySmallSide() >= value) + { + ci->rmParms.push_back(RMParam(sessionID, execplan::PMSMALLSIDEMEMORY, value)); + + msg = SetParmsPrelude; + mlen = Plen; + } + else + { + msg = invalidParmSizeMessage(rm->getHjTotalUmMaxMemorySmallSide(), mlen); + includeInput = false; + } + } + + memcpy(result, msg, mlen); + + if (includeInput) + { + memcpy(result + mlen, parameter, plen); + mlen += plen; + memcpy(result + mlen++, " ", 1); + memcpy(result + mlen, valuestr, vlen); + *length = mlen + vlen; + } + else + *length = mlen; + + return result; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calsetparms_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 2 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT) + { + strcpy(message, "CALSETPARMS() requires two string arguments"); + return 1; + } + + initid->max_length = MAXSTRINGLENGTH; + + char valuestr[MAXSTRINGLENGTH]; + size_t vlen = args->lengths[1]; + + memcpy(valuestr, args->args[1], vlen--); + + for (size_t i = 0; i < vlen; ++i) + if (!isdigit(valuestr[i])) + { + strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K"); + return 1; + } + + if (!isdigit(valuestr[vlen])) + { + switch (valuestr[vlen]) + { + case 'G': + case 'g': + case 'M': + case 'm': + case 'K': + case 'k': + case '\0': + break; + + default: + strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K"); + return 1; + } + } + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calsetparms_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calgetstats(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + + unsigned long l = ci->queryStats.size(); + + if (l == 0) + { + *is_null = 1; + return 0; + } + + if (l > 255) l = 255; + + memcpy(result, ci->queryStats.c_str(), l); + *length = l; + return result; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calgetstats_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 0) + { + strcpy(message, "CALGETSTATS() takes no arguments"); + return 1; + } + + initid->maybe_null = 1; + initid->max_length = 255; + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calgetstats_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + long long calsettrace(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, char* error) + { + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + + long long oldTrace = ci->traceFlags; + ci->traceFlags = (uint32_t)(*((long long*)args->args[0])); + // keep the vtablemode bit + ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_OFF); + ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH); + return oldTrace; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calsettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 1 || args->arg_type[0] != INT_RESULT) + { + strcpy(message, "CALSETTRACE() requires one INTEGER argument"); + return 1; + } + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calsettrace_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif +// Return 1 if system is ready for reads or 0 if not. + long long mcssystemready(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, char* error) + { + long long rtn = 0; + Oam oam; + DBRM dbrm(true); + SystemStatus systemstatus; + + try + { + oam.getSystemStatus(systemstatus); + + if (systemstatus.SystemOpState == ACTIVE + && dbrm.getSystemReady() + && dbrm.getSystemQueryReady()) + { + return 1; + } + } + catch (...) + { + *error = 1; + } + + return rtn; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool mcssystemready_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void mcssystemready_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif +// Return non-zero if system is read only; 0 if writeable + long long mcssystemreadonly(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, char* error) + { + long long rtn = 0; + DBRM dbrm(true); + + try + { + if (dbrm.getSystemSuspended()) + { + rtn = 1; + } + + if (dbrm.isReadWrite() > 0) // Returns 0 for writable, 5 for read only + { + rtn = 2; + } + } + catch (...) + { + *error = 1; + rtn = 1; + } + + return rtn; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool mcssystemreadonly_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void mcssystemreadonly_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif +// Return non-zero if this is the primary UM; 0 if not primary + long long mcssystemprimary(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, char* error) + { + long long rtn = 0; + Oam oam; + string PrimaryUMModuleName; + string localModule; + oamModuleInfo_t st; + + try + { + st = oam.getModuleInfo(); + localModule = boost::get<0>(st); + PrimaryUMModuleName = config::Config::makeConfig()->getConfig("SystemConfig", "PrimaryUMModuleName"); + + if (boost::iequals(localModule, PrimaryUMModuleName)) + rtn = 1; + if (PrimaryUMModuleName == "unassigned") + rtn = 1; + } + catch (runtime_error& e) + { + // It's difficult to return an error message from a numerical UDF + //string msg = string("ERROR: Problem getting Primary UM Module Name. ") + e.what(); + *error = 1; + } + catch (...) + { + *error = 1; + } + return rtn; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool mcssystemprimary_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void mcssystemprimary_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calviewtablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT)) + { + strcpy(message, "CALVIEWTABLELOCK() requires two string arguments"); + return 1; + } + else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) ) + { + strcpy(message, "CALVIEWTABLELOCK() requires one string argument"); + return 1; + } + else if (args->arg_count > 2 ) + { + strcpy(message, "CALVIEWTABLELOCK() takes one or two arguments only"); + return 1; + } + else if (args->arg_count == 0 ) + { + strcpy(message, "CALVIEWTABLELOCK() requires at least one argument"); + return 1; + } + + initid->maybe_null = 1; + initid->max_length = 255; + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calviewtablelock(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + THD* thd = current_thd; + + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + CalpontSystemCatalog::TableName tableName; + + if ( args->arg_count == 2 ) + { + tableName.schema = args->args[0]; + tableName.table = args->args[1]; + } + else if ( args->arg_count == 1 ) + { + tableName.table = args->args[0]; + + if (thd->db.length) + tableName.schema = thd->db.str; + else + { + string msg("No schema information provided"); + memcpy(result, msg.c_str(), msg.length()); + *length = msg.length(); + return result; + } + } + + if ( !ci->dmlProc ) + { + ci->dmlProc = new MessageQueueClient("DMLProc"); + //cout << "viewtablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl; + } + + string lockinfo = ha_calpont_impl_viewtablelock(*ci, tableName); + + memcpy(result, lockinfo.c_str(), lockinfo.length()); + *length = lockinfo.length(); + return result; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calviewtablelock_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calcleartablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if ((args->arg_count != 1) || (args->arg_type[0] != INT_RESULT)) + { + strcpy(message, + "CALCLEARTABLELOCK() requires one integer argument (the lockID)"); + return 1; + } + + initid->maybe_null = 1; + initid->max_length = 255; + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calcleartablelock(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast( + get_fe_conn_info_ptr()); + long long lockID = *reinterpret_cast(args->args[0]); + + if ( !ci->dmlProc ) + { + ci->dmlProc = new MessageQueueClient("DMLProc"); + //cout << "cleartablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl; + } + + unsigned long long uLockID = lockID; + string lockinfo = ha_calpont_impl_cleartablelock(*ci, uLockID); + + memcpy(result, lockinfo.c_str(), lockinfo.length()); + *length = lockinfo.length(); + return result; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calcleartablelock_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool callastinsertid_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT)) + { + strcpy(message, "CALLASTINSRTID() requires two string arguments"); + return 1; + } + else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) ) + { + strcpy(message, "CALLASTINSERTID() requires one string argument"); + return 1; + } + else if (args->arg_count > 2 ) + { + strcpy(message, "CALLASTINSERTID() takes one or two arguments only"); + return 1; + } + else if (args->arg_count == 0 ) + { + strcpy(message, "CALLASTINSERTID() requires at least one argument"); + return 1; + } + + initid->maybe_null = 1; + initid->max_length = 255; + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + long long callastinsertid(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, char* error) + { + THD* thd = current_thd; + + CalpontSystemCatalog::TableName tableName; + uint64_t nextVal = 0; + + if ( args->arg_count == 2 ) + { + tableName.schema = args->args[0]; + tableName.table = args->args[1]; + } + else if ( args->arg_count == 1 ) + { + tableName.table = args->args[0]; + + if (thd->db.length) + tableName.schema = thd->db.str; + else + { + return -1; + } + } + + boost::shared_ptr csc = + CalpontSystemCatalog::makeCalpontSystemCatalog( + CalpontSystemCatalog::idb_tid2sid(thd->thread_id)); + csc->identity(execplan::CalpontSystemCatalog::FE); + + try + { + nextVal = csc->nextAutoIncrValue(tableName); + } + catch (std::exception&) + { + string msg("No such table found during autincrement"); + setError(thd, ER_INTERNAL_ERROR, msg); + return nextVal; + } + + if (nextVal == AUTOINCR_SATURATED) + { + setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT)); + return nextVal; + } + + //@Bug 3559. Return a message for table without autoincrement column. + if (nextVal == 0) + { + string msg("Autoincrement does not exist for this table."); + setError(thd, ER_INTERNAL_ERROR, msg); + return nextVal; + } + + return (nextVal - 1); + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void callastinsertid_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calflushcache_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + long long calflushcache(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, char* error) + { + return static_cast(cacheutils::flushPrimProcCache()); + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calflushcache_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 0) + { + strcpy(message, "CALFLUSHCACHE() takes no arguments"); + return 1; + } + + return 0; + } + + static const unsigned long TraceSize = 16 * 1024; + +//mysqld will call this with only 766 bytes available in result no matter what we asked for in calgettrace_init() +// if we return a pointer that is not result, mysqld will take our pointer and use it, freeing up result +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calgettrace(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + const string* msgp; + int flags = 0; + + if (args->arg_count > 0) + { + if (args->arg_type[0] == INT_RESULT) + { + flags = *reinterpret_cast(args->args[0]); + } + } + + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + + if (flags > 0) + //msgp = &connMap[sessionID].extendedStats; + msgp = &ci->extendedStats; + else + //msgp = &connMap[sessionID].miniStats; + msgp = &ci->miniStats; + + unsigned long l = msgp->size(); + + if (l == 0) + { + *is_null = 1; + return 0; + } + + if (l > TraceSize) l = TraceSize; + + *length = l; + return msgp->c_str(); + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calgettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { +#if 0 + + if (args->arg_count != 0) + { + strcpy(message, "CALGETTRACE() takes no arguments"); + return 1; + } + +#endif + initid->maybe_null = 1; + initid->max_length = TraceSize; + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calgettrace_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calgetversion(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + string version(columnstore_version); + *length = version.size(); + memcpy(result, version.c_str(), *length); + return result; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calgetversion_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 0) + { + strcpy(message, "CALGETVERSION() takes no arguments"); + return 1; + } + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calgetversion_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calgetsqlcount(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + idbassert(ci != 0); + + MessageQueueClient* mqc = 0; + mqc = new MessageQueueClient("ExeMgr1"); + + ByteStream msg; + ByteStream::quadbyte runningSql, waitingSql; + ByteStream::quadbyte qb = 5; + msg << qb; + mqc->write(msg); + + //get ExeMgr response + msg.restart(); + msg = mqc->read(); + + if (msg.length() == 0) + { + memcpy(result, "Lost connection to ExeMgr", *length); + return result; + } + + msg >> runningSql; + msg >> waitingSql; + delete mqc; + + char ans[128]; + sprintf(ans, "Running SQL statements %d, Waiting SQL statments %d", runningSql, waitingSql); + *length = strlen(ans); + memcpy(result, ans, *length); + return result; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calgetsqlcount_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 0) + { + strcpy(message, "CALGETSQLCOUNT() takes no arguments"); + return 1; + } + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calgetsqlcount_deinit(UDF_INIT* initid) + { + } + + +} //extern "C" diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp new file mode 100644 index 000000000..ff6943fa4 --- /dev/null +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -0,0 +1,520 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2016 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include "idb_mysql.h" +#include "ha_mcs_sysvars.h" + +const char* mcs_compression_type_names[] = { + "NO_COMPRESSION", + "SNAPPY", + NullS +}; + +static TYPELIB mcs_compression_type_names_lib = { + array_elements(mcs_compression_type_names) - 1, + "mcs_compression_type_names", + mcs_compression_type_names, + NULL +}; + +// compression type +static MYSQL_THDVAR_ENUM( + compression_type, + PLUGIN_VAR_RQCMDARG, + "Controls compression algorithm for create tables. Possible values are: " + "NO_COMPRESSION segment files aren't compressed; " + "SNAPPY segment files are Snappy compressed (default);", + NULL, // check + NULL, // update + 1, //default + &mcs_compression_type_names_lib); // values lib + +// fe_conn_info pointer +static MYSQL_THDVAR_ULONGLONG( + fe_conn_info_ptr, + PLUGIN_VAR_NOSYSVAR | PLUGIN_VAR_NOCMDOPT, + "FrontEnd connection structure pointer. For internal usage.", + NULL, + NULL, + 0, + 0, + ~0U, + 1 +); + +// legacy system variables +static MYSQL_THDVAR_ULONG( + decimal_scale, + PLUGIN_VAR_RQCMDARG, + "The default decimal precision for calculated column sub-operations ", + NULL, + NULL, + 8, + 0, + 18, + 1 +); + +static MYSQL_THDVAR_BOOL( + varbin_always_hex, + PLUGIN_VAR_NOCMDARG, + "Always display/process varbinary columns as if they have been hexified.", + NULL, + NULL, + 0 +); + +static MYSQL_THDVAR_BOOL( + use_decimal_scale, + PLUGIN_VAR_NOCMDARG, + "Enable/disable the MCS decimal scale to be used internally", + NULL, + NULL, + 0 +); + +static MYSQL_THDVAR_BOOL( + double_for_decimal_math, + PLUGIN_VAR_NOCMDARG, + "Enable/disable the InfiniDB to replace DECIMAL with DOUBLE in arithmetic operation.", + NULL, + NULL, + 0 +); + +static MYSQL_THDVAR_BOOL( + ordered_only, + PLUGIN_VAR_NOCMDARG, + "Always use the first table in the from clause as the large side " + "table for joins", + NULL, + NULL, + 0 +); + +static MYSQL_THDVAR_ULONG( + string_scan_threshold, + PLUGIN_VAR_RQCMDARG, + "Max number of blocks in a dictionary file to be scanned for filtering", + NULL, + NULL, + 10, + 1, + ~0U, + 1 +); + +static MYSQL_THDVAR_ULONG( + stringtable_threshold, + PLUGIN_VAR_RQCMDARG, + "The minimum width of a string column to be stored in a string table", + NULL, + NULL, + 20, + 9, + ~0U, + 1 +); + +static MYSQL_THDVAR_ULONG( + diskjoin_smallsidelimit, + PLUGIN_VAR_RQCMDARG, + "The maximum amount of disk space in MB to use per query for storing " + "'small side' tables for a disk-based join. (0 = unlimited)", + NULL, + NULL, + 0, + 0, + ~0U, + 1 +); + +static MYSQL_THDVAR_ULONG( + diskjoin_largesidelimit, + PLUGIN_VAR_RQCMDARG, + "The maximum amount of disk space in MB to use per join for storing " + "'large side' table data for a disk-based join. (0 = unlimited)", + NULL, + NULL, + 0, + 0, + ~0U, + 1 +); + +static MYSQL_THDVAR_ULONG( + diskjoin_bucketsize, + PLUGIN_VAR_RQCMDARG, + "The maximum size in MB of each 'small side' table in memory.", + NULL, + NULL, + 100, + 1, + ~0U, + 1 +); + +static MYSQL_THDVAR_ULONG( + um_mem_limit, + PLUGIN_VAR_RQCMDARG, + "Per user Memory limit(MB). Switch to disk-based JOIN when limit is reached", + NULL, + NULL, + 0, + 0, + ~0U, + 1 +); + +static MYSQL_THDVAR_ULONG( + local_query, + PLUGIN_VAR_RQCMDARG, + "Enable/disable the Infinidb local PM query only feature.", + NULL, + NULL, + 0, + 0, + 2, + 1 +); + +static MYSQL_THDVAR_ULONG( + import_for_batchinsert_delimiter, + PLUGIN_VAR_RQCMDARG, + "ASCII value of the delimiter used by LDI and INSERT..SELECT", + NULL, // check + NULL, // update + 7, // default + 0, // min + 127, // max + 1 // block size +); + +static MYSQL_THDVAR_ULONG( + import_for_batchinsert_enclosed_by, + PLUGIN_VAR_RQCMDARG, + "ASCII value of the quote symbol used by batch data ingestion", + NULL, // check + NULL, // update + 17, // default + 17, // min + 127, // max + 1 // block size +); + +static MYSQL_THDVAR_BOOL( + use_import_for_batchinsert, + PLUGIN_VAR_NOCMDARG, + "LOAD DATA INFILE and INSERT..SELECT will use cpimport internally", + NULL, // check + NULL, // update + 1 // default +); + +static MYSQL_THDVAR_BOOL( + use_legacy_sysvars, + PLUGIN_VAR_NOCMDARG, + "Control CS behavior using legacy * sysvars", + NULL, // check + NULL, // update + 0 // default +); + +st_mysql_sys_var* mcs_system_variables[] = +{ + MYSQL_SYSVAR(compression_type), + MYSQL_SYSVAR(fe_conn_info_ptr), + MYSQL_SYSVAR(decimal_scale), + MYSQL_SYSVAR(use_decimal_scale), + MYSQL_SYSVAR(ordered_only), + MYSQL_SYSVAR(string_scan_threshold), + MYSQL_SYSVAR(stringtable_threshold), + MYSQL_SYSVAR(diskjoin_smallsidelimit), + MYSQL_SYSVAR(diskjoin_largesidelimit), + MYSQL_SYSVAR(diskjoin_bucketsize), + MYSQL_SYSVAR(um_mem_limit), + MYSQL_SYSVAR(double_for_decimal_math), + MYSQL_SYSVAR(local_query), + MYSQL_SYSVAR(use_import_for_batchinsert), + MYSQL_SYSVAR(import_for_batchinsert_delimiter), + MYSQL_SYSVAR(import_for_batchinsert_enclosed_by), + MYSQL_SYSVAR(use_legacy_sysvars), + MYSQL_SYSVAR(varbin_always_hex), + NULL +}; + +void* get_fe_conn_info_ptr(THD* thd) +{ + return ( current_thd == NULL && thd == NULL ) ? NULL : + (void*)THDVAR(current_thd, fe_conn_info_ptr); +} + +void set_fe_conn_info_ptr(void* ptr, THD* thd) +{ + if ( current_thd == NULL && thd == NULL) + { + return; + } + + THDVAR(current_thd, fe_conn_info_ptr) = (uint64_t)(ptr); +} + +bool get_use_legacy_sysvars(THD* thd) +{ + return ( thd == NULL ) ? false : THDVAR(thd, use_legacy_sysvars); +} + +void set_use_legacy_sysvars(THD* thd, bool value) +{ + THDVAR(thd, use_legacy_sysvars) = value; +} + +void set_compression_type(THD* thd, ulong value) +{ + THDVAR(thd, compression_type) = value; +} + +mcs_compression_type_t get_compression_type(THD* thd) { + return (mcs_compression_type_t) THDVAR(thd, compression_type); +} + +bool get_use_decimal_scale(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? false : thd->variables.infinidb_use_decimal_scale; + else + return ( thd == NULL ) ? false : THDVAR(thd, use_decimal_scale); +} +void set_use_decimal_scale(THD* thd, bool value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_use_decimal_scale = value; + else + THDVAR(thd, use_decimal_scale) = value; +} + +ulong get_decimal_scale(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? 0 : thd->variables.infinidb_decimal_scale; + else + return ( thd == NULL ) ? 0 : THDVAR(thd, decimal_scale); +} +void set_decimal_scale(THD* thd, ulong value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_decimal_scale = value; + else + THDVAR(thd, decimal_scale) = value; +} + +bool get_ordered_only(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? false : thd->variables.infinidb_ordered_only; + else + return ( thd == NULL ) ? false : THDVAR(thd, ordered_only); +} +void set_ordered_only(THD* thd, bool value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_ordered_only = value; + else + THDVAR(thd, ordered_only) = value; +} + +ulong get_string_scan_threshold(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? 0 : thd->variables.infinidb_string_scan_threshold; + else + return ( thd == NULL ) ? 0 : THDVAR(thd, string_scan_threshold); +} +void set_string_scan_threshold(THD* thd, ulong value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_string_scan_threshold = value; + else + THDVAR(thd, string_scan_threshold) = value; +} + +ulong get_stringtable_threshold(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? 0 : thd->variables.infinidb_stringtable_threshold; + else + return ( thd == NULL ) ? 0 : THDVAR(thd, stringtable_threshold); +} +void set_stringtable_threshold(THD* thd, ulong value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_stringtable_threshold = value; + else + THDVAR(thd, stringtable_threshold) = value; +} + +ulong get_diskjoin_smallsidelimit(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? 0 : thd->variables.infinidb_diskjoin_smallsidelimit; + else + return ( thd == NULL ) ? 0 : THDVAR(thd, diskjoin_smallsidelimit); +} +void set_diskjoin_smallsidelimit(THD* thd, ulong value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_diskjoin_smallsidelimit = value; + else + THDVAR(thd, diskjoin_smallsidelimit) = value; +} + +ulong get_diskjoin_largesidelimit(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? 0 : thd->variables.infinidb_diskjoin_largesidelimit; + else + return ( thd == NULL ) ? 0 : THDVAR(thd, diskjoin_largesidelimit); +} +void set_diskjoin_largesidelimit(THD* thd, ulong value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_diskjoin_largesidelimit = value; + else + THDVAR(thd, diskjoin_largesidelimit) = value; +} + +ulong get_diskjoin_bucketsize(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? 0 : thd->variables.infinidb_diskjoin_bucketsize; + else + return ( thd == NULL ) ? 0 : THDVAR(thd, diskjoin_bucketsize); +} +void set_diskjoin_bucketsize(THD* thd, ulong value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_diskjoin_bucketsize = value; + else + THDVAR(thd, diskjoin_bucketsize) = value; +} + +ulong get_um_mem_limit(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? 0 : thd->variables.infinidb_um_mem_limit; + else + return ( thd == NULL ) ? 0 : THDVAR(thd, um_mem_limit); +} +void set_um_mem_limit(THD* thd, ulong value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_um_mem_limit = value; + else + THDVAR(thd, um_mem_limit) = value; +} + +bool get_varbin_always_hex(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? false : thd->variables.infinidb_varbin_always_hex; + else + return ( thd == NULL ) ? false : THDVAR(thd, varbin_always_hex); +} +void set_varbin_always_hex(THD* thd, bool value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_varbin_always_hex = value; + else + THDVAR(thd, varbin_always_hex) = value; +} + +bool get_double_for_decimal_math(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? false : thd->variables.infinidb_double_for_decimal_math; + else + return ( thd == NULL ) ? false : THDVAR(thd, double_for_decimal_math); +} +void set_double_for_decimal_math(THD* thd, bool value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_double_for_decimal_math = value; + else + THDVAR(thd, double_for_decimal_math) = value; +} + +ulong get_local_query(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? 0 : thd->variables.infinidb_local_query; + else + return ( thd == NULL ) ? 0 : THDVAR(thd, local_query); +} +void set_local_query(THD* thd, ulong value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_local_query = value; + else + THDVAR(thd, local_query) = value; +} + +bool get_use_import_for_batchinsert(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? false : thd->variables.infinidb_use_import_for_batchinsert; + else + return ( thd == NULL ) ? false : THDVAR(thd, use_import_for_batchinsert); +} +void set_use_import_for_batchinsert(THD* thd, bool value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_use_import_for_batchinsert = value; + else + THDVAR(thd, use_import_for_batchinsert) = value; +} + +ulong get_import_for_batchinsert_delimiter(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? 0 : thd->variables.infinidb_import_for_batchinsert_delimiter; + else + return ( thd == NULL ) ? 0 : THDVAR(thd, import_for_batchinsert_delimiter); +} +void set_import_for_batchinsert_delimiter(THD* thd, ulong value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_import_for_batchinsert_delimiter = value; + else + THDVAR(thd, import_for_batchinsert_delimiter) = value; +} + +ulong get_import_for_batchinsert_enclosed_by(THD* thd) +{ + if(get_use_legacy_sysvars(thd)) + return ( thd == NULL ) ? 0 : thd->variables.infinidb_import_for_batchinsert_enclosed_by; + else + return ( thd == NULL ) ? 0 : THDVAR(thd, import_for_batchinsert_enclosed_by); +} +void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value) +{ + if(get_use_legacy_sysvars(thd)) + thd->variables.infinidb_import_for_batchinsert_enclosed_by = value; + else + THDVAR(thd, import_for_batchinsert_enclosed_by) = value; +} diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h new file mode 100644 index 000000000..eae31b0ea --- /dev/null +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -0,0 +1,91 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2016 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#ifndef MCS_SYSVARS_H__ +#define MCS_SYSVARS_H__ + +#include +#include "idb_mysql.h" + +extern st_mysql_sys_var* mcs_system_variables[]; + +// compression_type +enum mcs_compression_type_t { + NO_COMPRESSION = 0, + SNAPPY = 2 +}; + +// simple setters/getters +const char* get_original_query(THD* thd); +void set_original_query(THD* thd, char* query); + +mcs_compression_type_t get_compression_type(THD* thd); +void set_compression_type(THD* thd, ulong value); + +void* get_fe_conn_info_ptr(THD* thd = NULL); +void set_fe_conn_info_ptr(void* ptr, THD* thd = NULL); + +bool get_use_legacy_sysvars(THD* thd); +void set_use_legacy_sysvars(THD* thd, bool value); + +bool get_use_decimal_scale(THD* thd); +void set_use_decimal_scale(THD* thd, bool value); + +ulong get_decimal_scale(THD* thd); +void set_decimal_scale(THD* thd, ulong value); + +bool get_ordered_only(THD* thd); +void set_ordered_only(THD* thd, bool value); + +ulong get_string_scan_threshold(THD* thd); +void set_string_scan_threshold(THD* thd, ulong value); + +ulong get_stringtable_threshold(THD* thd); +void set_stringtable_threshold(THD* thd, ulong value); + +ulong get_diskjoin_smallsidelimit(THD* thd); +void set_diskjoin_smallsidelimit(THD* thd, ulong value); + +ulong get_diskjoin_largesidelimit(THD* thd); +void set_diskjoin_largesidelimit(THD* thd, ulong value); + +ulong get_diskjoin_bucketsize(THD* thd); +void set_diskjoin_bucketsize(THD* thd, ulong value); + +ulong get_um_mem_limit(THD* thd); +void set_um_mem_limit(THD* thd, ulong value); + +bool get_varbin_always_hex(THD* thd); +void set_varbin_always_hex(THD* thd, bool value); + +bool get_double_for_decimal_math(THD* thd); +void set_double_for_decimal_math(THD* thd, bool value); + +ulong get_local_query(THD* thd); +void set_local_query(THD* thd, ulong value); + +bool get_use_import_for_batchinsert(THD* thd); +void set_use_import_for_batchinsert(THD* thd, bool value); + +ulong get_import_for_batchinsert_delimiter(THD* thd); +void set_import_for_batchinsert_delimiter(THD* thd, ulong value); + +ulong get_import_for_batchinsert_enclosed_by(THD* thd); +void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value); + +#endif diff --git a/dbcon/mysql/ha_pseudocolumn.cpp b/dbcon/mysql/ha_pseudocolumn.cpp index 284130033..c87196e00 100644 --- a/dbcon/mysql/ha_pseudocolumn.cpp +++ b/dbcon/mysql/ha_pseudocolumn.cpp @@ -20,6 +20,7 @@ using namespace execplan; #include "functor_str.h" #include "ha_calpont_impl_if.h" +#include "ha_mcs_sysvars.h" using namespace cal_impl_if; namespace @@ -54,10 +55,10 @@ int64_t idblocalpm() { THD* thd = current_thd; - if (!thd->infinidb_vtable.cal_conn_info) - thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); if (ci->localPm == -1) { @@ -485,10 +486,10 @@ execplan::ReturnedColumn* buildPseudoColumn(Item* item, bool& nonSupport, uint32_t pseudoType) { - if (!(gwi.thd->infinidb_vtable.cal_conn_info)) - gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(gwi.thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); Item_func* ifp = (Item_func*)item; diff --git a/dbcon/mysql/ha_window_function.cpp b/dbcon/mysql/ha_window_function.cpp index f4a95bbc3..84c082bee 100644 --- a/dbcon/mysql/ha_window_function.cpp +++ b/dbcon/mysql/ha_window_function.cpp @@ -28,6 +28,7 @@ using namespace std; #include "idb_mysql.h" #include "ha_calpont_impl_if.h" +#include "ha_mcs_sysvars.h" #include "arithmeticcolumn.h" #include "arithmeticoperator.h" @@ -93,10 +94,10 @@ WF_FRAME frame(Window_frame_bound::Bound_precedence_type bound, Item* offset) } ReturnedColumn* buildBoundExp(WF_Boundary& bound, SRCP& order, gp_walk_info& gwi) { - if (!(gwi.thd->infinidb_vtable.cal_conn_info)) - gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(gwi.thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); bool addOp = true; ReturnedColumn* rc = NULL; @@ -337,10 +338,10 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n //String str; //item->print(&str, QT_INFINIDB_NO_QUOTE); //cout << str.c_ptr() << endl; - if (!(gwi.thd->infinidb_vtable.cal_conn_info)) - gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); - cal_connection_info* ci = reinterpret_cast(gwi.thd->infinidb_vtable.cal_conn_info); + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); gwi.hasWindowFunc = true; Item_window_func* wf = (Item_window_func*)item; @@ -902,7 +903,7 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n { ac->resultType(colType_MysqlToIDB(item_sum)); // bug5736. Make the result type double for some window functions when - // infinidb_double_for_decimal_math is set. + // plugin variable double_for_decimal_math is set. ac->adjustResultType(); }