1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-1101 Move client UDFs into a separate file.

Remove rmParms from ha_calpont_impl.cpp
This commit is contained in:
Roman Nozdrin
2019-01-01 12:33:47 +03:00
parent ca0240037a
commit d22183e195
7 changed files with 882 additions and 945 deletions

View File

@ -5,6 +5,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES}
SET ( libcalmysql_SRCS SET ( libcalmysql_SRCS
mcs_sysvars.cpp mcs_sysvars.cpp
mcs_client_udfs.cpp
ha_calpont.cpp ha_calpont.cpp
ha_calpont_impl.cpp ha_calpont_impl.cpp
ha_calpont_dml.cpp ha_calpont_dml.cpp

View File

@ -16,97 +16,6 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */ MA 02110-1301, USA. */
// $Id: ha_calpont.cpp 9642 2013-06-24 14:57:42Z rdempsey $
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/**
@file ha_example.cc
@brief
The ha_example engine is a stubbed storage engine for example purposes only;
it does nothing at this point. Its purpose is to provide a source
code illustration of how to begin writing new storage engines; see also
/storage/example/ha_example.h.
@details
ha_example will let you create/open/delete tables, but
nothing further (for example, indexes are not supported nor can data
be stored in the table). Use this example as a template for
implementing the same functionality in your own storage engine. You
can enable the example storage engine in your build by doing the
following during your build process:<br> ./configure
--with-example-storage-engine
Once this is done, MySQL will let you create tables with:<br>
CREATE TABLE <table name> (...) ENGINE=EXAMPLE;
The example storage engine is set up to use table locks. It
implements an example "SHARE" that is inserted into a hash by table
name. You can use this to store information of state that any
example handler object will be able to see when it is using that
table.
Please read the object definition in ha_example.h before reading the rest
of this file.
@note
When you create an EXAMPLE table, the MySQL Server creates a table .frm
(format) file in the database directory, using the table name as the file
name as is customary with MySQL. No other files are created. To get an idea
of what occurs, here is an example select that would do a scan of an entire
table:
@code
ha_example::store_lock
ha_example::external_lock
ha_example::info
ha_example::rnd_init
ha_example::extra
ENUM HA_EXTRA_CACHE Cache record in HA_rrnd()
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::rnd_next
ha_example::extra
ENUM HA_EXTRA_NO_CACHE End caching of records (def)
ha_example::external_lock
ha_example::extra
ENUM HA_EXTRA_RESET Reset database to after open
@endcode
Here you see that the example storage engine has 9 rows called before
rnd_next signals that it has reached the end of its data. Also note that
the table in question was already opened; had it not been open, a call to
ha_example::open() would also have been necessary. Calls to
ha_example::extra() are hints as to what will be occuring to the request.
A Longer Example can be found called the "Skeleton Engine" which can be
found on TangentOrg. It has both an engine and a full build environment
for building a pluggable storage engine.
Happy coding!<br>
-Brian
*/
#include "ha_calpont.h" #include "ha_calpont.h"
#include "columnstoreversion.h" #include "columnstoreversion.h"

View File

@ -169,7 +169,7 @@ static const string interval_names[] =
const unsigned NONSUPPORTED_ERR_THRESH = 2000; const unsigned NONSUPPORTED_ERR_THRESH = 2000;
//TODO: make this session-safe (put in connMap?) //TODO: make this session-safe (put in connMap?)
vector<RMParam> rmParms; //vector<RMParam> rmParms;
ResourceManager* rm = ResourceManager::instance(); ResourceManager* rm = ResourceManager::instance();
bool useHdfs = rm->useHdfs(); bool useHdfs = rm->useHdfs();
@ -1593,7 +1593,7 @@ uint32_t doUpdateDelete(THD* thd)
ByteStream bytestream, bytestream1; ByteStream bytestream, bytestream1;
bytestream << sessionID; bytestream << sessionID;
boost::shared_ptr<messageqcpp::ByteStream> plan = pDMLPackage->get_ExecutionPlan(); boost::shared_ptr<messageqcpp::ByteStream> plan = pDMLPackage->get_ExecutionPlan();
updateCP->rmParms(rmParms); updateCP->rmParms(ci->rmParms);
updateCP->serialize(*plan); updateCP->serialize(*plan);
// recover original vtable state // recover original vtable state
thd->infinidb_vtable.vtable_state = origState; thd->infinidb_vtable.vtable_state = origState;
@ -1908,835 +1908,6 @@ uint32_t doUpdateDelete(THD* thd)
} //anon namespace } //anon namespace
extern "C"
{
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calgetstats(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
THD* thd = current_thd;
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
unsigned long l = ci->queryStats.size();
if (l == 0)
{
*is_null = 1;
return 0;
}
if (l > 255) l = 255;
memcpy(result, ci->queryStats.c_str(), l);
*length = l;
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calgetstats_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 0)
{
strcpy(message, "CALGETSTATS() takes no arguments");
return 1;
}
initid->maybe_null = 1;
initid->max_length = 255;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calgetstats_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long calsettrace(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
THD* thd = current_thd;
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
long long oldTrace = ci->traceFlags;
ci->traceFlags = (uint32_t)(*((long long*)args->args[0]));
// keep the vtablemode bit
ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_OFF);
ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH);
return oldTrace;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calsettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 1 || args->arg_type[0] != INT_RESULT)
{
strcpy(message, "CALSETTRACE() requires one INTEGER argument");
return 1;
}
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calsettrace_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
// Return 1 if system is ready for reads or 0 if not.
long long mcssystemready(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
long long rtn = 0;
Oam oam;
DBRM dbrm(true);
SystemStatus systemstatus;
try
{
oam.getSystemStatus(systemstatus);
if (systemstatus.SystemOpState == ACTIVE
&& dbrm.getSystemReady()
&& dbrm.getSystemQueryReady())
{
return 1;
}
}
catch (...)
{
*error = 1;
}
return rtn;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool mcssystemready_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void mcssystemready_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
// Return non-zero if system is read only; 0 if writeable
long long mcssystemreadonly(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
long long rtn = 0;
DBRM dbrm(true);
try
{
if (dbrm.getSystemSuspended())
{
rtn = 1;
}
if (dbrm.isReadWrite() > 0) // Returns 0 for writable, 5 for read only
{
rtn = 2;
}
}
catch (...)
{
*error = 1;
rtn = 1;
}
return rtn;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool mcssystemreadonly_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void mcssystemreadonly_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
// Return non-zero if this is the primary UM; 0 if not primary
long long mcssystemprimary(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
long long rtn = 0;
Oam oam;
string PrimaryUMModuleName;
string localModule;
oamModuleInfo_t st;
try
{
st = oam.getModuleInfo();
localModule = boost::get<0>(st);
PrimaryUMModuleName = config::Config::makeConfig()->getConfig("SystemConfig", "PrimaryUMModuleName");
if (boost::iequals(localModule, PrimaryUMModuleName))
rtn = 1;
if (PrimaryUMModuleName == "unassigned")
rtn = 1;
}
catch (runtime_error& e)
{
// It's difficult to return an error message from a numerical UDF
//string msg = string("ERROR: Problem getting Primary UM Module Name. ") + e.what();
*error = 1;
}
catch (...)
{
*error = 1;
}
return rtn;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool mcssystemprimary_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void mcssystemprimary_deinit(UDF_INIT* initid)
{
}
#define MAXSTRINGLENGTH 50
const char* PmSmallSideMaxMemory = "pmmaxmemorysmallside";
const char* SetParmsPrelude = "Updated ";
const char* SetParmsError = "Invalid parameter: ";
const char* InvalidParmSize = "Invalid parameter size: Input value cannot be larger than ";
const size_t Plen = strlen(SetParmsPrelude);
const size_t Elen = strlen(SetParmsError);
const char* invalidParmSizeMessage(uint64_t size, size_t& len)
{
static char str[sizeof(InvalidParmSize) + 12] = {0};
ostringstream os;
os << InvalidParmSize << size;
len = os.str().length();
strcpy(str, os.str().c_str());
return str;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calsetparms(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
char parameter[MAXSTRINGLENGTH];
char valuestr[MAXSTRINGLENGTH];
size_t plen = args->lengths[0];
size_t vlen = args->lengths[1];
memcpy(parameter, args->args[0], plen);
memcpy(valuestr, args->args[1], vlen);
parameter[plen] = '\0';
valuestr[vlen] = '\0';
uint64_t value = Config::uFromText(valuestr);
THD* thd = current_thd;
uint32_t sessionID = tid2sid(thd->thread_id);
const char* msg = SetParmsError;
size_t mlen = Elen;
bool includeInput = true;
string pstr(parameter);
boost::algorithm::to_lower(pstr);
if (pstr == PmSmallSideMaxMemory)
{
joblist::ResourceManager* rm = joblist::ResourceManager::instance();
if (rm->getHjTotalUmMaxMemorySmallSide() >= value)
{
rmParms.push_back(RMParam(sessionID, execplan::PMSMALLSIDEMEMORY, value));
msg = SetParmsPrelude;
mlen = Plen;
}
else
{
msg = invalidParmSizeMessage(rm->getHjTotalUmMaxMemorySmallSide(), mlen);
includeInput = false;
}
}
memcpy(result, msg, mlen);
if (includeInput)
{
memcpy(result + mlen, parameter, plen);
mlen += plen;
memcpy(result + mlen++, " ", 1);
memcpy(result + mlen, valuestr, vlen);
*length = mlen + vlen;
}
else
*length = mlen;
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calsetparms_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 2 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT)
{
strcpy(message, "CALSETPARMS() requires two string arguments");
return 1;
}
initid->max_length = MAXSTRINGLENGTH;
char valuestr[MAXSTRINGLENGTH];
size_t vlen = args->lengths[1];
memcpy(valuestr, args->args[1], vlen--);
for (size_t i = 0; i < vlen; ++i)
if (!isdigit(valuestr[i]))
{
strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K");
return 1;
}
if (!isdigit(valuestr[vlen]))
{
switch (valuestr[vlen])
{
case 'G':
case 'g':
case 'M':
case 'm':
case 'K':
case 'k':
case '\0':
break;
default:
strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K");
return 1;
}
}
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calsetparms_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calviewtablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT))
{
strcpy(message, "CALVIEWTABLELOCK() requires two string arguments");
return 1;
}
else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) )
{
strcpy(message, "CALVIEWTABLELOCK() requires one string argument");
return 1;
}
else if (args->arg_count > 2 )
{
strcpy(message, "CALVIEWTABLELOCK() takes one or two arguments only");
return 1;
}
else if (args->arg_count == 0 )
{
strcpy(message, "CALVIEWTABLELOCK() requires at least one argument");
return 1;
}
initid->maybe_null = 1;
initid->max_length = 255;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calviewtablelock(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
THD* thd = current_thd;
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
CalpontSystemCatalog::TableName tableName;
if ( args->arg_count == 2 )
{
tableName.schema = args->args[0];
tableName.table = args->args[1];
}
else if ( args->arg_count == 1 )
{
tableName.table = args->args[0];
if (thd->db.length)
tableName.schema = thd->db.str;
else
{
string msg("No schema information provided");
memcpy(result, msg.c_str(), msg.length());
*length = msg.length();
return result;
}
}
if ( !ci->dmlProc )
{
ci->dmlProc = new MessageQueueClient("DMLProc");
//cout << "viewtablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl;
}
string lockinfo = ha_calpont_impl_viewtablelock(*ci, tableName);
memcpy(result, lockinfo.c_str(), lockinfo.length());
*length = lockinfo.length();
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calviewtablelock_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calcleartablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if ((args->arg_count != 1) || (args->arg_type[0] != INT_RESULT))
{
strcpy(message,
"CALCLEARTABLELOCK() requires one integer argument (the lockID)");
return 1;
}
initid->maybe_null = 1;
initid->max_length = 255;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calcleartablelock(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
THD* thd = current_thd;
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(
get_fe_conn_info_ptr());
long long lockID = *reinterpret_cast<long long*>(args->args[0]);
if ( !ci->dmlProc )
{
ci->dmlProc = new MessageQueueClient("DMLProc");
//cout << "cleartablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl;
}
unsigned long long uLockID = lockID;
string lockinfo = ha_calpont_impl_cleartablelock(*ci, uLockID);
memcpy(result, lockinfo.c_str(), lockinfo.length());
*length = lockinfo.length();
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calcleartablelock_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool callastinsertid_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT))
{
strcpy(message, "CALLASTINSRTID() requires two string arguments");
return 1;
}
else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) )
{
strcpy(message, "CALLASTINSERTID() requires one string argument");
return 1;
}
else if (args->arg_count > 2 )
{
strcpy(message, "CALLASTINSERTID() takes one or two arguments only");
return 1;
}
else if (args->arg_count == 0 )
{
strcpy(message, "CALLASTINSERTID() requires at least one argument");
return 1;
}
initid->maybe_null = 1;
initid->max_length = 255;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long callastinsertid(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
THD* thd = current_thd;
CalpontSystemCatalog::TableName tableName;
uint64_t nextVal = 0;
if ( args->arg_count == 2 )
{
tableName.schema = args->args[0];
tableName.table = args->args[1];
}
else if ( args->arg_count == 1 )
{
tableName.table = args->args[0];
if (thd->db.length)
tableName.schema = thd->db.str;
else
{
return -1;
}
}
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(tid2sid(thd->thread_id));
csc->identity(execplan::CalpontSystemCatalog::FE);
try
{
nextVal = csc->nextAutoIncrValue(tableName);
}
catch (std::exception&)
{
string msg("No such table found during autincrement");
setError(thd, ER_INTERNAL_ERROR, msg);
return nextVal;
}
if (nextVal == AUTOINCR_SATURATED)
{
setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT));
return nextVal;
}
//@Bug 3559. Return a message for table without autoincrement column.
if (nextVal == 0)
{
string msg("Autoincrement does not exist for this table.");
setError(thd, ER_INTERNAL_ERROR, msg);
return nextVal;
}
return (nextVal - 1);
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void callastinsertid_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calflushcache_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long calflushcache(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
return static_cast<long long>(cacheutils::flushPrimProcCache());
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calflushcache_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 0)
{
strcpy(message, "CALFLUSHCACHE() takes no arguments");
return 1;
}
return 0;
}
static const unsigned long TraceSize = 16 * 1024;
//mysqld will call this with only 766 bytes available in result no matter what we asked for in calgettrace_init()
// if we return a pointer that is not result, mysqld will take our pointer and use it, freeing up result
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calgettrace(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
THD* thd = current_thd;
const string* msgp;
int flags = 0;
if (args->arg_count > 0)
{
if (args->arg_type[0] == INT_RESULT)
{
flags = *reinterpret_cast<int*>(args->args[0]);
}
}
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (flags > 0)
//msgp = &connMap[sessionID].extendedStats;
msgp = &ci->extendedStats;
else
//msgp = &connMap[sessionID].miniStats;
msgp = &ci->miniStats;
unsigned long l = msgp->size();
if (l == 0)
{
*is_null = 1;
return 0;
}
if (l > TraceSize) l = TraceSize;
*length = l;
return msgp->c_str();
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calgettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
#if 0
if (args->arg_count != 0)
{
strcpy(message, "CALGETTRACE() takes no arguments");
return 1;
}
#endif
initid->maybe_null = 1;
initid->max_length = TraceSize;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calgettrace_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calgetversion(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
string version(columnstore_version);
*length = version.size();
memcpy(result, version.c_str(), *length);
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calgetversion_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 0)
{
strcpy(message, "CALGETVERSION() takes no arguments");
return 1;
}
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calgetversion_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calgetsqlcount(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
THD* thd = current_thd;
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
idbassert(ci != 0);
MessageQueueClient* mqc = 0;
mqc = new MessageQueueClient("ExeMgr1");
ByteStream msg;
ByteStream::quadbyte runningSql, waitingSql;
ByteStream::quadbyte qb = 5;
msg << qb;
mqc->write(msg);
//get ExeMgr response
msg.restart();
msg = mqc->read();
if (msg.length() == 0)
{
memcpy(result, "Lost connection to ExeMgr", *length);
return result;
}
msg >> runningSql;
msg >> waitingSql;
delete mqc;
char ans[128];
sprintf(ans, "Running SQL statements %d, Waiting SQL statments %d", runningSql, waitingSql);
*length = strlen(ans);
memcpy(result, ans, *length);
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calgetsqlcount_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 0)
{
strcpy(message, "CALGETSQLCOUNT() takes no arguments");
return 1;
}
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calgetsqlcount_deinit(UDF_INIT* initid)
{
}
} //extern "C"
int ha_calpont_impl_open(const char* name, int mode, uint32_t test_if_locked) int ha_calpont_impl_open(const char* name, int mode, uint32_t test_if_locked)
{ {
IDEBUG ( cout << "ha_calpont_impl_open: " << name << ", " << mode << ", " << test_if_locked << endl ); IDEBUG ( cout << "ha_calpont_impl_open: " << name << ", " << mode << ", " << test_if_locked << endl );
@ -3147,7 +2318,7 @@ int ha_calpont_impl_rnd_init(TABLE* table)
msg << qb; msg << qb;
hndl->exeMgr->write(msg); hndl->exeMgr->write(msg);
msg.restart(); msg.restart();
csep->rmParms(rmParms); csep->rmParms(ci->rmParms);
//send plan //send plan
csep->serialize(msg); csep->serialize(msg);
@ -3207,7 +2378,7 @@ int ha_calpont_impl_rnd_init(TABLE* table)
return ER_INTERNAL_ERROR; return ER_INTERNAL_ERROR;
} }
rmParms.clear(); ci->rmParms.clear();
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
{ {
@ -5330,7 +4501,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
msg << qb; msg << qb;
hndl->exeMgr->write(msg); hndl->exeMgr->write(msg);
msg.restart(); msg.restart();
csep->rmParms(rmParms); csep->rmParms(ci->rmParms);
//send plan //send plan
csep->serialize(msg); csep->serialize(msg);
@ -5390,7 +4561,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
return ER_INTERNAL_ERROR; return ER_INTERNAL_ERROR;
} }
rmParms.clear(); ci->rmParms.clear();
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
{ {

View File

@ -319,6 +319,8 @@ struct cal_connection_info
char delimiter; char delimiter;
char enclosed_by; char enclosed_by;
std::vector <execplan::CalpontSystemCatalog::ColType> columnTypes; std::vector <execplan::CalpontSystemCatalog::ColType> columnTypes;
// MCOL-1101 remove compilation unit variable rmParms
std::vector <execplan::RMParam> rmParms;
}; };
const std::string infinidb_err_msg = "\nThe query includes syntax that is not supported by MariaDB Columnstore. Use 'show warnings;' to get more information. Review the MariaDB Columnstore Syntax guide for additional information on supported distributed syntax or consider changing the MariaDB Columnstore Operating Mode (infinidb_vtable_mode)."; const std::string infinidb_err_msg = "\nThe query includes syntax that is not supported by MariaDB Columnstore. Use 'show warnings;' to get more information. Review the MariaDB Columnstore Syntax guide for additional information on supported distributed syntax or consider changing the MariaDB Columnstore Operating Mode (infinidb_vtable_mode).";

