/* Copyright (C) 2014 InfiniDB, Inc. * Copyright (C) 2016 MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /******************************************************************************* * $Id$ * *******************************************************************************/ /* * we_sdhandler.cpp * * Created on: Oct 17, 2011 * Author: bpaul */ #include #include #include using namespace std; #include "we_messages.h" #include "resourcemanager.h" #include #include #include #include using namespace boost; #include "mcsconfig.h" #include "configcpp.h" using namespace config; //----- #include "messagequeue.h" #include "bytestream.h" using namespace messageqcpp; #include "calpontsystemcatalog.h" using namespace execplan; #include "batchloader.h" using namespace batchloader; //----- #include "we_sdhandler.h" #include "we_splitterapp.h" #include "we_respreadthread.h" #include "we_filereadthread.h" #include "we_brmupdater.h" #include "we_tablelockgrabber.h" #include "we_simplesyslog.h" #include "installdir.h" namespace WriteEngine { //------------------------------------------------------------------------------ // /** @brief Add PM to the list * @param - PmId */ void WEPmList::addPm2List(int PmId) { boost::mutex::scoped_lock aLock(fListMutex); fPmList.push_back(PmId); aLock.unlock(); } //------------------------------------------------------------------------------ void WEPmList::addPriorityPm2List(int PmId) { boost::mutex::scoped_lock aLock(fListMutex); fPmList.push_front(PmId); aLock.unlock(); } //------------------------------------------------------------------------------ int WEPmList::getNextPm() { boost::mutex::scoped_lock aLock(fListMutex); int aPmId = 0; if (!fPmList.empty()) { aPmId = fPmList.front(); fPmList.pop_front(); } return aPmId; } //------------------------------------------------------------------------------ // incase a shutdown or roll back is called we need to clear the data // so that the sendingthreads will not keep sending data. void WEPmList::clearPmList() { boost::mutex::scoped_lock aLock(fListMutex); if (!fPmList.empty()) fPmList.clear(); aLock.unlock(); } //------------------------------------------------------------------------------ bool WEPmList::check4Pm(int PmId) { boost::mutex::scoped_lock aLock(fListMutex); WePmList::iterator aIt = fPmList.begin(); bool aFound = false; while (aIt != fPmList.end()) { if ((*aIt) == PmId) { aFound = true; fPmList.erase(aIt); break; } ++aIt; } return aFound; } //------------------------------------------------------------------------------ // WESDHandler Definitions //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ WESDHandler::WESDHandler(WESplitterApp& Ref) : fRef(Ref) , fLog() , fQId(101) , // 101 - took it from air fOam() , fModuleTypeConfig() , fDebugLvl(0) , fPmCount(0) , fTableLock(0) , fTableOId(0) , fFixedBinaryRecLen(0) , fRespMutex() , fRespCond() , fSendMutex() , fRespList() , fpRespThread(0) , fDataFeedList() , fFileReadThread(*this) , fDisconnectFailure(false) , fForcedFailure(false) , fAllCpiStarted(false) , fFirstDataSent(false) , fFirstPmToSend(0) , fSelectOtherPm(false) , fContinue(true) , fWeSplClients(MAX_PMS) , fBrmRptVec() , fpBatchLoader(0) { fRm = joblist::ResourceManager::instance(); } //------------------------------------------------------------------------------ WESDHandler::~WESDHandler() { try { for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { delete fWeSplClients[aCnt]; fWeSplClients[aCnt] = 0; } } delete fpRespThread; fpRespThread = 0; delete fpBatchLoader; fpBatchLoader = 0; } catch (...) { std::string aStr = "Handled an error in ~WESDHandler"; logging::Message::Args errMsgArgs; errMsgArgs.add(aStr); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); cout << aStr << endl; } } //------------------------------------------------------------------------------ void WESDHandler::reset() { fTableLock = 0; fTableOId = 0; fFixedBinaryRecLen = 0; // fpRespThread = 0; // fFileReadThread(*this); fForcedFailure = false; fAllCpiStarted = false; fFirstDataSent = false; fFirstPmToSend = 0; fSelectOtherPm = false; fContinue = true; // fWeSplClients(); // fpBatchLoader = 0; fImportRslt.reset(); fLog.closeLog(); try { for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { delete fWeSplClients[aCnt]; fWeSplClients[aCnt] = 0; } } delete fpRespThread; fpRespThread = 0; delete fpBatchLoader; fpBatchLoader = 0; } catch (...) { std::string aStr = "Handled an error in ~WESDHandler"; logging::Message::Args errMsgArgs; errMsgArgs.add(aStr); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); cout << aStr << endl; } } //------------------------------------------------------------------------------ // BP 10/24/2011 14:22 //------------------------------------------------------------------------------ void WESDHandler::send2Pm(ByteStream& Bs, unsigned int PmId) { // mutex::scoped_lock aLock(fSendMutex); if (PmId == 0) // send it to everyone { for (int aIdx = 1; aIdx <= getPmCount(); ++aIdx) { if (fWeSplClients[aIdx] != 0) { boost::mutex::scoped_lock aLock(fWeSplClients[aIdx]->fWriteMutex); fWeSplClients[aIdx]->write(Bs); aLock.unlock(); } } } else { boost::mutex::scoped_lock aLock(fWeSplClients[PmId]->fWriteMutex); fWeSplClients[PmId]->write(Bs); aLock.unlock(); } // aLock.unlock(); } //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ void WESDHandler::send2Pm(messageqcpp::SBS& Sbs, unsigned int PmId) { // mutex::scoped_lock aLock(fSendMutex); if (PmId == 0) // send it to everyone { for (int aIdx = 1; aIdx <= getPmCount(); ++aIdx) { if (fWeSplClients[aIdx] != 0) { boost::mutex::scoped_lock aLock(fWeSplClients[aIdx]->fSentQMutex); fWeSplClients[aIdx]->add2SendQueue(Sbs); aLock.unlock(); } } } else { boost::mutex::scoped_lock aLock(fWeSplClients[PmId]->fSentQMutex); fWeSplClients[PmId]->add2SendQueue(Sbs); aLock.unlock(); } // aLock.unlock(); } //------------------------------------------------------------------------------ // void WESDHandler::sendEODMsg() { // BUG 5035 Sending multiple EOD so that to avoid 'silly window syndrome' for (int idx = 0; idx < 3; idx++) { messageqcpp::SBS aSbs(new messageqcpp::ByteStream); *aSbs << (ByteStream::byte)WE_CLT_SRV_EOD; send2Pm(aSbs); } { std::stringstream aStrStr; aStrStr << "Send EOD message to All PMs"; logging::Message::Args errMsgArgs; errMsgArgs.add(aStrStr.str()); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); } } //------------------------------------------------------------------------------ // check for the messages from all WE servers //------------------------------------------------------------------------------ void WESDHandler::checkForRespMsgs() { ByteStream aBs; ByteStream::byte aPmId; ByteStream::byte aMsgId; // boost::shared_ptr aSbs; messageqcpp::SBS aSbs; while (isContinue()) { boost::mutex::scoped_lock aLock(fRespMutex); // NOTE - if isContinue is not checked thread will hang on shutdown // by locking again on fRespList.empty() while ((fRespList.empty()) && (isContinue())) fRespCond.wait(aLock); // if(!isContinue()) { aLock.unlock(); break; } testing for rare hanging // cout <<"wait signaled checkForRespMsgs" << endl; while (!fRespList.empty()) { // mutex::scoped_lock aLock (fRespMutex); aSbs = fRespList.front(); fRespList.pop_front(); // aLock.unlock(); *aSbs >> aMsgId; *aSbs >> aPmId; // Debugging // cout << "aMsgid = " << static_cast(aMsgId) << endl; switch (aMsgId) { case WE_CLT_SRV_KEEPALIVE: onKeepAliveMessage(static_cast(aPmId)); break; case WE_CLT_SRV_ACK: onAckResponse(static_cast(aPmId)); break; case WE_CLT_SRV_DATARQST: onDataRqstResponse(static_cast(aPmId)); break; case WE_CLT_SRV_EOD: onEodResponse(static_cast(aPmId)); break; case WE_CLT_SRV_STARTCPI: onStartCpiResponse(static_cast(aPmId)); break; case WE_CLT_SRV_CPIPASS: onCpimportPass(static_cast(aPmId)); break; case WE_CLT_SRV_CPIFAIL: onCpimportFail(static_cast(aPmId)); break; case WE_CLT_SRV_BRMRPT: onBrmReport(static_cast(aPmId), aSbs); break; case WE_CLT_SRV_ROLLBACK: onRollbackResult(static_cast(aPmId), aSbs); break; case WE_CLT_SRV_CLEANUP: onCleanupResult(static_cast(aPmId), aSbs); break; case WE_CLT_SRV_DBRCNT: onDBRootCount(static_cast(aPmId), aSbs); break; case WE_CLT_SRV_ERRLOG: onErrorFile(static_cast(aPmId), aSbs); break; case WE_CLT_SRV_BADLOG: onBadFile(static_cast(aPmId), aSbs); break; case WE_CLT_SRV_IMPFILEERROR: onImpFileError(static_cast(aPmId)); break; default: break; } // switch aSbs.reset(); } // while not empty() aLock.unlock(); // yield here so that other threads get slice } // while } //------------------------------------------------------------------------------ void WESDHandler::add2RespQueue(const messageqcpp::SBS& Sbs) { boost::mutex::scoped_lock aLock(fRespMutex); fRespList.push_back(Sbs); aLock.unlock(); // cout <<"Notifing from add2RespQueue" << endl; fRespCond.notify_one(); } //------------------------------------------------------------------------------ void WESDHandler::setup() { std::stringstream aPid; bool bRollback; bool bForce; aPid << getpid(); std::string aTimeStamp = getTime2Str(); std::string aLogName; std::string aErrLogName; if (fRef.fCmdArgs.isJobLogOnly()) aLogName = std::string(MCSLOGDIR) + "/cpimport/" + "cpimport_Job_" + fRef.fCmdArgs.getJobId() + ".log"; else aLogName = std::string(MCSLOGDIR) + "/cpimport/" + "cpimport_" + aTimeStamp + "_" + aPid.str() + ".log"; if (getDebugLvl() > 1) cout << "LogName : " << aLogName << endl; if (fRef.fCmdArgs.isJobLogOnly()) aErrLogName = std::string(MCSLOGDIR) + "/cpimport/" + "cpimport_Job_" + fRef.fCmdArgs.getJobId() + ".err"; else aErrLogName = std::string(MCSLOGDIR) + "/cpimport/" + "cpimport_" + aTimeStamp + "_" + aPid.str() + ".err"; if (getDebugLvl() > 1) cout << "ErrLogName : " << aErrLogName << endl; // consoleFlag false will only output only MSGLOG_LVL1 to console // and MSGLOG_LVL2 to log file without writing to console. // fLog.setLogFileName(aLogName.c_str(), aErrLogName.c_str(), false); fLog.setLogFileName(aLogName.c_str(), aErrLogName.c_str(), getConsoleLog()); // In mode 0 and Mode 1, we need to construct the input file list and check availability if (0 == fRef.fCmdArgs.getMode() || 1 == fRef.fCmdArgs.getMode()) setInputFileList(fRef.getLocFile()); fImportRslt.startTimer(); if (fPmCount == 0) // Should have already set in cpimport invoke check { throw(runtime_error("Configuration Error. PM Count = 0")); } if (fRef.fCmdArgs.getPmVecSize() == 0) // No Pms listed in Cmd line { // BUG 4668 - Added this code to find the PM's realtime from OAM // fOam.getSystemConfig("pm", fModuleTypeConfig); //commented out since- // - we are calling that function in check4CpiInvokeMode() oam::DeviceNetworkList::iterator pt = fModuleTypeConfig.ModuleNetworkList.begin(); for (; pt != fModuleTypeConfig.ModuleNetworkList.end(); pt++) { int moduleID = atoi((*pt).DeviceName.substr(oam::MAX_MODULE_TYPE_SIZE, oam::MAX_MODULE_ID_SIZE).c_str()); if (getDebugLvl() > 1) cout << "Adding PmId - " << moduleID << endl; fRef.fCmdArgs.add2PmVec(moduleID); } /* for (int PmId = 1; ((PmId <= fPmCount) && (PmId < MAX_PMS)); ++PmId) { if(getDebugLvl()>1) cout<<"Adding PmId - "<& aVec = fRef.fCmdArgs.getPmVec(); for (unsigned int PmId = 1; (PmId <= static_cast(fPmCount)); ++PmId) { int opState = oam::ACTIVE; bool aDegraded = false; ostringstream aOss; aOss << "pm" << PmId; std::string aModName = aOss.str(); if (getDebugLvl()) cout << "getModuleStatus() ModName = " << aModName << endl; try { fOam.getModuleStatus(aModName, opState, aDegraded); if (getDebugLvl()) cout << "ModName = " << aModName << " opState = " << opState << endl; } catch (std::exception& ex) {} if (opState != oam::ACTIVE ) // BUG 4668 { aVec.erase(std::remove(aVec.begin(), aVec.end(), PmId), aVec.end()); } } } */ int rtn = fDbrm.getSystemReady(); if (rtn < 1) { ostringstream oss; oss << "System is not ready (" << rtn << "). Verify that Columnstore is up and ready "; // fLog.logMsg( oss.str(), MSGLVL_ERROR ); setContinue(false); throw runtime_error(oss.str()); } else if (BRM::ERR_OK != fDbrm.isReadWrite()) { ostringstream oss; oss << "Error: System in ReadOnly state."; // fLog.logMsg(oss.str(), MSGLVL_ERROR); setContinue(false); throw runtime_error(oss.str()); } else if (fDbrm.getSystemShutdownPending(bRollback, bForce) > 0) { ostringstream oss; oss << "System is being shutdown. Can't start a new import"; setContinue(false); throw runtime_error(oss.str()); } else if (fDbrm.getSystemSuspendPending(bRollback) > 0 || fDbrm.getSystemSuspended()) { ostringstream oss; oss << "System is in write disabled state. Can't start a new import"; setContinue(false); throw runtime_error(oss.str()); } if ((fRef.fCmdArgs.getMode() == 1) || (fRef.fCmdArgs.getMode() == 2)) { fTableOId = 0; fFixedBinaryRecLen = 0; try { int32_t tblOid = getTableOID(fRef.fCmdArgs.getSchemaName(), fRef.fCmdArgs.getTableName()); fTableOId = tblOid; if (getDebugLvl()) cout << "Table OID = " << fTableOId << endl; if (fRef.fCmdArgs.getImportDataMode() != IMPORT_DATA_TEXT) { fFixedBinaryRecLen = calcTableRecLen(fRef.fCmdArgs.getSchemaName(), fRef.fCmdArgs.getTableName()); } } catch (std::exception& ex) { std::string aDetails = fRef.fCmdArgs.getSchemaName() + "." + fRef.fCmdArgs.getTableName() + " ERROR : "; std::string aStr = aDetails + ex.what(); logging::Message::Args errMsgArgs; // BUG 4152 errMsgArgs.add(fRef.fCmdArgs.getSchemaName()); errMsgArgs.add(fRef.fCmdArgs.getTableName()); errMsgArgs.add(ex.what()); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0097); fLog.logMsg(aStr, MSGLVL_ERROR); setContinue(false); if (fTableOId == 0) // error getting table OID throw runtime_error("Please make sure both schema and table exists!!"); else // error getting fixed binary record length throw runtime_error(ex.what()); } int aWaitIntvl = 10; // In seconds try { string aWaitPeriod = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod"); if (!aWaitPeriod.empty()) aWaitIntvl = atoi(aWaitPeriod.c_str()); if (getDebugLvl()) cout << "aWaitPeriod = " << aWaitPeriod << endl; } catch (std::exception&) { aWaitIntvl = 10; } std::vector aPmVec = fRef.fCmdArgs.getPmVec(); WETableLockGrabber aTLG(*this); string errMsg; for (int aIdx = 0; aIdx < (aWaitIntvl * 10); aIdx++) { try { fTableLock = aTLG.grabTableLock(aPmVec, fTableOId); if (getDebugLvl() > 1) cout << "Table Lock = " << fTableLock << endl; } catch (std::exception& ex) { errMsg = ex.what(); } if (fTableLock != 0) break; usleep(100000); } if (fTableLock == 0) { ostringstream oss; oss << "Failed to acquire Table Lock of "; oss << fRef.fCmdArgs.getSchemaName() << "."; oss << fRef.fCmdArgs.getTableName() << "; " << errMsg; // fLog.logMsg( oss.str(), MSGLVL_ERROR ); setContinue(false); throw runtime_error(oss.str()); } if (0 != fTableOId) { try { if (getDebugLvl() > 1) { for (unsigned int idx = 0; idx < aPmVec.size(); idx++) cout << "PmId = " << aPmVec[idx] << std::endl; } fpBatchLoader = new BatchLoader(fTableOId, 0, aPmVec); // int aRet=fpBatchLoader->selectFirstPM(fFirstPmToSend, fSelectOtherPm); // if (aRet != 0) throw runtime_error("BatchLoader error.. exiting"); } catch (std::exception& ex) { releaseTableLocks(); ostringstream oss; oss << ex.what() << " ... import exiting"; // fLog.logMsg( oss.str(), MSGLVL_ERROR ); setContinue(false); throw runtime_error(oss.str()); } } } for (int PmId = 1; ((PmId <= fPmCount) && (PmId < MAX_PMS)); ++PmId) { try { if (fRef.getPmStatus(PmId)) { fWeSplClients.at(PmId) = new WESplClient(*this, PmId); if (fWeSplClients[PmId] != NULL) { fWeSplClients[PmId]->setup(); if (2 == fRef.fCmdArgs.getMode()) fWeSplClients[PmId]->setRdSecTo(fPmCount); // Set Rd T/O to 1 sec } else { std::string aStr; aStr = std::string("Encountered NULL WESplClient : ") + std::to_string(PmId); cout << aStr << endl; fLog.logMsg(aStr, MSGLVL_ERROR); throw WESdHandlerException(aStr); } } } catch (const std::exception& ex) { std::string aStr = ex.what(); logging::Message::Args errMsgArgs; errMsgArgs.add(aStr); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); releaseTableLocks(); // BUG 4295 - release table lock as connection fails // cout << aStr << endl; fLog.logMsg(aStr, MSGLVL_ERROR); throw runtime_error("Error in connection setup."); } } // to initiate adding data to the SendQueue, that many BS // BUG 5031 - Initial Data request count set to 100. This will be the max // Q size on the WES side. Here after a batch send for every data rqst for (int aIdx = 0; aIdx < MAX_WES_QSIZE; aIdx++) { for (int PmId = 1; PmId <= fPmCount; PmId++) { if (fWeSplClients[PmId] != 0) fWeSplClients[PmId]->incDataRqstCount(); // fDataFeedList.addPm2List(PmId); } } fpRespThread = new boost::thread(WERespReadThread(*this)); try { // start the File Read thread if ((fRef.fCmdArgs.getMode() == 0) || (fRef.fCmdArgs.getMode() == 1)) { if (fRef.getLocFile() == "STDIN") { ostringstream oss; oss << "Reading input from STDIN to import into table "; oss << fRef.fCmdArgs.getSchemaName() << "."; oss << fRef.fCmdArgs.getTableName() << "..."; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(oss.str(), MSGLVL_INFO1); } if (getDebugLvl()) cout << "BatchQuantity=" << fRef.fCmdArgs.getBatchQuantity() << endl; fFileReadThread.setBatchQty(fRef.fCmdArgs.getBatchQuantity()); fFileReadThread.setup(fRef.getLocFile()); } } catch (std::exception& ex) { releaseTableLocks(); // BUG 4295 throw runtime_error(ex.what()); } // Output "Running distributed import (Mode{x}) on [all/] PMs" { ostringstream oss; oss << "Running distributed import (mode "; oss << fRef.fCmdArgs.getMode() << ") on "; if (fRef.fCmdArgs.getPmVecSize() == fPmCount) oss << "all PMs..."; else { oss << " PM "; std::vector aPmVec = fRef.fCmdArgs.getPmVec(); unsigned int aIdx = 0; while (aIdx < aPmVec.size()) { oss << aPmVec[aIdx]; aIdx++; if (aIdx != aPmVec.size()) oss << ","; } oss << " ..."; } if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(oss.str(), MSGLVL_INFO1); } } //----------------------------------------------------------------------------- bool WESDHandler::updateCPAndHWMInBRM() { if (getDebugLvl()) cout << "Inside updateCPAndHWMInBRM()" << endl; WEBrmUpdater aBrmUpdater(*this); bool aRslt = aBrmUpdater.updateCasualPartitionAndHighWaterMarkInBRM(); return aRslt; } //----------------------------------------------------------------------------- void WESDHandler::cancelOutstandingCpimports() { std::string aStr = "Canceling outstanding cpimports"; if (getDebugLvl()) cout << aStr << endl; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStr, MSGLVL_INFO1); fFileReadThread.shutdown(); bool aSetFail = false; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { if ((!fWeSplClients[aCnt]->isCpiFailed()) && (!fWeSplClients[aCnt]->isCpiPassed())) { if (getDebugLvl()) cout << "Canceling Cpimport in " << aCnt << endl; // clear the sendQ fWeSplClients[aCnt]->clearSendQueue(); messageqcpp::ByteStream aBs; aBs << (ByteStream::byte)WE_CLT_SRV_EOD; if (fWeSplClients[aCnt]->isConnected()) { boost::mutex::scoped_lock aLock(fWeSplClients[aCnt]->fWriteMutex); fWeSplClients[aCnt]->write(aBs); aLock.unlock(); } else { fWeSplClients[aCnt]->setCpiFailed(true); // Setting as FAILED } } // if it is passed already set it as canceled so that we can rollback else if (fWeSplClients[aCnt]->isCpiPassed()) { fWeSplClients[aCnt]->setCpiPassed(false); fWeSplClients[aCnt]->setCpiFailed(true); // Setting as FAILED aSetFail = true; } } } // setting Manual Failed caused a Rollback. Warrented if it the last PM if (aSetFail) { if (checkAllCpiFailStatus()) doRollback(); } if (getDebugLvl()) cout << "Canceled all outstanding cpimports!!" << endl; } //----------------------------------------------------------------------------- bool WESDHandler::checkForRollbackAndCleanup() { bool aRetStatus = true; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { // If Anyone of the client is if ((!fWeSplClients[aCnt]->isCpiFailed()) && // NOT Failed (!fWeSplClients[aCnt]->isCpiPassed()) && // NOT Passed (fWeSplClients[aCnt]->isConnected())) // NOT Disconnected { aRetStatus = false; // then its not time to Rollback/Cleanup break; } } } return aRetStatus; } //----------------------------------------------------------------------------- bool WESDHandler::checkForCpiFailStatus() { bool aRetStatus = false; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { // If Anyone of the client is if (fWeSplClients[aCnt]->isCpiFailed()) // Failed { aRetStatus = true; // encounterd a Failure break; } } } // if no CPI failed but a Forced Failure simulated, return Status as true if ((!aRetStatus) && (fForcedFailure)) aRetStatus = true; return aRetStatus; } //----------------------------------------------------------------------------- void WESDHandler::checkForConnections() { time_t aNow = time(0); for (int PmId = 1; PmId <= fPmCount; ++PmId) { if (fWeSplClients[PmId] != 0) { if (aNow - fWeSplClients[PmId]->getLastInTime() > 180) { std::string aStr; aStr = std::string("Heartbeats missed - Non Responsive PM") + std::to_string(PmId); fLog.logMsg(aStr, MSGLVL_ERROR); fWeSplClients[PmId]->onDisconnect(); exit(1); // Otherwise; have to wait till write() comes out } } } } //----------------------------------------------------------------------------- void WESDHandler::sendHeartbeats() { messageqcpp::SBS aSbs(new messageqcpp::ByteStream); *aSbs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE; send2Pm(aSbs); } //----------------------------------------------------------------------------- void WESDHandler::shutdown() { if (fDisconnectFailure) { onDisconnectFailure(); } if (fRef.fSigHup) { onHandlingSigHup(); } if (fRef.fSignaled) { onHandlingSignal(); } fDataFeedList.clearPmList(); fFileReadThread.shutdown(); if (fFileReadThread.getFpThread()) fFileReadThread.getFpThread()->join(); if (getDebugLvl()) cout << "cleaning up Client threads " << endl; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { fWeSplClients[aCnt]->setContinue(false); fWeSplClients[aCnt]->setConnected(false); if (fWeSplClients[aCnt]->getFpThread() != NULL) fWeSplClients[aCnt]->getFpThread()->join(); if (getDebugLvl()) fWeSplClients[aCnt]->printStats(); } } boost::mutex::scoped_lock aLock(fRespMutex); this->setContinue(false); usleep(100000); // so that response thread get updated. fRespCond.notify_all(); aLock.unlock(); if (fpRespThread) fpRespThread->join(); fLog.logMsg("Shutdown of all child threads Finished!!", MSGLVL_INFO2); } //------------------------------------------------------------------------------ void WESDHandler::onStartCpiResponse(int PmId) { if (getDebugLvl()) cout << "On Start CPI response arrived " << PmId << endl; messageqcpp::ByteStream aBs; aBs << (ByteStream::byte)WE_CLT_SRV_STARTCPI; send2Pm(aBs, PmId); fWeSplClients[PmId]->setCpiStarted(true); } //------------------------------------------------------------------------------ void WESDHandler::onDataRqstResponse(int PmId) { // may be we should add this pmid to the queue // cout << "Received an DataRqst from "<< PmId << endl; - for debug // BUG 5031 - Don't allow to accumulate RqstCnt uncontrollably // since that will end up sending too many messages to WES. // - Walt don't want this checking since we are going FIXED Q size int aCnt = fWeSplClients[PmId]->getDataRqstCount(); if (aCnt < MAX_WES_QSIZE) // 100 { if (getDebugLvl() > 2) cout << "DataReqst [" << PmId << "] = " << fWeSplClients[PmId]->getDataRqstCount() << endl; fWeSplClients[PmId]->incDataRqstCount(); } // fDataFeedList.addPm2List(PmId); } //------------------------------------------------------------------------------ void WESDHandler::onAckResponse(int PmId) { // may be we should add this pmid to the queue if (getDebugLvl()) cout << "Received an ACK from " << PmId << endl; // fDataFeedList.addPm2List(PmId); } //------------------------------------------------------------------------------ void WESDHandler::onNakResponse(int PmId) { if (getDebugLvl()) cout << "Received a NAK from " << PmId << endl; } //------------------------------------------------------------------------------ // Eod response means we are do not send anymore data to this PM // Increase the read timeout for this PM thread so that it won't // consume too much CPU void WESDHandler::onEodResponse(int PmId) { if (getDebugLvl() > 2) cout << "Received a EOD from " << PmId << endl; if (fRef.fCmdArgs.getMode() == 0) { // This is when one PM fail on Mode 0 if (checkForCpiFailStatus()) // someone else failed, { // so set this as failed for rollback. if (getDebugLvl()) cout << "Setting CPI failed on " << PmId << endl; fWeSplClients[PmId]->setCpiPassed(false); fWeSplClients[PmId]->setCpiFailed(true); if (getDebugLvl()) cout << "Calling onSigInterrupt from onEodResponse() " << endl; if (checkAllCpiFailStatus()) fRef.onSigInterrupt(1); } else { if (getDebugLvl()) cout << "Calling onCpimportPass() from onEodResponse() " << endl; onCpimportPass(PmId); // set dummy Cpimport Pass if (checkAllCpiPassStatus()) // Mode 0 won't don't have BRM report { fImportRslt.stopTimer(); ostringstream oss1; // Bulk load completed, total run time : 2.98625 seconds oss1 << "Load file distribution completed, total run time : "; oss1 << fImportRslt.getTotalRunTime() << " seconds" << endl; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(oss1.str(), MSGLVL_INFO1); fRef.onSigInterrupt(0); // 1 for the sake of it } } } else if (fRef.fCmdArgs.getMode() == 1) { // Only time when we get a EOD on Mode 1 is when there is failure // in one of the PMs. Stop sending data and Send EOD to all PMS. // Make sure fileread thread is not working anymore. if (fFileReadThread.isContinue()) // check already stopped running { sendEODMsg(); fFileReadThread.shutdown(); } } } //------------------------------------------------------------------------------ // BUG 4244 // ERROR because // either an File already exist or the directory doesn't exists void WESDHandler::onImpFileError(int PmId) { if (fRef.fCmdArgs.getMode() != 0) { cout << "ERROR : we should not be here. Mode Non-Zero" << endl; } std::stringstream aStrStr; aStrStr << "Target file Error from PM" << PmId << " - File already exists or path doesn't exist"; char aDefCon[16], aRedCol[16]; snprintf(aDefCon, sizeof(aDefCon), "\033[0m"); snprintf(aRedCol, sizeof(aRedCol), "\033[0;31m"); cout << aRedCol << aStrStr.str() << aDefCon << endl; fLog.logMsg(aStrStr.str(), MSGLVL_ERROR); if (!fWeSplClients[PmId]->isCpiFailed()) fWeSplClients[PmId]->setCpiFailed(true); else return; // this failure already reported so get out // If IMPFILE open Failed, then stop sending data to other PMs too if (checkAllCpiFailStatus()) { // If all are failed we will get out; no rollback for mode 0 fRef.onSigInterrupt(1); // 1 for the sake of it } else { // Stop sending data to all PMs. Also send EOF to all PMs cancelOutstandingCpimports(); } } //------------------------------------------------------------------------------ void WESDHandler::onPmErrorResponse(int PmId) { if (getDebugLvl()) cout << "Received a NAK from " << PmId << endl; } //------------------------------------------------------------------------------ void WESDHandler::onKeepAliveMessage(int PmId) { if (getDebugLvl()) cout << "Received a Keep Alive from " << PmId << endl; } //------------------------------------------------------------------------------ void WESDHandler::onCpimportPass(int PmId) { std::stringstream aStrStr; aStrStr << "Received a Cpimport Pass from PM" << PmId; logging::Message::Args errMsgArgs; // BUG 4152 errMsgArgs.add(PmId); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0098); if (getDebugLvl()) cout << aStrStr.str() << endl; fLog.logMsg(aStrStr.str(), MSGLVL_INFO2); fWeSplClients[PmId]->setCpiPassed(true); // Every CPI passed, BRM report will be send to us b4 this Msg. if (checkForCpiFailStatus()) // someone else failed, { // so set this as failed for rollback. fWeSplClients[PmId]->setCpiPassed(false); fWeSplClients[PmId]->setCpiFailed(true); if (checkAllCpiFailStatus()) doRollback(); } } //------------------------------------------------------------------------------ void WESDHandler::onCpimportFail(int PmId, bool SigHandle) { std::stringstream aStrStr; char aDefCon[16], aRedCol[16]; snprintf(aDefCon, sizeof(aDefCon), "\033[0m"); snprintf(aRedCol, sizeof(aRedCol), "\033[0;31m"); aStrStr << aRedCol << "Received a Cpimport Failure from PM" << PmId << aDefCon; logging::Message::Args errMsgArgs; errMsgArgs.add(aStrStr.str()); if (0 != PmId) { fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); // cout << aRedCol << aStrStr.str() << aDefCon << endl; fLog.logMsg(aStrStr.str(), MSGLVL_INFO2); fLog.logMsg(aStrStr.str(), MSGLVL_ERROR); std::stringstream aStrStr2; aStrStr2 << "Please verify error log files in PM" << PmId; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStrStr2.str(), MSGLVL_INFO1); if (!fWeSplClients[PmId]->isCpiFailed()) fWeSplClients[PmId]->setCpiFailed(true); else return; // this failure already reported so get out } if ((SigHandle) && (0 == PmId)) { fForcedFailure = true; } // if (!fWeSplClients[PmId]->isCpiFailed()) // fWeSplClients[PmId]->setCpiFailed(true); // else // return; // this failure already reported so get out // If Any CPI Failed, then stop other CPIMPORTS too // cancelOutstandingCpimports() - called below // TODO - later do a total rollback and release locks. if (checkAllCpiFailStatus()) { doRollback(); // fRef.onSigInterrupt(1); // 1 for the sake of it } else { // Stop sending data to all PMs. Also send EOF to all PMs // so that all cpimports will finish bulk upload cancelOutstandingCpimports(); } } //------------------------------------------------------------------------------ void WESDHandler::onBrmReport(int PmId, messageqcpp::SBS& Sbs) { std::stringstream aStrStr; aStrStr << "Received a BRM-Report from " << PmId; fLog.logMsg(aStrStr.str(), MSGLVL_INFO2); if (getDebugLvl()) cout << aStrStr.str() << endl; fWeSplClients[PmId]->setBrmRptRcvd(true); std::string aStr; int64_t aTotRows = 0; int64_t aInsRows = 0; int aColNum = 0; CalpontSystemCatalog::ColDataType aColType = CalpontSystemCatalog::INT; int aOorVal = 0; std::string aBadFileName; std::string aErrFileName; std::string aColName; if (getDebugLvl() > 2) cout << "BRM Report length : " << (*Sbs).length() << endl; while ((*Sbs).length() > 0) { (*Sbs) >> aStr; if (getDebugLvl() > 2) cout << "BRM Report value : " << aStr << endl; bool aRet = WEBrmUpdater::prepareRowsInsertedInfo(aStr, aTotRows, aInsRows); if (aRet) { setRowsUploadInfo(PmId, aTotRows, aInsRows); fImportRslt.updateRowsProcessed(aTotRows); fImportRslt.updateRowsInserted(aInsRows); } aRet = WEBrmUpdater::prepareColumnOutOfRangeInfo(aStr, aColNum, aColType, aColName, aOorVal); if (aRet) { add2ColOutOfRangeInfo(PmId, aColNum, aColType, aColName, aOorVal); fImportRslt.updateColOutOfRangeInfo(aColNum, aColType, aColName, aOorVal); } aRet = WEBrmUpdater::prepareBadDataFileInfo(aStr, aBadFileName); if (aRet) { setBadFileName(PmId, aBadFileName); // BUG 4324 - Mode 2 bad/err files left in PM(s). if (1 == fRef.fCmdArgs.getMode()) getBadLog(PmId, aBadFileName); else if (2 == fRef.fCmdArgs.getMode()) { std::stringstream aOss; aOss << "Bad File : " << aBadFileName << " @ PM" << PmId; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aOss.str(), MSGLVL_INFO1); } } aRet = WEBrmUpdater::prepareErrorFileInfo(aStr, aErrFileName); if (aRet) { setErrorFileName(PmId, aErrFileName); // BUG 4324 - Mode 2 bad/err files left in PM(s). if (1 == fRef.fCmdArgs.getMode()) getErrorLog(PmId, aErrFileName); else if (2 == fRef.fCmdArgs.getMode()) { std::stringstream aOss; aOss << "Err File : " << aErrFileName << " @ PM" << PmId; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aOss.str(), MSGLVL_INFO1); } } aRet = check4CriticalErrMsgs(aStr); // check 4 Critical msg from .bin if (aRet) { std::stringstream aOss; aOss << "PM" << PmId << " : " << aStr; logging::Message::Args errMsgArgs; errMsgArgs.add(aOss.str()); if (!fRef.fCmdArgs.getConsoleOutput()) { ostringstream oss; oss << startup::StartUp::tmpDir() << fTableOId << ".txt"; ofstream dmlFile(oss.str().c_str(), std::ofstream::app); if (dmlFile.is_open()) { dmlFile << aOss.str(); dmlFile << endl; } dmlFile.close(); } fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); if (getDebugLvl()) cout << aOss.str() << endl; fLog.logMsg(aOss.str(), MSGLVL_ERROR); } else // do not add Crit Msgs to BRMRpt vector fBrmRptVec.push_back(aStr); } // Even when CPI fail, we get BRMRpt to get the Err/Bad file. if (checkForCpiFailStatus()) return; // if a PM failed, don't update BRM. // cout << "Checking clients for BRM Reports" << endl; // TODO we should update BRM with the report we got. if (check4AllBrmReports()) { bool aRslt = updateCPAndHWMInBRM(); if (aRslt) { std::stringstream aStrStr; aStrStr << "BRM updated successfully "; if (getDebugLvl()) cout << aStrStr.str() << endl; fLog.logMsg(aStrStr.str(), MSGLVL_INFO2); if (fTableLock != 0) { WETableLockGrabber aTLG(*this); bool aRet = aTLG.changeTableLockState(fTableLock); if (aRet) { if (getDebugLvl()) cout << "\tSuccessfully changed TableLock State" << endl; doCleanup(true); } else { std::stringstream aStrStr; aStrStr << "Failed to change TableLock state to cleanup"; fLog.logMsg(aStrStr.str(), MSGLVL_ERROR); if (getDebugLvl()) cout << aStrStr.str() << endl; } } } else { std::stringstream aStrStr; aStrStr << "\tBRM update Failed : Need to Manually release the table locks"; logging::Message::Args errMsgArgs; errMsgArgs.add(aStrStr.str()); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); if (getDebugLvl()) cout << aStrStr.str() << endl; fLog.logMsg(aStrStr.str(), MSGLVL_ERROR); fRef.onSigInterrupt(1); // BUG 4701 //failure in BRM update & process, ie. 1 } } else { if (getDebugLvl()) cout << "Still Brm Reports to come in!!" << endl; } } //------------------------------------------------------------------------------ bool WESDHandler::check4CriticalErrMsgs(std::string& Entry) { bool aFound = false; if ((!Entry.empty()) && (Entry.at(0) == 'M')) { aFound = true; // start from after "MERR: " std::string aTmp(Entry.begin() + 6, Entry.end()); Entry = aTmp; } return aFound; } //------------------------------------------------------------------------------ void WESDHandler::onErrorFile(int PmId, messageqcpp::SBS& Sbs) { std::stringstream aStrStr; aStrStr << "Received ErrReport from " << PmId; logging::Message::Args errMsgArgs; // BUG 4152 errMsgArgs.add(PmId); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0099); if (getDebugLvl()) cout << aStrStr.str() << endl; fLog.logMsg(aStrStr.str(), MSGLVL_INFO2); // TODO - Open the ERROR log file and append to it ofstream aErrFile; std::string aErrFileName; std::string aTmpFileName; std::string aData; (*Sbs) >> aTmpFileName; // BUG 4324 - Mode 1 bad/err files stored in datafile loc or CWD size_t aPos = aTmpFileName.rfind('/'); if (aPos != std::string::npos) { std::string aFile = aTmpFileName.substr(aPos + 1); //+1 to pass '/' std::string aInFile = fRef.fCmdArgs.getErrorDir(); if (aInFile != "/dev/stdin") { size_t aPos2 = aInFile.rfind('/'); if (aPos2 != std::string::npos) { std::string aStr = aInFile.substr(0, aPos2 + 1); // std::cout << "Point 1 " << aStr << std::endl; std::stringstream aStrStr1; aStrStr1 << aStr << aFile; aTmpFileName = aStrStr1.str(); // std::cout << "Point 2 " << aTmpFileName << std::endl; } else aTmpFileName = aFile; } else aTmpFileName = aFile; if (getDebugLvl()) std::cout << "Prep ErrFile " << aTmpFileName << std::endl; } aStrStr.str(std::string()); aStrStr << aTmpFileName << "_" << PmId; aErrFileName = aStrStr.str(); // PmId+"_"+aTmpFileName; if (getDebugLvl()) cout << "Error File Name: " << aErrFileName << endl; if (getDebugLvl()) cout << "Error Data: " << endl; try { aErrFile.open(aErrFileName.c_str()); while ((*Sbs).length() > 0) { (*Sbs) >> aData; if (getDebugLvl() > 1) cout << aData << endl; aErrFile << aData; aErrFile << endl; } aErrFile.close(); setErrorFileName(PmId, aErrFileName); aStrStr.str(std::string()); aStrStr << "Row numbers with error reasons are listed in file : " << aErrFileName; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStrStr.str(), MSGLVL_INFO1); } catch (std::exception&) { cout << "Error in opening the ERROR file!!" << aErrFileName << endl; cout << "Error in opening the ERROR file!!" << aTmpFileName << endl; cout << "Check for ErrorFile " << aTmpFileName << "in Pm " << PmId << endl; } } //------------------------------------------------------------------------------ // Process a bulk load *.bad file containing the rejected rows from a PM. //------------------------------------------------------------------------------ void WESDHandler::onBadFile(int PmId, messageqcpp::SBS& Sbs) { std::stringstream aStrStr; aStrStr << "Received BadData Report from " << PmId; logging::Message::Args errMsgArgs; // BUG 4152 errMsgArgs.add(PmId); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0100); if (getDebugLvl()) cout << aStrStr.str() << endl; fLog.logMsg(aStrStr.str(), MSGLVL_INFO2); // TODO - Open the ERROR log file and append to it ofstream aBadFile; std::string aBadFileName; std::string aTmpFileName; std::string aData; (*Sbs) >> aTmpFileName; // BUG 4324 - Mode 1 bad/err files stored in datafile loc or CWD size_t aPos = aTmpFileName.rfind('/'); if (aPos != std::string::npos) { std::string aFile = aTmpFileName.substr(aPos + 1); //+1 to pass '/' std::string aInFile = fRef.fCmdArgs.getErrorDir(); if (aInFile != "/dev/stdin") { size_t aPos2 = aInFile.rfind('/'); if (aPos2 != std::string::npos) { std::string aStr = aInFile.substr(0, aPos2 + 1); // std::cout << "Point 1 " << aStr << std::endl; std::stringstream aStrStr1; aStrStr1 << aStr << aFile; aTmpFileName = aStrStr1.str(); // std::cout << "Point 2 " << aTmpFileName << std::endl; } else aTmpFileName = aFile; } else aTmpFileName = aFile; if (getDebugLvl() > 1) std::cout << "Prep BadFile " << aTmpFileName << std::endl; } aStrStr.str(std::string()); aStrStr << aTmpFileName << "_" << PmId; aBadFileName = aStrStr.str(); // PmId+"_"+aTmpFileName; if (getDebugLvl() > 1) cout << "Bad File Name: " << aBadFileName << endl; if ((getDebugLvl() > 1) && (fRef.fCmdArgs.getImportDataMode() == IMPORT_DATA_TEXT)) cout << "Bad Data: " << endl; std::string task; try { task = "opening"; aBadFile.open(aBadFileName.c_str()); task = "copying rejected rows to"; while ((*Sbs).length() > 0) { (*Sbs) >> aData; if ((getDebugLvl() > 1) && (fRef.fCmdArgs.getImportDataMode() == IMPORT_DATA_TEXT)) { cout << aData; } aBadFile.write(aData.c_str(), aData.length()); } aBadFile.close(); setBadFileName(PmId, aBadFileName); aStrStr.str(std::string()); aStrStr << "Exact error rows are listed in file : " << aBadFileName; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStrStr.str(), MSGLVL_INFO1); } catch (std::exception& ex) { cout << "Error in " << task << " the bad file " << aBadFileName << "; " << ex.what() << endl; cout << "Check for Bad File " << aTmpFileName << " on Pm " << PmId << endl; } } //------------------------------------------------------------------------------ void WESDHandler::getErrorLog(int PmId, const std::string& ErrFileName) { if (getDebugLvl()) cout << "Requesting Error Log" << endl; // TODO code appropriately for the message messageqcpp::ByteStream aBs; aBs << (ByteStream::byte)WE_CLT_SRV_ERRLOG; aBs << ErrFileName; send2Pm(aBs, PmId); // fWeSplClients[PmId]->setErrLogRqst(true); } //------------------------------------------------------------------------------ void WESDHandler::getBadLog(int PmId, const std::string& BadFileName) { if (getDebugLvl()) cout << "Requesting Bad Log" << endl; // TODO code appropriately for the message messageqcpp::ByteStream aBs; aBs << (ByteStream::byte)WE_CLT_SRV_BADLOG; aBs << BadFileName; send2Pm(aBs, PmId); // fWeSplClients[PmId]->setBadLogRqst(true); } //------------------------------------------------------------------------------ void WESDHandler::onRollbackResult(int PmId, messageqcpp::SBS& Sbs) { ByteStream::byte aRslt = 0; (*Sbs) >> aRslt; if (getDebugLvl()) cout << "Rollback rslt arrived PmId = " << PmId << " Rslt = " << (int)aRslt << endl; if (aRslt) fWeSplClients[PmId]->setRollbackRslt(1); else { fWeSplClients[PmId]->setRollbackRslt(-1); std::stringstream aStrStr; aStrStr << "Rollback Failed on PM : " << PmId; logging::Message::Args errMsgArgs; // BUG 4152 errMsgArgs.add(PmId); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0101); fLog.logMsg(aStrStr.str(), MSGLVL_ERROR); if (getDebugLvl()) cout << aStrStr.str() << endl; } int aStatus = check4RollbackRslts(); if (aStatus == -1) { std::stringstream aStrStr; aStrStr << "Rollback Failed on one or more PMs"; logging::Message::Args errMsgArgs; errMsgArgs.add(aStrStr.str()); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); fLog.logMsg(aStrStr.str(), MSGLVL_ERROR); if (getDebugLvl()) cout << aStrStr.str() << endl; // fRef.onSigInterrupt(1); // 1 for the sake of it if (check4AllRollbackStatus()) { fRef.onSigInterrupt(1); // process altogether is a failure } } else if (aStatus == 1) { std::stringstream aStrStr; aStrStr << "Rollback succeed on all PMs"; fLog.logMsg(aStrStr.str(), MSGLVL_INFO2); if (getDebugLvl()) cout << aStrStr.str() << endl; // false flag sent to doCleanup says to not delete HDFS temp db files, // because the bulk rollback will have already deleted them. We still // call doCleanup for other file cleanup (like deleting meta file). doCleanup(false); } } //------------------------------------------------------------------------------ void WESDHandler::onCleanupResult(int PmId, messageqcpp::SBS& Sbs) { ByteStream::byte aRslt = 0; (*Sbs) >> aRslt; if (getDebugLvl()) cout << "Cleanup rslt arrived PmId = " << PmId << " Rslt = " << (int)aRslt << endl; if (aRslt) fWeSplClients[PmId]->setCleanupRslt(1); else { fWeSplClients[PmId]->setCleanupRslt(-1); std::stringstream aStrStr; aStrStr << "ERROR: Cleanup Failed on PM : " << PmId; logging::Message::Args errMsgArgs; // BUG 4152 errMsgArgs.add(PmId); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0102); if (getDebugLvl()) cout << aStrStr.str() << endl; fLog.logMsg(aStrStr.str(), MSGLVL_ERROR); } int aStatus = check4CleanupRslts(); if (aStatus == -1) { std::stringstream aStrStr; aStrStr << "Cleanup Failed on one or more PMs"; logging::Message::Args errMsgArgs; errMsgArgs.add(aStrStr.str()); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); fLog.logMsg(aStrStr.str(), MSGLVL_ERROR); if (getDebugLvl()) cout << aStrStr.str() << endl; // fRef.onSigInterrupt(1); // 1 for the sake of it - // We need to wait till all the results comes back. if (check4AllCleanupStatus()) { fRef.onSigInterrupt(1); // failure in cleanup & process, ie. 1 } } else if (aStatus == 1) { releaseTableLocks(); std::stringstream aStrStr; aStrStr << "Cleanup succeed on all PMs"; fLog.logMsg(aStrStr.str(), MSGLVL_INFO2); if (getDebugLvl()) cout << aStrStr.str() << endl; if (checkAllCpiPassStatus()) // Cleanup and entire process success. { ostringstream oss; // For table walt.abc: 1000 rows processed and 1000 rows inserted. oss << "For table "; oss << fRef.fCmdArgs.getSchemaName() << "."; oss << fRef.fCmdArgs.getTableName() << ": "; oss << fImportRslt.fRowsPro << " rows processed and "; oss << fImportRslt.fRowsIns << " rows inserted."; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(oss.str(), MSGLVL_INFO1); // BUG 4399 Print out WARN messages for out of range counts WEColOorVec::iterator aIt = fImportRslt.fColOorVec.begin(); ofstream dmlFile; if (!fRef.fCmdArgs.getConsoleOutput()) // for DML to use file { ostringstream oss; oss << startup::StartUp::tmpDir() << fTableOId << ".txt"; dmlFile.open(oss.str().c_str()); } while (aIt != fImportRslt.fColOorVec.end()) { if ((*aIt).fNoOfOORs > 0) { ostringstream ossSatCnt; ossSatCnt << "Column " << (*aIt).fColName << "; Number of "; switch ((*aIt).fColType) { case CalpontSystemCatalog::DATE: ossSatCnt << "invalid dates replaced with zero value: "; break; case CalpontSystemCatalog::DATETIME: ossSatCnt << "invalid date/times replaced with zero value: "; break; case CalpontSystemCatalog::TIMESTAMP: ossSatCnt << "invalid timestamps replaced with zero value: "; break; case CalpontSystemCatalog::TIME: ossSatCnt << "invalid times replaced with zero value: "; break; case CalpontSystemCatalog::CHAR: ossSatCnt << "character strings truncated: "; break; case CalpontSystemCatalog::VARCHAR: ossSatCnt << "varchar strings truncated: "; break; default: ossSatCnt << "rows inserted with saturated values: "; break; } ossSatCnt << (*aIt).fNoOfOORs; fLog.logMsg(ossSatCnt.str(), MSGLVL_WARNING); if (!fRef.fCmdArgs.getConsoleOutput()) // for DML to use { if (dmlFile.is_open()) { dmlFile << (*aIt).fNoOfOORs; dmlFile << endl; } } } aIt++; } dmlFile.close(); fImportRslt.stopTimer(); ostringstream oss1; // Bulk load completed, total run time : 2.98625 seconds oss1 << "Bulk load completed, total run time : "; oss1 << fImportRslt.getTotalRunTime() << " seconds"; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(oss1.str(), MSGLVL_INFO1); fRef.onSigInterrupt(0); // 0 for entire success } else { ostringstream oss; oss << "Table " << fRef.fCmdArgs.getSchemaName() << "."; oss << fRef.fCmdArgs.getTableName() << ": (OID-"; oss << this->getTableOID() << ") was NOT successfully loaded."; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(oss.str(), MSGLVL_INFO1); fImportRslt.stopTimer(); ostringstream oss1; // Bulk load completed, total run time : 2.98625 seconds oss1 << "Bulk load completed, total run time : "; oss1 << fImportRslt.getTotalRunTime() << " seconds"; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(oss1.str(), MSGLVL_INFO1); // Even though cleanup is success, entire process is failure fRef.onSigInterrupt(1); // therefore 1 } } } //------------------------------------------------------------------------------ void WESDHandler::onDBRootCount(int PmId, messageqcpp::SBS& Sbs) { ByteStream::byte aDbrCnt = 0; (*Sbs) >> aDbrCnt; if (getDebugLvl()) cout << "No of DBRoots in PM" << PmId << " = " << (int)aDbrCnt << endl; if (aDbrCnt > 0) { fWeSplClients[PmId]->setDbRootCnt(static_cast(aDbrCnt)); fWeSplClients[PmId]->resetDbRootVar(); } } //------------------------------------------------------------------------------ void WESDHandler::doRollback() { std::string aAppName = "cpimport"; messageqcpp::ByteStream aBs; aBs << (ByteStream::byte)WE_CLT_SRV_ROLLBACK; aBs << (ByteStream::octbyte)fTableLock; aBs << (ByteStream::quadbyte)fTableOId; aBs << fRef.fCmdArgs.getTableName(); aBs << aAppName; boost::mutex::scoped_lock aLock(fSendMutex); send2Pm(aBs); aLock.unlock(); } //------------------------------------------------------------------------------ void WESDHandler::doCleanup(bool deleteHdfsTempDbFiles) { if (getDebugLvl()) cout << "A cleanup is called!!" << endl; messageqcpp::ByteStream aBs; aBs << (ByteStream::byte)WE_CLT_SRV_CLEANUP; aBs << (ByteStream::quadbyte)fTableOId; aBs << (ByteStream::byte)deleteHdfsTempDbFiles; boost::mutex::scoped_lock aLock(fSendMutex); send2Pm(aBs); aLock.unlock(); } //------------------------------------------------------------------------------ bool WESDHandler::releaseTableLocks() { if (fTableLock != 0) { WETableLockGrabber aTLG(*this); // BUG 4398. Move the call to takeSnapshot() from cpimport.bin to here fDbrm.takeSnapshot(); bool aRet = aTLG.releaseTableLock(fTableLock); if (aRet) { std::stringstream aStrStr; aStrStr << "Released Table Lock"; logging::Message::Args errMsgArgs; errMsgArgs.add(aStrStr.str()); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); if (getDebugLvl()) cout << aStrStr.str() << endl; fLog.logMsg(aStrStr.str(), MSGLVL_INFO2); return true; } } return false; } //------------------------------------------------------------------------------ int WESDHandler::check4RollbackRslts() { int aStatus = 1; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { int aRslt = fWeSplClients[aCnt]->getRollbackRslt(); if (aRslt == -1) { // cout << "Rollback Failed in PM - " << aCnt << endl; aStatus = -1; break; } else if (aRslt == 0) // not all results available yet { aStatus = 0; break; } } } return aStatus; } //------------------------------------------------------------------------------ int WESDHandler::check4CleanupRslts() { int aStatus = 1; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { int aRslt = fWeSplClients[aCnt]->getCleanupRslt(); if (aRslt == -1) { // cout << "Cleanup Failed in PM - " << aCnt << endl; aStatus = -1; break; } else if (aRslt == 0) // not all results available yet { aStatus = 0; break; } } } return aStatus; } //------------------------------------------------------------------------------ bool WESDHandler::check4AllRollbackStatus() { bool aStatus = true; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { int aRslt = fWeSplClients[aCnt]->getRollbackRslt(); if (aRslt == 0) // not all results available yet; either -1/1 { aStatus = false; break; } } } return aStatus; } //------------------------------------------------------------------------------ bool WESDHandler::check4AllCleanupStatus() { bool aStatus = true; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { int aRslt = fWeSplClients[aCnt]->getCleanupRslt(); if (aRslt == 0) // not all results available yet; either -1/1 { aStatus = false; break; } } } return aStatus; } //------------------------------------------------------------------------------ bool WESDHandler::checkAllCpiPassStatus() { bool aStatus = true; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { if (!fWeSplClients[aCnt]->isCpiPassed()) { if (getDebugLvl()) cout << "CPI Pass status still False in " << aCnt << endl; aStatus = false; break; } } } return aStatus; } // //------------------------------------------------------------------------------ bool WESDHandler::checkAllCpiFailStatus() { bool aStatus = true; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { if (!fWeSplClients[aCnt]->isCpiFailed()) { if (getDebugLvl()) cout << "CPI Fail status still False in " << aCnt << endl; aStatus = false; break; } } } return aStatus; } //----------------------------------------------------------------------------- bool WESDHandler::check4AllBrmReports() { for (int PmId = 1; PmId <= fPmCount; ++PmId) { if (fWeSplClients[PmId] != NULL) { if (!fWeSplClients[PmId]->isBrmRptRcvd()) { if (getDebugLvl()) cout << "BRMReport from " << PmId << " still not received" << endl; return false; } } } return true; } //------------------------------------------------------------------------------ int WESDHandler::getNextPm2Feed() { // int aLdPm = leastDataSendPm(); int aLdPm = getNextDbrPm2Send(); if (fWeSplClients[aLdPm] != 0) { // Balancing the total bytes sent and Q Size int aSz = fWeSplClients[aLdPm]->getSendQSize(); if (aSz > MAX_QSIZE) { // cout << "Queue Size = " << aSz << endl; if (fpBatchLoader) fpBatchLoader->reverseSequence(); aLdPm = 0; // Filled Q } // Check enough DataRqst to send Data if (getDebugLvl() > 2) cout << "NextPm2Feed " << aLdPm << endl; // will work only specific to mode 0 (fpBatchLoader==0) if ((aLdPm > 0) && (!fpBatchLoader)) fWeSplClients[aLdPm]->decDbRootVar(); } else if (aLdPm != 0) { cout << "Next PMid Error : PmId = " << aLdPm << endl; aLdPm = 0; } return aLdPm; } //------------------------------------------------------------------------------ int WESDHandler::getNextDbrPm2Send() { unsigned int aDbrVar = 0; unsigned int aPmId = 0; if ((!fAllCpiStarted) && (1 == fRef.fCmdArgs.getMode())) { check4AllCpiStarts(); return aPmId; // Not all Cpi started. } // NOTE : Implementing BatchLoader, which will be used to select the // FIRST PM to send data and subsequent PM's to send data if (fpBatchLoader) // for mode 1 and 2 only, since mode 0 don't have tableOID { if ((!fFirstDataSent) && (1 == fRef.fCmdArgs.getMode())) { try { aPmId = fFirstPmToSend; fFirstDataSent = true; if (fSelectOtherPm) aPmId = 0; // Select the other method if (getDebugLvl()) cout << "1st PM for data = " << aPmId << endl; fpBatchLoader->prepareForSecondPM(); } catch (std::exception& ex) { fLog.logMsg(ex.what(), MSGLVL_ERROR); logging::Message::Args errMsgArgs; errMsgArgs.add(ex.what()); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); } } else { aPmId = fpBatchLoader->selectNextPM(); } if (getDebugLvl() > 2) cout << "Next PM to get data = " << aPmId << endl; return aPmId; } //--------- The part below is for mode 0, which don't have table relation for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { if (aPmId == 0) // init { aDbrVar = fWeSplClients[aCnt]->getDbRootVar(); aPmId = aCnt; } else if (fWeSplClients[aCnt]->getDbRootVar() > aDbrVar) { aPmId = aCnt; aDbrVar = fWeSplClients[aCnt]->getDbRootVar(); } } } if (aDbrVar == 0) { for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { fWeSplClients[aCnt]->resetDbRootVar(); } } aPmId = 0; } if (getDebugLvl() > 2) cout << "DbrPM2Send [" << aPmId << "] = " << aDbrVar << endl; return aPmId; } //------------------------------------------------------------------------------ int WESDHandler::leastDataSendPm() { messageqcpp::BSSizeType aTx = 0; int aPmId = 0; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { if (aPmId == 0) // init { aTx = fWeSplClients[aCnt]->getBytesTx(); aPmId = aCnt; } else if (fWeSplClients[aCnt]->getBytesTx() < aTx) { aPmId = aCnt; aTx = fWeSplClients[aCnt]->getBytesTx(); } } } return aPmId; } //------------------------------------------------------------------------------ int WESDHandler::getTableOID(std::string Schema, std::string Table) { execplan::CalpontSystemCatalog::ROPair roPair; CalpontSystemCatalog::TableName tableName(Schema, Table); boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(); roPair = systemCatalogPtr->tableRID(tableName); return roPair.objnum; } //------------------------------------------------------------------------------ // Get the expected import binary fixed record length for the specified table. //------------------------------------------------------------------------------ unsigned int WESDHandler::calcTableRecLen(const std::string& schema, const std::string table) { unsigned int recLen = 0; CalpontSystemCatalog::TableName tableName(schema, table); boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(); CalpontSystemCatalog::RIDList colRidList = systemCatalogPtr->columnRIDs(tableName, true); CalpontSystemCatalog::RIDList::const_iterator rid_iterator = colRidList.begin(); std::set colListInJobFile; fRef.fCmdArgs.getColumnList(colListInJobFile); std::set::const_iterator setIter; // Add up the column widths to get the total record length while (rid_iterator != colRidList.end()) { CalpontSystemCatalog::ROPair roPair = *rid_iterator; CalpontSystemCatalog::OID oid = roPair.objnum; CalpontSystemCatalog::ColType colType; colType = systemCatalogPtr->colType(oid); // If we have a list of column names taken from an XML job file, // then we filter against that list if (colListInJobFile.size() > 0) { CalpontSystemCatalog::TableColName colName; colName = systemCatalogPtr->colName(oid); setIter = colListInJobFile.find(colName.column); if (setIter != colListInJobFile.end()) { recLen += colType.colWidth; } } else { recLen += colType.colWidth; } ++rid_iterator; } if (getDebugLvl()) cout << "Binary record length for " << schema << '.' << table << " is: " << recLen << endl; return recLen; } //------------------------------------------------------------------------------ void WESDHandler::check4CpiInvokeMode() { try { oam::oamModuleInfo_t aModInfo = fOam.getModuleInfo(); string aModuleType = boost::get<1>(aModInfo); if (getDebugLvl()) cout << "ModuleType " << aModuleType << endl; int aInstallType = boost::get<5>(aModInfo); if (getDebugLvl()) cout << "InstallType " << aInstallType << endl; fOam.getSystemConfig("pm", fModuleTypeConfig); fPmCount = fModuleTypeConfig.ModuleCount; if (getDebugLvl()) cout << "PM Count " << fPmCount << endl; // oam::INSTALL_COMBINE_* 2,3,4 ie UM+PM in 1 Machine if ((aInstallType > 1) && (fPmCount == 1)) { // Mode arg was NOT provided; set to to default Mode 3 if (fRef.fCmdArgs.getArgMode() == -1) { fRef.fCmdArgs.setMode(3); fRef.fCmdArgs.setCpiInvoke(); } else if (fRef.fCmdArgs.getArgMode() == 3) // BUG 4210 { fRef.fCmdArgs.setCpiInvoke(); } else if (fRef.fCmdArgs.getArgMode() == 0) { throw runtime_error("Mode 0 allowed only in Multi-Nodes."); } } else if ((aInstallType == oam::INSTALL_NORMAL) && (aModuleType == "um")) { // BUG 4165 if (fRef.fCmdArgs.getMode() == 3) { throw runtime_error("Mode 3 imports can only be run on a PM."); } else if (fRef.fCmdArgs.getMode() == -1) // default mode //BUG 4210 { fRef.fCmdArgs.setMode(1); } } else if (aModuleType == "pm") // BUG 4210 { // BUG 4210 if ((fPmCount >= 1) && (fRef.fCmdArgs.getMode() == 3)) { fRef.fCmdArgs.setCpiInvoke(); } // Single node default without argument option m else if ((fPmCount == 1) && (fRef.fCmdArgs.getArgMode() == -1)) { fRef.fCmdArgs.setMode(3); fRef.fCmdArgs.setCpiInvoke(); } // Multi-node default without argument option m else if ((fPmCount > 1) && (fRef.fCmdArgs.getArgMode() == -1)) { fRef.fCmdArgs.setMode(1); } } } catch (runtime_error& exp) { throw runtime_error(exp.what()); } catch (...) { std::string aStr = "oam.getModuleInfo/getSystemConfig error : WESDHandler check4CpiInvoke()"; throw runtime_error(aStr); } check4PmArguments(); } //------------------------------------------------------------------------------ bool WESDHandler::check4PmArguments() { if (fRef.fCmdArgs.getPmVecSize() > 0) { std::vector aPmVec = fRef.fCmdArgs.getPmVec(); int aSize = aPmVec.size(); for (int aIdx = 0; aIdx < aSize; aIdx++) { if ((fPmCount < static_cast(aPmVec[aIdx])) || (0 == aPmVec[aIdx])) { std::stringstream aStrStr; aStrStr << "Invalid argument PMid " << aPmVec[aIdx] << endl; throw runtime_error(aStrStr.str()); } } } return true; } //------------------------------------------------------------------------------ bool WESDHandler::check4AllCpiStarts() { bool aStarted = true; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) { if (fWeSplClients[aCnt] != 0) { if (!fWeSplClients[aCnt]->isCpiStarted()) aStarted = false; } } if (aStarted) fAllCpiStarted = aStarted; return aStarted; } //------------------------------------------------------------------------------ void WESDHandler::exportJobFile(std::string& JobId, std::string& JobFileName) { if (getDebugLvl()) cout << "JobFile Name is " << JobFileName << endl; ifstream aInFile; aInFile.open(JobFileName.c_str()); if ((aInFile.is_open()) && (!aInFile.eof())) { std::stringstream aSs; aSs << fRef.fCmdArgs.getTmpFileDir(); aSs << "/Job_"; aSs << JobId; aSs << ".xml"; messageqcpp::ByteStream aBs; aBs << (ByteStream::byte)WE_CLT_SRV_JOBID; aBs << aSs.str(); send2Pm(aBs); if (getDebugLvl()) cout << "exportJobFile::Send RmtFileName " << aSs.str() << endl; // Read everything to a String std::string aData((std::istreambuf_iterator(aInFile)), std::istreambuf_iterator()); if (getDebugLvl()) cout << "Sending XML FileData " << aData << endl; aBs.restart(); aBs << (ByteStream::byte)WE_CLT_SRV_JOBDATA; aBs << aData; send2Pm(aBs); } else { throw runtime_error("unable to open Job File"); } } //------------------------------------------------------------------------------ bool WESDHandler::getConsoleLog() { return fRef.fCmdArgs.getConsoleLog(); } //------------------------------------------------------------------------------ char WESDHandler::getEnclChar() { return fRef.fCmdArgs.getEnclChar(); } //------------------------------------------------------------------------------ char WESDHandler::getEscChar() { return fRef.fCmdArgs.getEscChar(); } //------------------------------------------------------------------------------ int WESDHandler::getReadBufSize() { return fRef.fCmdArgs.getReadBufSize(); } //------------------------------------------------------------------------------ char WESDHandler::getDelimChar() { return fRef.fCmdArgs.getDelimChar(); } //------------------------------------------------------------------------------ std::string WESDHandler::getTableName() const { return fRef.fCmdArgs.getTableName(); } //------------------------------------------------------------------------------ std::string WESDHandler::getSchemaName() const { return fRef.fCmdArgs.getSchemaName(); } ImportDataMode WESDHandler::getImportDataMode() const { return fRef.fCmdArgs.getImportDataMode(); } //------------------------------------------------------------------------------ void WESDHandler::sysLog(const logging::Message::Args& msgArgs, logging::LOG_TYPE logType, logging::Message::MessageID msgId) { fRef.fpSysLog->logMsg(msgArgs, logType, msgId); } //------------------------------------------------------------------------------ std::string WESDHandler::getTime2Str() const { char aBuff[64]; time_t aTime; struct tm pTm; time(&aTime); localtime_r(&aTime, &pTm); // M D H M S snprintf(aBuff, sizeof(aBuff), "%02d%02d%02d%02d%02d", pTm.tm_mon + 1, pTm.tm_mday, pTm.tm_hour, pTm.tm_min, pTm.tm_sec); return aBuff; } //------------------------------------------------------------------------------ void WESDHandler::setInputFileList(std::string InFileName) { fFileReadThread.chkForListOfFiles(InFileName); } //------------------------------------------------------------------------------ void WESDHandler::onHandlingSignal() { std::stringstream aStrStr; char aDefCon[16], aRedCol[16]; snprintf(aDefCon, sizeof(aDefCon), "\033[0m"); snprintf(aRedCol, sizeof(aRedCol), "\033[0;31m"); aStrStr << aRedCol << "Received signal to terminate the process." << aDefCon; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStrStr.str(), MSGLVL_INFO1); logging::Message::Args errMsgArgs; errMsgArgs.add(aStrStr.str()); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); std::stringstream aStrStr1; aStrStr1 << "Handling signal ......"; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStrStr1.str(), MSGLVL_INFO1); fRef.fSignaled = false; bool aTblLockReleased = false; bool aRollbackSuccess = false; onCpimportFail(0, true); usleep(2000000 * fPmCount); // BUG 4649 - Some systems taking too long to finish the process. // So we have to wait some more time. std::stringstream aStrStr2; aStrStr2 << "Rolling back .........."; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStrStr2.str(), MSGLVL_INFO1); for (int aIdx = 0; aIdx < 60; aIdx++) { int aStatus = check4RollbackRslts(); if (1 == aStatus) { if (getDebugLvl()) cout << "Rollback Successful... " << endl; aRollbackSuccess = true; break; } else if (-1 == aStatus) { if (getDebugLvl()) cout << "Rollback Failed... " << endl; break; } usleep(2000000 * fPmCount); } // Bug 5774 - if rollback failed, leave the tablelock if (!aRollbackSuccess) { std::stringstream aStrStr2a; aStrStr2a << "Rollback Failed; Leaving Tablelock ... "; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStrStr2a.str(), MSGLVL_INFO1); return; } std::stringstream aStrStr3; aStrStr3 << "Cleaning up .........."; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStrStr3.str(), MSGLVL_INFO1); for (int aIdx = 0; aIdx < 60; aIdx++) { int aStatus = check4CleanupRslts(); if (aStatus == 1) { if (getDebugLvl()) cout << "Cleanup Successful... " << endl; releaseTableLocks(); aTblLockReleased = true; break; } usleep(2000000 * fPmCount); } if ((!aTblLockReleased) && (aRollbackSuccess)) { releaseTableLocks(); } } //------------------------------------------------------------------------------ void WESDHandler::onHandlingSigHup() { std::stringstream aStrStr; char aDefCon[16], aRedCol[16]; snprintf(aDefCon, sizeof(aDefCon), "\033[0m"); snprintf(aRedCol, sizeof(aRedCol), "\033[0;31m"); aStrStr << aRedCol << "Interrupt received .... Program exiting." << aDefCon; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStrStr.str(), MSGLVL_INFO1); logging::Message::Args errMsgArgs; errMsgArgs.add(aStrStr.str()); fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000); fRef.fSigHup = false; // onCpimportFail(0, true); - cancelOutstandingCpimports will hang on send() // cancelOutstandingCpimports(); exit(1); // Hard exit on SIGHUP signal } //------------------------------------------------------------------------------ void WESDHandler::onDisconnectFailure() { string aStr("Trying to reconnect and rollback"); if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStr, MSGLVL_INFO1); for (int aSec = 0; aSec < 15; aSec++) { bool aDisconnect = false; usleep(1000000); for (int PmId = 1; PmId <= fPmCount; ++PmId) { if (fWeSplClients[PmId] != 0) { if (!fWeSplClients[PmId]->isConnected()) { aDisconnect = true; try { fWeSplClients[PmId]->setup(); } catch (runtime_error&) { cout << "Unable to connect to PM" << PmId << "; Trying again..." << endl; } } } } if (!aDisconnect) break; } doRollback(); bool aTblLockReleased = false; bool aRollbackSuccess = true; // BUG 4649 - Some systems taking too long to finish the process. // So we have to wait some more time. std::stringstream aStrStr2; aStrStr2 << "Rolling back .........."; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStrStr2.str(), MSGLVL_INFO1); for (int aIdx = 0; aIdx < 10; aIdx++) { int aStatus = check4RollbackRslts(); if (1 == aStatus) { if (getDebugLvl()) cout << "Rollback Successful... " << endl; break; } else if (-1 == aStatus) { if (getDebugLvl()) cout << "Rollback Failed... " << endl; aRollbackSuccess = false; break; } usleep(2000000 * fPmCount); } std::stringstream aStrStr3; aStrStr3 << "Cleaning up .........."; if (fRef.fCmdArgs.getConsoleOutput()) fLog.logMsg(aStrStr3.str(), MSGLVL_INFO1); for (int aIdx = 0; aIdx < 10; aIdx++) { int aStatus = check4CleanupRslts(); if (aStatus == 1) { if (getDebugLvl()) cout << "Cleanup Successful... " << endl; if (aRollbackSuccess) { releaseTableLocks(); aTblLockReleased = true; } break; } usleep(2000000 * fPmCount); } if ((!aTblLockReleased) && (aRollbackSuccess)) { releaseTableLocks(); } } //------------------------------------------------------------------------------ void WESDHandler::setDisconnectFailure(bool Flag) { if (fFileReadThread.isContinue()) // check already stopped running { sendEODMsg(); fFileReadThread.shutdown(); } fDisconnectFailure = Flag; fRef.onSigInterrupt(1); // process altogether is a failure } //------------------------------------------------------------------------------ } /* namespace WriteEngine */