From 4fe399e5c0e17eadfd2b0adce07e95a5c439cc1d Mon Sep 17 00:00:00 2001 From: david hill Date: Wed, 30 May 2018 15:46:58 -0500 Subject: [PATCH 01/13] MCOL-1370 - auto-failure, dont switch ebs when detahc fails --- oam/oamcpp/liboamcpp.cpp | 79 ++++++++++++++++++++++++++++++++++++++++ oam/oamcpp/liboamcpp.h | 2 + procmgr/main.cpp | 20 +++++++++- 3 files changed, 100 insertions(+), 1 deletion(-) diff --git a/oam/oamcpp/liboamcpp.cpp b/oam/oamcpp/liboamcpp.cpp index 649d86f13..0536fbacb 100644 --- a/oam/oamcpp/liboamcpp.cpp +++ b/oam/oamcpp/liboamcpp.cpp @@ -5477,6 +5477,21 @@ namespace oam exceptionControl("autoMovePmDbroot", API_INVALID_PARAMETER); } + //detach first to make sure DBS can be detach before trying to move to another pm + try + { + typedef std::vector dbrootList; + dbrootList dbrootlist; + dbrootlist.push_back(itoa(dbrootID)); + + amazonDetach(dbrootlist); + } + catch (exception& ) + { + writeLog("ERROR: amazonDetach failure", LOG_TYPE_ERROR ); + exceptionControl("autoMovePmDbroot", API_DETACH_FAILURE); + } + //get dbroot id for other PMs systemStorageInfo_t t; DeviceDBRootList moduledbrootlist; @@ -9644,6 +9659,69 @@ namespace oam } /*************************************************************************** + * + * Function: amazonDetach + * + * Purpose: Amazon EC2 volume deattach needed + * + ****************************************************************************/ + + void Oam::amazonDetach(dbrootList dbrootConfigList) + { + //if amazon cloud with external volumes, do the detach/attach moves + string cloud; + string DBRootStorageType; + try { + getSystemConfig("Cloud", cloud); + getSystemConfig("DBRootStorageType", DBRootStorageType); + } + catch(...) {} + + if ( (cloud == "amazon-ec2" || cloud == "amazon-vpc") && + DBRootStorageType == "external" ) + { + writeLog("amazonDetach function started ", LOG_TYPE_DEBUG ); + + dbrootList::iterator pt3 = dbrootConfigList.begin(); + for( ; pt3 != dbrootConfigList.end() ; pt3++) + { + string dbrootid = *pt3; + string volumeNameID = "PMVolumeName" + dbrootid; + string volumeName = oam::UnassignedName; + string deviceNameID = "PMVolumeDeviceName" + dbrootid; + string deviceName = oam::UnassignedName; + try { + getSystemConfig( volumeNameID, volumeName); + getSystemConfig( deviceNameID, deviceName); + } + catch(...) + {} + + if ( volumeName == oam::UnassignedName || deviceName == oam::UnassignedName ) + { + cout << " ERROR: amazonDetach, invalid configure " + volumeName + ":" + deviceName << endl; + writeLog("ERROR: amazonDetach, invalid configure " + volumeName + ":" + deviceName, LOG_TYPE_ERROR ); + exceptionControl("amazonDetach", API_INVALID_PARAMETER); + } + + //send msg to to-pm to umount volume + int returnStatus = sendMsgToProcMgr(UNMOUNT, dbrootid, FORCEFUL, ACK_YES); + if (returnStatus != API_SUCCESS) { + writeLog("ERROR: amazonDetach, umount failed on " + dbrootid, LOG_TYPE_ERROR ); + } + + if (!detachEC2Volume(volumeName)) { + cout << " ERROR: amazonDetach, detachEC2Volume failed on " + volumeName << endl; + writeLog("ERROR: amazonDetach, detachEC2Volume failed on " + volumeName , LOG_TYPE_ERROR ); + exceptionControl("amazonDetach", API_FAILURE); + } + + writeLog("amazonDetach, detachEC2Volume passed on " + volumeName , LOG_TYPE_DEBUG ); + } + } + } + + /*************************************************************************** * * Function: amazonReattach * @@ -9736,6 +9814,7 @@ namespace oam } } + /*************************************************************************** * * Function: mountDBRoot diff --git a/oam/oamcpp/liboamcpp.h b/oam/oamcpp/liboamcpp.h index 51c1f773c..fdfa7fe40 100644 --- a/oam/oamcpp/liboamcpp.h +++ b/oam/oamcpp/liboamcpp.h @@ -229,6 +229,7 @@ namespace oam API_CONN_REFUSED, API_CANCELLED, API_STILL_WORKING, + API_DETACH_FAILURE, API_MAX }; @@ -2432,6 +2433,7 @@ namespace oam void amazonReattach(std::string toPM, dbrootList dbrootConfigList, bool attach = false); void mountDBRoot(dbrootList dbrootConfigList, bool mount = true); + void amazonDetach(dbrootList dbrootConfigList); /** *@brief gluster control diff --git a/procmgr/main.cpp b/procmgr/main.cpp index 118ac0d73..bf9ff8d67 100644 --- a/procmgr/main.cpp +++ b/procmgr/main.cpp @@ -1922,7 +1922,7 @@ void pingDeviceThread() if ( PrimaryUMModuleName == moduleName ) downPrimaryUM = true; - // if not disabled and amazon, skip + // if disabled, skip if (opState != oam::AUTO_DISABLED ) { //Log failure, issue alarm, set moduleOpState @@ -1968,6 +1968,7 @@ void pingDeviceThread() if ( ( moduleName.find("pm") == 0 && !amazon && ( DBRootStorageType != "internal") ) || ( moduleName.find("pm") == 0 && amazon && downActiveOAMModule ) || ( moduleName.find("pm") == 0 && amazon && AmazonPMFailover == "y") ) { + string error; try { log.writeLog(__LINE__, "Call autoMovePmDbroot", LOG_TYPE_DEBUG); oam.autoMovePmDbroot(moduleName); @@ -1984,6 +1985,23 @@ void pingDeviceThread() { log.writeLog(__LINE__, "EXCEPTION ERROR on autoMovePmDbroot: Caught unknown exception!", LOG_TYPE_ERROR); } + + if ( error == OAM::API_DETACH_FAILURE ) + { + processManager.setModuleState(moduleName, oam::AUTO_DISABLED); + + // resume the dbrm + oam.dbrmctl("resume"); + log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); + + //enable query stats + dbrm.setSystemQueryReady(true); + + //set query system state ready + processManager.setQuerySystemState(true); + + break; + } } } From b2314f152d88534fd5f999350e9c60e106c7d408 Mon Sep 17 00:00:00 2001 From: david hill Date: Wed, 30 May 2018 16:16:42 -0500 Subject: [PATCH 02/13] MCOL-1370 --- oam/oamcpp/liboamcpp.cpp | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/oam/oamcpp/liboamcpp.cpp b/oam/oamcpp/liboamcpp.cpp index 0536fbacb..b6fb8dc2d 100644 --- a/oam/oamcpp/liboamcpp.cpp +++ b/oam/oamcpp/liboamcpp.cpp @@ -5478,18 +5478,24 @@ namespace oam } //detach first to make sure DBS can be detach before trying to move to another pm - try + DBRootConfigList::iterator pt3 = residedbrootConfigList.begin(); + for( ; pt3 != residedbrootConfigList.end() ; ) { - typedef std::vector dbrootList; - dbrootList dbrootlist; - dbrootlist.push_back(itoa(dbrootID)); + int dbrootID = *pt3; - amazonDetach(dbrootlist); - } - catch (exception& ) - { - writeLog("ERROR: amazonDetach failure", LOG_TYPE_ERROR ); - exceptionControl("autoMovePmDbroot", API_DETACH_FAILURE); + try + { + typedef std::vector dbrootList; + dbrootList dbrootlist; + dbrootlist.push_back(itoa(dbrootID)); + + amazonDetach(dbrootlist); + } + catch (exception& ) + { + writeLog("ERROR: amazonDetach failure", LOG_TYPE_ERROR ); + exceptionControl("autoMovePmDbroot", API_DETACH_FAILURE); + } } //get dbroot id for other PMs From 5226833dd4579238fd8cdf0bf2f4461e8c072547 Mon Sep 17 00:00:00 2001 From: david hill Date: Wed, 30 May 2018 16:27:33 -0500 Subject: [PATCH 03/13] MCOL-1370 --- procmgr/main.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/procmgr/main.cpp b/procmgr/main.cpp index bf9ff8d67..49443de8d 100644 --- a/procmgr/main.cpp +++ b/procmgr/main.cpp @@ -1986,7 +1986,7 @@ void pingDeviceThread() log.writeLog(__LINE__, "EXCEPTION ERROR on autoMovePmDbroot: Caught unknown exception!", LOG_TYPE_ERROR); } - if ( error == OAM::API_DETACH_FAILURE ) + if ( error == oam.itoa(oam::API_DETACH_FAILURE) ) { processManager.setModuleState(moduleName, oam::AUTO_DISABLED); From ed8e774dcd54e471a2858da8734fa6013d307565 Mon Sep 17 00:00:00 2001 From: david hill Date: Fri, 1 Jun 2018 16:33:48 -0500 Subject: [PATCH 04/13] MCOL-1370 --- oam/cloud/MCSVolumeCmds.sh | 4 +- oam/oamcpp/liboamcpp.cpp | 94 ++++++++++++++++++++++++++++++++++++-- oam/oamcpp/liboamcpp.h | 1 + procmgr/main.cpp | 4 +- procmgr/processmanager.cpp | 15 +++--- procmgr/processmanager.h | 2 +- procmon/main.cpp | 6 +-- procmon/processmonitor.cpp | 16 +++---- 8 files changed, 114 insertions(+), 28 deletions(-) diff --git a/oam/cloud/MCSVolumeCmds.sh b/oam/cloud/MCSVolumeCmds.sh index 291d27e44..c7a231261 100755 --- a/oam/cloud/MCSVolumeCmds.sh +++ b/oam/cloud/MCSVolumeCmds.sh @@ -202,7 +202,7 @@ detachvolume() { checkInfostatus if [ $STATUS == "detaching" ]; then retries=1 - while [ $retries -ne 60 ]; do + while [ $retries -ne 10 ]; do #retry until it's attached $AWSCLI detach-volume --volume-id $volumeName --region $Region > /tmp/volumeInfo_$volumeName 2>&1 @@ -239,7 +239,7 @@ attachvolume() { checkInfostatus if [ $STATUS == "attaching" -o $STATUS == "already-attached" ]; then retries=1 - while [ $retries -ne 60 ]; do + while [ $retries -ne 10 ]; do #check status until it's attached describevolume if [ $STATUS == "attached" ]; then diff --git a/oam/oamcpp/liboamcpp.cpp b/oam/oamcpp/liboamcpp.cpp index b6fb8dc2d..9a405e978 100644 --- a/oam/oamcpp/liboamcpp.cpp +++ b/oam/oamcpp/liboamcpp.cpp @@ -5479,7 +5479,7 @@ namespace oam //detach first to make sure DBS can be detach before trying to move to another pm DBRootConfigList::iterator pt3 = residedbrootConfigList.begin(); - for( ; pt3 != residedbrootConfigList.end() ; ) + for( ; pt3 != residedbrootConfigList.end() ; pt3++ ) { int dbrootID = *pt3; @@ -5494,6 +5494,14 @@ namespace oam catch (exception& ) { writeLog("ERROR: amazonDetach failure", LOG_TYPE_ERROR ); + + //reattach + typedef std::vector dbrootList; + dbrootList dbrootlist; + dbrootlist.push_back(itoa(dbrootID)); + + amazonAttach(residePM, dbrootlist); + exceptionControl("autoMovePmDbroot", API_DETACH_FAILURE); } } @@ -5972,9 +5980,8 @@ namespace oam } if (!found) { - writeLog("ERROR: no dbroots found in ../Calpont/local/moveDbrootTransactionLog", LOG_TYPE_ERROR ); - cout << "ERROR: no dbroots found in " << fileName << endl; - exceptionControl("autoUnMovePmDbroot", API_FAILURE); + writeLog("No dbroots found in ../Calpont/local/moveDbrootTransactionLog", LOG_TYPE_DEBUG ); + cout << "No dbroots found in " << fileName << endl; } oldFile.close(); @@ -7269,7 +7276,7 @@ namespace oam else return; - // check if mysql-Capont is installed + // check if mysql-Columnstore is installed string mysqlscript = InstallDir + "/mysql/mysql-Columnstore"; if (access(mysqlscript.c_str(), X_OK) != 0) return; @@ -9727,6 +9734,83 @@ namespace oam } } + /*************************************************************************** + * + * Function: amazonAttach + * + * Purpose: Amazon EC2 volume Attach needed + * + ****************************************************************************/ + + void Oam::amazonAttach(std::string toPM, dbrootList dbrootConfigList) + { + //if amazon cloud with external volumes, do the detach/attach moves + string cloud; + string DBRootStorageType; + try { + getSystemConfig("Cloud", cloud); + getSystemConfig("DBRootStorageType", DBRootStorageType); + } + catch(...) {} + + if ( (cloud == "amazon-ec2" || cloud == "amazon-vpc") && + DBRootStorageType == "external" ) + { + writeLog("amazonAttach function started ", LOG_TYPE_DEBUG ); + + //get Instance Name for to-pm + string toInstanceName = oam::UnassignedName; + try + { + ModuleConfig moduleconfig; + getSystemConfig(toPM, moduleconfig); + HostConfigList::iterator pt1 = moduleconfig.hostConfigList.begin(); + toInstanceName = (*pt1).HostName; + } + catch(...) + {} + + if ( toInstanceName == oam::UnassignedName || toInstanceName.empty() ) + { + cout << " ERROR: amazonAttach, invalid Instance Name for " << toPM << endl; + writeLog("ERROR: amazonAttach, invalid Instance Name " + toPM, LOG_TYPE_ERROR ); + exceptionControl("amazonAttach", API_INVALID_PARAMETER); + } + + dbrootList::iterator pt3 = dbrootConfigList.begin(); + for( ; pt3 != dbrootConfigList.end() ; pt3++) + { + string dbrootid = *pt3; + string volumeNameID = "PMVolumeName" + dbrootid; + string volumeName = oam::UnassignedName; + string deviceNameID = "PMVolumeDeviceName" + dbrootid; + string deviceName = oam::UnassignedName; + try { + getSystemConfig( volumeNameID, volumeName); + getSystemConfig( deviceNameID, deviceName); + } + catch(...) + {} + + if ( volumeName == oam::UnassignedName || deviceName == oam::UnassignedName ) + { + cout << " ERROR: amazonAttach, invalid configure " + volumeName + ":" + deviceName << endl; + writeLog("ERROR: amazonAttach, invalid configure " + volumeName + ":" + deviceName, LOG_TYPE_ERROR ); + exceptionControl("amazonAttach", API_INVALID_PARAMETER); + } + + if (!attachEC2Volume(volumeName, deviceName, toInstanceName)) { + cout << " ERROR: amazonAttach, attachEC2Volume failed on " + volumeName + ":" + deviceName + ":" + toInstanceName << endl; + writeLog("ERROR: amazonAttach, attachEC2Volume failed on " + volumeName + ":" + deviceName + ":" + toInstanceName, LOG_TYPE_ERROR ); + exceptionControl("amazonAttach", API_FAILURE); + } + + writeLog("amazonAttach, attachEC2Volume passed on " + volumeName + ":" + toPM, LOG_TYPE_DEBUG ); + } + } + } + + /*************************************************************************** * * Function: amazonReattach diff --git a/oam/oamcpp/liboamcpp.h b/oam/oamcpp/liboamcpp.h index fdfa7fe40..e5011407c 100644 --- a/oam/oamcpp/liboamcpp.h +++ b/oam/oamcpp/liboamcpp.h @@ -2434,6 +2434,7 @@ namespace oam void amazonReattach(std::string toPM, dbrootList dbrootConfigList, bool attach = false); void mountDBRoot(dbrootList dbrootConfigList, bool mount = true); void amazonDetach(dbrootList dbrootConfigList); + void amazonAttach(std::string toPM, dbrootList dbrootConfigList); /** *@brief gluster control diff --git a/procmgr/main.cpp b/procmgr/main.cpp index 49443de8d..2747fda16 100644 --- a/procmgr/main.cpp +++ b/procmgr/main.cpp @@ -1553,7 +1553,7 @@ void pingDeviceThread() processManager.restartProcessType("WriteEngineServer", moduleName); //set module to enable state - processManager.enableModule(moduleName, oam::AUTO_OFFLINE); + processManager.enableModule(moduleName, oam::AUTO_OFFLINE, true); downActiveOAMModule = false; int retry; @@ -1647,7 +1647,7 @@ void pingDeviceThread() } else //set module to enable state - processManager.enableModule(moduleName, oam::AUTO_OFFLINE); + processManager.enableModule(moduleName, oam::AUTO_OFFLINE, true); //restart module processes int retry = 0; diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index 36893e050..8b01179d2 100755 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -3438,7 +3438,7 @@ void ProcessManager::recycleProcess(string module, bool enableModule) restartProcessType("ExeMgr"); sleep(1); - restartProcessType("mysql"); + restartProcessType("mysqld"); restartProcessType("WriteEngineServer"); sleep(1); @@ -3457,7 +3457,7 @@ void ProcessManager::recycleProcess(string module, bool enableModule) * purpose: Clear the Disable State on a specified module * ******************************************************************************************/ -int ProcessManager::enableModule(string target, int state) +int ProcessManager::enableModule(string target, int state, bool failover) { Oam oam; ModuleConfig moduleconfig; @@ -3496,7 +3496,8 @@ int ProcessManager::enableModule(string target, int state) setStandbyModule(newStandbyModule); //set recycle process - recycleProcess(target); + if (!failover) + recycleProcess(target); log.writeLog(__LINE__, "enableModule request for " + target + " completed", LOG_TYPE_DEBUG); @@ -4256,7 +4257,7 @@ int ProcessManager::restartProcessType( std::string processName, std::string ski PMwithUM = "n"; } - // If mysql is the processName, then send to modules were ExeMgr is running + // If mysqld is the processName, then send to modules were ExeMgr is running try { oam.getProcessStatus(systemprocessstatus); @@ -4267,7 +4268,7 @@ int ProcessManager::restartProcessType( std::string processName, std::string ski if ( systemprocessstatus.processstatus[i].Module == skipModule ) continue; - if ( processName == "mysql" ) { + if ( processName == "mysqld" ) { if ( systemprocessstatus.processstatus[i].ProcessName == "ExeMgr") { ProcessStatus procstat; oam.getProcessStatus("mysqld", systemprocessstatus.processstatus[i].Module, procstat); @@ -8985,7 +8986,7 @@ int ProcessManager::OAMParentModuleChange() if (systemstatus.SystemOpState == ACTIVE) { log.writeLog(__LINE__, "System Active, restart needed processes", LOG_TYPE_DEBUG); - processManager.restartProcessType("mysql"); + processManager.restartProcessType("mysqld"); processManager.restartProcessType("ExeMgr"); processManager.restartProcessType("WriteEngineServer"); processManager.reinitProcessType("DBRMWorkerNode"); @@ -10099,7 +10100,7 @@ void ProcessManager::stopProcessTypes(bool manualFlag) log.writeLog(__LINE__, "stopProcessTypes Called"); //front-end first - processManager.stopProcessType("mysql", manualFlag); + processManager.stopProcessType("mysqld", manualFlag); processManager.stopProcessType("DMLProc", manualFlag); processManager.stopProcessType("DDLProc", manualFlag); processManager.stopProcessType("ExeMgr", manualFlag); diff --git a/procmgr/processmanager.h b/procmgr/processmanager.h index 55dad53cb..863ad9121 100644 --- a/procmgr/processmanager.h +++ b/procmgr/processmanager.h @@ -307,7 +307,7 @@ public: /** *@brief Enable a specified module */ - int enableModule(std::string target, int state); + int enableModule(std::string target, int state, bool failover = false); /** *@brief Enable a specified module diff --git a/procmon/main.cpp b/procmon/main.cpp index b010b3d74..2f98bc1e7 100644 --- a/procmon/main.cpp +++ b/procmon/main.cpp @@ -695,8 +695,8 @@ int main(int argc, char **argv) if ( ret != 0 ) log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); - //mysql status monitor thread - if ( ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM ) || + //mysqld status monitor thread + if ( ( config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) || (PMwithUM == "y") ) { @@ -1127,7 +1127,7 @@ static void mysqlMonitorThread(MonitorConfig config) catch(...) {} - sleep(10); + sleep(5); } } diff --git a/procmon/processmonitor.cpp b/procmon/processmonitor.cpp index aa10f2666..91f78e640 100644 --- a/procmon/processmonitor.cpp +++ b/procmon/processmonitor.cpp @@ -457,7 +457,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO log.writeLog(__LINE__, "MSG RECEIVED: Stop process request on " + processName); int requestStatus = API_SUCCESS; - // check for mysql + // check for mysqld if ( processName == "mysqld" ) { try { oam.actionMysqlCalpont(MYSQL_STOP); @@ -520,7 +520,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO msg >> manualFlag; log.writeLog(__LINE__, "MSG RECEIVED: Start process request on: " + processName); - // check for mysql + // check for mysqld if ( processName == "mysqld" ) { try { oam.actionMysqlCalpont(MYSQL_START); @@ -640,7 +640,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO log.writeLog(__LINE__, "MSG RECEIVED: Restart process request on " + processName); int requestStatus = API_SUCCESS; - // check for mysql restart + // check for mysqld restart if ( processName == "mysqld" ) { try { oam.actionMysqlCalpont(MYSQL_RESTART); @@ -869,7 +869,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO log.writeLog(__LINE__, "Error running DBRM clearShm", LOG_TYPE_ERROR); } - //stop the mysql daemon + //stop the mysqld daemon try { oam.actionMysqlCalpont(MYSQL_STOP); log.writeLog(__LINE__, "Stop MySQL Process", LOG_TYPE_DEBUG); @@ -995,12 +995,12 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO system(cmd.c_str()); - //start the mysql daemon + //start the mysqld daemon try { oam.actionMysqlCalpont(MYSQL_START); } catch(...) - { // mysql didn't start, return with error + { // mysqld didn't start, return with error log.writeLog(__LINE__, "STARTALL: MySQL failed to start, start-module failure", LOG_TYPE_CRITICAL); ackMsg << (ByteStream::byte) ACK; @@ -1265,7 +1265,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO //send down notification oam.sendDeviceNotification(config.moduleName(), MODULE_DOWN); - //stop the mysql daemon and then columnstore + //stop the mysqld daemon and then columnstore try { oam.actionMysqlCalpont(MYSQL_STOP); } @@ -1444,7 +1444,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO } } - // install mysql rpms if being reconfigured as a um + // install mysqld rpms if being reconfigured as a um if ( reconfigureModuleName.find("um") != string::npos ) { string cmd = startup::StartUp::installDir() + "/bin/post-mysqld-install >> /tmp/rpminstall"; system(cmd.c_str()); From 05f1752dd05b5f1483f2f1fa3db453cf01a401a3 Mon Sep 17 00:00:00 2001 From: david hill Date: Tue, 5 Jun 2018 15:47:38 -0500 Subject: [PATCH 05/13] MCOL-1405 - fix launch of mysql monitor thread on seperate module install --- procmon/main.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/procmon/main.cpp b/procmon/main.cpp index 2f98bc1e7..b4e23a6e1 100644 --- a/procmon/main.cpp +++ b/procmon/main.cpp @@ -696,10 +696,10 @@ int main(int argc, char **argv) log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); //mysqld status monitor thread - if ( ( config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) || - (PMwithUM == "y") ) + if ( config.moduleType() == "um" || + ( config.moduleType() == "pm" && config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) || + ( config.moduleType() == "pm" && PMwithUM == "y") ) { - pthread_t mysqlThread; ret = pthread_create (&mysqlThread, NULL, (void*(*)(void*)) &mysqlMonitorThread, NULL); if ( ret != 0 ) From 2bbb70f61b9a1642c9c1f5ce715b07005cc234ff Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Wed, 6 Jun 2018 16:18:54 +0100 Subject: [PATCH 06/13] MCOL-1408 Multiple API HWM boundary fixes Fixes the following: * Generate error if calculateRowId fails * No data written when first extent is completely full on a write, all data going to second extent. * 0 byte valArray malloc * valArray free() on no malloc * Column touched but no data written if all data going to second extent * Wrong colWidth used on second extent calculateRowId * Out of bounds memory write (crash) when no data for first extent * Extent not committed if all data going to second extent --- writeengine/wrapper/writeengine.cpp | 244 ++++++++++++++-------------- 1 file changed, 126 insertions(+), 118 deletions(-) diff --git a/writeengine/wrapper/writeengine.cpp b/writeengine/wrapper/writeengine.cpp index 7cb3ca85e..afea06fee 100644 --- a/writeengine/wrapper/writeengine.cpp +++ b/writeengine/wrapper/writeengine.cpp @@ -2008,7 +2008,6 @@ timer.stop("tokenize"); if (it != aColExtsInfo.end()) //update hwm info { oldHwm = it->hwm; - } // save hwm for the old extent colWidth = colStructList[i].colWidth; @@ -2032,6 +2031,7 @@ timer.stop("tokenize"); else return ERR_INVALID_PARAM; + } //update hwm for the new extent if (newExtent) { @@ -2043,6 +2043,7 @@ timer.stop("tokenize"); break; it++; } + colWidth = newColStructList[i].colWidth; succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK/colWidth, colWidth, curFbo, curBio); if (succFlag) { @@ -2107,6 +2108,9 @@ timer.start("writeColumnRec"); curFbo)); } } + else + return ERR_INVALID_PARAM; + } } // If we create a new extent for this batch for (unsigned i = 0; i < newColStructList.size(); i++) @@ -2123,7 +2127,8 @@ timer.start("writeColumnRec"); curFbo)); } } - } + else + return ERR_INVALID_PARAM; } if (lbids.size() > 0) @@ -4604,7 +4609,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, bool versioning) { int rc = 0; - void* valArray; + void* valArray = NULL; string segFile; Column curCol; ColStructList::size_type totalColumn; @@ -4629,132 +4634,135 @@ StopWatch timer; totalRow2 = 0; } - valArray = malloc(sizeof(uint64_t) * totalRow1); - - if (totalRow1 == 0) + // It is possible totalRow1 is zero but totalRow2 has values + if ((totalRow1 == 0) && (totalRow2 == 0)) return rc; TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); - for (i = 0; i < totalColumn; i++) + if (totalRow1) { - //@Bug 2205 Check if all rows go to the new extent - //Write the first batch - RID * firstPart = rowIdArray; - ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; - - // set params - colOp->initColumn(curCol); - // need to pass real dbRoot, partition, and segment to setColParam - colOp->setColParam(curCol, 0, colStructList[i].colWidth, - colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, - colStructList[i].fCompressionType, colStructList[i].fColDbRoot, - colStructList[i].fColPartition, colStructList[i].fColSegment); - - ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); - ColExtsInfo::iterator it = aColExtsInfo.begin(); - while (it != aColExtsInfo.end()) + valArray = malloc(sizeof(uint64_t) * totalRow1); + for (i = 0; i < totalColumn; i++) { - if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) - break; - it++; - } + //@Bug 2205 Check if all rows go to the new extent + //Write the first batch + RID * firstPart = rowIdArray; + ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; - if (it == aColExtsInfo.end()) //add this one to the list - { - ColExtInfo aExt; - aExt.dbRoot =colStructList[i].fColDbRoot; - aExt.partNum = colStructList[i].fColPartition; - aExt.segNum = colStructList[i].fColSegment; - aExt.compType = colStructList[i].fCompressionType; - aColExtsInfo.push_back(aExt); - aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); - } + // set params + colOp->initColumn(curCol); + // need to pass real dbRoot, partition, and segment to setColParam + colOp->setColParam(curCol, 0, colStructList[i].colWidth, + colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, + colStructList[i].fCompressionType, colStructList[i].fColDbRoot, + colStructList[i].fColPartition, colStructList[i].fColSegment); - rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file - if (rc != NO_ERROR) - break; - - // handling versioning - vector rangeList; - if (versioning) - { - rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], - colStructList[i].colWidth, totalRow1, firstPart, rangeList); - if (rc != NO_ERROR) { - if (colStructList[i].fCompressionType == 0) - { - curCol.dataFile.pFile->flush(); - } - - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - break; - } - } - - //totalRow1 -= totalRow2; - // have to init the size here - // nullArray = (bool*) malloc(sizeof(bool) * totalRow); - uint8_t tmp8; - uint16_t tmp16; - uint32_t tmp32; - for (size_t j = 0; j < totalRow1; j++) - { - uint64_t curValue = colValueList[((totalRow1 + totalRow2)*i) + j]; - switch (colStructList[i].colType) + ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); + while (it != aColExtsInfo.end()) { - case WriteEngine::WR_VARBINARY : // treat same as char for now - case WriteEngine::WR_CHAR: - case WriteEngine::WR_BLOB: - case WriteEngine::WR_TEXT: - ((uint64_t*)valArray)[j] = curValue; - break; - case WriteEngine::WR_INT: - case WriteEngine::WR_UINT: - case WriteEngine::WR_FLOAT: - tmp32 = curValue; - ((uint32_t*)valArray)[j] = tmp32; - break; - case WriteEngine::WR_ULONGLONG: - case WriteEngine::WR_LONGLONG: - case WriteEngine::WR_DOUBLE: - case WriteEngine::WR_TOKEN: - ((uint64_t*)valArray)[j] = curValue; - break; - case WriteEngine::WR_BYTE: - case WriteEngine::WR_UBYTE: - tmp8 = curValue; - ((uint8_t*)valArray)[j] = tmp8; - break; - case WriteEngine::WR_SHORT: - case WriteEngine::WR_USHORT: - tmp16 = curValue; - ((uint16_t*)valArray)[j] = tmp16; + if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) break; + it++; } + + if (it == aColExtsInfo.end()) //add this one to the list + { + ColExtInfo aExt; + aExt.dbRoot =colStructList[i].fColDbRoot; + aExt.partNum = colStructList[i].fColPartition; + aExt.segNum = colStructList[i].fColSegment; + aExt.compType = colStructList[i].fCompressionType; + aColExtsInfo.push_back(aExt); + aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); + } + + rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file + if (rc != NO_ERROR) + break; + + // handling versioning + vector rangeList; + if (versioning) + { + rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], + colStructList[i].colWidth, totalRow1, firstPart, rangeList); + if (rc != NO_ERROR) { + if (colStructList[i].fCompressionType == 0) + { + curCol.dataFile.pFile->flush(); + } + + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + break; + } + } + + //totalRow1 -= totalRow2; + // have to init the size here + // nullArray = (bool*) malloc(sizeof(bool) * totalRow); + uint8_t tmp8; + uint16_t tmp16; + uint32_t tmp32; + for (size_t j = 0; j < totalRow1; j++) + { + uint64_t curValue = colValueList[((totalRow1 + totalRow2)*i) + j]; + switch (colStructList[i].colType) + { + case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_CHAR: + case WriteEngine::WR_BLOB: + case WriteEngine::WR_TEXT: + ((uint64_t*)valArray)[j] = curValue; + break; + case WriteEngine::WR_INT: + case WriteEngine::WR_UINT: + case WriteEngine::WR_FLOAT: + tmp32 = curValue; + ((uint32_t*)valArray)[j] = tmp32; + break; + case WriteEngine::WR_ULONGLONG: + case WriteEngine::WR_LONGLONG: + case WriteEngine::WR_DOUBLE: + case WriteEngine::WR_TOKEN: + ((uint64_t*)valArray)[j] = curValue; + break; + case WriteEngine::WR_BYTE: + case WriteEngine::WR_UBYTE: + tmp8 = curValue; + ((uint8_t*)valArray)[j] = tmp8; + break; + case WriteEngine::WR_SHORT: + case WriteEngine::WR_USHORT: + tmp16 = curValue; + ((uint16_t*)valArray)[j] = tmp16; + break; + } + } + + +#ifdef PROFILE + timer.start("writeRow "); +#endif + rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray); +#ifdef PROFILE + timer.stop("writeRow "); +#endif + colOp->closeColumnFile(curCol); + + if (versioning) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + + // check error + if (rc != NO_ERROR) + break; + + } // end of for (i = 0 + if (valArray != NULL) + { + free(valArray); + valArray = NULL; } - - -#ifdef PROFILE -timer.start("writeRow "); -#endif - rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray); -#ifdef PROFILE -timer.stop("writeRow "); -#endif - colOp->closeColumnFile(curCol); - - if (versioning) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - - // check error - if (rc != NO_ERROR) - break; - - } // end of for (i = 0 - if (valArray != NULL) - { - free(valArray); - valArray = NULL; } // MCOL-1176 - Write second extent From 250d90a9bc695de73c485e79a27b7e7895ca5c02 Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Thu, 14 Jun 2018 14:43:37 +0100 Subject: [PATCH 07/13] MCOL-1474 Catch errors in PriorityThreadPool PriorityThreadPool errors cause crashes in PrimProc. This patch catches the errors and causes the thread to end cleanly. --- utils/threadpool/prioritythreadpool.cpp | 151 ++++++++++++++++-------- 1 file changed, 100 insertions(+), 51 deletions(-) diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index 461069e18..a0fc6a347 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -111,64 +111,113 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() uint32_t rescheduleCount; uint32_t queueSize; - while (!_stop) { + try + { + while (!_stop) { - mutex::scoped_lock lk(mutex); + mutex::scoped_lock lk(mutex); - queue = pickAQueue(preferredQueue); - if (jobQueues[queue].empty()) { - newJob.wait(lk); - continue; - } + queue = pickAQueue(preferredQueue); + if (jobQueues[queue].empty()) { + newJob.wait(lk); + continue; + } - queueSize = jobQueues[queue].size(); - weight = 0; - // 3 conditions stop this thread from grabbing all jobs in the queue - // - // 1: The weight limit has been exceeded - // 2: The queue is empty - // 3: It has grabbed more than half of the jobs available & - // should leave some to the other threads + queueSize = jobQueues[queue].size(); + weight = 0; + // 3 conditions stop this thread from grabbing all jobs in the queue + // + // 1: The weight limit has been exceeded + // 2: The queue is empty + // 3: It has grabbed more than half of the jobs available & + // should leave some to the other threads - while ((weight < weightPerRun) && (!jobQueues[queue].empty()) - && (runList.size() <= queueSize/2)) { - runList.push_back(jobQueues[queue].front()); - jobQueues[queue].pop_front(); - weight += runList.back().weight; - } - lk.unlock(); + while ((weight < weightPerRun) && (!jobQueues[queue].empty()) + && (runList.size() <= queueSize/2)) { + runList.push_back(jobQueues[queue].front()); + jobQueues[queue].pop_front(); + weight += runList.back().weight; + } + lk.unlock(); - reschedule.resize(runList.size()); - rescheduleCount = 0; - for (i = 0; i < runList.size() && !_stop; i++) { - try { - reschedule[i] = false; - reschedule[i] = (*(runList[i].functor))(); - if (reschedule[i]) - rescheduleCount++; - } - catch (std::exception &e) { - cerr << e.what() << endl; - } - } + reschedule.resize(runList.size()); + rescheduleCount = 0; + for (i = 0; i < runList.size() && !_stop; i++) { + try { + reschedule[i] = false; + reschedule[i] = (*(runList[i].functor))(); + if (reschedule[i]) + rescheduleCount++; + } + catch (std::exception &e) { + cerr << e.what() << endl; + } + } - // no real work was done, prevent intensive busy waiting - if (rescheduleCount == runList.size()) - usleep(1000); + // no real work was done, prevent intensive busy waiting + if (rescheduleCount == runList.size()) + usleep(1000); - if (rescheduleCount > 0) { - lk.lock(); - for (i = 0; i < runList.size(); i++) - if (reschedule[i]) - addJob(runList[i], false); - if (rescheduleCount > 1) - newJob.notify_all(); - else - newJob.notify_one(); - lk.unlock(); - } - runList.clear(); - } + if (rescheduleCount > 0) { + lk.lock(); + for (i = 0; i < runList.size(); i++) + if (reschedule[i]) + addJob(runList[i], false); + if (rescheduleCount > 1) + newJob.notify_all(); + else + newJob.notify_one(); + lk.unlock(); + } + runList.clear(); + } + } + catch (std::exception &ex) + { + // Log the exception and exit this thread + try + { +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(5); + args.add("threadFcn: Caught exception: "); + args.add(ex.what()); + + message.format( args ); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage( message ); +#endif + } + catch (...) + { + } + } + catch (...) + { + + // Log the exception and exit this thread + try + { +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(6); + args.add("threadFcn: Caught unknown exception!"); + + message.format( args ); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage( message ); +#endif + } + catch (...) + { + } + } } void PriorityThreadPool::stop() From 40405c792af7e66129ae5c2e6f14bfc88b316d20 Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Thu, 14 Jun 2018 16:28:06 +0100 Subject: [PATCH 08/13] MCOL-1474 Add error handling to PTP PriorityThreadPool didn't have very good error handling. If something failed it would just ignore whatever was being processed. This could lead to a query continuing without retreiving all of the required data. This patch adds error handling, sending a message back to the client and a log message. It also destroys and recreates the pool thread. --- primitives/primproc/primitiveserver.cpp | 46 +++++++++++++++++++++ utils/threadpool/prioritythreadpool.cpp | 54 +++++++++++++++++++++---- utils/threadpool/prioritythreadpool.h | 6 +++ 3 files changed, 98 insertions(+), 8 deletions(-) diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 7989a17d4..f56f1ef6c 100755 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -1818,12 +1818,22 @@ struct ReadThread switch(ismHdr->Command) { case DICT_CREATE_EQUALITY_FILTER: { PriorityThreadPool::Job job; + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; job.functor = boost::shared_ptr(new CreateEqualityFilter(bs)); OOBPool->addJob(job); break; } case DICT_DESTROY_EQUALITY_FILTER: { PriorityThreadPool::Job job; + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; job.functor = boost::shared_ptr(new DestroyEqualityFilter(bs)); OOBPool->addJob(job); break; @@ -1851,6 +1861,11 @@ struct ReadThread job.id = hdr->Hdr.UniqueID; job.weight = LOGICAL_BLOCK_RIDS; job.priority = hdr->Hdr.Priority; + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; if (hdr->flags & IS_SYSCAT) { //boost::thread t(DictScanJob(outIos, bs, writeLock)); // using already-existing threads may cut latency @@ -1889,6 +1904,12 @@ struct ReadThread job.id = bpps->getID(); job.weight = ismHdr->Size; job.priority = bpps->priority(); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; + if (bpps->isSysCat()) { //boost::thread t(*bpps); // using already-existing threads may cut latency @@ -1904,6 +1925,11 @@ struct ReadThread case BATCH_PRIMITIVE_CREATE: { PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::Create(fBPPHandler, bs)); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); //fBPPHandler->createBPP(*bs); break; @@ -1912,6 +1938,11 @@ struct ReadThread PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::AddJoiner(fBPPHandler, bs)); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); //fBPPHandler->addJoinerToBPP(*bs); break; @@ -1923,6 +1954,11 @@ struct ReadThread PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::LastJoiner(fBPPHandler, bs)); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); break; } @@ -1932,6 +1968,11 @@ struct ReadThread PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::Destroy(fBPPHandler, bs)); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); //fBPPHandler->destroyBPP(*bs); break; @@ -1946,6 +1987,11 @@ struct ReadThread PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::Abort(fBPPHandler, bs)); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); break; } diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index a0fc6a347..4d19df91e 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -33,6 +33,8 @@ using namespace logging; #include "prioritythreadpool.h" using namespace boost; +#include "dbcon/joblist/primitivemsg.h" + namespace threadpool { @@ -48,9 +50,9 @@ PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads threads.create_thread(ThreadHelper(this, LOW)); cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads << " low.\n"; - threadCounts[HIGH] = highThreads; - threadCounts[MEDIUM] = midThreads; - threadCounts[LOW] = lowThreads; + defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads; + defaultThreadCounts[MEDIUM] = threadCounts[MEDIUM] = midThreads; + defaultThreadCounts[LOW] = threadCounts[LOW] = lowThreads; } PriorityThreadPool::~PriorityThreadPool() @@ -65,6 +67,23 @@ void PriorityThreadPool::addJob(const Job &job, bool useLock) if (useLock) lk.lock(); + // Create any missing threads + if (defaultThreadCounts[HIGH] != threadCounts[HIGH]) + { + threads.create_thread(ThreadHelper(this, HIGH)); + threadCounts[HIGH]++; + } + if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM]) + { + threads.create_thread(ThreadHelper(this, MEDIUM)); + threadCounts[MEDIUM]++; + } + if (defaultThreadCounts[LOW] != threadCounts[LOW]) + { + threads.create_thread(ThreadHelper(this, LOW)); + threadCounts[LOW]++; + } + if (job.priority > 66) jobQueues[HIGH].push_back(job); else if (job.priority > 33) @@ -110,6 +129,7 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() vector reschedule; uint32_t rescheduleCount; uint32_t queueSize; + bool running = false; try { @@ -143,15 +163,12 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() reschedule.resize(runList.size()); rescheduleCount = 0; for (i = 0; i < runList.size() && !_stop; i++) { - try { reschedule[i] = false; + running = true; reschedule[i] = (*(runList[i].functor))(); + running = false; if (reschedule[i]) rescheduleCount++; - } - catch (std::exception &e) { - cerr << e.what() << endl; - } } // no real work was done, prevent intensive busy waiting @@ -177,6 +194,7 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() // Log the exception and exit this thread try { + threadCounts[queue]--; #ifndef NOLOGGING logging::Message::Args args; logging::Message message(5); @@ -190,6 +208,8 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() ml.logErrorMessage( message ); #endif + if (running) + sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock); } catch (...) { @@ -201,6 +221,7 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() // Log the exception and exit this thread try { + threadCounts[queue]--; #ifndef NOLOGGING logging::Message::Args args; logging::Message message(6); @@ -213,6 +234,8 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() ml.logErrorMessage( message ); #endif + if (running) + sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock); } catch (...) { @@ -220,6 +243,21 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() } } +void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock) +{ + ISMPacketHeader ism; + PrimitiveHeader ph = {0}; + + ism.Status = logging::primitiveServerErr; + ph.UniqueID = id; + ph.StepID = step; + ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); + msg.append((uint8_t *) &ism, sizeof(ism)); + msg.append((uint8_t *) &ph, sizeof(ph)); + + sock->write(msg); +} + void PriorityThreadPool::stop() { _stop = true; diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index 516c0df2f..649913d95 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -36,6 +36,7 @@ #include #include #include "../winport/winport.h" +#include "primitives/primproc/umsocketselector.h" namespace threadpool { @@ -60,6 +61,9 @@ public: uint32_t weight; uint32_t priority; uint32_t id; + uint32_t uniqueID; + uint32_t stepID; + primitiveprocessor::SP_UM_IOSOCK sock; }; enum Priority { @@ -105,9 +109,11 @@ private: Priority pickAQueue(Priority preference); void threadFcn(const Priority preferredQueue) throw(); + void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); std::list jobQueues[3]; // higher indexes = higher priority uint32_t threadCounts[3]; + uint32_t defaultThreadCounts[3]; boost::mutex mutex; boost::condition newJob; boost::thread_group threads; From d6cb205dfc6190b22dcd41f43af74839a77def25 Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Thu, 14 Jun 2018 18:37:52 +0100 Subject: [PATCH 09/13] MCOL-1475 Improve cross engine error handling Now shows MariaDB error code and message where possible. --- dbcon/joblist/crossenginestep.cpp | 15 ++++++++++----- dbcon/joblist/crossenginestep.h | 2 ++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/dbcon/joblist/crossenginestep.cpp b/dbcon/joblist/crossenginestep.cpp index 7f9f3199f..b91e83150 100644 --- a/dbcon/joblist/crossenginestep.cpp +++ b/dbcon/joblist/crossenginestep.cpp @@ -773,11 +773,16 @@ string CrossEngineStep::makeQuery() void CrossEngineStep::handleMySqlError(const char* errStr, unsigned int errCode) { ostringstream oss; - oss << errStr << "(" << errCode << ")"; - if (errCode == (unsigned int) -1) - oss << "(null pointer)"; - else - oss << "(" << errCode << ")"; + if (mysql->getErrno()) + { + oss << errStr << " (" << mysql->getErrno() << ")"; + oss << " (" << mysql->getErrorMsg() << ")"; + } + else + { + oss << errStr << " (" << errCode << ")"; + oss << " (unknown)"; + } throw IDBExcept(oss.str(), ERR_CROSS_ENGINE_CONNECT); diff --git a/dbcon/joblist/crossenginestep.h b/dbcon/joblist/crossenginestep.h index 30b715c25..ad731b384 100644 --- a/dbcon/joblist/crossenginestep.h +++ b/dbcon/joblist/crossenginestep.h @@ -70,6 +70,8 @@ public: long getFieldLength(int field) { return fieldLengths[field]; } MYSQL_FIELD* getField(int field) { return &fFields[field]; } const std::string& getError() { return fErrStr; } + unsigned int getErrno() { return mysql_errno(fCon); } + const char* getErrorMsg() { return mysql_error(fCon); } private: MYSQL* fCon; From 337bd9ba8c19630df02f2a2477180c4edefd5068 Mon Sep 17 00:00:00 2001 From: david hill Date: Mon, 18 Jun 2018 15:16:47 -0500 Subject: [PATCH 10/13] Update README --- README | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README b/README index 21bce2dd9..90421faa8 100644 --- a/README +++ b/README @@ -1,9 +1,9 @@ -This is MariaDB ColumnStore 1.1.4 GA -MariaDB ColumnStore 1.1.4 GA is the development version of MariaDB ColumnStore. +This is MariaDB ColumnStore 1.1 GA +MariaDB ColumnStore 1.1 GA is the development version of MariaDB ColumnStore. It is built by porting InfiniDB 4.6.7 on MariaDB 10.2 and adding entirely new features not found anywhere else. -MariaDB ColumnStore 1.1.4 is a GA release. This is the first MariaDB +MariaDB ColumnStore 1.1 is a GA release. This is the first MariaDB ColumnStore release, not all features planned for the MariaDB ColumnStore 1.0 series are included in this release. From 9f6df3b0ce87a5579133156305282a9c81cc801e Mon Sep 17 00:00:00 2001 From: david hill Date: Mon, 18 Jun 2018 15:17:58 -0500 Subject: [PATCH 11/13] Update VERSION --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index aef96eefc..8a397bd3a 100644 --- a/VERSION +++ b/VERSION @@ -1,4 +1,4 @@ COLUMNSTORE_VERSION_MAJOR=1 COLUMNSTORE_VERSION_MINOR=1 -COLUMNSTORE_VERSION_PATCH=5 +COLUMNSTORE_VERSION_PATCH=6 COLUMNSTORE_VERSION_RELEASE=1 From 1f8083e59b8370d027a11d24b3e664e2f6d2eaf4 Mon Sep 17 00:00:00 2001 From: david hill Date: Mon, 18 Jun 2018 15:18:39 -0500 Subject: [PATCH 12/13] Update README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index bd3717aab..2a5a19e09 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@ -#MariaDB ColumnStore Storage/Execution engine 1.1.4 GA -MariaDB ColumnStore 1.1.4 GA is the development version of MariaDB ColumnStore. +#MariaDB ColumnStore Storage/Execution engine 1.1 GA +MariaDB ColumnStore 1.1 GA is the development version of MariaDB ColumnStore. It is built by porting InfiniDB 4.6.7 on MariaDB 10.2.9 and adding entirely new features not found anywhere else. -#MariaDB ColumnStore 1.1.4 is an GA release. +#MariaDB ColumnStore 1.1 is an GA release. #Building This repository is not meant to be built independently outside of the server. This repository is integrated into http://mariadb-corporation/mariadb-columnstore-server (ie, the *server*) as a git submodule. As such, you can find complete build instructions on *the server* page. From d3d322ed7bb8a2c7c7bcfb484ae96469d18f9442 Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Tue, 19 Jun 2018 15:12:44 +0100 Subject: [PATCH 13/13] MCOL-1484 I_S condition pushdowns Add condition pushdowns to the information_schema tables to give a performance improvement when a relevant WHERE condition is provided. In addition there is a new table_usage() stored procedure designed to use the pushdowns. --- dbcon/mysql/columnstore_info.sql | 65 +++--- dbcon/mysql/is_columnstore_columns.cpp | 71 ++++++- dbcon/mysql/is_columnstore_extents.cpp | 283 +++++++++++++++---------- dbcon/mysql/is_columnstore_files.cpp | 231 +++++++++++++------- dbcon/mysql/is_columnstore_tables.cpp | 69 ++++++ 5 files changed, 505 insertions(+), 214 deletions(-) diff --git a/dbcon/mysql/columnstore_info.sql b/dbcon/mysql/columnstore_info.sql index 563052a11..d0433a0d9 100644 --- a/dbcon/mysql/columnstore_info.sql +++ b/dbcon/mysql/columnstore_info.sql @@ -37,43 +37,56 @@ DROP PROCEDURE IF EXISTS `table_usage` // CREATE PROCEDURE table_usage (IN t_schema char(64), IN t_name char(64)) `table_usage`: BEGIN + DECLARE done INTEGER DEFAULT 0; + DECLARE dbname VARCHAR(64); + DECLARE tbname VARCHAR(64); + DECLARE object_ids TEXT; + DECLARE dictionary_object_ids TEXT; DECLARE `locker` TINYINT UNSIGNED DEFAULT IS_USED_LOCK('table_usage'); - + DECLARE columns_list CURSOR FOR SELECT TABLE_SCHEMA, TABLE_NAME, GROUP_CONCAT(object_id) OBJECT_IDS, GROUP_CONCAT(dictionary_object_id) DICT_OBJECT_IDS FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS WHERE table_name = t_name and table_schema = t_schema GROUP BY table_schema, table_name; + DECLARE columns_list_sc CURSOR FOR SELECT TABLE_SCHEMA, TABLE_NAME, GROUP_CONCAT(object_id) OBJECT_IDS, GROUP_CONCAT(dictionary_object_id) DICT_OBJECT_IDS FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS WHERE table_schema = t_schema GROUP BY table_schema, table_name; + DECLARE columns_list_all CURSOR FOR SELECT TABLE_SCHEMA, TABLE_NAME, GROUP_CONCAT(object_id) OBJECT_IDS, GROUP_CONCAT(dictionary_object_id) DICT_OBJECT_IDS FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS GROUP BY table_schema, table_name; + DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1; IF `locker` IS NOT NULL THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Error acquiring table_usage lock'; LEAVE `table_usage`; END IF; DO GET_LOCK('table_usage', 0); - DROP TABLE IF EXISTS columnstore_info.columnstore_columns; DROP TABLE IF EXISTS columnstore_info.columnstore_files; - CREATE TABLE columnstore_info.columnstore_columns engine=myisam as (select * from information_schema.columnstore_columns); - ALTER TABLE columnstore_info.columnstore_columns ADD INDEX `object_id` (`object_id`); - ALTER TABLE columnstore_info.columnstore_columns ADD INDEX `dictionary_object_id` (`dictionary_object_id`); - CREATE TABLE columnstore_info.columnstore_files engine=myisam as (select * from information_schema.columnstore_files); - ALTER TABLE columnstore_info.columnstore_files ADD INDEX `object_id` (`object_id`); + CREATE TEMPORARY TABLE columnstore_info.columnstore_files (TABLE_SCHEMA VARCHAR(64), TABLE_NAME VARCHAR(64), DATA BIGINT, DICT BIGINT); + IF t_name IS NOT NULL THEN -SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(data) as DATA_DISK_USAGE, columnstore_info.format_filesize(dict) as DICT_DISK_USAGE, columnstore_info.format_filesize(data + COALESCE(dict, 0)) as TOTAL_USAGE FROM ( -SELECT TABLE_SCHEMA, TABLE_NAME, (SELECT sum(cf.file_size) as data FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema) as data, (SELECT sum(cf.file_size) as dict FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.dictionary_object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema GROUP BY table_schema, table_name) as dict -FROM -columnstore_info.columnstore_columns ics where table_name = t_name and (table_schema = t_schema or t_schema IS NULL) -group by table_schema, table_name -) q; + OPEN columns_list; ELSEIF t_schema IS NOT NULL THEN -SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(data) as DATA_DISK_USAGE, columnstore_info.format_filesize(dict) as DICT_DISK_USAGE, columnstore_info.format_filesize(data + COALESCE(dict, 0)) as TOTAL_USAGE FROM ( -SELECT TABLE_SCHEMA, TABLE_NAME, (SELECT sum(cf.file_size) as data FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema) as data, (SELECT sum(cf.file_size) as dict FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.dictionary_object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema GROUP BY table_schema, table_name) as dict -FROM -columnstore_info.columnstore_columns ics where table_schema = t_schema -group by table_schema, table_name -) q; + OPEN columns_list_sc; ELSE -SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(data) as DATA_DISK_USAGE, columnstore_info.format_filesize(dict) as DICT_DISK_USAGE, columnstore_info.format_filesize(data + COALESCE(dict, 0)) as TOTAL_USAGE FROM ( -SELECT TABLE_SCHEMA, TABLE_NAME, (SELECT sum(cf.file_size) as data FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema) as data, (SELECT sum(cf.file_size) as dict FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.dictionary_object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema GROUP BY table_schema, table_name) as dict -FROM -columnstore_info.columnstore_columns ics -group by table_schema, table_name -) q; + OPEN columns_list_all; END IF; - DROP TABLE IF EXISTS columnstore_info.columnstore_columns; + + files_table: LOOP + IF t_name IS NOT NULL THEN + FETCH columns_list INTO dbname, tbname, object_ids, dictionary_object_ids; + ELSEIF t_schema IS NOT NULL THEN + FETCH columns_list_sc INTO dbname, tbname, object_ids, dictionary_object_ids; + ELSE + FETCH columns_list_all INTO dbname, tbname, object_ids, dictionary_object_ids; + END IF; + IF done = 1 THEN LEAVE files_table; + END IF; + INSERT INTO columnstore_info.columnstore_files (SELECT dbname, tbname, sum(file_size), 0 FROM information_schema.columnstore_files WHERE find_in_set(object_id, object_ids)); + IF dictionary_object_ids IS NOT NULL THEN + UPDATE columnstore_info.columnstore_files SET DICT = (SELECT sum(file_size) FROM information_schema.columnstore_files WHERE find_in_set(object_id, dictionary_object_ids)) WHERE TABLE_SCHEMA = dbname AND TABLE_NAME = tbname; + END IF; + END LOOP; + IF t_name IS NOT NULL THEN + CLOSE columns_list; + ELSEIF t_schema IS NOT NULL THEN + CLOSE columns_list_sc; + ELSE + CLOSE columns_list_all; + END IF; + SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(DATA) as DATA_DISK_USAGE, columnstore_info.format_filesize(DICT) as DICT_DATA_USAGE, columnstore_info.format_filesize(DATA + COALESCE(DICT, 0)) as TOTAL_USAGE FROM columnstore_info.columnstore_files; + DROP TABLE IF EXISTS columnstore_info.columnstore_files; DO RELEASE_LOCK('table_usage'); END // diff --git a/dbcon/mysql/is_columnstore_columns.cpp b/dbcon/mysql/is_columnstore_columns.cpp index b446eed4e..e2f338782 100644 --- a/dbcon/mysql/is_columnstore_columns.cpp +++ b/dbcon/mysql/is_columnstore_columns.cpp @@ -56,10 +56,59 @@ ST_FIELD_INFO is_columnstore_columns_fields[] = }; +static void get_cond_item(Item_func *item, String **table, String **db) +{ + char tmp_char[MAX_FIELD_WIDTH]; + Item_field *item_field = (Item_field*) item->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "table_name") == 0) + { + String str_buf(tmp_char, sizeof(tmp_char), system_charset_info); + *table = item->arguments()[1]->val_str(&str_buf); + return; + } + else if (strcasecmp(item_field->field_name, "table_schema") == 0) + { + String str_buf(tmp_char, sizeof(tmp_char), system_charset_info); + *db = item->arguments()[1]->val_str(&str_buf); + return; + } +} + +static void get_cond_items(COND *cond, String **table, String **db) +{ + if (cond->type() == Item::FUNC_ITEM) + { + Item_func* fitem = (Item_func*) cond; + if (fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[1]->const_item()) + { + get_cond_item(fitem, table, db); + } + } + else if ((cond->type() == Item::COND_ITEM) && (((Item_cond*) cond)->functype() == Item_func::COND_AND_FUNC)) + { + List_iterator li(*((Item_cond*) cond)->argument_list()); + Item *item; + while ((item= li++)) + { + if (item->type() == Item::FUNC_ITEM) + { + get_cond_item((Item_func*)item, table, db); + } + else + { + get_cond_items(item, table, db); + } + } + } +} + static int is_columnstore_columns_fill(THD *thd, TABLE_LIST *tables, COND *cond) { CHARSET_INFO *cs = system_charset_info; TABLE *table = tables->table; + String *table_name = NULL; + String *db_name = NULL; boost::shared_ptr systemCatalogPtr = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id)); @@ -69,9 +118,29 @@ static int is_columnstore_columns_fill(THD *thd, TABLE_LIST *tables, COND *cond) systemCatalogPtr->identity(execplan::CalpontSystemCatalog::FE); + if (cond) + { + get_cond_items(cond, &table_name, &db_name); + } + for (std::vector >::const_iterator it = catalog_tables.begin(); it != catalog_tables.end(); ++it) { + if (db_name) + { + if ((*it).second.schema.compare(db_name->ptr()) != 0) + { + continue; + } + } + if (table_name) + { + if ((*it).second.table.compare(table_name->ptr()) != 0) + { + continue; + } + } + execplan::CalpontSystemCatalog::RIDList column_rid_list; // Note a table may get dropped as you iterate over the list of tables. // So simply ignore the dropped table. @@ -168,8 +237,6 @@ static int is_columnstore_columns_fill(THD *thd, TABLE_LIST *tables, COND *cond) } } - - return 0; } diff --git a/dbcon/mysql/is_columnstore_extents.cpp b/dbcon/mysql/is_columnstore_extents.cpp index 4ee4cbce6..27eceeafc 100644 --- a/dbcon/mysql/is_columnstore_extents.cpp +++ b/dbcon/mysql/is_columnstore_extents.cpp @@ -52,131 +52,200 @@ ST_FIELD_INFO is_columnstore_extents_fields[] = {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0} }; -static int is_columnstore_extents_fill(THD *thd, TABLE_LIST *tables, COND *cond) +static int generate_result(BRM::OID_t oid, BRM::DBRM *emp, TABLE *table, THD *thd) { CHARSET_INFO *cs = system_charset_info; - TABLE *table = tables->table; std::vector entries; std::vector::iterator iter; std::vector::iterator end; + emp->getExtents(oid, entries, false, false, true); + if (entries.size() == 0) + return 0; + + iter = entries.begin(); + end = entries.end(); + while (iter != end) + { + table->field[0]->store(oid); + if (iter->colWid > 0) + { + table->field[1]->store("Column", strlen("Column"), cs); + if (iter->partition.cprange.lo_val == std::numeric_limits::max() || + iter->partition.cprange.lo_val <= (std::numeric_limits::min() + 2)) + { + table->field[4]->set_null(); + } + else + { + table->field[4]->set_notnull(); + table->field[4]->store(iter->partition.cprange.lo_val); + } + if (iter->partition.cprange.hi_val == std::numeric_limits::max() || + iter->partition.cprange.hi_val <= (std::numeric_limits::min() + 2)) + { + table->field[5]->set_null(); + } + else + { + table->field[5]->set_notnull(); + table->field[5]->store(iter->partition.cprange.hi_val); + } + table->field[6]->store(iter->colWid); + + } + else + { + table->field[1]->store("Dictionary", strlen("Dictionary"), cs); + table->field[4]->set_null(); + table->field[5]->set_null(); + table->field[6]->store(8192); + } + table->field[2]->store(iter->range.start); + table->field[3]->store(iter->range.start + (iter->range.size * 1024) - 1); + + table->field[7]->store(iter->dbRoot); + table->field[8]->store(iter->partitionNum); + table->field[9]->store(iter->segmentNum); + table->field[10]->store(iter->blockOffset); + table->field[11]->store(iter->range.size * 1024); + table->field[12]->store(iter->HWM); + + switch (iter->partition.cprange.isValid) + { + case 0: + table->field[13]->store("Invalid", strlen("Invalid"), cs); + break; + case 1: + table->field[13]->store("Updating", strlen("Updating"), cs); + break; + case 2: + table->field[13]->store("Valid", strlen("Valid"), cs); + break; + default: + table->field[13]->store("Unknown", strlen("Unknown"), cs); + break; + } + switch (iter->status) + { + case BRM::EXTENTAVAILABLE: + table->field[14]->store("Available", strlen("Available"), cs); + break; + case BRM::EXTENTUNAVAILABLE: + table->field[14]->store("Unavailable", strlen("Unavailable"), cs); + break; + case BRM::EXTENTOUTOFSERVICE: + table->field[14]->store("Out of service", strlen("Out of service"), cs); + break; + default: + table->field[14]->store("Unknown", strlen("Unknown"), cs); + } + // MCOL-1016: on multiple segments HWM is set to 0 on the lower + // segments, we don't want these to show as 8KB. The down side is + // if the column has less than 1 block it will show as 0 bytes. + // We have no lookahead without it getting messy so this is the + // best compromise. + if (iter->HWM == 0) + { + table->field[15]->store(0); + } + else + { + table->field[15]->store((iter->HWM + 1) * 8192); + } + + if (schema_table_store_record(thd, table)) + { + delete emp; + return 1; + } + + iter++; + + } + return 0; +} + +static int is_columnstore_extents_fill(THD *thd, TABLE_LIST *tables, COND *cond) +{ + BRM::OID_t cond_oid = 0; + TABLE *table = tables->table; + BRM::DBRM *emp = new BRM::DBRM(); if (!emp || !emp->isDBRMReady()) { return 1; } + if (cond && cond->type() == Item::FUNC_ITEM) + { + Item_func* fitem = (Item_func*) cond; + if ((fitem->functype() == Item_func::EQ_FUNC) && (fitem->argument_count() == 2)) + { + if(fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[1]->const_item()) + { + // WHERE object_id = value + Item_field *item_field = (Item_field*) fitem->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + cond_oid = fitem->arguments()[1]->val_int(); + return generate_result(cond_oid, emp, table, thd); + } + } + else if (fitem->arguments()[1]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[0]->const_item()) + { + // WHERE value = object_id + Item_field *item_field = (Item_field*) fitem->arguments()[1]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + cond_oid = fitem->arguments()[0]->val_int(); + return generate_result(cond_oid, emp, table, thd); + } + } + } + else if (fitem->functype() == Item_func::IN_FUNC) + { + // WHERE object_id in (value1, value2) + Item_field *item_field = (Item_field*) fitem->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + for (unsigned int i=1; i < fitem->argument_count(); i++) + { + cond_oid = fitem->arguments()[i]->val_int(); + int result = generate_result(cond_oid, emp, table, thd); + if (result) + return 1; + } + } + } + else if (fitem->functype() == Item_func::UNKNOWN_FUNC && + strcasecmp(fitem->func_name(), "find_in_set") == 0) + { + // WHERE FIND_IN_SET(object_id, values) + String *tmp_var = fitem->arguments()[1]->val_str(); + std::stringstream ss(tmp_var->ptr()); + while (ss >> cond_oid) + { + int ret = generate_result(cond_oid, emp, table, thd); + if (ret) + return 1; + if (ss.peek() == ',') + ss.ignore(); + } + } + } + execplan::ObjectIDManager oidm; BRM::OID_t MaxOID = oidm.size(); for(BRM::OID_t oid = 3000; oid <= MaxOID; oid++) { - emp->getExtents(oid, entries, false, false, true); - if (entries.size() == 0) - continue; - - iter = entries.begin(); - end = entries.end(); - while (iter != end) - { - table->field[0]->store(oid); - if (iter->colWid > 0) - { - table->field[1]->store("Column", strlen("Column"), cs); - if (iter->partition.cprange.lo_val == std::numeric_limits::max() || - iter->partition.cprange.lo_val <= (std::numeric_limits::min() + 2)) - { - table->field[4]->set_null(); - } - else - { - table->field[4]->set_notnull(); - table->field[4]->store(iter->partition.cprange.lo_val); - } - if (iter->partition.cprange.hi_val == std::numeric_limits::max() || - iter->partition.cprange.hi_val <= (std::numeric_limits::min() + 2)) - { - table->field[5]->set_null(); - } - else - { - table->field[5]->set_notnull(); - table->field[5]->store(iter->partition.cprange.hi_val); - } - table->field[6]->store(iter->colWid); - - } - else - { - table->field[1]->store("Dictionary", strlen("Dictionary"), cs); - table->field[4]->set_null(); - table->field[5]->set_null(); - table->field[6]->store(8192); - } - table->field[2]->store(iter->range.start); - table->field[3]->store(iter->range.start + (iter->range.size * 1024) - 1); - - table->field[7]->store(iter->dbRoot); - table->field[8]->store(iter->partitionNum); - table->field[9]->store(iter->segmentNum); - table->field[10]->store(iter->blockOffset); - table->field[11]->store(iter->range.size * 1024); - table->field[12]->store(iter->HWM); - - switch (iter->partition.cprange.isValid) - { - case 0: - table->field[13]->store("Invalid", strlen("Invalid"), cs); - break; - case 1: - table->field[13]->store("Updating", strlen("Updating"), cs); - break; - case 2: - table->field[13]->store("Valid", strlen("Valid"), cs); - break; - default: - table->field[13]->store("Unknown", strlen("Unknown"), cs); - break; - } - switch (iter->status) - { - case BRM::EXTENTAVAILABLE: - table->field[14]->store("Available", strlen("Available"), cs); - break; - case BRM::EXTENTUNAVAILABLE: - table->field[14]->store("Unavailable", strlen("Unavailable"), cs); - break; - case BRM::EXTENTOUTOFSERVICE: - table->field[14]->store("Out of service", strlen("Out of service"), cs); - break; - default: - table->field[14]->store("Unknown", strlen("Unknown"), cs); - } - // MCOL-1016: on multiple segments HWM is set to 0 on the lower - // segments, we don't want these to show as 8KB. The down side is - // if the column has less than 1 block it will show as 0 bytes. - // We have no lookahead without it getting messy so this is the - // best compromise. - if (iter->HWM == 0) - { - table->field[15]->store(0); - } - else - { - table->field[15]->store((iter->HWM + 1) * 8192); - } - - if (schema_table_store_record(thd, table)) - { - delete emp; - return 1; - } - - iter++; - - } + int result = generate_result(oid, emp, table, thd); + if (result) + return 1; } - delete emp; return 0; } diff --git a/dbcon/mysql/is_columnstore_files.cpp b/dbcon/mysql/is_columnstore_files.cpp index ce00b8aae..1a5fdad1e 100644 --- a/dbcon/mysql/is_columnstore_files.cpp +++ b/dbcon/mysql/is_columnstore_files.cpp @@ -82,12 +82,10 @@ static bool get_file_sizes(messageqcpp::MessageQueueClient *msgQueueClient, cons } } -static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) +static int generate_result(BRM::OID_t oid, BRM::DBRM *emp, TABLE *table, THD *thd) { - BRM::DBRM *emp = new BRM::DBRM(); std::vector entries; CHARSET_INFO *cs = system_charset_info; - TABLE *table = tables->table; char oidDirName[WriteEngine::FILE_NAME_SIZE]; char fullFileName[WriteEngine::FILE_NAME_SIZE]; @@ -101,93 +99,168 @@ static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) oam::Oam oam_instance; int pmId = 0; + emp->getExtents(oid, entries, false, false, true); + if (entries.size() == 0) + return 0; + + std::vector::const_iterator iter = entries.begin(); + while ( iter != entries.end() ) //organize extents into files + { + // Don't include files more than once at different block offsets + if (iter->blockOffset > 0) + { + iter++; + return 0; + } + + try + { + oam_instance.getDbrootPmConfig(iter->dbRoot, pmId); + } + catch (std::runtime_error) + { + // MCOL-1116: If we are here a DBRoot is offline/missing + iter++; + return 0; + } + table->field[0]->store(oid); + table->field[1]->store(iter->segmentNum); + table->field[2]->store(iter->partitionNum); + + WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum); + std::stringstream DbRootName; + DbRootName << "DBRoot" << iter->dbRoot; + std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str()); + fileSize = compressedFileSize = 0; + snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName); + + std::ostringstream oss; + oss << "pm" << pmId << "_WriteEngineServer"; + std::string client = oss.str(); + msgQueueClient = messageqcpp::MessageQueueClientPool::getInstance(oss.str()); + + if (!get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize)) + { + messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); + delete emp; + return 1; + } + table->field[3]->store(fullFileName, strlen(fullFileName), cs); + + if (fileSize > 0) + { + table->field[4]->set_notnull(); + table->field[4]->store(fileSize); + if (compressedFileSize > 0) + { + table->field[5]->set_notnull(); + table->field[5]->store(compressedFileSize); + } + else + { + table->field[5]->set_null(); + } + } + else + { + table->field[4]->set_null(); + table->field[5]->set_null(); + } + + if (schema_table_store_record(thd, table)) + { + messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); + delete emp; + return 1; + } + iter++; + messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); + msgQueueClient = NULL; + } + return 0; +} + +static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) +{ + BRM::DBRM *emp = new BRM::DBRM(); + BRM::OID_t cond_oid = 0; + TABLE *table = tables->table; + if (!emp || !emp->isDBRMReady()) { return 1; } + if (cond && cond->type() == Item::FUNC_ITEM) + { + Item_func* fitem = (Item_func*) cond; + if ((fitem->functype() == Item_func::EQ_FUNC) && (fitem->argument_count() == 2)) + { + if(fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[1]->const_item()) + { + // WHERE object_id = value + Item_field *item_field = (Item_field*) fitem->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + cond_oid = fitem->arguments()[1]->val_int(); + return generate_result(cond_oid, emp, table, thd); + } + } + else if (fitem->arguments()[1]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[0]->const_item()) + { + // WHERE value = object_id + Item_field *item_field = (Item_field*) fitem->arguments()[1]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + cond_oid = fitem->arguments()[0]->val_int(); + return generate_result(cond_oid, emp, table, thd); + } + } + } + else if (fitem->functype() == Item_func::IN_FUNC) + { + // WHERE object_id in (value1, value2) + Item_field *item_field = (Item_field*) fitem->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + for (unsigned int i=1; i < fitem->argument_count(); i++) + { + cond_oid = fitem->arguments()[i]->val_int(); + int result = generate_result(cond_oid, emp, table, thd); + if (result) + return 1; + } + } + } + else if (fitem->functype() == Item_func::UNKNOWN_FUNC && + strcasecmp(fitem->func_name(), "find_in_set") == 0) + { + // WHERE FIND_IN_SET(object_id, values) + String *tmp_var = fitem->arguments()[1]->val_str(); + std::stringstream ss(tmp_var->ptr()); + while (ss >> cond_oid) + { + int ret = generate_result(cond_oid, emp, table, thd); + if (ret) + return 1; + if (ss.peek() == ',') + ss.ignore(); + } + } + } + execplan::ObjectIDManager oidm; BRM::OID_t MaxOID = oidm.size(); - for(BRM::OID_t oid = 3000; oid <= MaxOID; oid++) + if (!cond_oid) { - emp->getExtents(oid, entries, false, false, true); - if (entries.size() == 0) - continue; - - std::vector::const_iterator iter = entries.begin(); - while ( iter != entries.end() ) //organize extents into files + for(BRM::OID_t oid = 3000; oid <= MaxOID; oid++) { - // Don't include files more than once at different block offsets - if (iter->blockOffset > 0) - { - iter++; - continue; - } - - try - { - oam_instance.getDbrootPmConfig(iter->dbRoot, pmId); - } - catch (std::runtime_error) - { - // MCOL-1116: If we are here a DBRoot is offline/missing - iter++; - continue; - } - table->field[0]->store(oid); - table->field[1]->store(iter->segmentNum); - table->field[2]->store(iter->partitionNum); - - WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum); - std::stringstream DbRootName; - DbRootName << "DBRoot" << iter->dbRoot; - std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str()); - fileSize = compressedFileSize = 0; - snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName); - - std::ostringstream oss; - oss << "pm" << pmId << "_WriteEngineServer"; - std::string client = oss.str(); - msgQueueClient = messageqcpp::MessageQueueClientPool::getInstance(oss.str()); - - if (!get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize)) - { - messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); - delete emp; + int result = generate_result(oid, emp, table, thd); + if (result) return 1; - } - table->field[3]->store(fullFileName, strlen(fullFileName), cs); - - if (fileSize > 0) - { - table->field[4]->set_notnull(); - table->field[4]->store(fileSize); - if (compressedFileSize > 0) - { - table->field[5]->set_notnull(); - table->field[5]->store(compressedFileSize); - } - else - { - table->field[5]->set_null(); - } - } - else - { - table->field[4]->set_null(); - table->field[5]->set_null(); - } - - if (schema_table_store_record(thd, table)) - { - messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); - delete emp; - return 1; - } - iter++; - messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); - msgQueueClient = NULL; } } delete emp; diff --git a/dbcon/mysql/is_columnstore_tables.cpp b/dbcon/mysql/is_columnstore_tables.cpp index 47ce4970c..baa894487 100644 --- a/dbcon/mysql/is_columnstore_tables.cpp +++ b/dbcon/mysql/is_columnstore_tables.cpp @@ -42,22 +42,91 @@ ST_FIELD_INFO is_columnstore_tables_fields[] = {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0} }; +static void get_cond_item(Item_func *item, String **table, String **db) +{ + char tmp_char[MAX_FIELD_WIDTH]; + Item_field *item_field = (Item_field*) item->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "table_name") == 0) + { + String str_buf(tmp_char, sizeof(tmp_char), system_charset_info); + *table = item->arguments()[1]->val_str(&str_buf); + return; + } + else if (strcasecmp(item_field->field_name, "table_schema") == 0) + { + String str_buf(tmp_char, sizeof(tmp_char), system_charset_info); + *db = item->arguments()[1]->val_str(&str_buf); + return; + } +} + +static void get_cond_items(COND *cond, String **table, String **db) +{ + if (cond->type() == Item::FUNC_ITEM) + { + Item_func* fitem = (Item_func*) cond; + if (fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[1]->const_item()) + { + get_cond_item(fitem, table, db); + } + } + else if ((cond->type() == Item::COND_ITEM) && (((Item_cond*) cond)->functype() == Item_func::COND_AND_FUNC)) + { + List_iterator li(*((Item_cond*) cond)->argument_list()); + Item *item; + while ((item= li++)) + { + if (item->type() == Item::FUNC_ITEM) + { + get_cond_item((Item_func*)item, table, db); + } + else + { + get_cond_items(item, table, db); + } + } + } +} + static int is_columnstore_tables_fill(THD *thd, TABLE_LIST *tables, COND *cond) { CHARSET_INFO *cs = system_charset_info; TABLE *table = tables->table; + String *table_name = NULL; + String *db_name = NULL; boost::shared_ptr systemCatalogPtr = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id)); systemCatalogPtr->identity(execplan::CalpontSystemCatalog::FE); + if (cond) + { + get_cond_items(cond, &table_name, &db_name); + } + const std::vector< std::pair > catalog_tables = systemCatalogPtr->getTables(); for (std::vector >::const_iterator it = catalog_tables.begin(); it != catalog_tables.end(); ++it) { + if (db_name) + { + if ((*it).second.schema.compare(db_name->ptr()) != 0) + { + continue; + } + } + if (table_name) + { + if ((*it).second.table.compare(table_name->ptr()) != 0) + { + continue; + } + } + execplan::CalpontSystemCatalog::TableInfo tb_info = systemCatalogPtr->tableInfo((*it).second); std::string create_date = dataconvert::DataConvert::dateToString((*it).second.create_date); table->field[0]->store((*it).second.schema.c_str(), (*it).second.schema.length(), cs);