1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

Merge pull request #698 from drrtuy/MCOL-1101

MCOL-1101 Replace system variables with plugin variables where possible.
This commit is contained in:
Andrew Hutchings
2019-02-19 10:48:56 +00:00
committed by GitHub
13 changed files with 1641 additions and 1142 deletions

View File

@@ -4,6 +4,8 @@ include_directories( ${ENGINE_COMMON_INCLUDES}
SET ( libcalmysql_SRCS
ha_mcs_sysvars.cpp
ha_mcs_client_udfs.cpp
ha_calpont.cpp
ha_calpont_impl.cpp
ha_calpont_dml.cpp

View File

@@ -16,97 +16,6 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: ha_calpont.cpp 9642 2013-06-24 14:57:42Z rdempsey $
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/**
@file ha_example.cc
@brief
The ha_example engine is a stubbed storage engine for example purposes only;
it does nothing at this point. Its purpose is to provide a source
code illustration of how to begin writing new storage engines; see also
/storage/example/ha_example.h.
@details
ha_example will let you create/open/delete tables, but
nothing further (for example, indexes are not supported nor can data
be stored in the table). Use this example as a template for
implementing the same functionality in your own storage engine. You
can enable the example storage engine in your build by doing the
following during your build process:<br> ./configure
--with-example-storage-engine
Once this is done, MySQL will let you create tables with:<br>
CREATE TABLE <table name> (...) ENGINE=EXAMPLE;
The example storage engine is set up to use table locks. It
implements an example "SHARE" that is inserted into a hash by table
name. You can use this to store information of state that any
example handler object will be able to see when it is using that
table.
Please read the object definition in ha_example.h before reading the rest
of this file.
@note
When you create an EXAMPLE table, the MySQL Server creates a table .frm
(format) file in the database directory, using the table name as the file
name as is customary with MySQL. No other files are created. To get an idea
of what occurs, here is an example select that would do a scan of an entire
table:
@code
ha_example::store_lock
ha_example::external_lock
ha_example::info
ha_example::rnd_init
ha_example::extra
ENUM HA_EXTRA_CACHE Cache record in HA_rrnd()
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::extra
ENUM HA_EXTRA_NO_CACHE End caching of records (def)
ha_example::external_lock
ha_example::extra
ENUM HA_EXTRA_RESET Reset database to after open
@endcode
Here you see that the example storage engine has 9 rows called before
rnd_next signals that it has reached the end of its data. Also note that
the table in question was already opened; had it not been open, a call to
ha_example::open() would also have been necessary. Calls to
ha_example::extra() are hints as to what will be occuring to the request.
A Longer Example can be found called the "Skeleton Engine" which can be
found on TangentOrg. It has both an engine and a full build environment
for building a pluggable storage engine.
Happy coding!<br>
-Brian
*/
#include "ha_calpont.h"
#include "columnstoreversion.h"
@@ -1101,44 +1010,6 @@ struct st_mysql_storage_engine columnstore_storage_engine =
struct st_mysql_storage_engine infinidb_storage_engine =
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
#if 0
static ulong srv_enum_var = 0;
static ulong srv_ulong_var = 0;
const char* enum_var_names[] =
{
"e1", "e2", NullS
};
TYPELIB enum_var_typelib =
{
array_elements(enum_var_names) - 1, "enum_var_typelib",
enum_var_names, NULL
};
static MYSQL_SYSVAR_ENUM(
enum_var, // name
srv_enum_var, // varname
PLUGIN_VAR_RQCMDARG, // opt
"Sample ENUM system variable.", // comment
NULL, // check
NULL, // update
0, // def
&enum_var_typelib); // typelib
static MYSQL_SYSVAR_ULONG(
ulong_var,
srv_ulong_var,
PLUGIN_VAR_RQCMDARG,
"0..1000",
NULL,
NULL,
8,
0,
1000,
0);
#endif
/*@brief check_walk - It traverses filter conditions*/
/************************************************************
* DESCRIPTION:
@@ -1368,14 +1239,6 @@ int ha_calpont_group_by_handler::end_scan()
DBUG_RETURN(rc);
}
static struct st_mysql_sys_var* calpont_system_variables[] =
{
// MYSQL_SYSVAR(enum_var),
// MYSQL_SYSVAR(ulong_var),
NULL
};
mysql_declare_plugin(columnstore)
{
MYSQL_STORAGE_ENGINE_PLUGIN,
@@ -1388,7 +1251,7 @@ mysql_declare_plugin(columnstore)
columnstore_done_func, /* Plugin Deinit */
0x0100 /* 1.0 */,
NULL, /* status variables */
calpont_system_variables, /* system variables */
mcs_system_variables, /* system variables */
NULL, /* reserved */
0 /* config flags */
},
@@ -1403,7 +1266,7 @@ mysql_declare_plugin(columnstore)
infinidb_done_func, /* Plugin Deinit */
0x0100 /* 1.0 */,
NULL, /* status variables */
calpont_system_variables, /* system variables */
mcs_system_variables, /* system variables */
NULL, /* reserved */
0 /* config flags */
}
@@ -1419,9 +1282,9 @@ maria_declare_plugin(columnstore)
columnstore_init_func,
columnstore_done_func,
0x0100, /* 1.0 */
NULL, /* status variables */
calpont_system_variables, /* system variables */
"1.0", /* string version */
NULL, /* status variables */
mcs_system_variables, /* system variables */
"1.0", /* string version */
MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
},
{
@@ -1434,10 +1297,10 @@ maria_declare_plugin(columnstore)
infinidb_init_func,
infinidb_done_func,
0x0100, /* 1.0 */
NULL, /* status variables */
calpont_system_variables, /* system variables */
"1.0", /* string version */
MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
NULL, /* status variables */
mcs_system_variables, /* system variables */
"1.0", /* string version */
MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
}
maria_declare_plugin_end;

View File