View File

@ -0,0 +1,871 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#define NEED_CALPONT_INTERFACE
#include "ha_calpont_impl.h"
#include "ha_calpont_impl_if.h"
using namespace cal_impl_if;
#include "configcpp.h"
using namespace config;
#include "brmtypes.h"
using namespace BRM;
#include "bytestream.h"
using namespace messageqcpp;
#include "liboamcpp.h"
using namespace oam;
#include "cacheutils.h"
#include "errorcodes.h"
#include "idberrorinfo.h"
#include "errorids.h"
using namespace logging;
//#include "resourcemanager.h"
#include "columnstoreversion.h"
#include "mcs_sysvars.h"
extern "C"
{
#define MAXSTRINGLENGTH 50
const char* PmSmallSideMaxMemory = "pmmaxmemorysmallside";
const char* SetParmsPrelude = "Updated ";
const char* SetParmsError = "Invalid parameter: ";
const char* InvalidParmSize = "Invalid parameter size: Input value cannot be larger than ";
const size_t Plen = strlen(SetParmsPrelude);
const size_t Elen = strlen(SetParmsError);
const char* invalidParmSizeMessage(uint64_t size, size_t& len)
{
static char str[sizeof(InvalidParmSize) + 12] = {0};
ostringstream os;
os << InvalidParmSize << size;
len = os.str().length();
strcpy(str, os.str().c_str());
return str;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calsetparms(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
char parameter[MAXSTRINGLENGTH];
char valuestr[MAXSTRINGLENGTH];
size_t plen = args->lengths[0];
size_t vlen = args->lengths[1];
memcpy(parameter, args->args[0], plen);
memcpy(valuestr, args->args[1], vlen);
parameter[plen] = '\0';
valuestr[vlen] = '\0';
uint64_t value = Config::uFromText(valuestr);
THD* thd = current_thd;
uint32_t sessionID = CalpontSystemCatalog::idb_tid2sid(thd->thread_id);
const char* msg = SetParmsError;
size_t mlen = Elen;
bool includeInput = true;
string pstr(parameter);
boost::algorithm::to_lower(pstr);
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
idbassert(ci != 0);
if (pstr == PmSmallSideMaxMemory)
{
joblist::ResourceManager* rm = joblist::ResourceManager::instance();
if (rm->getHjTotalUmMaxMemorySmallSide() >= value)
{
ci->rmParms.push_back(RMParam(sessionID, execplan::PMSMALLSIDEMEMORY, value));
msg = SetParmsPrelude;
mlen = Plen;
}
else
{
msg = invalidParmSizeMessage(rm->getHjTotalUmMaxMemorySmallSide(), mlen);
includeInput = false;
}
}
memcpy(result, msg, mlen);
if (includeInput)
{
memcpy(result + mlen, parameter, plen);
mlen += plen;
memcpy(result + mlen++, " ", 1);
memcpy(result + mlen, valuestr, vlen);
*length = mlen + vlen;
}
else
*length = mlen;
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calsetparms_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 2 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT)
{
strcpy(message, "CALSETPARMS() requires two string arguments");
return 1;
}
initid->max_length = MAXSTRINGLENGTH;
char valuestr[MAXSTRINGLENGTH];
size_t vlen = args->lengths[1];
memcpy(valuestr, args->args[1], vlen--);
for (size_t i = 0; i < vlen; ++i)
if (!isdigit(valuestr[i]))
{
strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K");
return 1;
}
if (!isdigit(valuestr[vlen]))
{
switch (valuestr[vlen])
{
case 'G':
case 'g':
case 'M':
case 'm':
case 'K':
case 'k':
case '\0':
break;
default:
strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K");
return 1;
}
}
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calsetparms_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calgetstats(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
unsigned long l = ci->queryStats.size();
if (l == 0)
{
*is_null = 1;
return 0;
}
if (l > 255) l = 255;
memcpy(result, ci->queryStats.c_str(), l);
*length = l;
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calgetstats_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 0)
{
strcpy(message, "CALGETSTATS() takes no arguments");
return 1;
}
initid->maybe_null = 1;
initid->max_length = 255;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calgetstats_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long calsettrace(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
long long oldTrace = ci->traceFlags;
ci->traceFlags = (uint32_t)(*((long long*)args->args[0]));
// keep the vtablemode bit
ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_OFF);
ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH);
return oldTrace;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calsettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 1 || args->arg_type[0] != INT_RESULT)
{
strcpy(message, "CALSETTRACE() requires one INTEGER argument");
return 1;
}
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calsettrace_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
// Return 1 if system is ready for reads or 0 if not.
long long mcssystemready(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
long long rtn = 0;
Oam oam;
DBRM dbrm(true);
SystemStatus systemstatus;
try
{
oam.getSystemStatus(systemstatus);
if (systemstatus.SystemOpState == ACTIVE
&& dbrm.getSystemReady()
&& dbrm.getSystemQueryReady())
{
return 1;
}
}
catch (...)
{
*error = 1;
}
return rtn;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool mcssystemready_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void mcssystemready_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
// Return non-zero if system is read only; 0 if writeable
long long mcssystemreadonly(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
long long rtn = 0;
DBRM dbrm(true);
try
{
if (dbrm.getSystemSuspended())
{
rtn = 1;
}
if (dbrm.isReadWrite() > 0) // Returns 0 for writable, 5 for read only
{
rtn = 2;
}
}
catch (...)
{
*error = 1;
rtn = 1;
}
return rtn;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool mcssystemreadonly_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void mcssystemreadonly_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
// Return non-zero if this is the primary UM; 0 if not primary
long long mcssystemprimary(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
long long rtn = 0;
Oam oam;
string PrimaryUMModuleName;
string localModule;
oamModuleInfo_t st;
try
{
st = oam.getModuleInfo();
localModule = boost::get<0>(st);
PrimaryUMModuleName = config::Config::makeConfig()->getConfig("SystemConfig", "PrimaryUMModuleName");
if (boost::iequals(localModule, PrimaryUMModuleName))
rtn = 1;
if (PrimaryUMModuleName == "unassigned")
rtn = 1;
}
catch (runtime_error& e)
{
// It's difficult to return an error message from a numerical UDF
//string msg = string("ERROR: Problem getting Primary UM Module Name. ") + e.what();
*error = 1;
}
catch (...)
{
*error = 1;
}
return rtn;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool mcssystemprimary_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void mcssystemprimary_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calviewtablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT))
{
strcpy(message, "CALVIEWTABLELOCK() requires two string arguments");
return 1;
}
else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) )
{
strcpy(message, "CALVIEWTABLELOCK() requires one string argument");
return 1;
}
else if (args->arg_count > 2 )
{
strcpy(message, "CALVIEWTABLELOCK() takes one or two arguments only");
return 1;
}
else if (args->arg_count == 0 )
{
strcpy(message, "CALVIEWTABLELOCK() requires at least one argument");
return 1;
}
initid->maybe_null = 1;
initid->max_length = 255;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calviewtablelock(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
THD* thd = current_thd;
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
CalpontSystemCatalog::TableName tableName;
if ( args->arg_count == 2 )
{
tableName.schema = args->args[0];
tableName.table = args->args[1];
}
else if ( args->arg_count == 1 )
{
tableName.table = args->args[0];
if (thd->db.length)
tableName.schema = thd->db.str;
else
{
string msg("No schema information provided");
memcpy(result, msg.c_str(), msg.length());
*length = msg.length();
return result;
}
}
if ( !ci->dmlProc )
{
ci->dmlProc = new MessageQueueClient("DMLProc");
//cout << "viewtablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl;
}
string lockinfo = ha_calpont_impl_viewtablelock(*ci, tableName);
memcpy(result, lockinfo.c_str(), lockinfo.length());
*length = lockinfo.length();
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calviewtablelock_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calcleartablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if ((args->arg_count != 1) || (args->arg_type[0] != INT_RESULT))
{
strcpy(message,
"CALCLEARTABLELOCK() requires one integer argument (the lockID)");
return 1;
}
initid->maybe_null = 1;
initid->max_length = 255;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calcleartablelock(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(
get_fe_conn_info_ptr());
long long lockID = *reinterpret_cast<long long*>(args->args[0]);
if ( !ci->dmlProc )
{
ci->dmlProc = new MessageQueueClient("DMLProc");
//cout << "cleartablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl;
}
unsigned long long uLockID = lockID;
string lockinfo = ha_calpont_impl_cleartablelock(*ci, uLockID);
memcpy(result, lockinfo.c_str(), lockinfo.length());
*length = lockinfo.length();
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calcleartablelock_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool callastinsertid_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT))
{
strcpy(message, "CALLASTINSRTID() requires two string arguments");
return 1;
}
else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) )
{
strcpy(message, "CALLASTINSERTID() requires one string argument");
return 1;
}
else if (args->arg_count > 2 )
{
strcpy(message, "CALLASTINSERTID() takes one or two arguments only");
return 1;
}
else if (args->arg_count == 0 )
{
strcpy(message, "CALLASTINSERTID() requires at least one argument");
return 1;
}
initid->maybe_null = 1;
initid->max_length = 255;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long callastinsertid(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
THD* thd = current_thd;
CalpontSystemCatalog::TableName tableName;
uint64_t nextVal = 0;
if ( args->arg_count == 2 )
{
tableName.schema = args->args[0];
tableName.table = args->args[1];
}
else if ( args->arg_count == 1 )
{
tableName.table = args->args[0];
if (thd->db.length)
tableName.schema = thd->db.str;
else
{
return -1;
}
}
boost::shared_ptr<CalpontSystemCatalog> csc =
CalpontSystemCatalog::makeCalpontSystemCatalog(
CalpontSystemCatalog::idb_tid2sid(thd->thread_id));
csc->identity(execplan::CalpontSystemCatalog::FE);
try
{
nextVal = csc->nextAutoIncrValue(tableName);
}
catch (std::exception&)
{
string msg("No such table found during autincrement");
setError(thd, ER_INTERNAL_ERROR, msg);
return nextVal;
}
if (nextVal == AUTOINCR_SATURATED)
{
setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT));
return nextVal;
}
//@Bug 3559. Return a message for table without autoincrement column.
if (nextVal == 0)
{
string msg("Autoincrement does not exist for this table.");
setError(thd, ER_INTERNAL_ERROR, msg);
return nextVal;
}
return (nextVal - 1);
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void callastinsertid_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calflushcache_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long calflushcache(UDF_INIT* initid, UDF_ARGS* args,
char* is_null, char* error)
{
return static_cast<long long>(cacheutils::flushPrimProcCache());
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calflushcache_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 0)
{
strcpy(message, "CALFLUSHCACHE() takes no arguments");
return 1;
}
return 0;
}
static const unsigned long TraceSize = 16 * 1024;
//mysqld will call this with only 766 bytes available in result no matter what we asked for in calgettrace_init()
// if we return a pointer that is not result, mysqld will take our pointer and use it, freeing up result
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calgettrace(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
const string* msgp;
int flags = 0;
if (args->arg_count > 0)
{
if (args->arg_type[0] == INT_RESULT)
{
flags = *reinterpret_cast<int*>(args->args[0]);
}
}
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (flags > 0)
//msgp = &connMap[sessionID].extendedStats;
msgp = &ci->extendedStats;
else
//msgp = &connMap[sessionID].miniStats;
msgp = &ci->miniStats;
unsigned long l = msgp->size();
if (l == 0)
{
*is_null = 1;
return 0;
}
if (l > TraceSize) l = TraceSize;
*length = l;
return msgp->c_str();
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calgettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
#if 0
if (args->arg_count != 0)
{
strcpy(message, "CALGETTRACE() takes no arguments");
return 1;
}
#endif
initid->maybe_null = 1;
initid->max_length = TraceSize;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calgettrace_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calgetversion(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
string version(columnstore_version);
*length = version.size();
memcpy(result, version.c_str(), *length);
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calgetversion_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 0)
{
strcpy(message, "CALGETVERSION() takes no arguments");
return 1;
}
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calgetversion_deinit(UDF_INIT* initid)
{
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
const char* calgetsqlcount(UDF_INIT* initid, UDF_ARGS* args,
char* result, unsigned long* length,
char* is_null, char* error)
{
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
idbassert(ci != 0);
MessageQueueClient* mqc = 0;
mqc = new MessageQueueClient("ExeMgr1");
ByteStream msg;
ByteStream::quadbyte runningSql, waitingSql;
ByteStream::quadbyte qb = 5;
msg << qb;
mqc->write(msg);
//get ExeMgr response
msg.restart();
msg = mqc->read();
if (msg.length() == 0)
{
memcpy(result, "Lost connection to ExeMgr", *length);
return result;
}
msg >> runningSql;
msg >> waitingSql;
delete mqc;
char ans[128];
sprintf(ans, "Running SQL statements %d, Waiting SQL statments %d", runningSql, waitingSql);
*length = strlen(ans);
memcpy(result, ans, *length);
return result;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool calgetsqlcount_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 0)
{
strcpy(message, "CALGETSQLCOUNT() takes no arguments");
return 1;
}
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void calgetsqlcount_deinit(UDF_INIT* initid)
{
}
} //extern "C"

View File

@ -101,24 +101,6 @@ void set_fe_conn_info_ptr(void* ptr)
THDVAR(current_thd, fe_conn_info_ptr) = (uint64_t)(ptr); THDVAR(current_thd, fe_conn_info_ptr) = (uint64_t)(ptr);
} }
/*ha_calpont* get_legacy_handler(mcs_handler_info mcs_hndl_ptr)
{
//MCOL-1101 Add handler type check
//hndl_ptr.hndl_type == LEGACY )
ha_calpont* hndl;
if ( mcs_hndl_ptr.hndl_ptr != NULL )
{
hndl = (ha_calpont*)(mcs_hndl_ptr.hndl_ptr);
}
else
{
hndl = new ha_calpont();
hndl->fe_conn_info = (void*)THDVAR(current_thd, fe_conn_info_ptr);
}
return hndl;
}*/
mcs_compression_type_t get_compression_type(THD* thd) { mcs_compression_type_t get_compression_type(THD* thd) {
return (mcs_compression_type_t) THDVAR(thd, compression_type); return (mcs_compression_type_t) THDVAR(thd, compression_type);
} }

View File

@ -24,6 +24,7 @@
extern st_mysql_sys_var* mcs_system_variables[]; extern st_mysql_sys_var* mcs_system_variables[];
/* MCOL-1101 Remove before release
enum mcs_handler_types_t enum mcs_handler_types_t
{ {
SELECT, SELECT,
@ -40,7 +41,7 @@ struct mcs_handler_info
void* hndl_ptr; void* hndl_ptr;
mcs_handler_types_t hndl_type; mcs_handler_types_t hndl_type;
}; };
*/
// compression_type // compression_type
enum mcs_compression_type_t { enum mcs_compression_type_t {
NO_COMPRESSION = 0, NO_COMPRESSION = 0,