From d5e873e1981d50fec690df62ba545aa2dd0c7ed6 Mon Sep 17 00:00:00 2001 From: david hill Date: Mon, 17 Jul 2017 15:43:18 -0500 Subject: [PATCH] MCOL-814 --- dbcon/joblist/distributedenginecomm.cpp | 42 +- procmgr/processmanager.cpp | 889 +++++++++++++----------- 2 files changed, 519 insertions(+), 412 deletions(-) diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index a059623fc..4407c4d6d 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -171,7 +171,7 @@ namespace joblist DistributedEngineComm* DistributedEngineComm::fInstance = 0; /*static*/ - DistributedEngineComm* DistributedEngineComm::instance(ResourceManager& rm, bool isExeMgr) + DistributedEngineComm* DistributedEngineComm::instance(ResourceManager* rm, bool isExeMgr) { if (fInstance == 0) fInstance = new DistributedEngineComm(rm, isExeMgr); @@ -186,9 +186,9 @@ namespace joblist fInstance = 0; } - DistributedEngineComm::DistributedEngineComm(ResourceManager& rm, bool isExeMgr) : + DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr) : fRm(rm), - fLBIDShift(fRm.getPsLBID_Shift()), + fLBIDShift(fRm->getPsLBID_Shift()), pmCount(0), fIsExeMgr(isExeMgr) { @@ -219,10 +219,10 @@ void DistributedEngineComm::Setup() newClients.clear(); newLocks.clear(); - throttleThreshold = fRm.getDECThrottleThreshold(); - uint32_t newPmCount = fRm.getPsCount(); - int cpp = (fIsExeMgr ? fRm.getPsConnectionsPerPrimProc() : 1); - tbpsThreadCount = fRm.getJlNumScanReceiveThreads(); + throttleThreshold = fRm->getDECThrottleThreshold(); + uint32_t newPmCount = fRm->getPsCount(); + int cpp = (fIsExeMgr ? fRm->getPsConnectionsPerPrimProc() : 1); + tbpsThreadCount = fRm->getJlNumScanReceiveThreads(); unsigned numConnections = newPmCount * cpp; oam::Oam oam; ModuleTypeConfig moduletypeconfig; @@ -246,7 +246,7 @@ void DistributedEngineComm::Setup() string fServer (oss.str()); boost::shared_ptr - cl(new MessageQueueClient(fServer, fRm.getConfig())); + cl(new MessageQueueClient(fServer, fRm->getConfig())); boost::shared_ptr nl(new boost::mutex()); try { if (cl->connect()) { @@ -297,7 +297,7 @@ void DistributedEngineComm::Setup() int DistributedEngineComm::Close() { - //cout << "DistributedEngineComm::Close() called" << endl; + cout << "DistributedEngineComm::Close() called" << endl; makeBusy(false); // for each MessageQueueClient in pmConnections delete the MessageQueueClient; @@ -337,9 +337,9 @@ Error: // @bug 488 - error condition! push 0 length bs to messagequeuemap and // eventually let jobstep error out. mutex::scoped_lock lk(fMlock); - //cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl; + cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl; - MessageQueueMap::iterator map_tok; +/* MessageQueueMap::iterator map_tok; sbs.reset(new ByteStream(0)); for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) @@ -356,21 +356,21 @@ Error: { mutex::scoped_lock onErrLock(fOnErrMutex); string moduleName = client->moduleName(); - //cout << "moduleName=" << moduleName << endl; + cout << "moduleName=" << moduleName << endl; for ( uint32_t i = 0; i < fPmConnections.size(); i++) { if (moduleName != fPmConnections[i]->moduleName()) tempConns.push_back(fPmConnections[i]); //else - //cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl; + cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl; } if (tempConns.size() == fPmConnections.size()) return; fPmConnections.swap(tempConns); pmCount = (pmCount == 0 ? 0 : pmCount - 1); - //cout << "PMCOUNT=" << pmCount << endl; - + cout << "PMCOUNT=" << pmCount << endl; +*/ // send alarm & log it ALARMManager alarmMgr; string alarmItem = client->addr2String(); @@ -380,7 +380,7 @@ Error: ostringstream os; os << "DEC: lost connection to " << client->addr2String(); writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL); - } +// } return; } @@ -861,9 +861,9 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin { // @bug 488. error out under such condition instead of re-trying other connection, // by pushing 0 size bytestream to messagequeue and throw excpetion - SBS sbs; +/* SBS sbs; lk.lock(); - //cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << endl; + cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << endl; MessageQueueMap::iterator map_tok; sbs.reset(new ByteStream(0)); @@ -879,10 +879,10 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin // reconfig the connection array ClientList tempConns; { - //cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<< endl; + cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<< endl; mutex::scoped_lock onErrLock(fOnErrMutex); string moduleName = fPmConnections[index]->moduleName(); - //cout << "module name = " << moduleName << endl; + cout << "module name = " << moduleName << endl; if (index >= fPmConnections.size()) return 0; for (uint32_t i = 0; i < fPmConnections.size(); i++) @@ -894,7 +894,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin fPmConnections.swap(tempConns); pmCount = (pmCount == 0 ? 0 : pmCount - 1); } - +*/ // send alarm ALARMManager alarmMgr; string alarmItem("UNKNOWN"); diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index ae76401ca..e5c783c40 100644 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -618,11 +618,11 @@ void processMSG(messageqcpp::IOSocket* cfIos) oam.dbrmctl("resume"); log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); - processManager.restartProcessType("ExeMgr"); +// processManager.restartProcessType("ExeMgr"); //setup MySQL Replication for started modules - log.writeLog(__LINE__, "Setup MySQL Replication for module being started", LOG_TYPE_DEBUG); - processManager.setMySQLReplication(startdevicenetworklist); +// log.writeLog(__LINE__, "Setup MySQL Replication for module being started", LOG_TYPE_DEBUG); +// processManager.setMySQLReplication(startdevicenetworklist); } } else @@ -2791,7 +2791,17 @@ void processMSG(messageqcpp::IOSocket* cfIos) log.writeLog(__LINE__, "MSG RECEIVED: Process Restarted on " + moduleName + "/" + processName); - //request reinit after Process is active + //set query system states not ready + BRM::DBRM dbrm; + dbrm.setSystemQueryReady(false); + + processManager.setQuerySystemState(false); + + processManager.setSystemState(oam::BUSY_INIT); + + processManager.reinitProcessType("cpimport"); + + //request reinit after Process is active for ( int i = 0; i < 600 ; i++ ) { try { ProcessStatus procstat; @@ -2805,7 +2815,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) processManager.distributeConfigFile("system"); processManager.reinitProcessType("WriteEngineServer"); - processManager.restartProcessType("ExeMgr"); + processManager.reinitProcessType("ExeMgr"); processManager.reinitProcessType("DDLProc"); processManager.reinitProcessType("DMLProc"); } @@ -2851,7 +2861,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) } // Wait for DMLProc to be ACTIVE - BRM::DBRM dbrm; + //BRM::DBRM dbrm; state = AUTO_OFFLINE; while (state == oam::MAN_OFFLINE || state == oam::AUTO_OFFLINE @@ -2865,7 +2875,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) break; sleep(1); } - dbrm.setSystemQueryReady(true); + //dbrm.setSystemQueryReady(true); } // if a DDLProc was restarted, reinit DMLProc @@ -2916,6 +2926,13 @@ void processMSG(messageqcpp::IOSocket* cfIos) break; } } + + //enable query stats + dbrm.setSystemQueryReady(true); + + processManager.setQuerySystemState(true); + + processManager.setSystemState(oam::ACTIVE); } break; @@ -3797,10 +3814,8 @@ void ProcessManager::setSystemState(uint16_t state) else if ( state == oam::AUTO_OFFLINE ) aManager.sendAlarmReport(system.c_str(), SYSTEM_DOWN_AUTO, SET); - //this alarm doesnt get clear by reporter, so clear on stopage aManager.sendAlarmReport(system.c_str(), CONN_FAILURE, CLEAR); } - pthread_mutex_unlock(&STATUS_LOCK); } @@ -4420,6 +4435,18 @@ int ProcessManager::addModule(oam::DeviceNetworkList devicenetworklist, std::str pthread_mutex_lock(&THREAD_LOCK); + //get Distributed Install + string DistributedInstall = "y"; + + try + { + oam.getSystemConfig("DistributedInstall", DistributedInstall); + } + catch (...) + { + log.writeLog(__LINE__, "addModule - ERROR: get DistributedInstall", LOG_TYPE_ERROR); + } + int AddModuleCount = devicenetworklist.size(); DeviceNetworkList::iterator listPT = devicenetworklist.begin(); string moduleType = (*listPT).DeviceName.substr(0,MAX_MODULE_TYPE_SIZE); @@ -4510,8 +4537,6 @@ int ProcessManager::addModule(oam::DeviceNetworkList devicenetworklist, std::str return API_FAILURE; } - - //check if pkgs are located in /root directory string homedir = "/root"; if (!rootUser) { char* p= getenv("HOME"); @@ -4519,6 +4544,13 @@ int ProcessManager::addModule(oam::DeviceNetworkList devicenetworklist, std::str homedir = p; } + //clear out the known_host file, sometimes causes a failure on amazon during addModule + if ( amazon ) + { + string cmd = "sudo unlink " + homedir + ".ssh/know_hosts > /dev/null 2>&1"; + system(cmd.c_str()); + } + if ( packageType == "rpm") calpontPackage = homedir + "/mariadb-columnstore*" + systemsoftware.Version + "-" + systemsoftware.Release + "*.rpm.tar.gz"; else @@ -4527,55 +4559,59 @@ int ProcessManager::addModule(oam::DeviceNetworkList devicenetworklist, std::str else calpontPackage = homedir + "/mariadb-columnstore*" + systemsoftware.Version + "-" + systemsoftware.Release + "*.bin.tar.gz"; - string cmd = "ls " + calpontPackage + " > /dev/null 2>&1"; - int rtnCode = system(cmd.c_str()); - if (WEXITSTATUS(rtnCode) != 0) { - log.writeLog(__LINE__, "addModule - ERROR: Package not found: " + calpontPackage, LOG_TYPE_ERROR); - pthread_mutex_unlock(&THREAD_LOCK); - return API_FILE_OPEN_ERROR; - } - log.writeLog(__LINE__, "addModule - Calpont Package found:" + calpontPackage, LOG_TYPE_DEBUG); - - // - // Verify Host IP and Password - // - - if ( password == "ssh" && amazon ) - { // check if there is a root password stored - string rpw = oam::UnassignedName; - try - { - oam.getSystemConfig("rpw", rpw); - } - catch(...) - { - rpw = "mariadb1"; - } - - if (rpw != oam::UnassignedName) - password = rpw; - } - - listPT = devicenetworklist.begin(); - for( ; listPT != devicenetworklist.end() ; listPT++) + if ( DistributedInstall == "y" ) { - HostConfigList::iterator pt1 = (*listPT).hostConfigList.begin(); - string newHostName = (*pt1).HostName; - if ( newHostName == oam::UnassignedName ) - continue; + //check if pkgs are located in /root directory + string cmd = "ls " + calpontPackage + " > /dev/null 2>&1"; + int rtnCode = system(cmd.c_str()); + if (WEXITSTATUS(rtnCode) != 0) { + log.writeLog(__LINE__, "addModule - ERROR: Package not found: " + calpontPackage, LOG_TYPE_ERROR); + pthread_mutex_unlock(&THREAD_LOCK); + return API_FILE_OPEN_ERROR; + } + log.writeLog(__LINE__, "addModule - ColumnStore Package found:" + calpontPackage, LOG_TYPE_DEBUG); + + // + // Verify Host IP and Password + // - string newIPAddr = (*pt1).IPAddr; - string cmd = installDir + "/bin/remote_command.sh " + newIPAddr + " " + password + " ls"; - log.writeLog(__LINE__, cmd, LOG_TYPE_DEBUG); - int rtnCode = system(cmd.c_str()); - if (WEXITSTATUS(rtnCode) != 0) { - log.writeLog(__LINE__, "addModule - ERROR: Remote login test failed, Invalid IP / Password " + newIPAddr, LOG_TYPE_ERROR); - pthread_mutex_unlock(&THREAD_LOCK); - return API_FAILURE; - } - log.writeLog(__LINE__, "addModule - Remote login test successful: " + newIPAddr, LOG_TYPE_DEBUG); + if ( password == "ssh" && amazon ) + { // check if there is a root password stored + string rpw = oam::UnassignedName; + try + { + oam.getSystemConfig("rpw", rpw); + } + catch(...) + { + rpw = "mariadb1"; + } + + if (rpw != oam::UnassignedName) + password = rpw; + } + + listPT = devicenetworklist.begin(); + for( ; listPT != devicenetworklist.end() ; listPT++) + { + HostConfigList::iterator pt1 = (*listPT).hostConfigList.begin(); + string newHostName = (*pt1).HostName; + if ( newHostName == oam::UnassignedName ) + continue; + + string newIPAddr = (*pt1).IPAddr; + string cmd = installDir + "/bin/remote_command.sh " + newIPAddr + " " + password + " ls"; + log.writeLog(__LINE__, cmd, LOG_TYPE_DEBUG); + int rtnCode = system(cmd.c_str()); + if (WEXITSTATUS(rtnCode) != 0) { + log.writeLog(__LINE__, "addModule - ERROR: Remote login test failed, Invalid IP / Password " + newIPAddr, LOG_TYPE_ERROR); + pthread_mutex_unlock(&THREAD_LOCK); + return API_FAILURE; + } + log.writeLog(__LINE__, "addModule - Remote login test successful: " + newIPAddr, LOG_TYPE_DEBUG); + } } - + // //Get System Configuration file // @@ -4971,343 +5007,366 @@ int ProcessManager::addModule(oam::DeviceNetworkList devicenetworklist, std::str } } - //PMwithUM config - string PMwithUM = "n"; - try { - oam.getSystemConfig( "PMwithUM", PMwithUM); - } - catch(...) { - PMwithUM = "n"; - } - - //check mysql port changes - string MySQLPort; - try { - oam.getSystemConfig( "MySQLPort", MySQLPort); - } - catch(...) - {} - - if ( MySQLPort.empty() || MySQLPort == "" || MySQLPort == oam::UnassignedName ) - MySQLPort = "3306"; - - string version = systemsoftware.Version + "-" + systemsoftware.Release; - - //setup and push custom OS files - listPT = devicenetworklist.begin(); - for( ; listPT != devicenetworklist.end() ; listPT++) - { - string remoteModuleName = (*listPT).DeviceName; - string remoteModuleType = remoteModuleName.substr(0,MAX_MODULE_TYPE_SIZE); - HostConfigList::iterator pt1 = (*listPT).hostConfigList.begin(); - string remoteModuleIP = (*pt1).IPAddr; - string remoteHostName = (*pt1).HostName; - - //create and copy custom OS - //run remote installer script - string dir = installDir + "/local/etc/" + remoteModuleName; - - string cmd = "mkdir " + dir + " > /dev/null 2>&1"; - system(cmd.c_str()); - - if ( remoteModuleType == "um" ) { - cmd = "cp " + installDir + "/local/etc/um1/* " + dir + "/."; - system(cmd.c_str()); - } - else - { - if ( remoteModuleType == "pm") { - cmd = "cp " + installDir + "/local/etc/pm1/* " + dir + "/."; - system(cmd.c_str()); - } - } - log.writeLog(__LINE__, "addModule - created directory and custom OS files for " + remoteModuleName, LOG_TYPE_DEBUG); - - //create module file - if( !createModuleFile(remoteModuleName) ) { - log.writeLog(__LINE__, "addModule - ERROR: createModuleFile failed", LOG_TYPE_ERROR); - pthread_mutex_unlock(&THREAD_LOCK); - return API_FAILURE; - } - log.writeLog(__LINE__, "addModule - create module file for " + remoteModuleName, LOG_TYPE_DEBUG); - - if ( remoteModuleType == "pm" ) { - //setup Standby OAM Parent, if needed - if ( config.OAMStandbyName() == oam::UnassignedName ) - setStandbyModule(remoteModuleName, false); - } - - //set root password - if (amazon) { - cmd = startup::StartUp::installDir() + "/bin/remote_command.sh " + remoteModuleIP + " " + password + " '/root/.scripts/updatePassword.sh " + password + "' > /tmp/password_change.log"; - log.writeLog(__LINE__, "addModule - cmd: " + cmd, LOG_TYPE_DEBUG); - rtnCode = system(cmd.c_str()); - if (WEXITSTATUS(rtnCode) == 0) - log.writeLog(__LINE__, "addModule - update root password: " + remoteModuleName, LOG_TYPE_DEBUG); - else - log.writeLog(__LINE__, "addModule - ERROR: update root password: " + remoteModuleName, LOG_TYPE_DEBUG); - } - - //default - string binaryInstallDir = installDir; - - //run installer on remote module - if ( remoteModuleType == "um" || - ( remoteModuleType == "pm" && config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) || - ( remoteModuleType == "pm" && PMwithUM == "y" ) ) { - //run remote installer script - if ( packageType != "binary" ) { - log.writeLog(__LINE__, "addModule - user_installer run for " + remoteModuleName, LOG_TYPE_DEBUG); - - string cmd = installDir + "/bin/user_installer.sh " + remoteModuleName + " " + remoteModuleIP + " " + password + " " + version + " initial " + packageType + " --nodeps none " + MySQLPort + " 1 > /tmp/user_installer.log"; - - log.writeLog(__LINE__, "addModule cmd: " + cmd, LOG_TYPE_DEBUG); - - bool passed = false; - for ( int retry = 0 ; retry < 20 ; retry++ ) - { - rtnCode = system(cmd.c_str()); - if (WEXITSTATUS(rtnCode) != 0) { - // if log file size is zero, retry - ifstream in("/tmp/user_installer.log"); - in.seekg(0, std::ios::end); - int size = in.tellg(); - if ( size == 0 ) - { - log.writeLog(__LINE__, "addModule - ERROR: user_installer.sh failed, retry", LOG_TYPE_DEBUG); - sleep(5); - continue; - } - else - break; - } - else - { - passed = true; - break; - } - } - - if ( !passed ) - { - log.writeLog(__LINE__, "addModule - ERROR: user_installer.sh failed", LOG_TYPE_ERROR); - pthread_mutex_unlock(&THREAD_LOCK); - system("/bin/cp -f /tmp/user_installer.log /tmp/user_installer.log.failed"); - processManager.setModuleState(remoteModuleName, oam::FAILED); - return API_FAILURE; - } - } - else - { // do a binary package install - log.writeLog(__LINE__, "addModule - binary_installer run for " + remoteModuleName, LOG_TYPE_DEBUG); - - string binservertype = oam.itoa(config.ServerInstallType()); - if ( PMwithUM == "y" ) - binservertype = "pmwithum"; - string cmd = installDir + "/bin/binary_installer.sh " + remoteModuleName + " " + remoteModuleIP + " " + password + " " + calpontPackage + " " + remoteModuleType + " initial " + binservertype + " " + MySQLPort + " 1 " + binaryInstallDir + " > /tmp/binary_installer.log"; - - log.writeLog(__LINE__, "addModule - " + cmd, LOG_TYPE_DEBUG); - - bool passed = false; - for ( int retry = 0 ; retry < 20 ; retry++ ) - { - rtnCode = system(cmd.c_str()); - if (WEXITSTATUS(rtnCode) != 0) { - // if log file size is zero, retry - ifstream in("/tmp/binary_installer.log"); - in.seekg(0, std::ios::end); - int size = in.tellg(); - if ( size == 0 ) - { - log.writeLog(__LINE__, "addModule - ERROR: binary_installer.sh failed, retry", LOG_TYPE_DEBUG); - sleep(5); - continue; - } - else - break; - } - else - { - passed = true; - break; - } - } - - if ( !passed ) - { - log.writeLog(__LINE__, "addModule - ERROR: binary_installer.sh failed", LOG_TYPE_ERROR); - pthread_mutex_unlock(&THREAD_LOCK); - system("/bin/cp -f /tmp/binary_installer.log /tmp/binary_installer.log.failed"); - processManager.setModuleState(remoteModuleName, oam::FAILED); - return API_FAILURE; - } - } - } - else - { - if ( remoteModuleType == "pm" ) { - if ( packageType != "binary" ) { - log.writeLog(__LINE__, "addModule - performance_installer run for " + remoteModuleName, LOG_TYPE_DEBUG); - string cmd = installDir + "/bin/performance_installer.sh " + remoteModuleName + " " + remoteModuleIP + " " + password + " " + version + " initial " + packageType + + " --nodeps 1 > /tmp/performance_installer.log"; - log.writeLog(__LINE__, "addModule cmd: " + cmd, LOG_TYPE_DEBUG); - - rtnCode = system(cmd.c_str()); - - bool passed = false; - for ( int retry = 0 ; retry < 20 ; retry++ ) - { - rtnCode = system(cmd.c_str()); - if (WEXITSTATUS(rtnCode) != 0) { - // if log file size is zero, retry - ifstream in("/tmp/performance_installer.log"); - in.seekg(0, std::ios::end); - int size = in.tellg(); - if ( size == 0 ) - { - log.writeLog(__LINE__, "addModule - ERROR: performance_installer.sh failed, retry", LOG_TYPE_DEBUG); - sleep(5); - continue; - } - else - break; - } - else - { - passed = true; - break; - } - } - - if ( !passed ) - { - log.writeLog(__LINE__, "addModule - ERROR: performance_installer.sh failed", LOG_TYPE_ERROR); - pthread_mutex_unlock(&THREAD_LOCK); - system("/bin/cp -f /tmp/performance_installer.log /tmp/performance_installer.log.failed"); - processManager.setModuleState(remoteModuleName, oam::FAILED); - return API_FAILURE; - } - } - else - { // do a binary package install - log.writeLog(__LINE__, "addModule - binary_installer run for " + remoteModuleName, LOG_TYPE_DEBUG); - - string binservertype = oam.itoa(config.ServerInstallType()); - if ( PMwithUM == "y" ) - binservertype = "pmwithum"; - - string cmd = installDir + "/bin/binary_installer.sh " + remoteModuleName + " " + remoteModuleIP + " " + password + " " + calpontPackage + " " + remoteModuleType + " initial " + binservertype + " " + MySQLPort + " 1 " + binaryInstallDir + " > /tmp/binary_installer.log"; - - log.writeLog(__LINE__, "addModule - " + cmd, LOG_TYPE_DEBUG); - - bool passed = false; - for ( int retry = 0 ; retry < 20 ; retry++ ) - { - rtnCode = system(cmd.c_str()); - if (WEXITSTATUS(rtnCode) != 0) { - // if log file size is zero, retry - ifstream in("/tmp/binary_installer.log"); - in.seekg(0, std::ios::end); - int size = in.tellg(); - if ( size == 0 ) - { - log.writeLog(__LINE__, "addModule - ERROR: binary_installer.sh failed, retry", LOG_TYPE_DEBUG); - sleep(5); - continue; - } - else - break; - } - else - { - passed = true; - break; - } - } - - if ( !passed ) - { - log.writeLog(__LINE__, "addModule - ERROR: binary_installer.sh failed", LOG_TYPE_ERROR); - pthread_mutex_unlock(&THREAD_LOCK); - system("/bin/cp -f /tmp/binary_installer.log /tmp/binary_installer.log.failed"); - processManager.setModuleState(remoteModuleName, oam::FAILED); - return API_FAILURE; - } - } - } - } - } - - //Start new modules by starting up local Process-Monitor - listPT = devicenetworklist.begin(); - for( ; listPT != devicenetworklist.end() ; listPT++) - { - string remoteModuleName = (*listPT).DeviceName; - - if (manualFlag) - //set new module to disable state if manual add - disableModule(remoteModuleName, true); - - HostConfigList::iterator pt1 = (*listPT).hostConfigList.begin(); - string remoteModuleIP = (*pt1).IPAddr; - string remoteHostName = (*pt1).HostName; - - //send start service commands - string cmd = installDir + "/bin/remote_command.sh " + remoteModuleIP + " " + password + " '" + installDir + "/bin/columnstore restart;" + installDir + "/mysql/mysqld-Calpont restart' 0"; - system(cmd.c_str()); - log.writeLog(__LINE__, "addModule - restart columnstore service " + remoteModuleName, LOG_TYPE_DEBUG); - - // add to monitor list - moduleInfoList.insert(moduleList::value_type(remoteModuleName, 0)); - if (amazon) { - //check and assign Elastic IP Address - int AmazonElasticIPCount = 0; - try{ - oam.getSystemConfig("AmazonElasticIPCount", AmazonElasticIPCount); - } - catch(...) { - AmazonElasticIPCount = 0; - } - - for ( int id = 1 ; id < AmazonElasticIPCount+1 ; id++ ) - { - string AmazonElasticModule = "AmazonElasticModule" + oam.itoa(id); - string ELmoduleName; - try{ - oam.getSystemConfig(AmazonElasticModule, ELmoduleName); - } - catch(...) {} - - if ( ELmoduleName == remoteModuleName ) - { //match found assign Elastic IP Address - string AmazonElasticIPAddr = "AmazonElasticIPAddr" + oam.itoa(id); - string ELIPaddress; - try{ - oam.getSystemConfig(AmazonElasticIPAddr, ELIPaddress); - } - catch(...) {} - - try{ - oam.assignElasticIP(remoteHostName, ELIPaddress); - log.writeLog(__LINE__, "addModule - Set Elastic IP Address: " + remoteModuleName + "/" + ELIPaddress, LOG_TYPE_DEBUG); - } - catch(...) { - log.writeLog(__LINE__, "addModule - Failed to Set Elastic IP Address: " + remoteModuleName + "/" + ELIPaddress, LOG_TYPE_ERROR); - } - break; - } - } - } - } - - //if amazon, delay to give time for ProcMon to start - if (amazon) { - log.writeLog(__LINE__, "addModule - sleep 30 - give ProcMon time to start on new Instance", LOG_TYPE_DEBUG); - sleep(30); - } - //distribute config file distributeConfigFile("system"); + if ( DistributedInstall == "y" ) { + + //PMwithUM config + string PMwithUM = "n"; + try { + oam.getSystemConfig( "PMwithUM", PMwithUM); + } + catch(...) { + PMwithUM = "n"; + } + + string version = systemsoftware.Version + "-" + systemsoftware.Release; + + string AmazonInstall = "0"; + if ( amazon ) + AmazonInstall = "1"; + + //setup and push custom OS files + listPT = devicenetworklist.begin(); + for( ; listPT != devicenetworklist.end() ; listPT++) + { + string remoteModuleName = (*listPT).DeviceName; + string remoteModuleType = remoteModuleName.substr(0,MAX_MODULE_TYPE_SIZE); + HostConfigList::iterator pt1 = (*listPT).hostConfigList.begin(); + string remoteModuleIP = (*pt1).IPAddr; + string remoteHostName = (*pt1).HostName; + + //create and copy custom OS + //run remote installer script + string dir = installDir + "/local/etc/" + remoteModuleName; + + string cmd = "mkdir " + dir + " > /dev/null 2>&1"; + system(cmd.c_str()); + + if ( remoteModuleType == "um" ) { + cmd = "cp " + installDir + "/local/etc/um1/* " + dir + "/."; + system(cmd.c_str()); + } + else + { + if ( remoteModuleType == "pm") { + cmd = "cp " + installDir + "/local/etc/pm1/* " + dir + "/."; + system(cmd.c_str()); + } + } + log.writeLog(__LINE__, "addModule - created directory and custom OS files for " + remoteModuleName, LOG_TYPE_DEBUG); + + //create module file + if( !createModuleFile(remoteModuleName) ) { + log.writeLog(__LINE__, "addModule - ERROR: createModuleFile failed", LOG_TYPE_ERROR); + pthread_mutex_unlock(&THREAD_LOCK); + return API_FAILURE; + } + log.writeLog(__LINE__, "addModule - create module file for " + remoteModuleName, LOG_TYPE_DEBUG); + + if ( remoteModuleType == "pm" ) { + //setup Standby OAM Parent, if needed + if ( config.OAMStandbyName() == oam::UnassignedName ) + setStandbyModule(remoteModuleName, false); + } + + //set root password + if (amazon) { + cmd = startup::StartUp::installDir() + "/bin/remote_command.sh " + remoteModuleIP + " " + password + " '/root/.scripts/updatePassword.sh " + password + "' > /tmp/password_change.log"; + log.writeLog(__LINE__, "addModule - cmd: " + cmd, LOG_TYPE_DEBUG); + int rtnCode = system(cmd.c_str()); + if (WEXITSTATUS(rtnCode) == 0) + log.writeLog(__LINE__, "addModule - update root password: " + remoteModuleName, LOG_TYPE_DEBUG); + else + log.writeLog(__LINE__, "addModule - ERROR: update root password: " + remoteModuleName, LOG_TYPE_DEBUG); + } + + //default + string binaryInstallDir = installDir; + + //run installer on remote module + if ( remoteModuleType == "um" || + ( remoteModuleType == "pm" && config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) || + ( remoteModuleType == "pm" && PMwithUM == "y" ) ) { + //run remote installer script + if ( packageType != "binary" ) { + string logFile = "/tmp/" + remoteModuleName + "_user_installer.log"; + log.writeLog(__LINE__, "addModule - user_installer run for " + remoteModuleName, LOG_TYPE_DEBUG); + + string cmd = installDir + "/bin/user_installer.sh " + remoteModuleName + " " + remoteModuleIP + " " + password + " " + version + " initial " + AmazonInstall + " " + packageType + " --nodeps none 1 > " + logFile; + + log.writeLog(__LINE__, "addModule cmd: " + cmd, LOG_TYPE_DEBUG); + + bool passed = false; + for ( int retry = 0 ; retry < 20 ; retry++ ) + { + int rtnCode = system(cmd.c_str()); + if (WEXITSTATUS(rtnCode) != 0) { + // if log file size is zero, retry + ifstream in(logFile.c_str()); + in.seekg(0, std::ios::end); + int size = in.tellg(); + if ( size == 0 ) + { + log.writeLog(__LINE__, "addModule - ERROR: " + logFile + " failed, retry", LOG_TYPE_DEBUG); + sleep(5); + continue; + } + else + break; + } + else + { + passed = true; + break; + } + } + + if ( !passed ) + { + log.writeLog(__LINE__, "addModule - ERROR: " + logFile + " failed", LOG_TYPE_ERROR); + pthread_mutex_unlock(&THREAD_LOCK); + cmd = "/bin/cp -f " + logFile + " " + logFile + "failed"; + system(cmd.c_str()); + processManager.setModuleState(remoteModuleName, oam::FAILED); + return API_FAILURE; + } + } + else + { // do a binary package install + string logFile = "/tmp/" + remoteModuleName + "_binary_installer.log"; + log.writeLog(__LINE__, "addModule - binary_installer run for " + remoteModuleName, LOG_TYPE_DEBUG); + + string binservertype = oam.itoa(config.ServerInstallType()); + if ( PMwithUM == "y" ) + binservertype = "pmwithum"; + string cmd = installDir + "/bin/binary_installer.sh " + remoteModuleName + " " + remoteModuleIP + " " + password + " " + calpontPackage + " initial " + AmazonInstall + " 1 " + binaryInstallDir + " > " + logFile; + + log.writeLog(__LINE__, "addModule - " + cmd, LOG_TYPE_DEBUG); + + bool passed = false; + for ( int retry = 0 ; retry < 20 ; retry++ ) + { + int rtnCode = system(cmd.c_str()); + if (WEXITSTATUS(rtnCode) != 0) { + // if log file size is zero, retry + ifstream in(logFile.c_str()); + in.seekg(0, std::ios::end); + int size = in.tellg(); + if ( size == 0 ) + { + log.writeLog(__LINE__, "addModule - ERROR: " + logFile + " failed, retry", LOG_TYPE_DEBUG); + sleep(5); + continue; + } + else + break; + } + else + { + passed = true; + break; + } + } + + if ( !passed ) + { + log.writeLog(__LINE__, "addModule - ERROR: " + logFile + " failed, retry", LOG_TYPE_DEBUG); + pthread_mutex_unlock(&THREAD_LOCK); + cmd = "/bin/cp -f " + logFile + " " + logFile + "failed"; + system(cmd.c_str()); + processManager.setModuleState(remoteModuleName, oam::FAILED); + return API_FAILURE; + } + } + } + else + { + if ( remoteModuleType == "pm" ) { + if ( packageType != "binary" ) { + string logFile = "/tmp/" + remoteModuleName + "_performance_installer.log"; + log.writeLog(__LINE__, "addModule - performance_installer run for " + remoteModuleName, LOG_TYPE_DEBUG); + string cmd = installDir + "/bin/performance_installer.sh " + remoteModuleName + " " + remoteModuleIP + " " + password + " " + version + " initial " + AmazonInstall + " " + packageType + + " --nodeps 1 > " + logFile; + log.writeLog(__LINE__, "addModule cmd: " + cmd, LOG_TYPE_DEBUG); + + system(cmd.c_str()); + + bool passed = false; + for ( int retry = 0 ; retry < 20 ; retry++ ) + { + int rtnCode = system(cmd.c_str()); + if (WEXITSTATUS(rtnCode) != 0) { + // if log file size is zero, retry + ifstream in(logFile.c_str()); + in.seekg(0, std::ios::end); + int size = in.tellg(); + if ( size == 0 ) + { + log.writeLog(__LINE__, "addModule - ERROR: " + logFile + " failed, retry", LOG_TYPE_DEBUG); + sleep(5); + continue; + } + else + break; + } + else + { + passed = true; + break; + } + } + + if ( !passed ) + { + log.writeLog(__LINE__, "addModule - ERROR: " + logFile + " failed, retry", LOG_TYPE_DEBUG); + pthread_mutex_unlock(&THREAD_LOCK); + cmd = "/bin/cp -f " + logFile + " " + logFile + "failed"; + system(cmd.c_str()); + processManager.setModuleState(remoteModuleName, oam::FAILED); + return API_FAILURE; + } + } + else + { // do a binary package install + string logFile = "/tmp/" + remoteModuleName + "_binary_installer.log"; + log.writeLog(__LINE__, "addModule - binary_installer run for " + remoteModuleName, LOG_TYPE_DEBUG); + + string binservertype = oam.itoa(config.ServerInstallType()); + if ( PMwithUM == "y" ) + binservertype = "pmwithum"; + + string cmd = installDir + "/bin/binary_installer.sh " + remoteModuleName + " " + remoteModuleIP + " " + password + " " + calpontPackage + " initial " + AmazonInstall + " 1 " + binaryInstallDir + " > " + logFile; + + log.writeLog(__LINE__, "addModule - " + cmd, LOG_TYPE_DEBUG); + + bool passed = false; + for ( int retry = 0 ; retry < 20 ; retry++ ) + { + int rtnCode = system(cmd.c_str()); + if (WEXITSTATUS(rtnCode) != 0) { + // if log file size is zero, retry + ifstream in(logFile.c_str()); + in.seekg(0, std::ios::end); + int size = in.tellg(); + if ( size == 0 ) + { + log.writeLog(__LINE__, "addModule - ERROR: " + logFile + " failed, retry", LOG_TYPE_DEBUG); + sleep(5); + continue; + } + else + break; + } + else + { + passed = true; + break; + } + } + + if ( !passed ) + { + log.writeLog(__LINE__, "addModule - ERROR: " + logFile + " failed, retry", LOG_TYPE_DEBUG); + pthread_mutex_unlock(&THREAD_LOCK); + cmd = "/bin/cp -f " + logFile + " " + logFile + "failed"; + system(cmd.c_str()); + processManager.setModuleState(remoteModuleName, oam::FAILED); + return API_FAILURE; + } + } + } + } + } + + //distribute config file + distributeConfigFile("system"); + + //Start new modules by starting up local Process-Monitor + listPT = devicenetworklist.begin(); + for( ; listPT != devicenetworklist.end() ; listPT++) + { + string remoteModuleName = (*listPT).DeviceName; + + if (manualFlag) + //set new module to disable state if manual add + disableModule(remoteModuleName, true); + + HostConfigList::iterator pt1 = (*listPT).hostConfigList.begin(); + string remoteModuleIP = (*pt1).IPAddr; + string remoteHostName = (*pt1).HostName; + + + // add to monitor list + moduleInfoList.insert(moduleList::value_type(remoteModuleName, 0)); + if (amazon) { + //check and assign Elastic IP Address + int AmazonElasticIPCount = 0; + try{ + oam.getSystemConfig("AmazonElasticIPCount", AmazonElasticIPCount); + } + catch(...) { + AmazonElasticIPCount = 0; + } + + for ( int id = 1 ; id < AmazonElasticIPCount+1 ; id++ ) + { + string AmazonElasticModule = "AmazonElasticModule" + oam.itoa(id); + string ELmoduleName; + try{ + oam.getSystemConfig(AmazonElasticModule, ELmoduleName); + } + catch(...) {} + + if ( ELmoduleName == remoteModuleName ) + { //match found assign Elastic IP Address + string AmazonElasticIPAddr = "AmazonElasticIPAddr" + oam.itoa(id); + string ELIPaddress; + try{ + oam.getSystemConfig(AmazonElasticIPAddr, ELIPaddress); + } + catch(...) {} + + try{ + oam.assignElasticIP(remoteHostName, ELIPaddress); + log.writeLog(__LINE__, "addModule - Set Elastic IP Address: " + remoteModuleName + "/" + ELIPaddress, LOG_TYPE_DEBUG); + } + catch(...) { + log.writeLog(__LINE__, "addModule - Failed to Set Elastic IP Address: " + remoteModuleName + "/" + ELIPaddress, LOG_TYPE_ERROR); + } + break; + } + } + } + } + + listPT = devicenetworklist.begin(); + for( ; listPT != devicenetworklist.end() ; listPT++) + { + string moduleName = (*listPT).DeviceName; + + processManager.configureModule(moduleName); + sleep(10); + } + + //if amazon, delay to give time for ProcMon to start +// if (amazon) { +// log.writeLog(__LINE__, "addModule - sleep 30 - give ProcMon time to start on new Instance", LOG_TYPE_DEBUG); +// sleep(30); +// } + } + else + { + listPT = devicenetworklist.begin(); + for( ; listPT != devicenetworklist.end() ; listPT++) + { + string moduleName = (*listPT).DeviceName; + + processManager.configureModule(moduleName); + sleep(10); + } + } + log.writeLog(__LINE__, "Setup MySQL Replication for new Modules being Added", LOG_TYPE_DEBUG); processManager.setMySQLReplication(devicenetworklist, oam::UnassignedName, false, true, password ); @@ -5607,6 +5666,20 @@ int ProcessManager::removeModule(oam::DeviceNetworkList devicenetworklist, bool return API_FAILURE; } + //clear out the known_host file, sometimes causes a failure on amazon during addModule + if ( amazon ) + { + string homedir = "/root"; + if (!rootUser) { + char* p= getenv("HOME"); + if (p && *p) + homedir = p; + } + + string cmd = "sudo unlink " + homedir + ".ssh/know_hosts > /dev/null 2>&1"; + system(cmd.c_str()); + } + pthread_mutex_unlock(&THREAD_LOCK); //check if any removed modules was Standby OAM or Active OAM @@ -6010,6 +6083,40 @@ int ProcessManager::reconfigureModule(oam::DeviceNetworkList devicenetworklist) return API_SUCCESS; } +/****************************************************************************************** +* @brief configureModule +* +* purpose: Configure Module sends message to procmon to setup modulename +* +******************************************************************************************/ +int ProcessManager::configureModule(std::string moduleName) +{ + //distribute config file + distributeConfigFile(moduleName); + + // + //Send Configure msg to Module's Process-Monitor being reconfigured + // + ByteStream msg; + ByteStream::byte requestID = CONFIGURE; + + msg << requestID; + msg << moduleName; + + int returnStatus = sendMsgProcMon( moduleName, msg, requestID ); + + if ( returnStatus == API_SUCCESS) + //log the event + log.writeLog(__LINE__, "configureModule - procmon configure successful", LOG_TYPE_DEBUG); + else + { + log.writeLog(__LINE__, "configureModule - procmon configure failed", LOG_TYPE_ERROR); + return API_FAILURE; + } + + return API_SUCCESS; +} + /****************************************************************************************** * @brief sendMsgProcMon