@@ -15,35 +15,16 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/** @file ha_example.h
@brief
The ha_example engine is a stubbed storage engine for example purposes only;
it does nothing at this point. Its purpose is to provide a source
code illustration of how to begin writing new storage engines; see also
/storage/example/ha_example.cc.
@note
Please read ha_example.cc before reading this file.
Reminder: The example storage engine implements all methods that are *required*
to be implemented. For a full list of all methods that you can implement, see
handler.h.
@see
/sql/handler.h and /storage/example/ha_example.cc
*/
// $Id: ha_calpont.h 9210 2013-01-21 14:10:42Z rdempsey $
#ifndef HA_CALPONT_H__
#define HA_CALPONT_H__
#include <my_config.h>
#include "idb_mysql.h"
#include "ha_mcs_sysvars.h"
extern handlerton* calpont_hton;
/** @brief
EXAMPLE_SHARE is a structure that will be shared among all open handlers.
This structure will be shared among all open handlers.
This example implements the minimum of what you will probably need.
*/
typedef struct st_calpont_share

View File

@@ -49,6 +49,7 @@ using namespace std;
#include <boost/tokenizer.hpp>
using namespace boost;
#include "ha_mcs_sysvars.h"
#include "idb_mysql.h"
#include "ha_calpont_impl_if.h"
@@ -679,10 +680,10 @@ int ProcessDDLStatement(string& ddlStatement, string& schema, const string& tabl
IDBCompressInterface idbCompress;
parser.Parse(ddlStatement.c_str());
if (!thd->infinidb_vtable.cal_conn_info)
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (parser.Good())
{
@@ -2144,7 +2145,7 @@ int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO*
return 1;
}
int compressiontype = thd->variables.infinidb_compression_type;
int compressiontype = get_compression_type(thd);
if (compressiontype == 1) compressiontype = 2;
@@ -2156,7 +2157,7 @@ int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO*
}
if ( compressiontype == MAX_INT )
compressiontype = thd->variables.infinidb_compression_type;
compressiontype = get_compression_type(thd);
else if ( compressiontype < 0 )
{
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_INVALID_COMPRESSION_TYPE);
@@ -2489,12 +2490,12 @@ extern "C"
if ( thd->db.length )
db = thd->db.str;
int compressiontype = thd->variables.infinidb_compression_type;
int compressiontype = get_compression_type(thd);
if (compressiontype == 1) compressiontype = 2;
if ( compressiontype == MAX_INT )
compressiontype = thd->variables.infinidb_compression_type;
compressiontype = get_compression_type(thd);
//hdfs
if ((compressiontype == 0) && (useHdfs))

View File

