diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index 0f5b1d2c3..623f1951b 100644 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -9169,11 +9169,14 @@ int ProcessManager::getDBRMData(messageqcpp::IOSocket fIos, std::string moduleNa string oidFile; oam.getSystemConfig("OIDBitmapFile", oidFile); + + // StorageManager: Need to make these existence checks use an idbfilesystem op? string currentDbrmFile; ifstream oldFile (currentFileName.c_str()); if (oldFile) { + // current file found, check for OIDBitmapFile ifstream mapFile (oidFile.c_str()); @@ -9290,6 +9293,7 @@ int ProcessManager::getDBRMData(messageqcpp::IOSocket fIos, std::string moduleNa } // put oid file and current file in list + // StorageManager: no need to distribute these files dbrmFiles.push_back(currentFileName); ifstream file1 (journalFileName.c_str()); @@ -9324,6 +9328,7 @@ int ProcessManager::getDBRMData(messageqcpp::IOSocket fIos, std::string moduleNa //remove any file of size 0 std::vector::iterator pt1 = dbrmFiles.begin(); + // StorageManager: ? for ( ; pt1 != dbrmFiles.end() ; pt1++) { string fileName = *pt1; @@ -9352,13 +9357,13 @@ int ProcessManager::getDBRMData(messageqcpp::IOSocket fIos, std::string moduleNa } catch (...) { - log.writeLog(__LINE__, "EXCEPTION ERROR on cfIos.write: Unknow exception", LOG_TYPE_ERROR); + log.writeLog(__LINE__, "EXCEPTION ERROR on cfIos.write: Unknown exception", LOG_TYPE_ERROR); pthread_mutex_unlock(&THREAD_LOCK); return oam::API_FAILURE; } + // StorageManager: ? pt1 = dbrmFiles.begin(); - for ( ; pt1 != dbrmFiles.end() ; pt1++) { ByteStream fnmsg, fdmsg; diff --git a/procmon/processmonitor.cpp b/procmon/processmonitor.cpp index 54a255b91..fd501d8a8 100644 --- a/procmon/processmonitor.cpp +++ b/procmon/processmonitor.cpp @@ -577,8 +577,8 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO } if (processName == "StorageManager") // storagemanager doesn't send its own response { - ackMsg << (uint8_t) ACK << (uint8_t) START << (uint8_t) API_SUCCESS; - mq.write(ackMsg); + //ackMsg << (uint8_t) ACK << (uint8_t) START << (uint8_t) API_SUCCESS; + //mq.write(ackMsg); } ProcessConfig processconfig; @@ -663,9 +663,13 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO processconfig.LogFile, initType, actIndicator); - if (processName == "StorageManager") - log.writeLog(__LINE__, "START: supposedly StorageManager was started, got processID " + processID, LOG_TYPE_DEBUG); - + if (processconfig.ProcessName == "StorageManager") + { + log.writeLog(__LINE__, "StorageManager WTF? 6", LOG_TYPE_DEBUG); + oam.setProcessStatus("StorageManager", boost::get<0>(oam.getModuleInfo()), + oam::ACTIVE, processID); + } + if ( processID > oam::API_MAX ) processID = oam::API_SUCCESS; @@ -712,8 +716,8 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO } if (processName == "StorageManager") // storagemanager doesn't send its own response { - ackMsg << (uint8_t) ACK << (uint8_t) RESTART << (uint8_t) API_SUCCESS; - mq.write(ackMsg); + //ackMsg << (uint8_t) ACK << (uint8_t) RESTART << (uint8_t) API_SUCCESS; + // mq.write(ackMsg); } processList::iterator listPtr; @@ -765,9 +769,12 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO (*listPtr).DepModuleName, (*listPtr).LogFile, initType); - if (processName == "StorageManager") - log.writeLog(__LINE__, "RESTART: supposedly StorageManager was started, got processID " + processID, LOG_TYPE_DEBUG); - + if (listPtr->ProcessName == "StorageManager") + { + log.writeLog(__LINE__, "StorageManager WTF? 7", LOG_TYPE_DEBUG); + oam.setProcessStatus("StorageManager", boost::get<0>(oam.getModuleInfo()), + oam::ACTIVE, listPtr->processID); + } if ( processID > oam::API_MAX ) processID = oam::API_SUCCESS; @@ -1115,8 +1122,8 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO } if (processName == "StorageManager") // storagemanager doesn't send its own status { - ackMsg << (uint8_t) ACK << (uint8_t) STARTALL << (uint8_t) API_SUCCESS; - mq.write(ackMsg); + //ackMsg << (uint8_t) ACK << (uint8_t) STARTALL << (uint8_t) API_SUCCESS; + //mq.write(ackMsg); } if ( config.moduleType() == "pm" ) @@ -1212,9 +1219,12 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO (*listPtr).DepModuleName, (*listPtr).LogFile, initType); - if (processName == "StorageManager") - log.writeLog(__LINE__, "STARTALL: supposedly StorageManager was started, got processID " + processID, LOG_TYPE_DEBUG); - + if (listPtr->ProcessName == "StorageManager") + { + log.writeLog(__LINE__, "StorageManager WTF? 4", LOG_TYPE_DEBUG); + oam.setProcessStatus("StorageManager", boost::get<0>(oam.getModuleInfo()), + oam::ACTIVE, processID); + } if ( processID > oam::API_MAX ) { @@ -1281,9 +1291,12 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO (*listPtr).DepModuleName, (*listPtr).LogFile, initType); - if (processName == "StorageManager") - log.writeLog(__LINE__, "STARTALL: supposedly StorageManager was started, got processID " + processID, LOG_TYPE_DEBUG); - + if (listPtr->ProcessName == "StorageManager") + { + log.writeLog(__LINE__, "StorageManager WTF? 5", LOG_TYPE_DEBUG); + oam.setProcessStatus("StorageManager", boost::get<0>(oam.getModuleInfo()), + oam::ACTIVE, processID); + } if ( processID > oam::API_MAX ) processID = oam::API_SUCCESS; @@ -2684,7 +2697,8 @@ pid_t ProcessMonitor::startProcess(string processModuleType, string processName, updateProcessInfo(processName, initType, 0); //sleep, give time for INIT state to be update, prevent race condition with ACTIVE - sleep(1); + if (processName != "StorageManager") + sleep(1); //check and setup for logfile time_t now; @@ -2839,7 +2853,8 @@ pid_t ProcessMonitor::startProcess(string processModuleType, string processName, } //give time to get INIT status updated in shared memory - sleep(1); + if (processName != "StorageManager") + sleep(1); execv(processLocation.c_str(), argList); if (processName == "StorageManager") @@ -4810,6 +4825,7 @@ int ProcessMonitor::runHDFSTest() ifstream File (DataFilePlugin.c_str()); +#if 0 // for storagemanager if (!File) { log.writeLog(__LINE__, "Error: Hadoop Datafile Plugin File (" + DataFilePlugin + ") doesn't exist", LOG_TYPE_CRITICAL); @@ -4843,6 +4859,7 @@ int ProcessMonitor::runHDFSTest() fail = true; } } +#endif if (!fail) { diff --git a/utils/cloudio/SocketPool.cpp b/utils/cloudio/SocketPool.cpp index 349350bb6..df7fab540 100644 --- a/utils/cloudio/SocketPool.cpp +++ b/utils/cloudio/SocketPool.cpp @@ -86,10 +86,20 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream * { uint count = 0; uint length = in.length(); - int sock = getSocket(); + int sock = -1; const uint8_t *inbuf = in.buf(); int err = 0; + while (sock < 0) + { + sock = getSocket(); + if (sock < 0) + { + log(logging::LOG_TYPE_ERROR, "SocketPool::send_recv(): failed to get a connection, retrying in 5 sec..."); + sleep(5); + } + } + /* TODO: make these writes not send SIGPIPE */ storagemanager::sm_msg_header hdr; hdr.type = storagemanager::SM_MSG_START; diff --git a/utils/idbdatafile/IDBPolicy.cpp b/utils/idbdatafile/IDBPolicy.cpp index 7bcdde27f..a4edfc851 100644 --- a/utils/idbdatafile/IDBPolicy.cpp +++ b/utils/idbdatafile/IDBPolicy.cpp @@ -120,14 +120,14 @@ bool IDBPolicy::isLocalFile( const std::string& path ) strmblen = funcexp::utf8::idb_wcstombs(outbuf, filepath.extension().c_str(), strmblen); string fileExt(outbuf, strmblen); #else - string fileExt = filepath.extension().c_str(); + //string fileExt = filepath.extension().c_str(); #endif - bool isXml = (fileExt == ".xml"); - + bool isXml = filepath.extension() == ".xml"; + bool isDbrm = path.find("dbrm") != string::npos; // StorageManager: make this depend on config values bool isVb = path.find("versionbuffer") != string::npos; bool isScratch = path.find(s_hdfsRdwrScratch) == 0; - return isXml || isVb || isScratch; + return isXml || isDbrm || isVb || isScratch; } IDBDataFile::Types IDBPolicy::getType( const std::string& path, Contexts ctxt )