From 1bd427486c14c51b1d00d8209f83dad3b3a71722 Mon Sep 17 00:00:00 2001 From: David Hall Date: Thu, 14 Jan 2016 10:27:21 -0600 Subject: [PATCH] Updated with latest InfiniDB Develop (4.6.6) --- dbcon/joblist/joblist.cpp | 45 +++- dbcon/joblist/joblist.h | 8 +- dbcon/joblist/joblistfactory.cpp | 5 + dbcon/mysql/ha_calpont_dml.cpp | 2 +- dbcon/mysql/my.cnf | 1 + exemgr/main.cpp | 2 + oam/install_scripts/myCnf-include-args.text | 15 +- oam/install_scripts/post-mysql-install | 36 +-- oam/install_scripts/user_installer.sh | 6 +- oam/oamcpp/liboamcpp.cpp | 26 +- oam/oamcpp/oamcache.cpp | 11 +- oam/oamcpp/oamcache.h | 1 + oamapps/calpontConsole/calpontConsole.cpp | 33 ++- oamapps/postConfigure/helpers.cpp | 9 +- oamapps/postConfigure/helpers.h | 2 +- oamapps/postConfigure/mycnfUpgrade.cpp | 47 +++- oamapps/postConfigure/postConfigure.cpp | 72 +++--- oamapps/serverMonitor/diskMonitor.cpp | 81 +++--- .../primproc/batchprimitiveprocessor.cpp | 2 + primitives/primproc/primitiveserver.cpp | 10 +- primitives/primproc/primitiveserver.h | 4 +- procmgr/main.cpp | 40 ++- procmgr/processmanager.cpp | 99 +++++--- procmon/main.cpp | 240 +++++++++--------- procmon/processmonitor.cpp | 100 ++++---- tools/configMgt/autoConfigure.cpp | 17 ++ .../we_redistributecontrolthread.cpp | 15 +- 27 files changed, 556 insertions(+), 373 deletions(-) diff --git a/dbcon/joblist/joblist.cpp b/dbcon/joblist/joblist.cpp index 618ac535c..1aab1bdfb 100644 --- a/dbcon/joblist/joblist.cpp +++ b/dbcon/joblist/joblist.cpp @@ -41,11 +41,14 @@ using namespace execplan; #include "tupleunion.h" #include "tupleaggregatestep.h" #include "windowfunctionstep.h" +#include "configcpp.h" +#include "oamcache.h" #include "atomicops.h" namespace joblist { +int JobList::fPmsConfigured = 0; struct JSJoiner { @@ -60,7 +63,7 @@ struct JSJoiner JobList::JobList(bool isEM) : fIsRunning(false), fIsExeMgr(isEM), - fPmConnected(false), + fPmsConnected(0), projectingTableOID(0), fAborted(0), fPriority(50) @@ -119,7 +122,7 @@ JobList::~JobList() int JobList::doQuery() { // Don't start the steps if there is no PrimProc connection. - if (!fPmConnected) + if (fPmsConfigured < 1 || fPmsConnected < fPmsConfigured) return 0; JobStep *js; @@ -206,20 +209,38 @@ int JobList::doQuery() int JobList::putEngineComm(DistributedEngineComm* dec) { int retryCnt = 0; - while (!fPmConnected) + + if (fPmsConfigured == 0) { - // Don't sleep until after the first retry - if (retryCnt > 1) - { - sleep(1); - } - fPmConnected = (dec->connectedPmServers() > 0); + logging::LoggingID lid(05); + logging::MessageLog ml(lid); + logging::Message::Args args; + logging::Message m(0); + // We failed to get a connection + args.add("There are no PMs configured. Can't perform Query"); + args.add(retryCnt); + m.format(args); + ml.logDebugMessage(m); + if (!errInfo) + errInfo.reset(new ErrorInfo); + errInfo->errCode = logging::ERR_NO_PRIMPROC; + errInfo->errMsg = logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_NO_PRIMPROC); + return errInfo->errCode; + } + // Check to be sure all PrimProcs are attached. + fPmsConnected = dec->connectedPmServers(); + while (fPmsConnected < fPmsConfigured) + { + sleep(1); + fPmsConnected = dec->connectedPmServers(); // Give up after 20 seconds. Primproc isn't coming back - if (fPmConnected || retryCnt >= 20) + if (retryCnt >= 20) { break; } ++retryCnt; + oam::OamCache *oamCache = oam::OamCache::makeOamCache(); + oamCache->forceReload(); dec->Setup(); } if (retryCnt > 0) @@ -228,10 +249,10 @@ int JobList::putEngineComm(DistributedEngineComm* dec) logging::MessageLog ml(lid); logging::Message::Args args; logging::Message m(0); - if (!fPmConnected) + if (fPmsConnected < fPmsConfigured) { // We failed to get a connection - args.add("Failed to get a PrimProc connection. Retry count"); + args.add("Failed to get all PrimProc connections. Retry count"); args.add(retryCnt); m.format(args); ml.logDebugMessage(m); diff --git a/dbcon/joblist/joblist.h b/dbcon/joblist/joblist.h index 73cfa0c42..c64f62833 100644 --- a/dbcon/joblist/joblist.h +++ b/dbcon/joblist/joblist.h @@ -123,13 +123,19 @@ public: // @bug4848, enhance and unify limit handling. EXPORT virtual void abortOnLimit(JobStep* js); + static void setPMsConfigured(int pms) {fPmsConfigured = pms;} + protected: //defaults okay //JobList(const JobList& rhs); //JobList& operator=(const JobList& rhs); bool fIsRunning; bool fIsExeMgr; - bool fPmConnected; + int fPmsConnected; + + // Dirty pool kludge. Contains the number of PMs configured in Calpont.xml. + // This kludge reduces the number of calls needed to config.Config, which are expensive. + static int fPmsConfigured; DeliveredTableMap fDeliveredTables; execplan::CalpontSystemCatalog::OID projectingTableOID; //DeliveryWSDLs get a reference to this diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index 5889c584c..10f8fbd0a 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -1595,6 +1595,9 @@ SJLP makeJobList_( CalpontSelectExecutionPlan* csep = dynamic_cast(cplan); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(csep->sessionID()); + static config::Config* sysConfig = config::Config::makeConfig(); + int pmsConfigured = atoi(sysConfig->getConfig("PrimitiveServers", "Count").c_str()); + // We have to go ahead and create JobList now so we can store the joblist's // projectTableOID pointer in JobInfo for use during jobstep creation. SErrorInfo errorInfo(new ErrorInfo()); @@ -1602,6 +1605,7 @@ SJLP makeJobList_( boost::shared_ptr subCount(new int); *subCount = 0; JobList* jl = new TupleJobList(isExeMgr); + jl->setPMsConfigured(pmsConfigured); jl->priority(csep->priority()); jl->errorInfo(errorInfo); rm.setTraceFlags(csep->traceFlags()); @@ -1794,6 +1798,7 @@ SJLP JobListFactory::makeJobList( SJLP ret; string emsg; unsigned errCode = 0; + ret = makeJobList_(cplan, rm, isExeMgr, errCode, emsg); if (!ret) diff --git a/dbcon/mysql/ha_calpont_dml.cpp b/dbcon/mysql/ha_calpont_dml.cpp index 56d98ae73..cbf0c13f6 100644 --- a/dbcon/mysql/ha_calpont_dml.cpp +++ b/dbcon/mysql/ha_calpont_dml.cpp @@ -708,7 +708,7 @@ int ha_calpont_impl_write_batch_row_(uchar *buf, TABLE* table, cal_impl_if::cal_ if (colpos == ci.columnTypes.size()) break; - if (headerByte >= ci.headerLength) + if (ci.headerLength > 0 && headerByte >= ci.headerLength) { // We've used more null bits than allowed. Something is seriously wrong. std::string errormsg = "Null bit header is wrong size"; diff --git a/dbcon/mysql/my.cnf b/dbcon/mysql/my.cnf index 400bbae78..5b03d74d5 100644 --- a/dbcon/mysql/my.cnf +++ b/dbcon/mysql/my.cnf @@ -78,6 +78,7 @@ plugin_dir = /usr/local/Calpont/mysql/lib/mysql/plugin # Replication Master Server (default) # binary logging is required for replication # log-bin=mysql-bin +# binlog_format=ROW # required unique id between 1 and 2^32 - 1 # defaults to 1 if master-host diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 3ab982e1d..e3fcaea3b 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -1189,6 +1189,8 @@ void added_a_pm(int) catch (...) {} + oam::OamCache *oamCache = oam::OamCache::makeOamCache(); + oamCache->forceReload(); ec->Setup(); //set ACTIVE state diff --git a/oam/install_scripts/myCnf-include-args.text b/oam/install_scripts/myCnf-include-args.text index 95a8c9c4e..bc2c4c74f 100644 --- a/oam/install_scripts/myCnf-include-args.text +++ b/oam/install_scripts/myCnf-include-args.text @@ -2,4 +2,17 @@ # infinidb_local_query log-bin=mysql-bin -server-id \ No newline at end of file +server-id +max_length_for_sort_data +tmpdir +log-error +general_log_file +slow_query_log_file +general-log +slow-query-log +character-set-server +collation-server +init-connect +binlog_format +secure-auth +port \ No newline at end of file diff --git a/oam/install_scripts/post-mysql-install b/oam/install_scripts/post-mysql-install index 8129fd573..157f16951 100755 --- a/oam/install_scripts/post-mysql-install +++ b/oam/install_scripts/post-mysql-install @@ -6,33 +6,21 @@ # check log for error checkForError() { - grep ERROR /tmp/mysql_install.log > /tmp/error.check + # check for password error + grep "ERROR 1045" /tmp/mysql_install.log > /tmp/error.check if [ `cat /tmp/error.check | wc -c` -ne 0 ]; then - # check for password error - grep "ERROR 1045" /tmp/mysql_install.log > /tmp/error.check - if [ `cat /tmp/error.check | wc -c` -ne 0 ]; then - echo "MySQL Password missing or incorrect, check local file" - password=`$installdir/bin/getMySQLpw` - if [ $? -ne 0 ]; then - rm -f /tmp/error.check - $installdir/mysql/mysql-Calpont stop - sleep 2 - exit 2; - fi + password=`$installdir/bin/getMySQLpw` + if [ $? -ne 0 ]; then + echo "MySQL Password missing or incorrect" rm -f /tmp/error.check - return 1; - else - # ignore 1125 - already exist error - grep "ERROR 1125" /tmp/mysql_install.log > /tmp/error.check - if [ `cat /tmp/error.check | wc -c` -eq 0 ]; then - echo "ERROR: check log file: /tmp/mysql_install.log" - rm -f /tmp/error.check - $installdir/mysql/mysql-Calpont stop - sleep 2 - exit 1; - fi + $installdir/mysql/mysql-Calpont stop + sleep 2 + exit 2; fi + rm -f /tmp/error.check + return 1; fi + rm -f /tmp/error.check return 0; } @@ -100,7 +88,7 @@ if [ -x $installdir/mysql/mysql-Calpont ]; then $installdir/mysql/install_calpont_mysql.sh --password=$password --installdir=$installdir checkForError if [ $? -ne 0 ]; then - echo "ERROR: missing or invalidate passed" + echo "ERROR: missing or invalid password" $installdir/mysql/mysql-Calpont stop sleep 2 exit 1; diff --git a/oam/install_scripts/user_installer.sh b/oam/install_scripts/user_installer.sh index bdb6f6651..267f9d7ae 100644 --- a/oam/install_scripts/user_installer.sh +++ b/oam/install_scripts/user_installer.sh @@ -41,9 +41,9 @@ if { $MYSQLPW == "none" } { } set BASH "/bin/bash " -if { $DEBUG == "1" } { - set BASH "/bin/bash -x " -} +#if { $DEBUG == "1" } { +# set BASH "/bin/bash -x " +#} log_user $DEBUG spawn -noecho /bin/bash diff --git a/oam/oamcpp/liboamcpp.cpp b/oam/oamcpp/liboamcpp.cpp index 9ea5f50eb..354d543ab 100644 --- a/oam/oamcpp/liboamcpp.cpp +++ b/oam/oamcpp/liboamcpp.cpp @@ -8191,7 +8191,7 @@ namespace oam bool Oam::disableMySQLRep() { // build and send msg - int returnStatus = sendMsgToProcMgr(DISABLEMYSQLREP); + int returnStatus = sendMsgToProcMgr(DISABLEMYSQLREP, oam::UnassignedName, FORCEFUL, ACK_YES); if (returnStatus != API_SUCCESS) exceptionControl("disableMySQLRep", returnStatus); @@ -8522,17 +8522,17 @@ namespace oam string target; ByteStream::byte status; - // get current requesting process, an error will occur if process is a UI tool (not kept in Status Table) - // this will be used to determine if this is a manually or auto request down within Process-Monitor - bool requestManual; - myProcessStatus_t t; - try { - t = getMyProcessStatus(); - requestManual = false; // set to auto - } - catch (...) { - requestManual = true; // set to manual - } + // get current requesting process, an error will occur if process is a UI tool (not kept in Status Table) + // this will be used to determine if this is a manually or auto request down within Process-Monitor + bool requestManual; + myProcessStatus_t t; + try { + t = getMyProcessStatus(); + requestManual = false; // set to auto + } + catch (...) { + requestManual = true; // set to manual + } // setup message msg << (ByteStream::byte) REQUEST; @@ -9079,6 +9079,8 @@ namespace oam if ( (cloud == "amazon-ec2" || cloud == "amazon-vpc") && DBRootStorageType == "external" ) { + writeLog("amazonReattach function started ", LOG_TYPE_DEBUG ); + //get Instance Name for to-pm string toInstanceName = oam::UnassignedName; try diff --git a/oam/oamcpp/oamcache.cpp b/oam/oamcpp/oamcache.cpp index 94cc59a94..c4f57cf19 100644 --- a/oam/oamcpp/oamcache.cpp +++ b/oam/oamcpp/oamcache.cpp @@ -97,6 +97,7 @@ void OamCache::checkReload() map pmToConnectionMap; #ifdef _MSC_VER moduleIds.push_back(*it); + pmToConnectionMap[*it] = i++; #else // Restore for Windows when we support multiple PMs while (it != uniquePids.end()) @@ -130,8 +131,14 @@ void OamCache::checkReload() #endif dbRootConnectionMap.reset(new map()); for (i = 0; i < dbroots.size(); i++) - (*dbRootConnectionMap)[dbroots[i]] = pmToConnectionMap[(*dbRootPMMap)[dbroots[i]]]; - + { + map::iterator pmIter = pmToConnectionMap.find((*dbRootPMMap)[dbroots[i]]); + if (pmIter != pmToConnectionMap.end()) + { + (*dbRootConnectionMap)[dbroots[i]] = (*pmIter).second; + } + } + pmDbrootsMap.reset(new OamCache::PMDbrootsMap_t::element_type()); systemStorageInfo_t t; t = oam.getStorageConfig(); diff --git a/oam/oamcpp/oamcache.h b/oam/oamcpp/oamcache.h index af60450b0..44561aa0a 100644 --- a/oam/oamcpp/oamcache.h +++ b/oam/oamcpp/oamcache.h @@ -42,6 +42,7 @@ public: EXPORT virtual ~OamCache(); EXPORT void checkReload(); + EXPORT void forceReload() {mtime=0;} EXPORT dbRootPMMap_t getDBRootToPMMap(); EXPORT dbRootPMMap_t getDBRootToConnectionMap(); diff --git a/oamapps/calpontConsole/calpontConsole.cpp b/oamapps/calpontConsole/calpontConsole.cpp index 58834cca9..1bdc2a3f0 100644 --- a/oamapps/calpontConsole/calpontConsole.cpp +++ b/oamapps/calpontConsole/calpontConsole.cpp @@ -1750,7 +1750,27 @@ int processCommand(string* arguments) break; } - systemStorageInfo_t t; + SystemStatus systemstatus; + try { + oam.getSystemStatus(systemstatus); + + if (systemstatus.SystemOpState != oam::ACTIVE ) { + cout << endl << "**** removeDbroot Failed, System has to be in a ACTIVE state" << endl; + break; + } + } + catch (exception& e) + { + cout << endl << "**** removeDbroot Failed : " << e.what() << endl; + break; + } + catch(...) + { + cout << endl << "**** removeDbroot Failed, Failed return from getSystemStatus API" << endl; + break; + } + + systemStorageInfo_t t; try { t = oam.getStorageConfig(); @@ -2044,6 +2064,11 @@ int processCommand(string* arguments) cout << endl << "ERROR: Stopping InfiniDB Service failure, check /tmp/cc-stop.pdsh. exit..." << endl; } } + else + { + string cmd = startup::StartUp::installDir() + "/bin/infinidb stop > /tmp/status.log"; + system(cmd.c_str()); + } } catch (exception& e) { @@ -4877,7 +4902,7 @@ int processCommand(string* arguments) if ( MySQLPasswordConfig == oam::UnassignedName ) { cout << endl; - string prompt = "Is there a 'MySQL' Password configured in " + HOME + "/.my.cnf (y,n): "; + string prompt = "Is there a 'MySQL' Password configured on the MySQL Front-end Modules in " + HOME + "/.my.cnf (y,n): "; MySQLPasswordConfig = dataPrompt(prompt); } @@ -4892,6 +4917,7 @@ int processCommand(string* arguments) //set flag try { oam.setSystemConfig("MySQLRep", "y"); + sleep(2); } catch(...) {} @@ -5824,7 +5850,7 @@ int processCommand(string* arguments) if ( MySQLPasswordConfig == oam::UnassignedName ) { cout << endl; - string prompt = "Is there a 'MySQL' Password configured in " + HOME + "/.my.cnf (y,n): "; + string prompt = "Is there a 'MySQL' Password configured on the MySQL Front-end Modules in " + HOME + "/.my.cnf (y,n): "; MySQLPasswordConfig = dataPrompt(prompt); } @@ -5839,6 +5865,7 @@ int processCommand(string* arguments) //set flag try { oam.setSystemConfig("MySQLRep", "n"); + sleep(2); } catch(...) {} diff --git a/oamapps/postConfigure/helpers.cpp b/oamapps/postConfigure/helpers.cpp index 2d20b4058..bc23bd316 100644 --- a/oamapps/postConfigure/helpers.cpp +++ b/oamapps/postConfigure/helpers.cpp @@ -403,7 +403,7 @@ int sendUpgradeRequest(int IserverTypeInstall, bool pmwithum) * * ******************************************************************************************/ -int sendReplicationRequest(int IserverTypeInstall, std::string password, std::string port) +int sendReplicationRequest(int IserverTypeInstall, std::string password, std::string port, bool pmwithum) { Oam oam; @@ -490,6 +490,13 @@ int sendReplicationRequest(int IserverTypeInstall, std::string password, std::st } else { // set for slave repl request + // don't do PMs unless PMwithUM flag is set + string moduleType = (*pt).DeviceName.substr(0,MAX_MODULE_TYPE_SIZE); + if ( moduleType == "pm" && !pmwithum ) { + pt++; + continue; + } + ByteStream msg; ByteStream::byte requestID = oam::SLAVEREP; msg << requestID; diff --git a/oamapps/postConfigure/helpers.h b/oamapps/postConfigure/helpers.h index 1bc7c9ee7..c7df2e83e 100644 --- a/oamapps/postConfigure/helpers.h +++ b/oamapps/postConfigure/helpers.h @@ -38,7 +38,7 @@ extern void dbrmDirCheck(); extern void mysqlSetup(); extern int sendMsgProcMon( std::string module, ByteStream msg, int requestID, int timeout ); extern int sendUpgradeRequest(int IserverTypeInstall, bool pmwithum); -extern int sendReplicationRequest(int IserverTypeInstall, std::string password, std::string mysqlPort); +extern int sendReplicationRequest(int IserverTypeInstall, std::string password, std::string mysqlPort, bool pmwithum); extern void checkFilesPerPartion(int DBRootCount, Config* sysConfig); extern void checkMysqlPort( string& mysqlPort, Config* sysConfig); extern bool writeConfig(Config* sysConfig); diff --git a/oamapps/postConfigure/mycnfUpgrade.cpp b/oamapps/postConfigure/mycnfUpgrade.cpp index 7237fc712..ec0701773 100644 --- a/oamapps/postConfigure/mycnfUpgrade.cpp +++ b/oamapps/postConfigure/mycnfUpgrade.cpp @@ -115,22 +115,25 @@ int main(int argc, char *argv[]) oldbuf = line; string::size_type pos = oldbuf.find(includeArg,0); if ( pos != string::npos ) { - //check if this is commented out + //found in my.cnf.rpmsave, check if this is commented out if ( line[0] != '#' ) { - // no, find in my.cnf and replace + // no, check in my.cnf and replace if exist or add if it doesn't ifstream mycnffile (mycnfFile.c_str()); vector lines; char line1[200]; string newbuf; + bool updated = false; while (mycnffile.getline(line1, 200)) { newbuf = line1; string::size_type pos = newbuf.find(includeArg,0); - if ( pos != string::npos ) + if ( pos != string::npos ) { newbuf = oldbuf; - + cout << "Updated argument: " << includeArg << endl; + updated = true; + } //output to temp file lines.push_back(newbuf); } @@ -147,6 +150,39 @@ int main(int argc, char *argv[]) newFile.close(); close(fd); + + if (!updated) + { //not found, so add + ifstream mycnffile (mycnfFile.c_str()); + vector lines; + char line1[200]; + string newbuf; + while (mycnffile.getline(line1, 200)) + { + newbuf = line1; + string::size_type pos = newbuf.find("[mysqld]",0); + if ( pos != string::npos ) { + lines.push_back(newbuf); + newbuf = oldbuf; + cout << "Added argument: " << includeArg << endl; + } + //output to temp file + lines.push_back(newbuf); + } + + //write out a new my.cnf + mycnffile.close(); + unlink (mycnfFile.c_str()); + ofstream newFile (mycnfFile.c_str()); + + //create new file + int fd = open(mycnfFile.c_str(), O_RDWR|O_CREAT, 0666); + + copy(lines.begin(), lines.end(), ostream_iterator(newFile, "\n")); + newFile.close(); + + close(fd); + } } break; @@ -154,6 +190,9 @@ int main(int argc, char *argv[]) } } + string cmd = "chown mysql:mysql " + mycnfFile; + system(cmd.c_str()); + exit (0); } // vim:ts=4 sw=4: diff --git a/oamapps/postConfigure/postConfigure.cpp b/oamapps/postConfigure/postConfigure.cpp index 60c2de096..a94464251 100644 --- a/oamapps/postConfigure/postConfigure.cpp +++ b/oamapps/postConfigure/postConfigure.cpp @@ -431,6 +431,17 @@ int main(int argc, char *argv[]) exit(1); } + // run my.cnf upgrade script + if ( reuseConfig == "y" ) + { + cmd = installDir + "/bin/mycnfUpgrade > /tmp/mycnfUpgrade.log 2>&1"; + int rtnCode = system(cmd.c_str()); + if (WEXITSTATUS(rtnCode) != 0) + cout << "Error: Problem upgrade my.cnf, check /tmp/mycnfUpgrade.log" << endl; + else + cout << cout << "NOTE: my.cnf file was upgraded based on my.cnf.rpmsave" << endl; + } + //check mysql port changes string MySQLPort; try { @@ -2975,18 +2986,6 @@ int main(int argc, char *argv[]) cout << endl; } } - else - { - // run my.cnf upgrade script - if ( reuseConfig == "y" && MySQLRep == "y" && - IserverTypeInstall == oam::INSTALL_COMBINE_DM_UM_PM ) - { - cmd = installDir + "/bin/mycnfUpgrade > /tmp/mycnfUpgrade.log 2>&1"; - int rtnCode = system(cmd.c_str()); - if (WEXITSTATUS(rtnCode) != 0) - cout << "Error: Problem upgrade my.cnf, check /tmp/mycnfUpgrade.log" << endl; - } - } cout << endl; cout << "Next step is to enter the password to access the other Servers." << endl; @@ -3040,15 +3039,6 @@ int main(int argc, char *argv[]) { cout << endl << "===== Running the InfiniDB MySQL setup scripts =====" << endl << endl; - // run my.cnf upgrade script - if ( reuseConfig == "y" && MySQLRep == "y" ) - { - cmd = installDir + "/bin/mycnfUpgrade > /tmp/mycnfUpgrade.log 2>&1"; - int rtnCode = system(cmd.c_str()); - if (WEXITSTATUS(rtnCode) != 0) - cout << "Error: Problem upgrade my.cnf, check /tmp/mycnfUpgrade.log" << endl; - } - // call the mysql setup scripts mysqlSetup(); sleep(5); @@ -3606,7 +3596,7 @@ int main(int argc, char *argv[]) cout.flush(); //send message to procmon's to run upgrade script - int status = sendReplicationRequest(IserverTypeInstall, password, mysqlPort); + int status = sendReplicationRequest(IserverTypeInstall, password, mysqlPort, pmwithum); if ( status != 0 ) { cout << endl << " InfiniDB Install Failed" << endl << endl; @@ -4835,28 +4825,21 @@ bool storageSetup(string cloud) cout << endl; while(true) { - pcommand = callReadline("Do you want to enable Non-OAM-Parent-PM EBS failover support? [y,n] (" + AmazonPMFailover + ") > "); + pcommand = callReadline("Do you want to enable Instance failover support? [y,n] (" + AmazonPMFailover + ") > "); if (pcommand) { - if (strlen(pcommand) > 0) - { - AmazonPMFailover = pcommand; - } - else - { - if ( noPrompting ) - continue; - else - { - cout << "Invalid Entry, please enter 'y' for yes or 'n' for no" << endl; - if ( noPrompting ) - exit(1); - continue; - } - } - + if (strlen(pcommand) > 0) AmazonPMFailover = pcommand; callFree(pcommand); } + + if ( AmazonPMFailover == "y" || AmazonPMFailover == "n" ) { + cout << endl; + break; + } + else + cout << "Invalid Entry, please enter 'y' for yes or 'n' for no" << endl; + if ( noPrompting ) + exit(1); } try { @@ -4866,6 +4849,15 @@ bool storageSetup(string cloud) {} } + if( DBRootStorageType == "internal" && cloud == "amazon" ) + { //set AmazonPMFailover + try { + sysConfig->setConfig(InstallSection, "AmazonPMFailover", "n"); + } + catch(...) + {} + } + if ( !writeConfig(sysConfig) ) { cout << "ERROR: Failed trying to update InfiniDB System Configuration file" << endl; return false; diff --git a/oamapps/serverMonitor/diskMonitor.cpp b/oamapps/serverMonitor/diskMonitor.cpp index cbc70c462..33bbafd30 100644 --- a/oamapps/serverMonitor/diskMonitor.cpp +++ b/oamapps/serverMonitor/diskMonitor.cpp @@ -82,42 +82,6 @@ void diskMonitor() bool Externalflag = false; - //check for external disk - DBrootList dbrootList; - if (moduleType == "pm") { - systemStorageInfo_t t; - t = oam.getStorageConfig(); - if ( boost::get<0>(t) == "external") - Externalflag = true; - - // get dbroot list and storage type from config file - DBRootConfigList dbrootConfigList; - oam.getPmDbrootConfig(moduleID, dbrootConfigList); - - DBRootConfigList::iterator pt = dbrootConfigList.begin(); - for( ; pt != dbrootConfigList.end() ; pt++) - { - int dbrootID = *pt; - - string dbroot = "DBRoot" + oam.itoa(dbrootID); - - string dbootdir; - try{ - oam.getSystemConfig(dbroot, dbootdir); - } - catch(...) {} - - if ( dbootdir.empty() || dbootdir == "" ) - continue; - - DBrootData dbrootData; - dbrootData.dbrootDir = dbootdir; - dbrootData.downFlag = false; - - dbrootList.push_back(dbrootData); - } - } - string cloud = oam::UnassignedName; try { oam.getSystemConfig( "Cloud", cloud); @@ -140,6 +104,42 @@ void diskMonitor() while(true) { + //check for external disk + DBrootList dbrootList; + if (moduleType == "pm") { + systemStorageInfo_t t; + t = oam.getStorageConfig(); + if ( boost::get<0>(t) == "external") + Externalflag = true; + + // get dbroot list and storage type from config file + DBRootConfigList dbrootConfigList; + oam.getPmDbrootConfig(moduleID, dbrootConfigList); + + DBRootConfigList::iterator pt = dbrootConfigList.begin(); + for( ; pt != dbrootConfigList.end() ; pt++) + { + int dbrootID = *pt; + + string dbroot = "DBRoot" + oam.itoa(dbrootID); + + string dbootdir; + try{ + oam.getSystemConfig(dbroot, dbootdir); + } + catch(...) {} + + if ( dbootdir.empty() || dbootdir == "" ) + continue; + + DBrootData dbrootData; + dbrootData.dbrootDir = dbootdir; + dbrootData.downFlag = false; + + dbrootList.push_back(dbrootData); + } + } + SystemStatus systemstatus; try { oam.getSystemStatus(systemstatus); @@ -518,7 +518,7 @@ void diskMonitor() args.add("dbroot monitoring: Lost access to "); args.add(dbrootDir); msg.format(args); - ml.logCriticalMessage(msg); + ml.logWarningMessage(msg); oam.sendDeviceNotification(dbrootName, DBROOT_DOWN, moduleName); (*p).downFlag = true; @@ -610,11 +610,12 @@ void diskMonitor() //check disk space every 10 minutes diskSpaceCheck++; - if ( diskSpaceCheck >= 20 ) + if ( diskSpaceCheck >= 20 ) { diskSpaceCheck = 0; - lfs.clear(); - sdl.clear(); + lfs.clear(); + sdl.clear(); + } } // end of while loop } diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index d09c2cdc4..9ab8a6ea0 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -1026,6 +1026,7 @@ void BatchPrimitiveProcessor::execute() sessionID, &counterLock, &busyLoaderCount, + sendThread, &vssCache); asyncLoaded[p] = true; } @@ -2012,6 +2013,7 @@ void BatchPrimitiveProcessor::asyncLoadProjectColumns() sessionID, &counterLock, &busyLoaderCount, + sendThread, &vssCache); asyncLoaded[i] = true; } diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 1d7462c3d..0e2973eec 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -796,6 +796,7 @@ struct AsynchLoader { uint32_t sesID, boost::mutex *m, uint32_t *loaderCount, + boost::shared_ptr st, // sendThread for abort upon exception. VSSCache *vCache) : lbid(l), ver(v), @@ -807,6 +808,7 @@ struct AsynchLoader { readCount(rCount), busyLoaders(loaderCount), mutex(m), + sendThread(st), vssCache(vCache) { } @@ -818,9 +820,10 @@ struct AsynchLoader { //cout << "asynch started " << pthread_self() << " l: " << lbid << endl; try { - loadBlock(lbid, ver, txn, compType, buf, &cached, &rCount, LBIDTrace, sessionID, true, vssCache); + loadBlock(lbid, ver, txn, compType, buf, &cached, &rCount, LBIDTrace, true, vssCache); } catch (std::exception& ex) { + sendThread->abort(); cerr << "AsynchLoader caught loadBlock exception: " << ex.what() << endl; idbassert(asyncCounter > 0); (void)atomicops::atomicDec(&asyncCounter); @@ -834,6 +837,7 @@ struct AsynchLoader { return; } catch (...) { + sendThread->abort(); cerr << "AsynchLoader caught unknown exception: " << endl; //FIXME Use a locked processor primitive? idbassert(asyncCounter > 0); @@ -869,6 +873,7 @@ private: uint32_t *readCount; uint32_t *busyLoaders; boost::mutex *mutex; + boost::shared_ptr sendThread; VSSCache *vssCache; }; @@ -882,6 +887,7 @@ void loadBlockAsync(uint64_t lbid, uint32_t sessionID, boost::mutex *m, uint32_t *busyLoaders, + boost::shared_ptr sendThread, // sendThread for abort upon exception. VSSCache *vssCache) { blockCacheClient bc(*BRPp[cacheNum(lbid)]); @@ -913,7 +919,7 @@ void loadBlockAsync(uint64_t lbid, mutex::scoped_lock sl(*m); try { boost::thread thd(AsynchLoader(lbid, c, txn, compType, cCount, rCount, - LBIDTrace, sessionID, m, busyLoaders, vssCache)); + LBIDTrace, sessionID, m, busyLoaders, sendThread, vssCache)); (*busyLoaders)++; } catch (boost::thread_resource_error &e) { diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index 79880c705..2c0e839ff 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -93,14 +93,14 @@ namespace primitiveprocessor typedef std::map BPPMap; extern BPPMap bppMap; - void prefetchBlocks(uint64_t lbid, uint32_t* rCount); + void prefetchBlocks(uint64_t lbid, const int compType, uint32_t* rCount); void prefetchExtent(uint64_t lbid, uint32_t ver, uint32_t txn, uint32_t* rCount); void loadBlock(uint64_t lbid, BRM::QueryContext q, uint32_t txn, int compType, void* bufferPtr, bool* pWasBlockInCache, uint32_t* rCount=NULL, bool LBIDTrace = false, uint32_t sessionID = 0, bool doPrefetch=true, VSSCache *vssCache = NULL); void loadBlockAsync(uint64_t lbid, const BRM::QueryContext &q, uint32_t txn, int CompType, uint32_t *cCount, uint32_t *rCount, bool LBIDTrace, uint32_t sessionID, - boost::mutex *m, uint32_t *busyLoaders, VSSCache* vssCache=0); + boost::mutex *m, uint32_t *busyLoaders, boost::shared_ptr sendThread, VSSCache* vssCache=0); uint32_t loadBlocks(BRM::LBID_t *lbids, BRM::QueryContext q, BRM::VER_t txn, int compType, uint8_t **bufferPtrs, uint32_t *rCount, bool LBIDTrace, uint32_t sessionID, uint32_t blockCount, bool *wasVersioned, bool doPrefetch = true, VSSCache *vssCache = NULL); diff --git a/procmgr/main.cpp b/procmgr/main.cpp index 83b4b800b..c588c4df2 100644 --- a/procmgr/main.cpp +++ b/procmgr/main.cpp @@ -1216,6 +1216,13 @@ void pingDeviceThread() if (LANOUTAGEACTIVE) break; + try { + oam.getSystemConfig("MySQLRep", MySQLRep); + } + catch(...) { + MySQLRep = "n"; + } + if (moduleInfoList[moduleName] >= ModuleHeartbeatCount || opState == oam::DOWN || opState == oam::AUTO_DISABLED) { @@ -1503,15 +1510,22 @@ void pingDeviceThread() processManager.restartProcessType("ExeMgr", moduleName); } + string moduleType = moduleName.substr(0,MAX_MODULE_TYPE_SIZE); + if ( MySQLRep == "y" ) { - //setup MySQL Replication for started modules - - log.writeLog(__LINE__, "Setup MySQL Replication for module recovering from outage on " + moduleName, LOG_TYPE_DEBUG); - DeviceNetworkList devicenetworklist; - DeviceNetworkConfig devicenetworkconfig; - devicenetworkconfig.DeviceName = moduleName; - devicenetworklist.push_back(devicenetworkconfig); - processManager.setMySQLReplication(devicenetworklist); + if ( moduleType == "um" || + ( moduleType == "pm" && config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) || + ( moduleType == "pm" && PMwithUM == "y") ) { + + //setup MySQL Replication for started modules + + log.writeLog(__LINE__, "Setup MySQL Replication for module recovering from outage on " + moduleName, LOG_TYPE_DEBUG); + DeviceNetworkList devicenetworklist; + DeviceNetworkConfig devicenetworkconfig; + devicenetworkconfig.DeviceName = moduleName; + devicenetworklist.push_back(devicenetworkconfig); + processManager.setMySQLReplication(devicenetworklist); + } } else { @@ -1531,6 +1545,8 @@ void pingDeviceThread() //set query system state ready processManager.setQuerySystemState(true); + processManager.setSystemState(oam::ACTIVE); + //clear count moduleInfoList[moduleName] = 0; } @@ -1674,7 +1690,6 @@ void pingDeviceThread() // state = stopped, then try starting, if fail, remove/addmodule to launch new instance // state = terminate or nothing, remove/addmodule to launch new instance if ( amazon ) { - if ( moduleName.find("um") == 0 ) { // resume the dbrm @@ -1932,6 +1947,9 @@ void pingDeviceThread() // resume the dbrm oam.dbrmctl("resume"); log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); + + //set query system state ready + processManager.setQuerySystemState(true); } } else @@ -2087,8 +2105,8 @@ void pingDeviceThread() if ( parentOAMModule == config.moduleName() || parentOAMModule == "FAILED" ) { - //send sighup to these guys incase they marked any PrimProcs offline - processManager.reinitProcessType("ExeMgr"); + //srestart to these guys incase they marked any PrimProcs offline + processManager.restartProcessType("ExeMgr"); processManager.reinitProcessType("DDLProc"); processManager.reinitProcessType("DMLProc"); } diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index a2bc754b4..2c48f2ff0 100644 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -53,7 +53,7 @@ extern string USER; extern bool HDFS; extern string localHostName; extern string PMwithUM; -extern string MySQLRep; +extern string AmazonPMFailover; typedef map moduleList; extern moduleList moduleInfoList; @@ -617,7 +617,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) oam.dbrmctl("resume"); log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); - processManager.reinitProcessType("ExeMgr"); + processManager.restartProcessType("ExeMgr"); //setup MySQL Replication for started modules log.writeLog(__LINE__, "Setup MySQL Replication for module being started", LOG_TYPE_DEBUG); @@ -738,7 +738,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) //distribute config file processManager.distributeConfigFile("system"); - processManager.reinitProcessType("ExeMgr"); + processManager.restartProcessType("ExeMgr"); } } else @@ -1702,7 +1702,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) processManager.distributeConfigFile("system"); processManager.reinitProcessType("WriteEngineServer"); - processManager.reinitProcessType("ExeMgr"); + processManager.restartProcessType("ExeMgr"); processManager.reinitProcessType("DDLProc"); processManager.reinitProcessType("DMLProc"); } @@ -1752,7 +1752,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) processManager.distributeConfigFile("system"); processManager.reinitProcessType("WriteEngineServer"); - processManager.reinitProcessType("ExeMgr"); + processManager.restartProcessType("ExeMgr"); processManager.reinitProcessType("DDLProc"); processManager.reinitProcessType("DMLProc"); } @@ -2193,7 +2193,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) processManager.distributeConfigFile("system"); processManager.reinitProcessType("WriteEngineServer"); - processManager.reinitProcessType("ExeMgr"); + processManager.restartProcessType("ExeMgr"); processManager.reinitProcessType("DDLProc"); processManager.reinitProcessType("DMLProc"); } @@ -2226,7 +2226,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) processManager.distributeConfigFile("system"); processManager.reinitProcessType("WriteEngineServer"); - processManager.reinitProcessType("ExeMgr"); + processManager.restartProcessType("ExeMgr"); processManager.reinitProcessType("DDLProc"); processManager.reinitProcessType("DMLProc"); } @@ -2557,6 +2557,28 @@ void processMSG(messageqcpp::IOSocket* cfIos) break; } + case DISABLEMYSQLREP: + { + log.writeLog(__LINE__, "MSG RECEIVED: Disable MySQL Replication"); + + // target = root password + oam::DeviceNetworkList devicenetworklist; + status = processManager.setMySQLReplication(devicenetworklist, oam::UnassignedName, false, true, target, false); + + log.writeLog(__LINE__, "Disable MySQL Replication status: " + oam.itoa(status) ); + + ackMsg << (ByteStream::byte) oam::ACK; + ackMsg << actionType; + ackMsg << target; + ackMsg << (ByteStream::byte) status; + try { + fIos.write(ackMsg); + } + catch(...) {} + + break; + } + case GLUSTERASSIGN: { string dbroot; @@ -2742,7 +2764,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) processManager.distributeConfigFile("system"); processManager.reinitProcessType("WriteEngineServer"); - processManager.reinitProcessType("ExeMgr"); + processManager.restartProcessType("ExeMgr"); processManager.reinitProcessType("DDLProc"); processManager.reinitProcessType("DMLProc"); } @@ -3302,7 +3324,7 @@ int ProcessManager::disableModule(string target, bool manualFlag) /****************************************************************************************** * @brief recycleProcess * -* purpose: recyle process, general;ly after some disable module is run +* purpose: recyle process, generally after some disable module is run * ******************************************************************************************/ void ProcessManager::recycleProcess(string module) @@ -3327,20 +3349,23 @@ void ProcessManager::recycleProcess(string module) restartProcessType("mysql"); } else - reinitProcessType("ExeMgr"); + restartProcessType("ExeMgr"); if ( PrimaryUMModuleName == module ) { - restartProcessType("DDLProc", "none", false); + restartProcessType("DDLProc", module); +// restartProcessType("DDLProc", module, false); sleep(1); - restartProcessType("DMLProc", "none", false); + restartProcessType("DMLProc", module); +// restartProcessType("DMLProc", module, false); } if( moduleType == "pm" && PrimaryUMModuleName != module) { reinitProcessType("DDLProc"); sleep(1); - restartProcessType("DMLProc", "none", false); + restartProcessType("DMLProc", module); +// restartProcessType("DMLProc", module, false); } return; @@ -8809,10 +8834,10 @@ int ProcessManager::OAMParentModuleChange() {} //do amazon failover - if (amazon) + if (amazon && AmazonPMFailover == "n") { log.writeLog(__LINE__, " ", LOG_TYPE_DEBUG); - log.writeLog(__LINE__, "*** OAMParentModule outage, recover the Instance ***", LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "*** OAMParentModule outage, AmazonPMFailover not set, wating for instance to restart ***", LOG_TYPE_DEBUG); string currentIPAddr = oam.getEC2InstanceIpAddress(downOAMParentHostname); if (currentIPAddr == "stopped") @@ -9068,6 +9093,12 @@ int ProcessManager::OAMParentModuleChange() if ( status != 0 ) log.writeLog(__LINE__, "startModuleThread: pthread_create failed, return status = " + oam.itoa(status), LOG_TYPE_ERROR); + if (status == 0) + { + pthread_join(startmodulethread, NULL); + status = startsystemthreadStatus; + } + //restart/reinit processes to force their release of the controller node port if ( ( config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM) && ( moduleNameList.size() <= 1 && config.moduleType() == "pm") ) @@ -9135,7 +9166,7 @@ int ProcessManager::OAMParentModuleChange() if (opState != oam::AUTO_DISABLED) { if ((*pt).DeviceName != downOAMParentName ) { if ((*pt).DeviceName != config.moduleName() ) { - processManager.setModuleState((*pt).DeviceName, oam::AUTO_INIT); + // processManager.setModuleState((*pt).DeviceName, oam::AUTO_INIT); pthread_t startmodulethread; string moduleName = (*pt).DeviceName; int status = pthread_create (&startmodulethread, NULL, (void*(*)(void*)) &startModuleThread, &moduleName); @@ -9152,25 +9183,6 @@ int ProcessManager::OAMParentModuleChange() } } - //wait until local module is active before continuing - while(true) - { - int opState = oam::ACTIVE; - bool degraded; - try { - oam.getModuleStatus(config.moduleName(), opState, degraded); - } - catch(...) - { -// log.writeLog(__LINE__, "EXCEPTION ERROR on getModuleStatus on module " + config.moduleName() + ": Caught unknown exception!", LOG_TYPE_ERROR); - } - - if (opState == oam::ACTIVE) - break; - - sleep(1); - } - //restart DDLProc/DMLProc to perform any rollbacks, if needed //dont rollback in amazon, wait until down pm recovers if ( ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM ) @@ -9194,13 +9206,8 @@ int ProcessManager::OAMParentModuleChange() // clear alarm aManager.sendAlarmReport(config.moduleName().c_str(), MODULE_SWITCH_ACTIVE, CLEAR); - if (amazon) { - //Set the down module instance state so it will be auto restarted - processManager.setModuleState(downOAMParentName, oam::AUTO_OFFLINE); - - // sleep to give time for local pm to fully go active - sleep(30); - } + //set status to ACTIVE while failover is in progress + processManager.setSystemState(oam::ACTIVE); log.writeLog(__LINE__, "*** Exiting OAMParentModuleChange function ***", LOG_TYPE_DEBUG); @@ -9867,6 +9874,14 @@ int ProcessManager::setMySQLReplication(oam::DeviceNetworkList devicenetworklist { Oam oam; + string MySQLRep; + try { + oam.getSystemConfig("MySQLRep", MySQLRep); + } + catch(...) { + MySQLRep = "n"; + } + if ( MySQLRep == "n" && enable ) return oam::API_SUCCESS; diff --git a/procmon/main.cpp b/procmon/main.cpp index 04059430b..c0f7a1bc1 100644 --- a/procmon/main.cpp +++ b/procmon/main.cpp @@ -214,7 +214,8 @@ int main(int argc, char **argv) //check if currently configured as Parent OAM Module on startup if ( gOAMParentModuleFlag ) { - if ( config.OAMStandbyName() != oam::UnassignedName ) { + if ( ( config.OAMStandbyName() != oam::UnassignedName ) && + DBRootStorageType != "internal" ) { //try for 20 minutes checking if the standby node is up string parentOAMModule; log.writeLog(__LINE__, "starting has parent, double check. checking with old Standby Module", LOG_TYPE_DEBUG); @@ -600,10 +601,15 @@ int main(int argc, char **argv) log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); //mysql status monitor thread - pthread_t mysqlThread; - ret = pthread_create (&mysqlThread, NULL, (void*(*)(void*)) &mysqlMonitorThread, NULL); - if ( ret != 0 ) - log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); + if ( ( config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) || + (PMwithUM == "y") ) + { + + pthread_t mysqlThread; + ret = pthread_create (&mysqlThread, NULL, (void*(*)(void*)) &mysqlMonitorThread, NULL); + if ( ret != 0 ) + log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); + } //update syslog file priviledges aMonitor.changeModLog(); @@ -1086,16 +1092,6 @@ static void chldHandleThread(MonitorConfig config) processRestartPeriod = 120; } - try - { - oam.getProcessStatus(systemprocessstatus); - } - catch(...) - { - sleep(5); - continue; - } - listPtr = aPtr->begin(); for (; listPtr != aPtr->end(); ++listPtr) { @@ -1104,112 +1100,121 @@ static void chldHandleThread(MonitorConfig config) // Update internal process state when in INIT and System is ACTIVE/FAILED // Updated System process state when AOS and different from internal int outOfSyncCount = 0; - if ( delayCount == 10 ) { + if ( delayCount == 2 ) { while(true) { int state = (*listPtr).state; //set as default int PID = (*listPtr).processID; //set as default + try { + ProcessStatus procstat; + oam.getProcessStatus((*listPtr).ProcessName, config.moduleName(), procstat); + state = procstat.ProcessOpState; + PID = procstat.ProcessID; + + if (state == oam::BUSY_INIT ) { + // updated local state ot BUSY_INIT + (*listPtr).state = state; + break; + } - for( unsigned int j = 0 ; j < systemprocessstatus.processstatus.size(); j++) + if ( (state == oam::AUTO_INIT && (*listPtr).state == oam::AUTO_INIT) || + (state == oam::MAN_INIT && (*listPtr).state == oam::MAN_INIT) ) { + // get current time in seconds + time_t cal; + time (&cal); + + if ( (cal - (*listPtr).currentTime) > 20 ) { + // issue ALARM and update status to FAILED + aMonitor.sendAlarm((*listPtr).ProcessName, PROCESS_INIT_FAILURE, SET); +// (*listPtr).state = oam::FAILED; +// aMonitor.updateProcessInfo((*listPtr).ProcessName, oam::FAILED, (*listPtr).processID); + + //force restart the un-initted process + log.writeLog(__LINE__, (*listPtr).ProcessName + "/" + oam.itoa((*listPtr).processID) + " failed to init in 20 seconds, force killing it so it can restart", LOG_TYPE_CRITICAL); + //skip killing 0 or 1 + if ( (*listPtr).processID > 1 ) + kill((*listPtr).processID, SIGKILL); + break; + } + break; + } + } + catch (exception& ex) { - if ( systemprocessstatus.processstatus[j].ProcessName == (*listPtr).ProcessName - && systemprocessstatus.processstatus[j].Module.find(config.moduleName(),0) != string::npos) { - - state = systemprocessstatus.processstatus[j].ProcessOpState; - PID = systemprocessstatus.processstatus[j].ProcessID; - - if (state == oam::BUSY_INIT ) { - // updated local state ot BUSY_INIT + string error = ex.what(); +// log.writeLog(__LINE__, "EXCEPTION ERROR on getProcessStatus: " + error, LOG_TYPE_ERROR); + break; + } + catch(...) + { +// log.writeLog(__LINE__, "EXCEPTION ERROR on getProcessStatus: Caught unknown exception!", LOG_TYPE_ERROR); + break; + } + + if (state != (*listPtr).state || PID != (*listPtr).processID) { + if ( state == oam::STANDBY && (*listPtr).state == oam::ACTIVE ) + break; + else + { + if ( (state == oam::ACTIVE && (*listPtr).state == oam::AUTO_INIT) || + (state == oam::ACTIVE && (*listPtr).state == oam::MAN_INIT) || + (state == oam::ACTIVE && (*listPtr).state == oam::STANDBY) || + (state == oam::ACTIVE && (*listPtr).state == oam::INITIAL) || + (state == oam::ACTIVE && (*listPtr).state == oam::STANDBY_INIT) || + (state == oam::ACTIVE && (*listPtr).state == oam::BUSY_INIT) || + (state == oam::STANDBY && (*listPtr).state == oam::AUTO_INIT) || + (state == oam::STANDBY && (*listPtr).state == oam::MAN_INIT) || + (state == oam::STANDBY && (*listPtr).state == oam::INITIAL) || + (state == oam::STANDBY && (*listPtr).state == oam::BUSY_INIT) || + (state == oam::STANDBY && (*listPtr).state == oam::STANDBY_INIT) ) { + // updated local state to ACTIVE (*listPtr).state = state; break; } - - if ( (state == oam::AUTO_INIT && (*listPtr).state == oam::AUTO_INIT) || - (state == oam::MAN_INIT && (*listPtr).state == oam::MAN_INIT) ) { - // get current time in seconds - time_t cal; - time (&cal); - - if ( (cal - (*listPtr).currentTime) > 20 ) { - // issue ALARM and update status to FAILED - aMonitor.sendAlarm((*listPtr).ProcessName, PROCESS_INIT_FAILURE, SET); - - //force restart the un-initted process - log.writeLog(__LINE__, (*listPtr).ProcessName + "/" + oam.itoa((*listPtr).processID) + " failed to init in 20 seconds, force killing it so it can restart", LOG_TYPE_CRITICAL); - //skip killing 0 or 1 - if ( (*listPtr).processID > 1 ) - kill((*listPtr).processID, SIGKILL); - break; - } - break; - } - } + if ( (state == oam::FAILED && (*listPtr).state == oam::AUTO_INIT) || + (state == oam::FAILED && (*listPtr).state == oam::BUSY_INIT) || + (state == oam::FAILED && (*listPtr).state == oam::MAN_INIT) ) { + // issue ALARM and update local status to FAILED + log.writeLog(__LINE__, (*listPtr).ProcessName + " failed initialization", LOG_TYPE_WARNING); + aMonitor.sendAlarm((*listPtr).ProcessName, PROCESS_INIT_FAILURE, SET); + (*listPtr).state = state; + + //setModule status to failed + try{ + oam.setModuleStatus(config.moduleName(), oam::FAILED); + } + catch (exception& ex) + { + string error = ex.what(); +// log.writeLog(__LINE__, "EXCEPTION ERROR on setModuleStatus: " + error, LOG_TYPE_ERROR); + } + catch(...) + { +// log.writeLog(__LINE__, "EXCEPTION ERROR on setModuleStatus: Caught unknown exception!", LOG_TYPE_ERROR); + } - if (state != (*listPtr).state || PID != (*listPtr).processID) { - if ( state == oam::STANDBY && (*listPtr).state == oam::ACTIVE ) break; - else - { - if ( (state == oam::ACTIVE && (*listPtr).state == oam::AUTO_INIT) || - (state == oam::ACTIVE && (*listPtr).state == oam::MAN_INIT) || - (state == oam::ACTIVE && (*listPtr).state == oam::STANDBY) || - (state == oam::ACTIVE && (*listPtr).state == oam::INITIAL) || - (state == oam::ACTIVE && (*listPtr).state == oam::STANDBY_INIT) || - (state == oam::ACTIVE && (*listPtr).state == oam::BUSY_INIT) || - (state == oam::STANDBY && (*listPtr).state == oam::AUTO_INIT) || - (state == oam::STANDBY && (*listPtr).state == oam::MAN_INIT) || - (state == oam::STANDBY && (*listPtr).state == oam::INITIAL) || - (state == oam::STANDBY && (*listPtr).state == oam::BUSY_INIT) || - (state == oam::STANDBY && (*listPtr).state == oam::STANDBY_INIT) ) { - // updated local state to ACTIVE - (*listPtr).state = state; - break; - } - if ( (state == oam::FAILED && (*listPtr).state == oam::AUTO_INIT) || - (state == oam::FAILED && (*listPtr).state == oam::BUSY_INIT) || - (state == oam::FAILED && (*listPtr).state == oam::MAN_INIT) ) { - // issue ALARM and update local status to FAILED - log.writeLog(__LINE__, (*listPtr).ProcessName + " failed initialization", LOG_TYPE_WARNING); - aMonitor.sendAlarm((*listPtr).ProcessName, PROCESS_INIT_FAILURE, SET); - (*listPtr).state = state; - - //setModule status to failed - try{ - oam.setModuleStatus(config.moduleName(), oam::FAILED); - } - catch (exception& ex) - { - string error = ex.what(); - log.writeLog(__LINE__, "EXCEPTION ERROR on setModuleStatus: " + error, LOG_TYPE_ERROR); - } - catch(...) - { - log.writeLog(__LINE__, "EXCEPTION ERROR on setModuleStatus: Caught unknown exception!", LOG_TYPE_ERROR); - } - - break; - } - - if (state == oam::AUTO_OFFLINE || state == oam::INITIAL || - PID != (*listPtr).processID) { - //due to a small window, only process if out-of-sync for more than 1 second - outOfSyncCount++; - if ( outOfSyncCount == 2 ) { - // out of sync, update with internal state/PID - log.writeLog(__LINE__, "State out-of-sync, update on " + (*listPtr).ProcessName + "/" + oam.itoa((*listPtr).state) + "/" + oam.itoa((*listPtr).processID) , LOG_TYPE_DEBUG); - - aMonitor.updateProcessInfo((*listPtr).ProcessName, (*listPtr).state, (*listPtr).processID); - break; - } - sleep(1); - } - else - break; } + + if (state == oam::AUTO_OFFLINE || state == oam::INITIAL || + PID != (*listPtr).processID) { + //due to a small window, only process if out-of-sync for more than 1 second + outOfSyncCount++; + if ( outOfSyncCount == 2 ) { + // out of sync, update with internal state/PID + log.writeLog(__LINE__, "State out-of-sync, update on " + (*listPtr).ProcessName + "/" + oam.itoa((*listPtr).state) + "/" + oam.itoa((*listPtr).processID) , LOG_TYPE_DEBUG); + + aMonitor.updateProcessInfo((*listPtr).ProcessName, (*listPtr).state, (*listPtr).processID); + break; + } + sleep(1); + } + else + break; } - else - break; } + else + break; } } @@ -2326,14 +2331,6 @@ void processStatusMSG(messageqcpp::IOSocket* cfIos) fShmProcessStatus[shmIndex].ProcessID = PID; memcpy(fShmProcessStatus[shmIndex].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE); - //if DMLProc set to ACTIVE, set system state to ACTIVE - if ( processName == "DMLProc" && state == oam::ACTIVE ) - { - fShmSystemStatus[0].OpState = state; - memcpy(fShmSystemStatus[0].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE); - log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Set System State = " + oamState[state], LOG_TYPE_DEBUG); - } - //if DMLProc set to BUSY_INIT, set system state to BUSY_INIT if ( processName == "DMLProc" && state == oam::BUSY_INIT ) { @@ -2341,6 +2338,19 @@ void processStatusMSG(messageqcpp::IOSocket* cfIos) memcpy(fShmSystemStatus[0].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE); log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Set System State = " + oamState[state], LOG_TYPE_DEBUG); } + + //if DMLProc set to ACTIVE, set system state to ACTIVE if in an INIT state + if ( processName == "DMLProc" && state == oam::ACTIVE ) + { + if ( fShmSystemStatus[0].OpState == oam::BUSY_INIT || + fShmSystemStatus[0].OpState == oam::MAN_INIT || + fShmSystemStatus[0].OpState == oam::AUTO_INIT ) + { + fShmSystemStatus[0].OpState = state; + memcpy(fShmSystemStatus[0].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE); + log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Set System State = " + oamState[state], LOG_TYPE_DEBUG); + } + } } break; diff --git a/procmon/processmonitor.cpp b/procmon/processmonitor.cpp index 87f356a6b..471e5d258 100644 --- a/procmon/processmonitor.cpp +++ b/procmon/processmonitor.cpp @@ -546,18 +546,6 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO //Check for SIMPLEX runtype processes initType = checkSpecialProcessState( processconfig.ProcessName, processconfig.RunType, processconfig.ModuleType ); - if ( actIndicator == oam::GRACEFUL_STANDBY) { - //this module running Parent OAM Standby - runStandby = true; - log.writeLog(__LINE__, "ProcMon Running Hot-Standby"); - - // delete any old active alarm log file - unlink ("/var/log/Calpont/activeAlarms"); - } - - //Check for SIMPLEX runtype processes - initType = checkSpecialProcessState( processconfig.ProcessName, processconfig.RunType, processconfig.ModuleType ); - if ( initType == oam::COLD_STANDBY) { //there is a mate active, skip config.buildList(processconfig.ModuleType, @@ -1771,6 +1759,19 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO string masterLogFile = oam::UnassignedName; string masterLogPos = oam::UnassignedName; + if ( (PMwithUM == "n") && (config.moduleType() == "pm") && ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM) ) + { + ackMsg << (ByteStream::byte) ACK; + ackMsg << (ByteStream::byte) MASTERREP; + ackMsg << (ByteStream::byte) oam::API_FAILURE; + ackMsg << masterLogFile; + ackMsg << masterLogPos; + mq.write(ackMsg); + + log.writeLog(__LINE__, "MASTERREP: Error PM invalid msg - ACK back to ProcMgr return status = " + oam.itoa((int) oam::API_FAILURE)); + break; + } + //change local my.cnf file int ret; int retry; @@ -1831,6 +1832,17 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO string port; msg >> port; + if ( (PMwithUM == "n") && (config.moduleType() == "pm") && ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM) ) + { + ackMsg << (ByteStream::byte) ACK; + ackMsg << (ByteStream::byte) SLAVEREP; + ackMsg << (ByteStream::byte) oam::API_FAILURE; + mq.write(ackMsg); + + log.writeLog(__LINE__, "SLAVEREP: Error PM invalid msg - ACK back to ProcMgr return status = " + oam.itoa((int) oam::API_FAILURE)); + break; + } + //change local my.cnf file int ret = changeMyCnf("slave"); @@ -1867,6 +1879,16 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO string module; msg >> module; + if ( (PMwithUM == "n") && (config.moduleType() == "pm") && ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM) ) + { + ackMsg << (ByteStream::byte) ACK; + ackMsg << (ByteStream::byte) MASTERDIST; + ackMsg << (ByteStream::byte) oam::API_FAILURE; + mq.write(ackMsg); + + log.writeLog(__LINE__, "MASTERDIST: runMasterRep - ACK back to ProcMgr return status = " + oam.itoa((int) oam::API_FAILURE)); + } + if ( password == oam::UnassignedName ) password = "ssh"; @@ -3796,8 +3818,6 @@ int ProcessMonitor::createDataDirs(std::string cloud) if (UMStorageType == "external") { if(!amazonVolumeCheck()) { - //Set the alarm - sendAlarm(config.moduleName().c_str(), STARTUP_DIAGNOTICS_FAILURE, SET); return API_FAILURE; } } @@ -3856,8 +3876,6 @@ int ProcessMonitor::createDataDirs(std::string cloud) config.moduleID() == moduleID) { if(!amazonVolumeCheck(id)) { - //Set the alarm - sendAlarm(config.moduleName().c_str(), STARTUP_DIAGNOTICS_FAILURE, SET); return API_FAILURE; } } @@ -4696,33 +4714,16 @@ int ProcessMonitor::changeMyCnf(std::string type) buf = "server-id = 1"; } -/* pos = buf.find("# log-bin=mysql-bin",0); + pos = buf.find("# binlog_format=ROW",0); if ( pos != string::npos ) { - buf = "log-bin=mysql-bin"; + buf = "binlog_format=ROW"; } -*/ + pos = buf.find("infinidb_local_query=1",0); if ( pos != string::npos && pos == 0) { buf = "# infinidb_local_query=1"; } -/* pos = buf.find("slave-skip-errors=all",0); - if ( pos != string::npos && pos == 0) { - buf = "# slave-skip-errors=all" + buf; - } - - pos = buf.find("# relay-log=",0); - if ( pos != string::npos && pos == 0) { - buf = buf; - } - else - { - pos = buf.find("relay-log=",0); - if ( pos != string::npos && pos == 0) { - buf = "# " + buf; - } - } -*/ //output to temp file lines.push_back(buf); } @@ -4823,16 +4824,6 @@ int ProcessMonitor::changeMyCnf(std::string type) buf = "server-id = " + slaveID; } -/* pos = buf.find("# relay-log=",0); - if ( pos != string::npos ) { - buf = "relay-log=" + dbDir + "/relay-bin"; - } - - pos = buf.find("# slave-skip-errors=all",0); - if ( pos != string::npos ) { - buf = "slave-skip-errors=all"; - } -*/ // set local query flag if on pm if ( (PMwithUM == "y") && config.moduleType() == "pm" ) { @@ -4849,11 +4840,11 @@ int ProcessMonitor::changeMyCnf(std::string type) } } -/* pos = buf.find("log-bin=mysql-bin",0); + pos = buf.find("binlog_format=ROW",0); if ( pos != string::npos && pos == 0 ) { - buf = "# log-bin=mysql-bin"; + buf = "# binlog_format=ROW"; } -*/ + //output to temp file lines.push_back(buf); } @@ -4893,6 +4884,11 @@ int ProcessMonitor::changeMyCnf(std::string type) buf = "# log-bin=mysql-bin"; } + pos = buf.find("binlog_format=ROW",0); + if ( pos != string::npos && pos == 0 ) { + buf = "# binlog_format=ROW"; + } + pos = buf.find("infinidb_local_query=1",0); if ( pos != string::npos && pos == 0) { buf = "# infinidb_local_query=1"; @@ -5585,7 +5581,7 @@ bool ProcessMonitor::amazonVolumeCheck(int dbrootID) } if ( status != "available" ) { - log.writeLog(__LINE__, "amazonVolumeCheck function failed, volume not attached and not available: " + volumeName, LOG_TYPE_CRITICAL); + log.writeLog(__LINE__, "amazonVolumeCheck function failed, volume not attached and not available: " + volumeName, LOG_TYPE_WARNING); return false; } else @@ -5610,7 +5606,7 @@ bool ProcessMonitor::amazonVolumeCheck(int dbrootID) } else { - log.writeLog(__LINE__, "amazonVolumeCheck function failed, volume failed to attached: " + volumeName, LOG_TYPE_CRITICAL); + log.writeLog(__LINE__, "amazonVolumeCheck function failed, volume failed to attached: " + volumeName, LOG_TYPE_WARNING); return false; } } @@ -5642,7 +5638,7 @@ bool ProcessMonitor::amazonVolumeCheck(int dbrootID) } if ( status != "available" ) { - log.writeLog(__LINE__, "amazonVolumeCheck function failed, volume not attached and not available: " + volumeName, LOG_TYPE_CRITICAL); + log.writeLog(__LINE__, "amazonVolumeCheck function failed, volume not attached and not available: " + volumeName, LOG_TYPE_WARNING); return false; } else diff --git a/tools/configMgt/autoConfigure.cpp b/tools/configMgt/autoConfigure.cpp index 46c033692..5afe2d68a 100644 --- a/tools/configMgt/autoConfigure.cpp +++ b/tools/configMgt/autoConfigure.cpp @@ -1805,6 +1805,23 @@ int main(int argc, char *argv[]) catch(...) {} + try { + string AmazonPMFailover = sysConfigOld->getConfig("Installation", "AmazonPMFailover"); + + if ( !AmazonPMFailover.empty() ) + { + try { + sysConfigNew->setConfig("Installation", "AmazonPMFailover", AmazonPMFailover); + } + catch(...) + { + cout << "ERROR: Problem setting AmazonPMFailover in the Calpont System Configuration file" << endl; + exit(-1); + } + } + } + catch(...) + {} //Write out Updated System Configuration File sysConfigNew->write(); diff --git a/writeengine/redistribute/we_redistributecontrolthread.cpp b/writeengine/redistribute/we_redistributecontrolthread.cpp index d1b3d12f5..d83638522 100644 --- a/writeengine/redistribute/we_redistributecontrolthread.cpp +++ b/writeengine/redistribute/we_redistributecontrolthread.cpp @@ -442,17 +442,24 @@ int RedistributeControlThread::executeRedistributePlan() bool isActive = false; while (!isActive) { + bool noExcept = true; SystemStatus systemstatus; - systemstatus.SystemOpState = oam::ACTIVE; try { fControl->fOam->getSystemStatus(systemstatus); } + catch (const std::exception& ex) + { + fErrorMsg += ex.what(); + noExcept = false; + } catch (...) - {} + { + noExcept = false; + } - if ((isActive = (systemstatus.SystemOpState == oam::ACTIVE) == false)) - sleep(1); + if (noExcept && ((isActive = (systemstatus.SystemOpState == oam::ACTIVE)) == false)) + sleep(1);; } #endif