@@ -58,6 +58,7 @@ using namespace logging;
#include "idb_mysql.h"
#include "ha_calpont_impl_if.h"
#include "ha_mcs_sysvars.h"
#include "ha_subquery.h"
//#include "ha_view.h"
using namespace cal_impl_if;
@@ -1442,10 +1443,10 @@ bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip)
}
}
if (!(gwip->thd->infinidb_vtable.cal_conn_info))
gwip->thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(gwip->thd->infinidb_vtable.cal_conn_info);
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (ifp->functype() == Item_func::BETWEEN)
@@ -2466,10 +2467,10 @@ void setError(THD* thd, uint32_t errcode, string errmsg)
thd->infinidb_vtable.override_largeside_estimate = false;
// reset expressionID
if (!(thd->infinidb_vtable.cal_conn_info))
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
ci->expressionId = 0;
}
@@ -3104,10 +3105,10 @@ ArithmeticColumn* buildArithmeticColumn(
bool& nonSupport,
bool pushdownHand)
{
if (!(gwi.thd->infinidb_vtable.cal_conn_info))
gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(gwi.thd->infinidb_vtable.cal_conn_info);
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
ArithmeticColumn* ac = new ArithmeticColumn();
Item** sfitempp = item->arguments();
@@ -3275,7 +3276,7 @@ ArithmeticColumn* buildArithmeticColumn(
//idbassert(pt->left() && pt->right() && pt->left()->data() && pt->right()->data());
CalpontSystemCatalog::ColType mysql_type = colType_MysqlToIDB(item);
if (current_thd->variables.infinidb_double_for_decimal_math == 1)
if (get_double_for_decimal_math(current_thd) == true)
aop->adjustResultType(mysql_type);
else
aop->resultType(mysql_type);
@@ -3329,10 +3330,10 @@ ReturnedColumn* buildFunctionColumn(
bool& nonSupport,
bool pushdownHand)
{
if (!(gwi.thd->infinidb_vtable.cal_conn_info))
gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(gwi.thd->infinidb_vtable.cal_conn_info);
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
string funcName = ifp->func_name();
FuncExp* funcExp = FuncExp::instance();
@@ -3801,10 +3802,10 @@ ReturnedColumn* buildFunctionColumn(
FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonSupport)
{
if (!(gwi.thd->infinidb_vtable.cal_conn_info))
gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(gwi.thd->infinidb_vtable.cal_conn_info);
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
FunctionColumn* fc = new FunctionColumn();
FunctionParm funcParms;
@@ -4194,10 +4195,10 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
vector<SRCP> selCols;
vector<SRCP> orderCols;
bool bIsConst = false;
if (!(gwi.thd->infinidb_vtable.cal_conn_info))
gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(gwi.thd->infinidb_vtable.cal_conn_info);
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
Item_sum* isp = reinterpret_cast<Item_sum*>(item);
Item** sfitempp = isp->get_orig_args();
@@ -5775,7 +5776,8 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
return ER_CHECK_NOT_IMPLEMENTED;
}
gwi.internalDecimalScale = (gwi.thd->variables.infinidb_use_decimal_scale ? gwi.thd->variables.infinidb_decimal_scale : -1);
gwi.internalDecimalScale = (get_use_decimal_scale(gwi.thd) ? get_decimal_scale(gwi.thd) : -1);
gwi.subSelectType = csep->subType();
JOIN* join = select_lex.join;
@@ -5808,25 +5810,25 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
// @bug 2123. Override large table estimate if infinidb_ordered hint was used.
// @bug 2404. Always override if the infinidb_ordered_only variable is turned on.
if (gwi.thd->infinidb_vtable.override_largeside_estimate || gwi.thd->variables.infinidb_ordered_only)
if (gwi.thd->infinidb_vtable.override_largeside_estimate || get_ordered_only(gwi.thd))
csep->overrideLargeSideEstimate(true);
// @bug 5741. Set a flag when in Local PM only query mode
csep->localQuery(gwi.thd->variables.infinidb_local_query);
csep->localQuery(get_local_query(gwi.thd));
// @bug 3321. Set max number of blocks in a dictionary file to be scanned for filtering
csep->stringScanThreshold(gwi.thd->variables.infinidb_string_scan_threshold);
csep->stringScanThreshold(get_string_scan_threshold(gwi.thd));
csep->stringTableThreshold(gwi.thd->variables.infinidb_stringtable_threshold);
csep->stringTableThreshold(get_stringtable_threshold(gwi.thd));
csep->djsSmallSideLimit(gwi.thd->variables.infinidb_diskjoin_smallsidelimit * 1024ULL * 1024);
csep->djsLargeSideLimit(gwi.thd->variables.infinidb_diskjoin_largesidelimit * 1024ULL * 1024);
csep->djsPartitionSize(gwi.thd->variables.infinidb_diskjoin_bucketsize * 1024ULL * 1024);
csep->djsSmallSideLimit(get_diskjoin_smallsidelimit(gwi.thd) * 1024ULL * 1024);
csep->djsLargeSideLimit(get_diskjoin_largesidelimit(gwi.thd) * 1024ULL * 1024);
csep->djsPartitionSize(get_diskjoin_bucketsize(gwi.thd) * 1024ULL * 1024);
if (gwi.thd->variables.infinidb_um_mem_limit == 0)
if (get_um_mem_limit(gwi.thd) == 0)
csep->umMemLimit(numeric_limits<int64_t>::max());
else
csep->umMemLimit(gwi.thd->variables.infinidb_um_mem_limit * 1024ULL * 1024);
csep->umMemLimit(get_um_mem_limit(gwi.thd) * 1024ULL * 1024);
// populate table map and trigger syscolumn cache for all the tables (@bug 1637).
// all tables on FROM list must have at least one col in colmap
@@ -8241,7 +8243,7 @@ int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti)
csep->tableList(tblist);
// @bug 3321. Set max number of blocks in a dictionary file to be scanned for filtering
csep->stringScanThreshold(gwi->thd->variables.infinidb_string_scan_threshold);
csep->stringScanThreshold(get_string_scan_threshold(gwi->thd));
return 0;
}
@@ -8339,7 +8341,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
return ER_CHECK_NOT_IMPLEMENTED;
}
gwi.internalDecimalScale = (gwi.thd->variables.infinidb_use_decimal_scale ? gwi.thd->variables.infinidb_decimal_scale : -1);
gwi.internalDecimalScale = (get_use_decimal_scale(gwi.thd) ? get_decimal_scale(gwi.thd) : -1);
gwi.subSelectType = csep->subType();
JOIN* join = select_lex.join;
@@ -8356,25 +8358,25 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
// @bug 2123. Override large table estimate if infinidb_ordered hint was used.
// @bug 2404. Always override if the infinidb_ordered_only variable is turned on.
if (gwi.thd->infinidb_vtable.override_largeside_estimate || gwi.thd->variables.infinidb_ordered_only)
if (gwi.thd->infinidb_vtable.override_largeside_estimate || get_ordered_only(gwi.thd))
csep->overrideLargeSideEstimate(true);
// @bug 5741. Set a flag when in Local PM only query mode
csep->localQuery(gwi.thd->variables.infinidb_local_query);
csep->localQuery(get_local_query(gwi.thd));
// @bug 3321. Set max number of blocks in a dictionary file to be scanned for filtering
csep->stringScanThreshold(gwi.thd->variables.infinidb_string_scan_threshold);
csep->stringScanThreshold(get_string_scan_threshold(gwi.thd));
csep->stringTableThreshold(gwi.thd->variables.infinidb_stringtable_threshold);
csep->stringTableThreshold(get_stringtable_threshold(gwi.thd));
csep->djsSmallSideLimit(gwi.thd->variables.infinidb_diskjoin_smallsidelimit * 1024ULL * 1024);
csep->djsLargeSideLimit(gwi.thd->variables.infinidb_diskjoin_largesidelimit * 1024ULL * 1024);
csep->djsPartitionSize(gwi.thd->variables.infinidb_diskjoin_bucketsize * 1024ULL * 1024);
csep->djsSmallSideLimit(get_diskjoin_smallsidelimit(gwi.thd) * 1024ULL * 1024);
csep->djsLargeSideLimit(get_diskjoin_largesidelimit(gwi.thd) * 1024ULL * 1024);
csep->djsPartitionSize(get_diskjoin_bucketsize(gwi.thd) * 1024ULL * 1024);
if (gwi.thd->variables.infinidb_um_mem_limit == 0)
if (get_um_mem_limit(gwi.thd) == 0)
csep->umMemLimit(numeric_limits<int64_t>::max());
else
csep->umMemLimit(gwi.thd->variables.infinidb_um_mem_limit * 1024ULL * 1024);
csep->umMemLimit(get_um_mem_limit(gwi.thd) * 1024ULL * 1024);
// populate table map and trigger syscolumn cache for all the tables (@bug 1637).
// all tables on FROM list must have at least one col in colmap

File diff suppressed because it is too large Load Diff

View File

@@ -16,12 +16,6 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*
* $Id: ha_calpont_impl.h 9413 2013-04-22 22:03:42Z zzhu $
*/
/** @file */
#ifndef HA_CALPONT_IMPL_H__
#define HA_CALPONT_IMPL_H__

View File

@@ -319,10 +319,10 @@ struct cal_connection_info
char delimiter;
char enclosed_by;
std::vector <execplan::CalpontSystemCatalog::ColType> columnTypes;
// MCOL-1101 remove compilation unit variable rmParms
std::vector <execplan::RMParam> rmParms;
};
typedef std::tr1::unordered_map<int, cal_connection_info> CalConnMap;
const std::string infinidb_err_msg = "\nThe query includes syntax that is not supported by MariaDB Columnstore. Use 'show warnings;' to get more information. Review the MariaDB Columnstore Syntax guide for additional information on supported distributed syntax or consider changing the MariaDB Columnstore Operating Mode (infinidb_vtable_mode).";
int cp_get_plan(THD* thd, execplan::SCSEP& csep);

