1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Checkpointing some stuff. Doesn't work yet.

This commit is contained in:
Patrick LeBlanc
2019-06-06 15:47:15 -05:00
parent 98c9cd7b22
commit 15c256b011
4 changed files with 59 additions and 27 deletions

View File

@ -9169,11 +9169,14 @@ int ProcessManager::getDBRMData(messageqcpp::IOSocket fIos, std::string moduleNa
string oidFile; string oidFile;
oam.getSystemConfig("OIDBitmapFile", oidFile); oam.getSystemConfig("OIDBitmapFile", oidFile);
// StorageManager: Need to make these existence checks use an idbfilesystem op?
string currentDbrmFile; string currentDbrmFile;
ifstream oldFile (currentFileName.c_str()); ifstream oldFile (currentFileName.c_str());
if (oldFile) if (oldFile)
{ {
// current file found, check for OIDBitmapFile // current file found, check for OIDBitmapFile
ifstream mapFile (oidFile.c_str()); ifstream mapFile (oidFile.c_str());
@ -9290,6 +9293,7 @@ int ProcessManager::getDBRMData(messageqcpp::IOSocket fIos, std::string moduleNa
} }
// put oid file and current file in list // put oid file and current file in list
// StorageManager: no need to distribute these files
dbrmFiles.push_back(currentFileName); dbrmFiles.push_back(currentFileName);
ifstream file1 (journalFileName.c_str()); ifstream file1 (journalFileName.c_str());
@ -9324,6 +9328,7 @@ int ProcessManager::getDBRMData(messageqcpp::IOSocket fIos, std::string moduleNa
//remove any file of size 0 //remove any file of size 0
std::vector<std::string>::iterator pt1 = dbrmFiles.begin(); std::vector<std::string>::iterator pt1 = dbrmFiles.begin();
// StorageManager: ?
for ( ; pt1 != dbrmFiles.end() ; pt1++) for ( ; pt1 != dbrmFiles.end() ; pt1++)
{ {
string fileName = *pt1; string fileName = *pt1;
@ -9352,13 +9357,13 @@ int ProcessManager::getDBRMData(messageqcpp::IOSocket fIos, std::string moduleNa
} }
catch (...) catch (...)
{ {
log.writeLog(__LINE__, "EXCEPTION ERROR on cfIos.write: Unknow exception", LOG_TYPE_ERROR); log.writeLog(__LINE__, "EXCEPTION ERROR on cfIos.write: Unknown exception", LOG_TYPE_ERROR);
pthread_mutex_unlock(&THREAD_LOCK); pthread_mutex_unlock(&THREAD_LOCK);
return oam::API_FAILURE; return oam::API_FAILURE;
} }
// StorageManager: ?
pt1 = dbrmFiles.begin(); pt1 = dbrmFiles.begin();
for ( ; pt1 != dbrmFiles.end() ; pt1++) for ( ; pt1 != dbrmFiles.end() ; pt1++)
{ {
ByteStream fnmsg, fdmsg; ByteStream fnmsg, fdmsg;

View File

@ -577,8 +577,8 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
} }
if (processName == "StorageManager") // storagemanager doesn't send its own response if (processName == "StorageManager") // storagemanager doesn't send its own response
{ {
ackMsg << (uint8_t) ACK << (uint8_t) START << (uint8_t) API_SUCCESS; //ackMsg << (uint8_t) ACK << (uint8_t) START << (uint8_t) API_SUCCESS;
mq.write(ackMsg); //mq.write(ackMsg);
} }
ProcessConfig processconfig; ProcessConfig processconfig;
@ -663,9 +663,13 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
processconfig.LogFile, processconfig.LogFile,
initType, initType,
actIndicator); actIndicator);
if (processName == "StorageManager") if (processconfig.ProcessName == "StorageManager")
log.writeLog(__LINE__, "START: supposedly StorageManager was started, got processID " + processID, LOG_TYPE_DEBUG); {
log.writeLog(__LINE__, "StorageManager WTF? 6", LOG_TYPE_DEBUG);
oam.setProcessStatus("StorageManager", boost::get<0>(oam.getModuleInfo()),
oam::ACTIVE, processID);
}
if ( processID > oam::API_MAX ) if ( processID > oam::API_MAX )
processID = oam::API_SUCCESS; processID = oam::API_SUCCESS;
@ -712,8 +716,8 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
} }
if (processName == "StorageManager") // storagemanager doesn't send its own response if (processName == "StorageManager") // storagemanager doesn't send its own response
{ {
ackMsg << (uint8_t) ACK << (uint8_t) RESTART << (uint8_t) API_SUCCESS; //ackMsg << (uint8_t) ACK << (uint8_t) RESTART << (uint8_t) API_SUCCESS;
mq.write(ackMsg); // mq.write(ackMsg);
} }
processList::iterator listPtr; processList::iterator listPtr;
@ -765,9 +769,12 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
(*listPtr).DepModuleName, (*listPtr).DepModuleName,
(*listPtr).LogFile, (*listPtr).LogFile,
initType); initType);
if (processName == "StorageManager") if (listPtr->ProcessName == "StorageManager")
log.writeLog(__LINE__, "RESTART: supposedly StorageManager was started, got processID " + processID, LOG_TYPE_DEBUG); {
log.writeLog(__LINE__, "StorageManager WTF? 7", LOG_TYPE_DEBUG);
oam.setProcessStatus("StorageManager", boost::get<0>(oam.getModuleInfo()),
oam::ACTIVE, listPtr->processID);
}
if ( processID > oam::API_MAX ) if ( processID > oam::API_MAX )
processID = oam::API_SUCCESS; processID = oam::API_SUCCESS;
@ -1115,8 +1122,8 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
} }
if (processName == "StorageManager") // storagemanager doesn't send its own status if (processName == "StorageManager") // storagemanager doesn't send its own status
{ {
ackMsg << (uint8_t) ACK << (uint8_t) STARTALL << (uint8_t) API_SUCCESS; //ackMsg << (uint8_t) ACK << (uint8_t) STARTALL << (uint8_t) API_SUCCESS;
mq.write(ackMsg); //mq.write(ackMsg);
} }
if ( config.moduleType() == "pm" ) if ( config.moduleType() == "pm" )
@ -1212,9 +1219,12 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
(*listPtr).DepModuleName, (*listPtr).DepModuleName,
(*listPtr).LogFile, (*listPtr).LogFile,
initType); initType);
if (processName == "StorageManager") if (listPtr->ProcessName == "StorageManager")
log.writeLog(__LINE__, "STARTALL: supposedly StorageManager was started, got processID " + processID, LOG_TYPE_DEBUG); {
log.writeLog(__LINE__, "StorageManager WTF? 4", LOG_TYPE_DEBUG);
oam.setProcessStatus("StorageManager", boost::get<0>(oam.getModuleInfo()),
oam::ACTIVE, processID);
}
if ( processID > oam::API_MAX ) if ( processID > oam::API_MAX )
{ {
@ -1281,9 +1291,12 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
(*listPtr).DepModuleName, (*listPtr).DepModuleName,
(*listPtr).LogFile, (*listPtr).LogFile,
initType); initType);
if (processName == "StorageManager") if (listPtr->ProcessName == "StorageManager")
log.writeLog(__LINE__, "STARTALL: supposedly StorageManager was started, got processID " + processID, LOG_TYPE_DEBUG); {
log.writeLog(__LINE__, "StorageManager WTF? 5", LOG_TYPE_DEBUG);
oam.setProcessStatus("StorageManager", boost::get<0>(oam.getModuleInfo()),
oam::ACTIVE, processID);
}
if ( processID > oam::API_MAX ) if ( processID > oam::API_MAX )
processID = oam::API_SUCCESS; processID = oam::API_SUCCESS;
@ -2684,7 +2697,8 @@ pid_t ProcessMonitor::startProcess(string processModuleType, string processName,
updateProcessInfo(processName, initType, 0); updateProcessInfo(processName, initType, 0);
//sleep, give time for INIT state to be update, prevent race condition with ACTIVE //sleep, give time for INIT state to be update, prevent race condition with ACTIVE
sleep(1); if (processName != "StorageManager")
sleep(1);
//check and setup for logfile //check and setup for logfile
time_t now; time_t now;
@ -2839,7 +2853,8 @@ pid_t ProcessMonitor::startProcess(string processModuleType, string processName,
} }
//give time to get INIT status updated in shared memory //give time to get INIT status updated in shared memory
sleep(1); if (processName != "StorageManager")
sleep(1);
execv(processLocation.c_str(), argList); execv(processLocation.c_str(), argList);
if (processName == "StorageManager") if (processName == "StorageManager")
@ -4810,6 +4825,7 @@ int ProcessMonitor::runHDFSTest()
ifstream File (DataFilePlugin.c_str()); ifstream File (DataFilePlugin.c_str());
#if 0 // for storagemanager
if (!File) if (!File)
{ {
log.writeLog(__LINE__, "Error: Hadoop Datafile Plugin File (" + DataFilePlugin + ") doesn't exist", LOG_TYPE_CRITICAL); log.writeLog(__LINE__, "Error: Hadoop Datafile Plugin File (" + DataFilePlugin + ") doesn't exist", LOG_TYPE_CRITICAL);
@ -4843,6 +4859,7 @@ int ProcessMonitor::runHDFSTest()
fail = true; fail = true;
} }
} }
#endif
if (!fail) if (!fail)
{ {

View File

@ -86,10 +86,20 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream *
{ {
uint count = 0; uint count = 0;
uint length = in.length(); uint length = in.length();
int sock = getSocket(); int sock = -1;
const uint8_t *inbuf = in.buf(); const uint8_t *inbuf = in.buf();
int err = 0; int err = 0;
while (sock < 0)
{
sock = getSocket();
if (sock < 0)
{
log(logging::LOG_TYPE_ERROR, "SocketPool::send_recv(): failed to get a connection, retrying in 5 sec...");
sleep(5);
}
}
/* TODO: make these writes not send SIGPIPE */ /* TODO: make these writes not send SIGPIPE */
storagemanager::sm_msg_header hdr; storagemanager::sm_msg_header hdr;
hdr.type = storagemanager::SM_MSG_START; hdr.type = storagemanager::SM_MSG_START;

View File

@ -120,14 +120,14 @@ bool IDBPolicy::isLocalFile( const std::string& path )
strmblen = funcexp::utf8::idb_wcstombs(outbuf, filepath.extension().c_str(), strmblen); strmblen = funcexp::utf8::idb_wcstombs(outbuf, filepath.extension().c_str(), strmblen);
string fileExt(outbuf, strmblen); string fileExt(outbuf, strmblen);
#else #else
string fileExt = filepath.extension().c_str(); //string fileExt = filepath.extension().c_str();
#endif #endif
bool isXml = (fileExt == ".xml"); bool isXml = filepath.extension() == ".xml";
bool isDbrm = path.find("dbrm") != string::npos; // StorageManager: make this depend on config values
bool isVb = path.find("versionbuffer") != string::npos; bool isVb = path.find("versionbuffer") != string::npos;
bool isScratch = path.find(s_hdfsRdwrScratch) == 0; bool isScratch = path.find(s_hdfsRdwrScratch) == 0;
return isXml || isVb || isScratch; return isXml || isDbrm || isVb || isScratch;
} }
IDBDataFile::Types IDBPolicy::getType( const std::string& path, Contexts ctxt ) IDBDataFile::Types IDBPolicy::getType( const std::string& path, Contexts ctxt )