diff --git a/dbcon/mysql/CMakeLists.txt b/dbcon/mysql/CMakeLists.txt index b9af67200..7744e50b4 100644 --- a/dbcon/mysql/CMakeLists.txt +++ b/dbcon/mysql/CMakeLists.txt @@ -5,6 +5,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES} SET ( libcalmysql_SRCS mcs_sysvars.cpp + mcs_client_udfs.cpp ha_calpont.cpp ha_calpont_impl.cpp ha_calpont_dml.cpp diff --git a/dbcon/mysql/ha_calpont.cpp b/dbcon/mysql/ha_calpont.cpp index 5caf10997..3c15dd146 100644 --- a/dbcon/mysql/ha_calpont.cpp +++ b/dbcon/mysql/ha_calpont.cpp @@ -16,97 +16,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -// $Id: ha_calpont.cpp 9642 2013-06-24 14:57:42Z rdempsey $ - -/* Copyright (C) 2003 MySQL AB - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -/** - @file ha_example.cc - - @brief - The ha_example engine is a stubbed storage engine for example purposes only; - it does nothing at this point. Its purpose is to provide a source - code illustration of how to begin writing new storage engines; see also - /storage/example/ha_example.h. - - @details - ha_example will let you create/open/delete tables, but - nothing further (for example, indexes are not supported nor can data - be stored in the table). Use this example as a template for - implementing the same functionality in your own storage engine. You - can enable the example storage engine in your build by doing the - following during your build process:
./configure - --with-example-storage-engine - - Once this is done, MySQL will let you create tables with:
- CREATE TABLE (...) ENGINE=EXAMPLE; - - The example storage engine is set up to use table locks. It - implements an example "SHARE" that is inserted into a hash by table - name. You can use this to store information of state that any - example handler object will be able to see when it is using that - table. - - Please read the object definition in ha_example.h before reading the rest - of this file. - - @note - When you create an EXAMPLE table, the MySQL Server creates a table .frm - (format) file in the database directory, using the table name as the file - name as is customary with MySQL. No other files are created. To get an idea - of what occurs, here is an example select that would do a scan of an entire - table: - - @code - ha_example::store_lock - ha_example::external_lock - ha_example::info - ha_example::rnd_init - ha_example::extra - ENUM HA_EXTRA_CACHE Cache record in HA_rrnd() - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::rnd_next - ha_example::extra - ENUM HA_EXTRA_NO_CACHE End caching of records (def) - ha_example::external_lock - ha_example::extra - ENUM HA_EXTRA_RESET Reset database to after open - @endcode - - Here you see that the example storage engine has 9 rows called before - rnd_next signals that it has reached the end of its data. Also note that - the table in question was already opened; had it not been open, a call to - ha_example::open() would also have been necessary. Calls to - ha_example::extra() are hints as to what will be occuring to the request. - - A Longer Example can be found called the "Skeleton Engine" which can be - found on TangentOrg. It has both an engine and a full build environment - for building a pluggable storage engine. - - Happy coding!
- -Brian -*/ - #include "ha_calpont.h" #include "columnstoreversion.h" diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index c15b0f98b..43a2fd628 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -169,7 +169,7 @@ static const string interval_names[] = const unsigned NONSUPPORTED_ERR_THRESH = 2000; //TODO: make this session-safe (put in connMap?) -vector rmParms; +//vector rmParms; ResourceManager* rm = ResourceManager::instance(); bool useHdfs = rm->useHdfs(); @@ -1593,7 +1593,7 @@ uint32_t doUpdateDelete(THD* thd) ByteStream bytestream, bytestream1; bytestream << sessionID; boost::shared_ptr plan = pDMLPackage->get_ExecutionPlan(); - updateCP->rmParms(rmParms); + updateCP->rmParms(ci->rmParms); updateCP->serialize(*plan); // recover original vtable state thd->infinidb_vtable.vtable_state = origState; @@ -1908,835 +1908,6 @@ uint32_t doUpdateDelete(THD* thd) } //anon namespace -extern "C" -{ -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calgetstats(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - THD* thd = current_thd; - - if (get_fe_conn_info_ptr() == NULL) - set_fe_conn_info_ptr((void*)new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - - unsigned long l = ci->queryStats.size(); - - if (l == 0) - { - *is_null = 1; - return 0; - } - - if (l > 255) l = 255; - - memcpy(result, ci->queryStats.c_str(), l); - *length = l; - return result; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calgetstats_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count != 0) - { - strcpy(message, "CALGETSTATS() takes no arguments"); - return 1; - } - - initid->maybe_null = 1; - initid->max_length = 255; - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calgetstats_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - long long calsettrace(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, char* error) - { - THD* thd = current_thd; - - if (get_fe_conn_info_ptr() == NULL) - set_fe_conn_info_ptr((void*)new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - - long long oldTrace = ci->traceFlags; - ci->traceFlags = (uint32_t)(*((long long*)args->args[0])); - // keep the vtablemode bit - ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_OFF); - ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH); - return oldTrace; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calsettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count != 1 || args->arg_type[0] != INT_RESULT) - { - strcpy(message, "CALSETTRACE() requires one INTEGER argument"); - return 1; - } - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calsettrace_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif -// Return 1 if system is ready for reads or 0 if not. - long long mcssystemready(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, char* error) - { - long long rtn = 0; - Oam oam; - DBRM dbrm(true); - SystemStatus systemstatus; - - try - { - oam.getSystemStatus(systemstatus); - - if (systemstatus.SystemOpState == ACTIVE - && dbrm.getSystemReady() - && dbrm.getSystemQueryReady()) - { - return 1; - } - } - catch (...) - { - *error = 1; - } - - return rtn; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool mcssystemready_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void mcssystemready_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif -// Return non-zero if system is read only; 0 if writeable - long long mcssystemreadonly(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, char* error) - { - long long rtn = 0; - DBRM dbrm(true); - - try - { - if (dbrm.getSystemSuspended()) - { - rtn = 1; - } - - if (dbrm.isReadWrite() > 0) // Returns 0 for writable, 5 for read only - { - rtn = 2; - } - } - catch (...) - { - *error = 1; - rtn = 1; - } - - return rtn; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool mcssystemreadonly_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void mcssystemreadonly_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif -// Return non-zero if this is the primary UM; 0 if not primary - long long mcssystemprimary(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, char* error) - { - long long rtn = 0; - Oam oam; - string PrimaryUMModuleName; - string localModule; - oamModuleInfo_t st; - - try - { - st = oam.getModuleInfo(); - localModule = boost::get<0>(st); - PrimaryUMModuleName = config::Config::makeConfig()->getConfig("SystemConfig", "PrimaryUMModuleName"); - - if (boost::iequals(localModule, PrimaryUMModuleName)) - rtn = 1; - if (PrimaryUMModuleName == "unassigned") - rtn = 1; - } - catch (runtime_error& e) - { - // It's difficult to return an error message from a numerical UDF - //string msg = string("ERROR: Problem getting Primary UM Module Name. ") + e.what(); - *error = 1; - } - catch (...) - { - *error = 1; - } - return rtn; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool mcssystemprimary_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void mcssystemprimary_deinit(UDF_INIT* initid) - { - } - -#define MAXSTRINGLENGTH 50 - - const char* PmSmallSideMaxMemory = "pmmaxmemorysmallside"; - - const char* SetParmsPrelude = "Updated "; - const char* SetParmsError = "Invalid parameter: "; - const char* InvalidParmSize = "Invalid parameter size: Input value cannot be larger than "; - - const size_t Plen = strlen(SetParmsPrelude); - const size_t Elen = strlen(SetParmsError); - - const char* invalidParmSizeMessage(uint64_t size, size_t& len) - { - static char str[sizeof(InvalidParmSize) + 12] = {0}; - ostringstream os; - os << InvalidParmSize << size; - len = os.str().length(); - strcpy(str, os.str().c_str()); - return str; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calsetparms(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - char parameter[MAXSTRINGLENGTH]; - char valuestr[MAXSTRINGLENGTH]; - size_t plen = args->lengths[0]; - size_t vlen = args->lengths[1]; - - memcpy(parameter, args->args[0], plen); - memcpy(valuestr, args->args[1], vlen); - - parameter[plen] = '\0'; - valuestr[vlen] = '\0'; - - uint64_t value = Config::uFromText(valuestr); - - THD* thd = current_thd; - uint32_t sessionID = tid2sid(thd->thread_id); - - const char* msg = SetParmsError; - size_t mlen = Elen; - bool includeInput = true; - - string pstr(parameter); - boost::algorithm::to_lower(pstr); - - if (pstr == PmSmallSideMaxMemory) - { - joblist::ResourceManager* rm = joblist::ResourceManager::instance(); - - if (rm->getHjTotalUmMaxMemorySmallSide() >= value) - { - rmParms.push_back(RMParam(sessionID, execplan::PMSMALLSIDEMEMORY, value)); - - msg = SetParmsPrelude; - mlen = Plen; - } - else - { - msg = invalidParmSizeMessage(rm->getHjTotalUmMaxMemorySmallSide(), mlen); - includeInput = false; - } - } - - memcpy(result, msg, mlen); - - if (includeInput) - { - memcpy(result + mlen, parameter, plen); - mlen += plen; - memcpy(result + mlen++, " ", 1); - memcpy(result + mlen, valuestr, vlen); - *length = mlen + vlen; - } - else - *length = mlen; - - return result; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calsetparms_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count != 2 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT) - { - strcpy(message, "CALSETPARMS() requires two string arguments"); - return 1; - } - - initid->max_length = MAXSTRINGLENGTH; - - char valuestr[MAXSTRINGLENGTH]; - size_t vlen = args->lengths[1]; - - memcpy(valuestr, args->args[1], vlen--); - - for (size_t i = 0; i < vlen; ++i) - if (!isdigit(valuestr[i])) - { - strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K"); - return 1; - } - - if (!isdigit(valuestr[vlen])) - { - switch (valuestr[vlen]) - { - case 'G': - case 'g': - case 'M': - case 'm': - case 'K': - case 'k': - case '\0': - break; - - default: - strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K"); - return 1; - } - } - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calsetparms_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calviewtablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT)) - { - strcpy(message, "CALVIEWTABLELOCK() requires two string arguments"); - return 1; - } - else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) ) - { - strcpy(message, "CALVIEWTABLELOCK() requires one string argument"); - return 1; - } - else if (args->arg_count > 2 ) - { - strcpy(message, "CALVIEWTABLELOCK() takes one or two arguments only"); - return 1; - } - else if (args->arg_count == 0 ) - { - strcpy(message, "CALVIEWTABLELOCK() requires at least one argument"); - return 1; - } - - initid->maybe_null = 1; - initid->max_length = 255; - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calviewtablelock(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - THD* thd = current_thd; - - if (get_fe_conn_info_ptr() == NULL) - set_fe_conn_info_ptr((void*)new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - CalpontSystemCatalog::TableName tableName; - - if ( args->arg_count == 2 ) - { - tableName.schema = args->args[0]; - tableName.table = args->args[1]; - } - else if ( args->arg_count == 1 ) - { - tableName.table = args->args[0]; - - if (thd->db.length) - tableName.schema = thd->db.str; - else - { - string msg("No schema information provided"); - memcpy(result, msg.c_str(), msg.length()); - *length = msg.length(); - return result; - } - } - - if ( !ci->dmlProc ) - { - ci->dmlProc = new MessageQueueClient("DMLProc"); - //cout << "viewtablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl; - } - - string lockinfo = ha_calpont_impl_viewtablelock(*ci, tableName); - - memcpy(result, lockinfo.c_str(), lockinfo.length()); - *length = lockinfo.length(); - return result; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calviewtablelock_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calcleartablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if ((args->arg_count != 1) || (args->arg_type[0] != INT_RESULT)) - { - strcpy(message, - "CALCLEARTABLELOCK() requires one integer argument (the lockID)"); - return 1; - } - - initid->maybe_null = 1; - initid->max_length = 255; - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calcleartablelock(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - THD* thd = current_thd; - - if (get_fe_conn_info_ptr() == NULL) - set_fe_conn_info_ptr((void*)new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast( - get_fe_conn_info_ptr()); - long long lockID = *reinterpret_cast(args->args[0]); - - if ( !ci->dmlProc ) - { - ci->dmlProc = new MessageQueueClient("DMLProc"); - //cout << "cleartablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl; - } - - unsigned long long uLockID = lockID; - string lockinfo = ha_calpont_impl_cleartablelock(*ci, uLockID); - - memcpy(result, lockinfo.c_str(), lockinfo.length()); - *length = lockinfo.length(); - return result; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calcleartablelock_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool callastinsertid_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT)) - { - strcpy(message, "CALLASTINSRTID() requires two string arguments"); - return 1; - } - else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) ) - { - strcpy(message, "CALLASTINSERTID() requires one string argument"); - return 1; - } - else if (args->arg_count > 2 ) - { - strcpy(message, "CALLASTINSERTID() takes one or two arguments only"); - return 1; - } - else if (args->arg_count == 0 ) - { - strcpy(message, "CALLASTINSERTID() requires at least one argument"); - return 1; - } - - initid->maybe_null = 1; - initid->max_length = 255; - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - long long callastinsertid(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, char* error) - { - THD* thd = current_thd; - - CalpontSystemCatalog::TableName tableName; - uint64_t nextVal = 0; - - if ( args->arg_count == 2 ) - { - tableName.schema = args->args[0]; - tableName.table = args->args[1]; - } - else if ( args->arg_count == 1 ) - { - tableName.table = args->args[0]; - - if (thd->db.length) - tableName.schema = thd->db.str; - else - { - return -1; - } - } - - boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(tid2sid(thd->thread_id)); - csc->identity(execplan::CalpontSystemCatalog::FE); - - try - { - nextVal = csc->nextAutoIncrValue(tableName); - } - catch (std::exception&) - { - string msg("No such table found during autincrement"); - setError(thd, ER_INTERNAL_ERROR, msg); - return nextVal; - } - - if (nextVal == AUTOINCR_SATURATED) - { - setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT)); - return nextVal; - } - - //@Bug 3559. Return a message for table without autoincrement column. - if (nextVal == 0) - { - string msg("Autoincrement does not exist for this table."); - setError(thd, ER_INTERNAL_ERROR, msg); - return nextVal; - } - - return (nextVal - 1); - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void callastinsertid_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calflushcache_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - long long calflushcache(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, char* error) - { - return static_cast(cacheutils::flushPrimProcCache()); - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calflushcache_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count != 0) - { - strcpy(message, "CALFLUSHCACHE() takes no arguments"); - return 1; - } - - return 0; - } - - static const unsigned long TraceSize = 16 * 1024; - -//mysqld will call this with only 766 bytes available in result no matter what we asked for in calgettrace_init() -// if we return a pointer that is not result, mysqld will take our pointer and use it, freeing up result -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calgettrace(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - THD* thd = current_thd; - const string* msgp; - int flags = 0; - - if (args->arg_count > 0) - { - if (args->arg_type[0] == INT_RESULT) - { - flags = *reinterpret_cast(args->args[0]); - } - } - - if (get_fe_conn_info_ptr() == NULL) - set_fe_conn_info_ptr((void*)new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - - if (flags > 0) - //msgp = &connMap[sessionID].extendedStats; - msgp = &ci->extendedStats; - else - //msgp = &connMap[sessionID].miniStats; - msgp = &ci->miniStats; - - unsigned long l = msgp->size(); - - if (l == 0) - { - *is_null = 1; - return 0; - } - - if (l > TraceSize) l = TraceSize; - - *length = l; - return msgp->c_str(); - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calgettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { -#if 0 - - if (args->arg_count != 0) - { - strcpy(message, "CALGETTRACE() takes no arguments"); - return 1; - } - -#endif - initid->maybe_null = 1; - initid->max_length = TraceSize; - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calgettrace_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calgetversion(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - string version(columnstore_version); - *length = version.size(); - memcpy(result, version.c_str(), *length); - return result; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calgetversion_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count != 0) - { - strcpy(message, "CALGETVERSION() takes no arguments"); - return 1; - } - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calgetversion_deinit(UDF_INIT* initid) - { - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - const char* calgetsqlcount(UDF_INIT* initid, UDF_ARGS* args, - char* result, unsigned long* length, - char* is_null, char* error) - { - THD* thd = current_thd; - - if (get_fe_conn_info_ptr() == NULL) - set_fe_conn_info_ptr((void*)new cal_connection_info()); - - cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - idbassert(ci != 0); - - MessageQueueClient* mqc = 0; - mqc = new MessageQueueClient("ExeMgr1"); - - ByteStream msg; - ByteStream::quadbyte runningSql, waitingSql; - ByteStream::quadbyte qb = 5; - msg << qb; - mqc->write(msg); - - //get ExeMgr response - msg.restart(); - msg = mqc->read(); - - if (msg.length() == 0) - { - memcpy(result, "Lost connection to ExeMgr", *length); - return result; - } - - msg >> runningSql; - msg >> waitingSql; - delete mqc; - - char ans[128]; - sprintf(ans, "Running SQL statements %d, Waiting SQL statments %d", runningSql, waitingSql); - *length = strlen(ans); - memcpy(result, ans, *length); - return result; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool calgetsqlcount_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - if (args->arg_count != 0) - { - strcpy(message, "CALGETSQLCOUNT() takes no arguments"); - return 1; - } - - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void calgetsqlcount_deinit(UDF_INIT* initid) - { - } - - -} //extern "C" - int ha_calpont_impl_open(const char* name, int mode, uint32_t test_if_locked) { IDEBUG ( cout << "ha_calpont_impl_open: " << name << ", " << mode << ", " << test_if_locked << endl ); @@ -3147,7 +2318,7 @@ int ha_calpont_impl_rnd_init(TABLE* table) msg << qb; hndl->exeMgr->write(msg); msg.restart(); - csep->rmParms(rmParms); + csep->rmParms(ci->rmParms); //send plan csep->serialize(msg); @@ -3207,7 +2378,7 @@ int ha_calpont_impl_rnd_init(TABLE* table) return ER_INTERNAL_ERROR; } - rmParms.clear(); + ci->rmParms.clear(); if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) { @@ -5330,7 +4501,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE msg << qb; hndl->exeMgr->write(msg); msg.restart(); - csep->rmParms(rmParms); + csep->rmParms(ci->rmParms); //send plan csep->serialize(msg); @@ -5390,7 +4561,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE return ER_INTERNAL_ERROR; } - rmParms.clear(); + ci->rmParms.clear(); if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) { diff --git a/dbcon/mysql/ha_calpont_impl_if.h b/dbcon/mysql/ha_calpont_impl_if.h index 23d9461c2..4f0046745 100644 --- a/dbcon/mysql/ha_calpont_impl_if.h +++ b/dbcon/mysql/ha_calpont_impl_if.h @@ -319,6 +319,8 @@ struct cal_connection_info char delimiter; char enclosed_by; std::vector columnTypes; + // MCOL-1101 remove compilation unit variable rmParms + std::vector rmParms; }; const std::string infinidb_err_msg = "\nThe query includes syntax that is not supported by MariaDB Columnstore. Use 'show warnings;' to get more information. Review the MariaDB Columnstore Syntax guide for additional information on supported distributed syntax or consider changing the MariaDB Columnstore Operating Mode (infinidb_vtable_mode)."; diff --git a/dbcon/mysql/mcs_client_udfs.cpp b/dbcon/mysql/mcs_client_udfs.cpp new file mode 100644 index 000000000..605e62392 --- /dev/null +++ b/dbcon/mysql/mcs_client_udfs.cpp @@ -0,0 +1,871 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2016 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#define NEED_CALPONT_INTERFACE +#include "ha_calpont_impl.h" + +#include "ha_calpont_impl_if.h" +using namespace cal_impl_if; + +#include "configcpp.h" +using namespace config; +#include "brmtypes.h" +using namespace BRM; +#include "bytestream.h" +using namespace messageqcpp; +#include "liboamcpp.h" +using namespace oam; +#include "cacheutils.h" + +#include "errorcodes.h" +#include "idberrorinfo.h" +#include "errorids.h" +using namespace logging; + +//#include "resourcemanager.h" + +#include "columnstoreversion.h" +#include "mcs_sysvars.h" + +extern "C" +{ +#define MAXSTRINGLENGTH 50 + + const char* PmSmallSideMaxMemory = "pmmaxmemorysmallside"; + + const char* SetParmsPrelude = "Updated "; + const char* SetParmsError = "Invalid parameter: "; + const char* InvalidParmSize = "Invalid parameter size: Input value cannot be larger than "; + + const size_t Plen = strlen(SetParmsPrelude); + const size_t Elen = strlen(SetParmsError); + + const char* invalidParmSizeMessage(uint64_t size, size_t& len) + { + static char str[sizeof(InvalidParmSize) + 12] = {0}; + ostringstream os; + os << InvalidParmSize << size; + len = os.str().length(); + strcpy(str, os.str().c_str()); + return str; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calsetparms(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + char parameter[MAXSTRINGLENGTH]; + char valuestr[MAXSTRINGLENGTH]; + size_t plen = args->lengths[0]; + size_t vlen = args->lengths[1]; + + memcpy(parameter, args->args[0], plen); + memcpy(valuestr, args->args[1], vlen); + + parameter[plen] = '\0'; + valuestr[vlen] = '\0'; + + uint64_t value = Config::uFromText(valuestr); + + THD* thd = current_thd; + uint32_t sessionID = CalpontSystemCatalog::idb_tid2sid(thd->thread_id); + + const char* msg = SetParmsError; + size_t mlen = Elen; + bool includeInput = true; + + string pstr(parameter); + boost::algorithm::to_lower(pstr); + + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + idbassert(ci != 0); + + if (pstr == PmSmallSideMaxMemory) + { + joblist::ResourceManager* rm = joblist::ResourceManager::instance(); + + if (rm->getHjTotalUmMaxMemorySmallSide() >= value) + { + ci->rmParms.push_back(RMParam(sessionID, execplan::PMSMALLSIDEMEMORY, value)); + + msg = SetParmsPrelude; + mlen = Plen; + } + else + { + msg = invalidParmSizeMessage(rm->getHjTotalUmMaxMemorySmallSide(), mlen); + includeInput = false; + } + } + + memcpy(result, msg, mlen); + + if (includeInput) + { + memcpy(result + mlen, parameter, plen); + mlen += plen; + memcpy(result + mlen++, " ", 1); + memcpy(result + mlen, valuestr, vlen); + *length = mlen + vlen; + } + else + *length = mlen; + + return result; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calsetparms_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 2 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT) + { + strcpy(message, "CALSETPARMS() requires two string arguments"); + return 1; + } + + initid->max_length = MAXSTRINGLENGTH; + + char valuestr[MAXSTRINGLENGTH]; + size_t vlen = args->lengths[1]; + + memcpy(valuestr, args->args[1], vlen--); + + for (size_t i = 0; i < vlen; ++i) + if (!isdigit(valuestr[i])) + { + strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K"); + return 1; + } + + if (!isdigit(valuestr[vlen])) + { + switch (valuestr[vlen]) + { + case 'G': + case 'g': + case 'M': + case 'm': + case 'K': + case 'k': + case '\0': + break; + + default: + strcpy(message, "CALSETPARMS() second argument must be numeric or end in G, M or K"); + return 1; + } + } + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calsetparms_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calgetstats(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + + unsigned long l = ci->queryStats.size(); + + if (l == 0) + { + *is_null = 1; + return 0; + } + + if (l > 255) l = 255; + + memcpy(result, ci->queryStats.c_str(), l); + *length = l; + return result; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calgetstats_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 0) + { + strcpy(message, "CALGETSTATS() takes no arguments"); + return 1; + } + + initid->maybe_null = 1; + initid->max_length = 255; + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calgetstats_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + long long calsettrace(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, char* error) + { + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + + long long oldTrace = ci->traceFlags; + ci->traceFlags = (uint32_t)(*((long long*)args->args[0])); + // keep the vtablemode bit + ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_OFF); + ci->traceFlags |= (oldTrace & CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH); + return oldTrace; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calsettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 1 || args->arg_type[0] != INT_RESULT) + { + strcpy(message, "CALSETTRACE() requires one INTEGER argument"); + return 1; + } + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calsettrace_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif +// Return 1 if system is ready for reads or 0 if not. + long long mcssystemready(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, char* error) + { + long long rtn = 0; + Oam oam; + DBRM dbrm(true); + SystemStatus systemstatus; + + try + { + oam.getSystemStatus(systemstatus); + + if (systemstatus.SystemOpState == ACTIVE + && dbrm.getSystemReady() + && dbrm.getSystemQueryReady()) + { + return 1; + } + } + catch (...) + { + *error = 1; + } + + return rtn; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool mcssystemready_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void mcssystemready_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif +// Return non-zero if system is read only; 0 if writeable + long long mcssystemreadonly(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, char* error) + { + long long rtn = 0; + DBRM dbrm(true); + + try + { + if (dbrm.getSystemSuspended()) + { + rtn = 1; + } + + if (dbrm.isReadWrite() > 0) // Returns 0 for writable, 5 for read only + { + rtn = 2; + } + } + catch (...) + { + *error = 1; + rtn = 1; + } + + return rtn; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool mcssystemreadonly_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void mcssystemreadonly_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif +// Return non-zero if this is the primary UM; 0 if not primary + long long mcssystemprimary(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, char* error) + { + long long rtn = 0; + Oam oam; + string PrimaryUMModuleName; + string localModule; + oamModuleInfo_t st; + + try + { + st = oam.getModuleInfo(); + localModule = boost::get<0>(st); + PrimaryUMModuleName = config::Config::makeConfig()->getConfig("SystemConfig", "PrimaryUMModuleName"); + + if (boost::iequals(localModule, PrimaryUMModuleName)) + rtn = 1; + if (PrimaryUMModuleName == "unassigned") + rtn = 1; + } + catch (runtime_error& e) + { + // It's difficult to return an error message from a numerical UDF + //string msg = string("ERROR: Problem getting Primary UM Module Name. ") + e.what(); + *error = 1; + } + catch (...) + { + *error = 1; + } + return rtn; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool mcssystemprimary_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void mcssystemprimary_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calviewtablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT)) + { + strcpy(message, "CALVIEWTABLELOCK() requires two string arguments"); + return 1; + } + else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) ) + { + strcpy(message, "CALVIEWTABLELOCK() requires one string argument"); + return 1; + } + else if (args->arg_count > 2 ) + { + strcpy(message, "CALVIEWTABLELOCK() takes one or two arguments only"); + return 1; + } + else if (args->arg_count == 0 ) + { + strcpy(message, "CALVIEWTABLELOCK() requires at least one argument"); + return 1; + } + + initid->maybe_null = 1; + initid->max_length = 255; + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calviewtablelock(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + THD* thd = current_thd; + + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + CalpontSystemCatalog::TableName tableName; + + if ( args->arg_count == 2 ) + { + tableName.schema = args->args[0]; + tableName.table = args->args[1]; + } + else if ( args->arg_count == 1 ) + { + tableName.table = args->args[0]; + + if (thd->db.length) + tableName.schema = thd->db.str; + else + { + string msg("No schema information provided"); + memcpy(result, msg.c_str(), msg.length()); + *length = msg.length(); + return result; + } + } + + if ( !ci->dmlProc ) + { + ci->dmlProc = new MessageQueueClient("DMLProc"); + //cout << "viewtablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl; + } + + string lockinfo = ha_calpont_impl_viewtablelock(*ci, tableName); + + memcpy(result, lockinfo.c_str(), lockinfo.length()); + *length = lockinfo.length(); + return result; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calviewtablelock_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calcleartablelock_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if ((args->arg_count != 1) || (args->arg_type[0] != INT_RESULT)) + { + strcpy(message, + "CALCLEARTABLELOCK() requires one integer argument (the lockID)"); + return 1; + } + + initid->maybe_null = 1; + initid->max_length = 255; + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calcleartablelock(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast( + get_fe_conn_info_ptr()); + long long lockID = *reinterpret_cast(args->args[0]); + + if ( !ci->dmlProc ) + { + ci->dmlProc = new MessageQueueClient("DMLProc"); + //cout << "cleartablelock starts a new client " << ci->dmlProc << " for session " << thd->thread_id << endl; + } + + unsigned long long uLockID = lockID; + string lockinfo = ha_calpont_impl_cleartablelock(*ci, uLockID); + + memcpy(result, lockinfo.c_str(), lockinfo.length()); + *length = lockinfo.length(); + return result; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calcleartablelock_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool callastinsertid_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count == 2 && (args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT)) + { + strcpy(message, "CALLASTINSRTID() requires two string arguments"); + return 1; + } + else if ((args->arg_count == 1) && (args->arg_type[0] != STRING_RESULT ) ) + { + strcpy(message, "CALLASTINSERTID() requires one string argument"); + return 1; + } + else if (args->arg_count > 2 ) + { + strcpy(message, "CALLASTINSERTID() takes one or two arguments only"); + return 1; + } + else if (args->arg_count == 0 ) + { + strcpy(message, "CALLASTINSERTID() requires at least one argument"); + return 1; + } + + initid->maybe_null = 1; + initid->max_length = 255; + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + long long callastinsertid(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, char* error) + { + THD* thd = current_thd; + + CalpontSystemCatalog::TableName tableName; + uint64_t nextVal = 0; + + if ( args->arg_count == 2 ) + { + tableName.schema = args->args[0]; + tableName.table = args->args[1]; + } + else if ( args->arg_count == 1 ) + { + tableName.table = args->args[0]; + + if (thd->db.length) + tableName.schema = thd->db.str; + else + { + return -1; + } + } + + boost::shared_ptr csc = + CalpontSystemCatalog::makeCalpontSystemCatalog( + CalpontSystemCatalog::idb_tid2sid(thd->thread_id)); + csc->identity(execplan::CalpontSystemCatalog::FE); + + try + { + nextVal = csc->nextAutoIncrValue(tableName); + } + catch (std::exception&) + { + string msg("No such table found during autincrement"); + setError(thd, ER_INTERNAL_ERROR, msg); + return nextVal; + } + + if (nextVal == AUTOINCR_SATURATED) + { + setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT)); + return nextVal; + } + + //@Bug 3559. Return a message for table without autoincrement column. + if (nextVal == 0) + { + string msg("Autoincrement does not exist for this table."); + setError(thd, ER_INTERNAL_ERROR, msg); + return nextVal; + } + + return (nextVal - 1); + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void callastinsertid_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calflushcache_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + long long calflushcache(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, char* error) + { + return static_cast(cacheutils::flushPrimProcCache()); + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calflushcache_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 0) + { + strcpy(message, "CALFLUSHCACHE() takes no arguments"); + return 1; + } + + return 0; + } + + static const unsigned long TraceSize = 16 * 1024; + +//mysqld will call this with only 766 bytes available in result no matter what we asked for in calgettrace_init() +// if we return a pointer that is not result, mysqld will take our pointer and use it, freeing up result +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calgettrace(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + const string* msgp; + int flags = 0; + + if (args->arg_count > 0) + { + if (args->arg_type[0] == INT_RESULT) + { + flags = *reinterpret_cast(args->args[0]); + } + } + + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + + if (flags > 0) + //msgp = &connMap[sessionID].extendedStats; + msgp = &ci->extendedStats; + else + //msgp = &connMap[sessionID].miniStats; + msgp = &ci->miniStats; + + unsigned long l = msgp->size(); + + if (l == 0) + { + *is_null = 1; + return 0; + } + + if (l > TraceSize) l = TraceSize; + + *length = l; + return msgp->c_str(); + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calgettrace_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { +#if 0 + + if (args->arg_count != 0) + { + strcpy(message, "CALGETTRACE() takes no arguments"); + return 1; + } + +#endif + initid->maybe_null = 1; + initid->max_length = TraceSize; + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calgettrace_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calgetversion(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + string version(columnstore_version); + *length = version.size(); + memcpy(result, version.c_str(), *length); + return result; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calgetversion_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 0) + { + strcpy(message, "CALGETVERSION() takes no arguments"); + return 1; + } + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calgetversion_deinit(UDF_INIT* initid) + { + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + const char* calgetsqlcount(UDF_INIT* initid, UDF_ARGS* args, + char* result, unsigned long* length, + char* is_null, char* error) + { + if (get_fe_conn_info_ptr() == NULL) + set_fe_conn_info_ptr((void*)new cal_connection_info()); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + idbassert(ci != 0); + + MessageQueueClient* mqc = 0; + mqc = new MessageQueueClient("ExeMgr1"); + + ByteStream msg; + ByteStream::quadbyte runningSql, waitingSql; + ByteStream::quadbyte qb = 5; + msg << qb; + mqc->write(msg); + + //get ExeMgr response + msg.restart(); + msg = mqc->read(); + + if (msg.length() == 0) + { + memcpy(result, "Lost connection to ExeMgr", *length); + return result; + } + + msg >> runningSql; + msg >> waitingSql; + delete mqc; + + char ans[128]; + sprintf(ans, "Running SQL statements %d, Waiting SQL statments %d", runningSql, waitingSql); + *length = strlen(ans); + memcpy(result, ans, *length); + return result; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool calgetsqlcount_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 0) + { + strcpy(message, "CALGETSQLCOUNT() takes no arguments"); + return 1; + } + + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void calgetsqlcount_deinit(UDF_INIT* initid) + { + } + + +} //extern "C" diff --git a/dbcon/mysql/mcs_sysvars.cpp b/dbcon/mysql/mcs_sysvars.cpp index 97069eb8e..106559a0a 100644 --- a/dbcon/mysql/mcs_sysvars.cpp +++ b/dbcon/mysql/mcs_sysvars.cpp @@ -101,24 +101,6 @@ void set_fe_conn_info_ptr(void* ptr) THDVAR(current_thd, fe_conn_info_ptr) = (uint64_t)(ptr); } -/*ha_calpont* get_legacy_handler(mcs_handler_info mcs_hndl_ptr) -{ - //MCOL-1101 Add handler type check - //hndl_ptr.hndl_type == LEGACY ) - ha_calpont* hndl; - if ( mcs_hndl_ptr.hndl_ptr != NULL ) - { - hndl = (ha_calpont*)(mcs_hndl_ptr.hndl_ptr); - } - else - { - hndl = new ha_calpont(); - hndl->fe_conn_info = (void*)THDVAR(current_thd, fe_conn_info_ptr); - } - - return hndl; -}*/ - mcs_compression_type_t get_compression_type(THD* thd) { return (mcs_compression_type_t) THDVAR(thd, compression_type); } diff --git a/dbcon/mysql/mcs_sysvars.h b/dbcon/mysql/mcs_sysvars.h index c1506615b..8e2436db6 100644 --- a/dbcon/mysql/mcs_sysvars.h +++ b/dbcon/mysql/mcs_sysvars.h @@ -24,6 +24,7 @@ extern st_mysql_sys_var* mcs_system_variables[]; +/* MCOL-1101 Remove before release enum mcs_handler_types_t { SELECT, @@ -40,7 +41,7 @@ struct mcs_handler_info void* hndl_ptr; mcs_handler_types_t hndl_type; }; - +*/ // compression_type enum mcs_compression_type_t { NO_COMPRESSION = 0,