View File

@@ -0,0 +1,871 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#define NEED_CALPONT_INTERFACE
#include "ha_calpont_impl.h"
#include "ha_calpont_impl_if.h"
using namespace cal_impl_if;
#include "configcpp.h"
using namespace config;
#include "brmtypes.h"
using namespace BRM;
#include "bytestream.h"
using namespace messageqcpp;
#include "liboamcpp.h"
using namespace oam;
#include "cacheutils.h"
#include "errorcodes.h"
#include "idberrorinfo.h"
#include "errorids.h"
using namespace logging;
//#include "resourcemanager.h"
#include "columnstoreversion.h"
#include "ha_mcs_sysvars.h"
extern "C"
{
#define MAXSTRINGLENGTH 50
const char* PmSmallSideMaxMemory = "pmmaxmemorysmallside";
const char* SetParmsPrelude = "Updated ";
const char* SetParmsError = "Invalid parameter: ";
const char* InvalidParmSize = "Invalid parameter size: Input value cannot be larger than ";
const size_t Plen = strlen(SetParmsPrelude);
const size_t Elen = strlen(SetParmsError);
const char* invalidParmSizeMessage(uint64_t size, size_t& len)
{
static char str[sizeof(InvalidParmSize) + 12] = {0};
ostringstream os;
os << InvalidParmSize << size;
len = os.str().length();
strcpy(str, os.str().c_str());
return str;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calsetparms(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
char parameter[MAXSTRINGLENGTH];
char valuestr[MAXSTRINGLENGTH];
size_t plen = args->lengths[0];
size_t vlen = args->lengths[1];
memcpy(parameter, args->args[0], plen);
memcpy(valuestr, args->args[1], vlen);
parameter[plen] = '\0';
valuestr[vlen] = '\0';
uint64_t value = Config::uFromText(valuestr);
THD* thd = current_thd;
uint32_t sessionID = CalpontSystemCatalog::idb_tid2sid(thd->thread_id);
const char* msg = SetParmsError;
size_t mlen = Elen;
bool includeInput = true;
string pstr(parameter);
boost::algorithm::to_lower(pstr);
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
idbassert(ci != 0);
if (pstr == PmSmallSideMaxMemory)
{
joblist::ResourceManager* rm = joblist::ResourceManager::instance();
if (rm->getHjTotalUmMaxMemorySmallSide() >= value)
{
ci->rmParms.push_back(RMParam(sessionID, execplan::PMSMALLSIDEMEMORY, value));
msg = SetParmsPrelude;
mlen = Plen;
}
else
{
msg = invalidParmSizeMessage(rm->getHjTotalUmMaxMemorySmallSide(), mlen);
includeInput = false;
}
}
memcpy(result, msg, mlen);
if (includeInput)
{
memcpy(result + mlen, parameter, plen);
mlen += plen;
memcpy(result + mlen++, " ", 1);
memcpy(result + mlen, valuestr, vlen);
*length = mlen + vlen;
}
else
*length = mlen;
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calsetparms_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 2 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT)
{
strcpy(message, "CALSETPARMS() requires two string arguments");
return 1;
}
initid->max_length = MAXSTRINGLENGTH;
char valuestr[MAXSTRINGLENGTH];
size_t vlen = args->lengths[1];
memcpy(valuestr, args->args[1], vlen--);
for (size_t i = 0; i < vlen; ++i)
if (!isdigit(valuestr[i]))
{
strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K");
return 1;
}
if (!isdigit(valuestr[vlen]))
{
switch (valuestr[vlen])
{
case 'G':
case 'g':
case 'M':
case 'm':
case 'K':
case 'k':
case '\0':
break;
default:
strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K");
return 1;
}
}
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calsetparms_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calgetstats(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
unsigned long l = ci->queryStats.size();
if (l == 0)
{
*is_null = 1;
return 0;
}
if (l > 255) l = 255;
memcpy(result, ci->queryStats.c_str(), l);
*length = l;
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calgetstats_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 0)
{
strcpy(message, "CALGETSTATS() takes no arguments");
return 1;
}
initid->maybe_null = 1;
initid->max_length = 255;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calgetstats_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long calsettrace(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
long long oldTrace = ci->traceFlags;
ci->traceFlags = (uint32_t)(*((long long*)args->args[0]));
// keep the vtablemode bit
ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_OFF);
ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH);
return oldTrace;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calsettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 1 || args->arg_type[0] != INT_RESULT)
{
strcpy(message, "CALSETTRACE() requires one INTEGER argument");
return 1;
}
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calsettrace_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
// Return 1 if system is ready for reads or 0 if not.
long long mcssystemready(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
long long rtn = 0;
Oam oam;
DBRM dbrm(true);
SystemStatus systemstatus;
try
{
oam.getSystemStatus(systemstatus);
if (systemstatus.SystemOpState == ACTIVE
&& dbrm.getSystemReady()
&& dbrm.getSystemQueryReady())
{
return 1;
}
}
catch (...)
{
*error = 1;
}
return rtn;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool mcssystemready_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void mcssystemready_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
// Return non-zero if system is read only; 0 if writeable
long long mcssystemreadonly(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
long long rtn = 0;
DBRM dbrm(true);
try
{
if (dbrm.getSystemSuspended())
{
rtn = 1;
}
if (dbrm.isReadWrite() > 0) // Returns 0 for writable, 5 for read only
{
rtn = 2;
}
}
catch (...)
{
*error = 1;
rtn = 1;
}
return rtn;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool mcssystemreadonly_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void mcssystemreadonly_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
// Return non-zero if this is the primary UM; 0 if not primary
long long mcssystemprimary(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
long long rtn = 0;
Oam oam;
string PrimaryUMModuleName;
string localModule;
oamModuleInfo_t st;
try
{
st = oam.getModuleInfo();
localModule = boost::get<0>(st);
PrimaryUMModuleName = config::Config::makeConfig()->getConfig("SystemConfig", "PrimaryUMModuleName");
if (boost::iequals(localModule, PrimaryUMModuleName))
rtn = 1;
if (PrimaryUMModuleName == "unassigned")
rtn = 1;
}
catch (runtime_error& e)
{
// It's difficult to return an error message from a numerical UDF
//string msg = string("ERROR: Problem getting Primary UM Module Name. ") + e.what();
*error = 1;
}
catch (...)
{
*error = 1;
}
return rtn;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool mcssystemprimary_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void mcssystemprimary_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calviewtablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT))
{
strcpy(message, "CALVIEWTABLELOCK() requires two string arguments");
return 1;
}
else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) )
{
strcpy(message, "CALVIEWTABLELOCK() requires one string argument");
return 1;
}
else if (args->arg_count > 2 )
{
strcpy(message, "CALVIEWTABLELOCK() takes one or two arguments only");
return 1;
}
else if (args->arg_count == 0 )
{
strcpy(message, "CALVIEWTABLELOCK() requires at least one argument");
return 1;
}
initid->maybe_null = 1;
initid->max_length = 255;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calviewtablelock(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
THD* thd = current_thd;
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
CalpontSystemCatalog::TableName tableName;
if ( args->arg_count == 2 )
{
tableName.schema = args->args[0];
tableName.table = args->args[1];
}
else if ( args->arg_count == 1 )
{
tableName.table = args->args[0];
if (thd->db.length)
tableName.schema = thd->db.str;
else
{
string msg("No schema information provided");
memcpy(result, msg.c_str(), msg.length());
*length = msg.length();
return result;
}
}
if ( !ci->dmlProc )
{
ci->dmlProc = new MessageQueueClient("DMLProc");
//cout << "viewtablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl;
}
string lockinfo = ha_calpont_impl_viewtablelock(*ci, tableName);
memcpy(result, lockinfo.c_str(), lockinfo.length());
*length = lockinfo.length();
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calviewtablelock_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calcleartablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if ((args->arg_count != 1) || (args->arg_type[0] != INT_RESULT))
{
strcpy(message,
"CALCLEARTABLELOCK() requires one integer argument (the lockID)");
return 1;
}
initid->maybe_null = 1;
initid->max_length = 255;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calcleartablelock(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(
get_fe_conn_info_ptr());
long long lockID = *reinterpret_cast<long long*>(args->args[0]);
if ( !ci->dmlProc )
{
ci->dmlProc = new MessageQueueClient("DMLProc");
//cout << "cleartablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl;
}
unsigned long long uLockID = lockID;
string lockinfo = ha_calpont_impl_cleartablelock(*ci, uLockID);
memcpy(result, lockinfo.c_str(), lockinfo.length());
*length = lockinfo.length();
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calcleartablelock_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool callastinsertid_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT))
{
strcpy(message, "CALLASTINSRTID() requires two string arguments");
return 1;
}
else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) )
{
strcpy(message, "CALLASTINSERTID() requires one string argument");
return 1;
}
else if (args->arg_count > 2 )
{
strcpy(message, "CALLASTINSERTID() takes one or two arguments only");
return 1;
}
else if (args->arg_count == 0 )
{
strcpy(message, "CALLASTINSERTID() requires at least one argument");
return 1;
}
initid->maybe_null = 1;
initid->max_length = 255;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long callastinsertid(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
THD* thd = current_thd;
CalpontSystemCatalog::TableName tableName;
uint64_t nextVal = 0;
if ( args->arg_count == 2 )
{
tableName.schema = args->args[0];
tableName.table = args->args[1];
}
else if ( args->arg_count == 1 )
{
tableName.table = args->args[0];
if (thd->db.length)
tableName.schema = thd->db.str;
else
{
return -1;
}
}
boost::shared_ptr<CalpontSystemCatalog> csc =
CalpontSystemCatalog::makeCalpontSystemCatalog(
CalpontSystemCatalog::idb_tid2sid(thd->thread_id));
csc->identity(execplan::CalpontSystemCatalog::FE);
try
{
nextVal = csc->nextAutoIncrValue(tableName);
}
catch (std::exception&)
{
string msg("No such table found during autincrement");
setError(thd, ER_INTERNAL_ERROR, msg);
return nextVal;
}
if (nextVal == AUTOINCR_SATURATED)
{
setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT));
return nextVal;
}
//@Bug 3559. Return a message for table without autoincrement column.
if (nextVal == 0)
{
string msg("Autoincrement does not exist for this table.");
setError(thd, ER_INTERNAL_ERROR, msg);
return nextVal;
}
return (nextVal - 1);
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void callastinsertid_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calflushcache_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long calflushcache(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
return static_cast<long long>(cacheutils::flushPrimProcCache());
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calflushcache_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 0)
{
strcpy(message, "CALFLUSHCACHE() takes no arguments");
return 1;
}
return 0;
}
static const unsigned long TraceSize = 16 * 1024;
//mysqld will call this with only 766 bytes available in result no matter what we asked for in calgettrace_init()
// if we return a pointer that is not result, mysqld will take our pointer and use it, freeing up result
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calgettrace(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
const string* msgp;
int flags = 0;
if (args->arg_count > 0)
{
if (args->arg_type[0] == INT_RESULT)
{
flags = *reinterpret_cast<int*>(args->args[0]);
}
}
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (flags > 0)
//msgp = &connMap[sessionID].extendedStats;
msgp = &ci->extendedStats;
else
//msgp = &connMap[sessionID].miniStats;
msgp = &ci->miniStats;
unsigned long l = msgp->size();
if (l == 0)
{
*is_null = 1;
return 0;
}
if (l > TraceSize) l = TraceSize;
*length = l;
return msgp->c_str();
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calgettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
#if 0
if (args->arg_count != 0)
{
strcpy(message, "CALGETTRACE() takes no arguments");
return 1;
}
#endif
initid->maybe_null = 1;
initid->max_length = TraceSize;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calgettrace_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calgetversion(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
string version(columnstore_version);
*length = version.size();
memcpy(result, version.c_str(), *length);
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calgetversion_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 0)
{
strcpy(message, "CALGETVERSION() takes no arguments");
return 1;
}
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calgetversion_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calgetsqlcount(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
idbassert(ci != 0);
MessageQueueClient* mqc = 0;
mqc = new MessageQueueClient("ExeMgr1");
ByteStream msg;
ByteStream::quadbyte runningSql, waitingSql;
ByteStream::quadbyte qb = 5;
msg << qb;
mqc->write(msg);
//get ExeMgr response
msg.restart();
msg = mqc->read();
if (msg.length() == 0)
{
memcpy(result, "Lost connection to ExeMgr", *length);
return result;
}
msg >> runningSql;
msg >> waitingSql;
delete mqc;
char ans[128];
sprintf(ans, "Running SQL statements %d, Waiting SQL statments %d", runningSql, waitingSql);
*length = strlen(ans);
memcpy(result, ans, *length);
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calgetsqlcount_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 0)
{
strcpy(message, "CALGETSQLCOUNT() takes no arguments");
return 1;
}
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calgetsqlcount_deinit(UDF_INIT* initid)
{
}
} //extern "C"

View File

@@ -0,0 +1,520 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <my_config.h>
#include "idb_mysql.h"
#include "ha_mcs_sysvars.h"
const char* mcs_compression_type_names[] = {
"NO_COMPRESSION",
"SNAPPY",
NullS
};
static TYPELIB mcs_compression_type_names_lib = {
array_elements(mcs_compression_type_names) - 1,
"mcs_compression_type_names",
mcs_compression_type_names,
NULL
};
// compression type
static MYSQL_THDVAR_ENUM(
compression_type,
PLUGIN_VAR_RQCMDARG,
"Controls compression algorithm for create tables. Possible values are: "
"NO_COMPRESSION segment files aren't compressed; "
"SNAPPY segment files are Snappy compressed (default);",
NULL, // check
NULL, // update
1, //default
&mcs_compression_type_names_lib); // values lib
// fe_conn_info pointer
static MYSQL_THDVAR_ULONGLONG(
fe_conn_info_ptr,
PLUGIN_VAR_NOSYSVAR | PLUGIN_VAR_NOCMDOPT,
"FrontEnd connection structure pointer. For internal usage.",
NULL,
NULL,
0,
0,
~0U,
1
);
// legacy system variables
static MYSQL_THDVAR_ULONG(
decimal_scale,
PLUGIN_VAR_RQCMDARG,
"The default decimal precision for calculated column sub-operations ",
NULL,
NULL,
8,
0,
18,
1
);
static MYSQL_THDVAR_BOOL(
varbin_always_hex,
PLUGIN_VAR_NOCMDARG,
"Always display/process varbinary columns as if they have been hexified.",
NULL,
NULL,
0
);
static MYSQL_THDVAR_BOOL(
use_decimal_scale,
PLUGIN_VAR_NOCMDARG,
"Enable/disable the MCS decimal scale to be used internally",
NULL,
NULL,
0
);
static MYSQL_THDVAR_BOOL(
double_for_decimal_math,
PLUGIN_VAR_NOCMDARG,
"Enable/disable the InfiniDB to replace DECIMAL with DOUBLE in arithmetic operation.",
NULL,
NULL,
0
);
static MYSQL_THDVAR_BOOL(
ordered_only,
PLUGIN_VAR_NOCMDARG,
"Always use the first table in the from clause as the large side "
"table for joins",
NULL,
NULL,
0
);
static MYSQL_THDVAR_ULONG(
string_scan_threshold,
PLUGIN_VAR_RQCMDARG,
"Max number of blocks in a dictionary file to be scanned for filtering",
NULL,
NULL,
10,
1,
~0U,
1
);
static MYSQL_THDVAR_ULONG(
stringtable_threshold,
PLUGIN_VAR_RQCMDARG,
"The minimum width of a string column to be stored in a string table",
NULL,
NULL,
20,
9,
~0U,
1
);
static MYSQL_THDVAR_ULONG(
diskjoin_smallsidelimit,
PLUGIN_VAR_RQCMDARG,
"The maximum amount of disk space in MB to use per query for storing "
"'small side' tables for a disk-based join. (0 = unlimited)",
NULL,
NULL,
0,
0,
~0U,
1
);
static MYSQL_THDVAR_ULONG(
diskjoin_largesidelimit,
PLUGIN_VAR_RQCMDARG,
"The maximum amount of disk space in MB to use per join for storing "
"'large side' table data for a disk-based join. (0 = unlimited)",
NULL,
NULL,
0,
0,
~0U,
1
);
static MYSQL_THDVAR_ULONG(
diskjoin_bucketsize,
PLUGIN_VAR_RQCMDARG,
"The maximum size in MB of each 'small side' table in memory.",
NULL,
NULL,
100,
1,
~0U,
1
);
static MYSQL_THDVAR_ULONG(
um_mem_limit,
PLUGIN_VAR_RQCMDARG,
"Per user Memory limit(MB). Switch to disk-based JOIN when limit is reached",
NULL,
NULL,
0,
0,
~0U,
1
);
static MYSQL_THDVAR_ULONG(
local_query,
PLUGIN_VAR_RQCMDARG,
"Enable/disable the Infinidb local PM query only feature.",
NULL,
NULL,
0,
0,
2,
1
);
static MYSQL_THDVAR_ULONG(
import_for_batchinsert_delimiter,
PLUGIN_VAR_RQCMDARG,
"ASCII value of the delimiter used by LDI and INSERT..SELECT",
NULL, // check
NULL, // update
7, // default
0, // min
127, // max
1 // block size
);
static MYSQL_THDVAR_ULONG(
import_for_batchinsert_enclosed_by,
PLUGIN_VAR_RQCMDARG,
"ASCII value of the quote symbol used by batch data ingestion",
NULL, // check
NULL, // update
17, // default
17, // min
127, // max
1 // block size
);
static MYSQL_THDVAR_BOOL(
use_import_for_batchinsert,
PLUGIN_VAR_NOCMDARG,
"LOAD DATA INFILE and INSERT..SELECT will use cpimport internally",
NULL, // check
NULL, // update
1 // default
);
static MYSQL_THDVAR_BOOL(
use_legacy_sysvars,
PLUGIN_VAR_NOCMDARG,
"Control CS behavior using legacy * sysvars",
NULL, // check
NULL, // update
0 // default
);
st_mysql_sys_var* mcs_system_variables[] =
{
MYSQL_SYSVAR(compression_type),
MYSQL_SYSVAR(fe_conn_info_ptr),
MYSQL_SYSVAR(decimal_scale),
MYSQL_SYSVAR(use_decimal_scale),
MYSQL_SYSVAR(ordered_only),
MYSQL_SYSVAR(string_scan_threshold),
MYSQL_SYSVAR(stringtable_threshold),
MYSQL_SYSVAR(diskjoin_smallsidelimit),
MYSQL_SYSVAR(diskjoin_largesidelimit),
MYSQL_SYSVAR(diskjoin_bucketsize),
MYSQL_SYSVAR(um_mem_limit),
MYSQL_SYSVAR(double_for_decimal_math),
MYSQL_SYSVAR(local_query),
MYSQL_SYSVAR(use_import_for_batchinsert),
MYSQL_SYSVAR(import_for_batchinsert_delimiter),
MYSQL_SYSVAR(import_for_batchinsert_enclosed_by),
MYSQL_SYSVAR(use_legacy_sysvars),
MYSQL_SYSVAR(varbin_always_hex),
NULL
};
void* get_fe_conn_info_ptr(THD* thd)
{
return ( current_thd == NULL && thd == NULL ) ? NULL :
(void*)THDVAR(current_thd, fe_conn_info_ptr);
}
void set_fe_conn_info_ptr(void* ptr, THD* thd)
{
if ( current_thd == NULL && thd == NULL)
{
return;
}
THDVAR(current_thd, fe_conn_info_ptr) = (uint64_t)(ptr);
}
bool get_use_legacy_sysvars(THD* thd)
{
return ( thd == NULL ) ? false : THDVAR(thd, use_legacy_sysvars);
}
void set_use_legacy_sysvars(THD* thd, bool value)
{
THDVAR(thd, use_legacy_sysvars) = value;
}
void set_compression_type(THD* thd, ulong value)
{
THDVAR(thd, compression_type) = value;
}
mcs_compression_type_t get_compression_type(THD* thd) {
return (mcs_compression_type_t) THDVAR(thd, compression_type);
}
bool get_use_decimal_scale(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? false : thd->variables.infinidb_use_decimal_scale;
else
return ( thd == NULL ) ? false : THDVAR(thd, use_decimal_scale);
}
void set_use_decimal_scale(THD* thd, bool value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_use_decimal_scale = value;
else
THDVAR(thd, use_decimal_scale) = value;
}
ulong get_decimal_scale(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? 0 : thd->variables.infinidb_decimal_scale;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, decimal_scale);
}
void set_decimal_scale(THD* thd, ulong value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_decimal_scale = value;
else
THDVAR(thd, decimal_scale) = value;
}
bool get_ordered_only(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? false : thd->variables.infinidb_ordered_only;
else
return ( thd == NULL ) ? false : THDVAR(thd, ordered_only);
}
void set_ordered_only(THD* thd, bool value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_ordered_only = value;
else
THDVAR(thd, ordered_only) = value;
}
ulong get_string_scan_threshold(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? 0 : thd->variables.infinidb_string_scan_threshold;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, string_scan_threshold);
}
void set_string_scan_threshold(THD* thd, ulong value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_string_scan_threshold = value;
else
THDVAR(thd, string_scan_threshold) = value;
}
ulong get_stringtable_threshold(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? 0 : thd->variables.infinidb_stringtable_threshold;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, stringtable_threshold);
}
void set_stringtable_threshold(THD* thd, ulong value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_stringtable_threshold = value;
else
THDVAR(thd, stringtable_threshold) = value;
}
ulong get_diskjoin_smallsidelimit(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? 0 : thd->variables.infinidb_diskjoin_smallsidelimit;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, diskjoin_smallsidelimit);
}
void set_diskjoin_smallsidelimit(THD* thd, ulong value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_diskjoin_smallsidelimit = value;
else
THDVAR(thd, diskjoin_smallsidelimit) = value;
}
ulong get_diskjoin_largesidelimit(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? 0 : thd->variables.infinidb_diskjoin_largesidelimit;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, diskjoin_largesidelimit);
}
void set_diskjoin_largesidelimit(THD* thd, ulong value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_diskjoin_largesidelimit = value;
else
THDVAR(thd, diskjoin_largesidelimit) = value;
}
ulong get_diskjoin_bucketsize(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? 0 : thd->variables.infinidb_diskjoin_bucketsize;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, diskjoin_bucketsize);
}
void set_diskjoin_bucketsize(THD* thd, ulong value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_diskjoin_bucketsize = value;
else
THDVAR(thd, diskjoin_bucketsize) = value;
}
ulong get_um_mem_limit(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? 0 : thd->variables.infinidb_um_mem_limit;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, um_mem_limit);
}
void set_um_mem_limit(THD* thd, ulong value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_um_mem_limit = value;
else
THDVAR(thd, um_mem_limit) = value;
}
bool get_varbin_always_hex(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? false : thd->variables.infinidb_varbin_always_hex;
else
return ( thd == NULL ) ? false : THDVAR(thd, varbin_always_hex);
}
void set_varbin_always_hex(THD* thd, bool value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_varbin_always_hex = value;
else
THDVAR(thd, varbin_always_hex) = value;
}
bool get_double_for_decimal_math(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? false : thd->variables.infinidb_double_for_decimal_math;
else
return ( thd == NULL ) ? false : THDVAR(thd, double_for_decimal_math);
}
void set_double_for_decimal_math(THD* thd, bool value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_double_for_decimal_math = value;
else
THDVAR(thd, double_for_decimal_math) = value;
}
ulong get_local_query(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? 0 : thd->variables.infinidb_local_query;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, local_query);
}
void set_local_query(THD* thd, ulong value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_local_query = value;
else
THDVAR(thd, local_query) = value;
}
bool get_use_import_for_batchinsert(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? false : thd->variables.infinidb_use_import_for_batchinsert;
else
return ( thd == NULL ) ? false : THDVAR(thd, use_import_for_batchinsert);
}
void set_use_import_for_batchinsert(THD* thd, bool value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_use_import_for_batchinsert = value;
else
THDVAR(thd, use_import_for_batchinsert) = value;
}
ulong get_import_for_batchinsert_delimiter(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? 0 : thd->variables.infinidb_import_for_batchinsert_delimiter;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, import_for_batchinsert_delimiter);
}
void set_import_for_batchinsert_delimiter(THD* thd, ulong value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_import_for_batchinsert_delimiter = value;
else
THDVAR(thd, import_for_batchinsert_delimiter) = value;
}
ulong get_import_for_batchinsert_enclosed_by(THD* thd)
{
if(get_use_legacy_sysvars(thd))
return ( thd == NULL ) ? 0 : thd->variables.infinidb_import_for_batchinsert_enclosed_by;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, import_for_batchinsert_enclosed_by);
}
void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value)
{
if(get_use_legacy_sysvars(thd))
thd->variables.infinidb_import_for_batchinsert_enclosed_by = value;
else
THDVAR(thd, import_for_batchinsert_enclosed_by) = value;
}

View File

@@ -0,0 +1,91 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef MCS_SYSVARS_H__
#define MCS_SYSVARS_H__
#include <my_config.h>
#include "idb_mysql.h"
extern st_mysql_sys_var* mcs_system_variables[];
// compression_type
enum mcs_compression_type_t {
NO_COMPRESSION = 0,
SNAPPY = 2
};
// simple setters/getters
const char* get_original_query(THD* thd);
void set_original_query(THD* thd, char* query);
mcs_compression_type_t get_compression_type(THD* thd);
void set_compression_type(THD* thd, ulong value);
void* get_fe_conn_info_ptr(THD* thd = NULL);
void set_fe_conn_info_ptr(void* ptr, THD* thd = NULL);
bool get_use_legacy_sysvars(THD* thd);
void set_use_legacy_sysvars(THD* thd, bool value);
bool get_use_decimal_scale(THD* thd);
void set_use_decimal_scale(THD* thd, bool value);
ulong get_decimal_scale(THD* thd);
void set_decimal_scale(THD* thd, ulong value);
bool get_ordered_only(THD* thd);
void set_ordered_only(THD* thd, bool value);
ulong get_string_scan_threshold(THD* thd);
void set_string_scan_threshold(THD* thd, ulong value);
ulong get_stringtable_threshold(THD* thd);
void set_stringtable_threshold(THD* thd, ulong value);
ulong get_diskjoin_smallsidelimit(THD* thd);
void set_diskjoin_smallsidelimit(THD* thd, ulong value);
ulong get_diskjoin_largesidelimit(THD* thd);
void set_diskjoin_largesidelimit(THD* thd, ulong value);
ulong get_diskjoin_bucketsize(THD* thd);
void set_diskjoin_bucketsize(THD* thd, ulong value);
ulong get_um_mem_limit(THD* thd);
void set_um_mem_limit(THD* thd, ulong value);
bool get_varbin_always_hex(THD* thd);
void set_varbin_always_hex(THD* thd, bool value);
bool get_double_for_decimal_math(THD* thd);
void set_double_for_decimal_math(THD* thd, bool value);
ulong get_local_query(THD* thd);
void set_local_query(THD* thd, ulong value);
bool get_use_import_for_batchinsert(THD* thd);
void set_use_import_for_batchinsert(THD* thd, bool value);
ulong get_import_for_batchinsert_delimiter(THD* thd);
void set_import_for_batchinsert_delimiter(THD* thd, ulong value);
ulong get_import_for_batchinsert_enclosed_by(THD* thd);
void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value);
#endif

View File

@@ -20,6 +20,7 @@ using namespace execplan;
#include "functor_str.h"
#include "ha_calpont_impl_if.h"
#include "ha_mcs_sysvars.h"
using namespace cal_impl_if;
namespace
@@ -54,10 +55,10 @@ int64_t idblocalpm()
{
THD* thd = current_thd;
if (!thd->infinidb_vtable.cal_conn_info)
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (ci->localPm == -1)
{
@@ -485,10 +486,10 @@ execplan::ReturnedColumn* buildPseudoColumn(Item* item,
bool& nonSupport,
uint32_t pseudoType)
{
if (!(gwi.thd->infinidb_vtable.cal_conn_info))
gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(gwi.thd->infinidb_vtable.cal_conn_info);
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
Item_func* ifp = (Item_func*)item;

View File

@@ -28,6 +28,7 @@ using namespace std;
#include "idb_mysql.h"
#include "ha_calpont_impl_if.h"
#include "ha_mcs_sysvars.h"
#include "arithmeticcolumn.h"
#include "arithmeticoperator.h"
@@ -93,10 +94,10 @@ WF_FRAME frame(Window_frame_bound::Bound_precedence_type bound, Item* offset)
}
ReturnedColumn* buildBoundExp(WF_Boundary& bound, SRCP& order, gp_walk_info& gwi)
{
if (!(gwi.thd->infinidb_vtable.cal_conn_info))
gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(gwi.thd->infinidb_vtable.cal_conn_info);
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
bool addOp = true;
ReturnedColumn* rc = NULL;
@@ -337,10 +338,10 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
//String str;
//item->print(&str, QT_INFINIDB_NO_QUOTE);
//cout << str.c_ptr() << endl;
if (!(gwi.thd->infinidb_vtable.cal_conn_info))
gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(gwi.thd->infinidb_vtable.cal_conn_info);
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
gwi.hasWindowFunc = true;
Item_window_func* wf = (Item_window_func*)item;
@@ -902,7 +903,7 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
{
ac->resultType(colType_MysqlToIDB(item_sum));
// bug5736. Make the result type double for some window functions when
// infinidb_double_for_decimal_math is set.
// plugin variable double_for_decimal_math is set.
ac->adjustResultType();
}