diff --git a/dbcon/ddlpackage/ddl.l b/dbcon/ddlpackage/ddl.l index 179e5c14b..aa909374f 100644 --- a/dbcon/ddlpackage/ddl.l +++ b/dbcon/ddlpackage/ddl.l @@ -75,7 +75,7 @@ ident_cont [A-Za-z\200-\377_0-9\$] identifier {ident_start}{ident_cont}* extended_identifier {ident_start}{extended_ident_cont}* /* fully qualified names regexes */ -fq_identifier {identifier}\.{identifier} +ident_w_spaces {identifier}\x20* identifier_quoted {grave_accent}{extended_identifier}{grave_accent} identifier_double_quoted {double_quote}{extended_identifier}{double_quote} diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index 789d85ad3..3b109ade0 100644 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -4853,7 +4853,6 @@ void gp_walk(const Item* item, void* arg) { gp_walk_info* gwip = reinterpret_cast(arg); idbassert(gwip); - bool isCached = false; //Bailout... if (gwip->fatalParseError) return; @@ -4866,15 +4865,17 @@ void gp_walk(const Item* item, void* arg) if (itype == Item::FUNC_ITEM && ((Item_func*)item)->functype() == Item_func::XOR_FUNC ) itype = Item::COND_ITEM; - if (item->type() == Item::CACHE_ITEM) - { - item = ((Item_cache*)item)->get_example(); - itype = item->type(); - isCached = true; - } - switch (itype) { + case Item::CACHE_ITEM: + { + // The item or condition is cached as per MariaDB server view but + // for InfiniDB it need to be parsed and executed. + // MCOL-1188 and MCOL-1029 + Item* orig_item = ((Item_cache*)item)->get_example(); + orig_item->traverse_cond(gp_walk, gwip, Item::POSTFIX); + break; + } case Item::FIELD_ITEM: { Item_field* ifp = (Item_field*)item; @@ -5086,14 +5087,10 @@ void gp_walk(const Item* item, void* arg) cc->resultType(colType_MysqlToIDB(item)); } - // cached item comes in one piece - if (!isCached) - { - for (uint32_t i = 0; i < ifp->argument_count() && !gwip->rcWorkStack.empty(); i++) - { - gwip->rcWorkStack.pop(); - } - } + for (uint32_t i = 0; i < ifp->argument_count() && !gwip->rcWorkStack.empty(); i++) + { + gwip->rcWorkStack.pop(); + } // bug 3137. If filter constant like 1=0, put it to ptWorkStack // MariaDB bug 750. Breaks if compare is an argument to a function. @@ -5170,14 +5167,6 @@ void gp_walk(const Item* item, void* arg) bool isOr = (ftype == Item_func::COND_OR_FUNC); bool isXor = (ftype == Item_func::XOR_FUNC); - // MCOL-1029 A cached COND_ITEM is something like: - // AND (TRUE OR FALSE) - // We can skip it - if (isCached) - { - break; - } - List* argumentList; List xorArgumentList; diff --git a/ddlproc/ddlproc.cpp b/ddlproc/ddlproc.cpp index 03cbe08cd..bc70b01d5 100644 --- a/ddlproc/ddlproc.cpp +++ b/ddlproc/ddlproc.cpp @@ -139,8 +139,30 @@ int main(int argc, char* argv[]) { oam.processInitComplete("DDLProc", ACTIVE); } + catch (std::exception& ex) + { + cerr << ex.what() << endl; + LoggingID logid(23, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("DDLProc init caught exception: "); + args1.add(ex.what()); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + return 1; + } catch (...) { + cerr << "Caught unknown exception in init!" << endl; + LoggingID logid(23, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("DDLProc init caught unknown exception"); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + return 1; } } @@ -151,21 +173,28 @@ int main(int argc, char* argv[]) catch (std::exception& ex) { cerr << ex.what() << endl; + LoggingID logid(23, 0, 0); Message::Args args; Message message(8); args.add("DDLProc failed on: "); args.add(ex.what()); message.format( args ); - + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, message, logid); + return 1; } catch (...) { cerr << "Caught unknown exception!" << endl; + LoggingID logid(23, 0, 0); Message::Args args; Message message(8); args.add("DDLProc failed on: "); - args.add("receiving DDLPackage"); + args.add("receiving DDLPackage (unknown exception)"); message.format( args ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, message, logid); + return 1; } return 0; diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index 64282d3ca..8369a2003 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -534,8 +534,30 @@ int main(int argc, char* argv[]) // At first we set to BUSY_INIT oam.processInitComplete("DMLProc", oam::BUSY_INIT); } + catch (std::exception& ex) + { + cerr << ex.what() << endl; + LoggingID logid(21, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("DMLProc init caught exception: "); + args1.add(ex.what()); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + return 1; + } catch (...) { + cerr << "Caught unknown exception in init!" << endl; + LoggingID logid(21, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("DMLProc init caught unknown exception"); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + return 1; } //@Bug 1627 @@ -632,8 +654,30 @@ int main(int argc, char* argv[]) { oam.processInitComplete("DMLProc", ACTIVE); } + catch (std::exception& ex) + { + cerr << ex.what() << endl; + LoggingID logid(21, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("DMLProc init caught exception: "); + args1.add(ex.what()); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + return 1; + } catch (...) { + cerr << "Caught unknown exception in init!" << endl; + LoggingID logid(21, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("DMLProc init caught unknown exception"); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + return 1; } Dec = DistributedEngineComm::instance(rm); diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index f143ced83..861198d45 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -1225,8 +1225,28 @@ void DMLServer::start() cancelThread.join(); } + catch (std::exception& ex) + { + cerr << ex.what() << endl; + logging::LoggingID lid(21); + Message::Args args; + Message message(8); + args.add("DMLProc init caught exception: "); + args.add(ex.what()); + message.format(args); + logging::Logger logger(lid.fSubsysID); + logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid); + } catch (...) { + cerr << "Caught unknown exception!" << endl; + logging::LoggingID lid(21); + Message::Args args; + Message message(8); + args.add("DMLProc init caught unknown exception"); + message.format(args); + logging::Logger logger(lid.fSubsysID); + logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid); } } diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 4e3b42347..528990ec8 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -1389,8 +1389,34 @@ void cleanTempDir() assert(tmpPrefix != "/"); /* This is quite scary as ExeMgr usually runs as root */ - boost::filesystem::remove_all(tmpPrefix); - boost::filesystem::create_directories(tmpPrefix); + try + { + boost::filesystem::remove_all(tmpPrefix); + boost::filesystem::create_directories(tmpPrefix); + } + catch (std::exception& ex) + { + cerr << ex.what() << endl; + LoggingID logid(16, 0, 0); + Message::Args args; + Message message(8); + args.add("Execption whilst cleaning tmpdir: "); + args.add(ex.what()); + message.format( args ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_WARNING, message, logid); + } + catch (...) + { + cerr << "Caught unknown exception during tmpdir cleanup" << endl; + LoggingID logid(16, 0, 0); + Message::Args args; + Message message(8); + args.add("Unknown execption whilst cleaning tmpdir"); + message.format( args ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_WARNING, message, logid); + } } diff --git a/oam/install_scripts/columnstoreAlias b/oam/install_scripts/columnstoreAlias index cd225c1a9..255eb7e7e 100644 --- a/oam/install_scripts/columnstoreAlias +++ b/oam/install_scripts/columnstoreAlias @@ -10,5 +10,8 @@ alias core='cd /var/log/mariadb/columnstore/corefiles' alias tmsg='tail -f /var/log/messages' alias tdebug='tail -f /var/log/mariadb/columnstore/debug.log' alias tinfo='tail -f /var/log/mariadb/columnstore/info.log' +alias terror='tail -f /var/log/mariadb/columnstore/err.log' +alias twarning='tail -f /var/log/mariadb/columnstore/warning.log' +alias tcrit='tail -f /var/log/mariadb/columnstore/crit.log' alias dbrm='cd /usr/local/mariadb/columnstore/data1/systemFiles/dbrm' alias module='cat /usr/local/mariadb/columnstore/local/module' diff --git a/oam/oamcpp/liboamcpp.cpp b/oam/oamcpp/liboamcpp.cpp index bba892b0d..9c5c80eab 100644 --- a/oam/oamcpp/liboamcpp.cpp +++ b/oam/oamcpp/liboamcpp.cpp @@ -8259,8 +8259,7 @@ std::string Oam::getEC2LocalInstance(std::string name) // run script to get Instance status and IP Address string cmd = InstallDir + "/bin/MCSInstanceCmds.sh getInstance > /tmp/getInstanceInfo_" + name; int status = system(cmd.c_str()); - - if (WEXITSTATUS(status) != 0 ) + if (WEXITSTATUS(status) == 1 ) return "failed"; // get Instance Name @@ -8293,8 +8292,7 @@ std::string Oam::getEC2LocalInstanceType(std::string name) // run script to get Instance status and IP Address string cmd = InstallDir + "/bin/MCSInstanceCmds.sh getType > /tmp/getInstanceType_" + name; int status = system(cmd.c_str()); - - if (WEXITSTATUS(status) != 0 ) + if (WEXITSTATUS(status) == 1 ) return "failed"; // get Instance Name @@ -8327,8 +8325,7 @@ std::string Oam::getEC2LocalInstanceSubnet(std::string name) // run script to get Instance Subnet string cmd = InstallDir + "/bin/MCSInstanceCmds.sh getSubnet > /tmp/getInstanceSubnet_" + name; int status = system(cmd.c_str()); - - if (WEXITSTATUS(status) != 0 ) + if (WEXITSTATUS(status) == 1 ) return "failed"; // get Instance Name @@ -8362,8 +8359,7 @@ std::string Oam::launchEC2Instance( const std::string name, const std::string IP // run script to get Instance status and IP Address string cmd = InstallDir + "/bin/MCSInstanceCmds.sh launchInstance " + IPAddress + " " + type + " " + group + " > /tmp/getInstance_" + name; int status = system(cmd.c_str()); - - if (WEXITSTATUS(status) != 0 ) + if (WEXITSTATUS(status) == 1 ) return "failed"; if (checkLogStatus("/tmp/getInstance", "Required") ) @@ -8441,8 +8437,7 @@ bool Oam::startEC2Instance(std::string instanceName) // run script to get Instance status and IP Address string cmd = InstallDir + "/bin/MCSInstanceCmds.sh startInstance " + instanceName + " > /tmp/startEC2Instance_" + instanceName; int ret = system(cmd.c_str()); - - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) return false; return true; @@ -8461,8 +8456,7 @@ bool Oam::assignElasticIP(std::string instanceName, std::string IpAddress) // run script to get Instance status and IP Address string cmd = InstallDir + "/bin/MCSInstanceCmds.sh assignElasticIP " + instanceName + " " + IpAddress + " > /tmp/assignElasticIP_" + instanceName; int ret = system(cmd.c_str()); - - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) exceptionControl("assignElasticIP", oam::API_FAILURE); return true; @@ -8481,8 +8475,7 @@ bool Oam::deassignElasticIP(std::string IpAddress) // run script to get Instance status and IP Address string cmd = InstallDir + "/bin/MCSInstanceCmds.sh deassignElasticIP " + IpAddress + " > /tmp/deassignElasticIP_" + IpAddress; int ret = system(cmd.c_str()); - - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) exceptionControl("deassignElasticIP", oam::API_FAILURE); return true; @@ -8501,9 +8494,9 @@ std::string Oam::getEC2VolumeStatus(std::string volumeName) // run script to get Volume Status string cmd = InstallDir + "/bin/MCSVolumeCmds.sh describe " + volumeName + " > /tmp/getVolumeStatus_" + volumeName; int ret = system(cmd.c_str()); - - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ){ return "failed"; + } // get status string status; @@ -8535,8 +8528,7 @@ std::string Oam::createEC2Volume(std::string size, std::string name) // run script to get Volume Status string cmd = InstallDir + "/bin/MCSVolumeCmds.sh create " + size + " " + name + " > /tmp/createVolumeStatus_" + name; int ret = system(cmd.c_str()); - - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) return "failed"; // get status @@ -8584,12 +8576,16 @@ bool Oam::attachEC2Volume(std::string volumeName, std::string deviceName, std::s string cmd = InstallDir + "/bin/MCSVolumeCmds.sh attach " + volumeName + " " + instanceName + " " + deviceName + " > /tmp/attachVolumeStatus_" + volumeName; ret = system(cmd.c_str()); - if (WEXITSTATUS(ret) == 0 ) - return true; + if (WEXITSTATUS(ret) == 1 ) + { + //failing to attach, dettach and retry + writeLog("attachEC2Volume: Attach failed, call detach:" + volumeName + " " + instanceName + " " + deviceName, LOG_TYPE_ERROR ); - //failing to attach, dettach and retry - detachEC2Volume(volumeName); - } + detachEC2Volume(volumeName); + } + else + return true; + } if (ret == 0 ) return true; @@ -8610,8 +8606,7 @@ bool Oam::detachEC2Volume(std::string volumeName) // run script to attach Volume string cmd = InstallDir + "/bin/MCSVolumeCmds.sh detach " + volumeName + " > /tmp/detachVolumeStatus_" + volumeName; int ret = system(cmd.c_str()); - - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) return false; return true; @@ -8630,8 +8625,7 @@ bool Oam::deleteEC2Volume(std::string volumeName) // run script to delete Volume string cmd = InstallDir + "/bin/MCSVolumeCmds.sh delete " + volumeName + " > /tmp/deleteVolumeStatus_" + volumeName; int ret = system(cmd.c_str()); - - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) return false; return true; @@ -8650,8 +8644,7 @@ bool Oam::createEC2tag(std::string resourceName, std::string tagName, std::strin // run script to create a tag string cmd = InstallDir + "/bin/MCSVolumeCmds.sh createTag " + resourceName + " " + tagName + " " + tagValue + " > /tmp/createTagStatus_" + resourceName; int ret = system(cmd.c_str()); - - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) return false; return true; diff --git a/oamapps/mcsadmin/mcsadmin.cpp b/oamapps/mcsadmin/mcsadmin.cpp index c1f42f52c..95b759812 100644 --- a/oamapps/mcsadmin/mcsadmin.cpp +++ b/oamapps/mcsadmin/mcsadmin.cpp @@ -7841,15 +7841,24 @@ int processCommand(string* arguments) { try { - cout << endl << " Starting Modules" << endl; - oam.startModule(devicenetworklist, ackTemp); - - //reload DBRM with new configuration, needs to be done here after startModule - cmd = startup::StartUp::installDir() + "/bin/dbrmctl reload > /dev/null 2>&1"; - system(cmd.c_str()); - sleep(15); - - cout << " Successful start of Modules " << endl; + cout << endl << " Restarting System " << endl; + gracefulTemp = oam::FORCEFUL; + int returnStatus = oam.restartSystem(gracefulTemp, ackTemp); + switch (returnStatus) + { + case API_SUCCESS: + if ( waitForActive() ) + cout << endl << " Successful restart of System " << endl << endl; + else + cout << endl << "**** restartSystem Failed : check log files" << endl; + break; + case API_CANCELLED: + cout << endl << " Restart of System canceled" << endl << endl; + break; + default: + cout << endl << "**** restartSystem Failed : Check system logs" << endl; + break; + } } catch (exception& e) { diff --git a/primitives/blockcache/filebuffermgr.cpp b/primitives/blockcache/filebuffermgr.cpp index 65ad2b8f0..0d3032743 100644 --- a/primitives/blockcache/filebuffermgr.cpp +++ b/primitives/blockcache/filebuffermgr.cpp @@ -120,7 +120,10 @@ void FileBufferMgr::flushCache() // the block pool should not be freed in the above block to allow us // to continue doing concurrent unprotected-but-"safe" memcpys // from that memory - + if (fReportFrequency) + { + fLog << "Clearing entire cache" << endl; + } fFBPool.clear(); // fFBPool.reserve(fMaxNumBlocks); } @@ -154,7 +157,15 @@ void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt) BRM::LBID_t lbid; BRM::VER_t ver; filebuffer_uset_iter_t iter; - + if (fReportFrequency) + { + fLog << "flushMany " << cnt << " items: "; + for (uint32_t j = 0; j < cnt; j++) + { + fLog << "lbid: " << laVptr[j].LBID << " ver: " << laVptr[j].Ver << ", "; + } + fLog << endl; + } for (uint32_t j = 0; j < cnt; j++) { lbid = static_cast(laVptr->LBID); @@ -163,6 +174,10 @@ void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt) if (iter != fbSet.end()) { + if (fReportFrequency) + { + fLog << "flushMany hit, lbid: " << lbid << " index: " << iter->poolIdx << endl; + } //remove it from fbList uint32_t idx = iter->poolIdx; fbList.erase(fFBPool[idx].listLoc()); @@ -186,6 +201,16 @@ void FileBufferMgr::flushManyAllversion(const LBID_t* laVptr, uint32_t cnt) mutex::scoped_lock lk(fWLock); + if (fReportFrequency) + { + fLog << "flushManyAllversion " << cnt << " items: "; + for (uint32_t i = 0; i < cnt; i++) + { + fLog << laVptr[i] << ", "; + } + fLog << endl; + } + if (fCacheSize == 0 || cnt == 0) return; @@ -194,8 +219,10 @@ void FileBufferMgr::flushManyAllversion(const LBID_t* laVptr, uint32_t cnt) for (it = fbSet.begin(); it != fbSet.end();) { - if (uniquer.find(it->lbid) != uniquer.end()) - { + if (fReportFrequency) + { + fLog << "flushManyAllversion hit: " << it->lbid << " index: " << it->poolIdx << endl; + } const uint32_t idx = it->poolIdx; fbList.erase(fFBPool[idx].listLoc()); fEmptyPoolSlots.push_back(idx); @@ -222,6 +249,16 @@ void FileBufferMgr::flushOIDs(const uint32_t* oids, uint32_t count) pair itList; filebuffer_uset_t::iterator it; + if (fReportFrequency) + { + fLog << "flushOIDs " << count << " items: "; + for (uint32_t i = 0; i < count; i++) + { + fLog << oids[i] << ", "; + } + fLog << endl; + } + // If there are more than this # of extents to drop, the whole cache will be cleared const uint32_t clearThreshold = 50000; @@ -286,6 +323,22 @@ void FileBufferMgr::flushPartition(const vector& oids, const set::iterator sit; + fLog << "flushPartition oids: "; + for (uint32_t i = 0; i < count; i++) + { + fLog << oids[i] << ", "; + } + fLog << "flushPartition partitions: "; + for (sit = partitions.begin(); sit != partitions.end(); ++sit) + { + fLog << (*sit).toString() << ", "; + } + fLog << endl; + } + if (fCacheSize == 0 || oids.size() == 0 || partitions.size() == 0) return; @@ -554,7 +607,7 @@ int FileBufferMgr::insert(const BRM::LBID_t lbid, const BRM::VER_t ver, const ui { struct timespec tm; clock_gettime(CLOCK_MONOTONIC, &tm); - fLog + fLog << "insert: " << left << fixed << ((double)(tm.tv_sec + (1.e-9 * tm.tv_nsec))) << " " << right << setw(12) << fBlksLoaded << " " << right << setw(12) << fBlksNotUsed << endl; @@ -743,9 +796,13 @@ int FileBufferMgr::bulkInsert(const vector& ops) mutex::scoped_lock lk(fWLock); - for (i = 0; i < ops.size(); i++) + if (fReportFrequency) { - const CacheInsert_t& op = ops[i]; + fLog << "bulkInsert: "; + } + + for (i = 0; i < ops.size(); i++) { + const CacheInsert_t &op = ops[i]; if (gPMProfOn && gPMStatsPtr) #ifdef _MSC_VER @@ -770,7 +827,10 @@ int FileBufferMgr::bulkInsert(const vector& ops) continue; } - //cout << "FBM: inserting <" << op.lbid << ", " << op.ver << endl; + if (fReportFrequency) + { + fLog << op.lbid << " " << op.ver << ", "; + } fCacheSize++; fBlksLoaded++; FBData_t fbdata = {op.lbid, op.ver, 0}; @@ -790,7 +850,10 @@ int FileBufferMgr::bulkInsert(const vector& ops) #endif ret++; } - + if (fReportFrequency) + { + fLog << endl; + } idbassert(fCacheSize <= maxCacheSize()); return ret; diff --git a/procmgr/main.cpp b/procmgr/main.cpp index 0aff2dcd6..2ca22a936 100644 --- a/procmgr/main.cpp +++ b/procmgr/main.cpp @@ -1617,7 +1617,7 @@ void pingDeviceThread() if (moduleInfoList[moduleName] >= ModuleHeartbeatCount || opState == oam::DOWN || opState == oam::AUTO_DISABLED) { - log.writeLog(__LINE__, "Module alive, bring it back online: " + moduleName, LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "*** Module alive, bring it back online: " + moduleName, LOG_TYPE_DEBUG); string PrimaryUMModuleName = config.moduleName(); @@ -2089,7 +2089,7 @@ void pingDeviceThread() { //Log failure, issue alarm, set moduleOpState Configuration config; - log.writeLog(__LINE__, "module is down: " + moduleName, LOG_TYPE_CRITICAL); + log.writeLog(__LINE__, "*** module is down: " + moduleName, LOG_TYPE_CRITICAL); //set query system state not ready processManager.setQuerySystemState(false); @@ -2173,9 +2173,6 @@ void pingDeviceThread() // resume the dbrm oam.dbrmctl("resume"); log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); - - //set recycle process - processManager.recycleProcess(moduleName); } // return values = 'ip address' for running or rebooting, stopped or terminated diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index 595a86f88..2594ff5a0 100644 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -629,10 +629,12 @@ void processMSG(messageqcpp::IOSocket* cfIos) if ( count > 0 ) { + string module = oam::UnassignedName; for (int i = 0; i < count; i++) { msg >> value; devicenetworkconfig.DeviceName = value; + module = value; msg >> value; devicenetworkconfig.UserTempDeviceName = value; msg >> value; @@ -662,24 +664,21 @@ void processMSG(messageqcpp::IOSocket* cfIos) if ( status == API_SUCCESS) { - //distribute config file - processManager.distributeConfigFile("system"); + processManager.setSystemState(oam::BUSY_INIT); - //call dbrm control - oam.dbrmctl("halt"); - log.writeLog(__LINE__, "'dbrmctl halt' done", LOG_TYPE_DEBUG); + //set query system state not ready + processManager.setQuerySystemState(false); - oam.dbrmctl("reload"); - log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG); + //set recycle process + processManager.recycleProcess(target, true); - oam.dbrmctl("resume"); - log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); + //distribute config file + processManager.distributeConfigFile("system"); + + //set query system state ready + processManager.setQuerySystemState(true); -// processManager.restartProcessType("ExeMgr"); - - //setup MySQL Replication for started modules -// log.writeLog(__LINE__, "Setup MySQL Replication for module being started", LOG_TYPE_DEBUG); -// processManager.setMySQLReplication(startdevicenetworklist); + processManager.setSystemState(oam::ACTIVE); } } else @@ -922,7 +921,6 @@ void processMSG(messageqcpp::IOSocket* cfIos) //set query system state ready processManager.setQuerySystemState(true); - } else { @@ -1771,7 +1769,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) oam::DeviceNetworkList devicenetworklist; pthread_t startsystemthread; - pthread_create (&startsystemthread, NULL, (void* (*)(void*)) &startSystemThread, &devicenetworklist); + status = pthread_create (&startsystemthread, NULL, (void*(*)(void*)) &startSystemThread, &devicenetworklist); if ( status != 0 ) { @@ -1781,20 +1779,19 @@ void processMSG(messageqcpp::IOSocket* cfIos) if (status == 0 && ackIndicator) { - // BUG 4554 We don't need the join because calpont console is now looking for "Active" - // We need to return the ack right away to let console know we got the message. -// pthread_join(startsystemthread, NULL); -// status = startsystemthreadStatus; + pthread_join(startsystemthread, NULL); + status = startsystemthreadStatus; } - - // setup MySQL Replication after switchover command - /* if (graceful == FORCEFUL) + + // setup MySQL Replication after FORCE restart command + if ( (status == API_SUCCESS) && + (graceful == oam::FORCEFUL) ) { - log.writeLog(__LINE__, "Setup MySQL Replication for restartSystem FORCE, used by switch-parent command", LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "Setup MySQL Replication for restartSystem FORCE", LOG_TYPE_DEBUG); oam::DeviceNetworkList devicenetworklist; - processManager.setMySQLReplication(devicenetworklist); + processManager.setMySQLReplication(devicenetworklist, oam::UnassignedName, true); } - */ + log.writeLog(__LINE__, "RESTARTSYSTEM: Start System Request Completed", LOG_TYPE_INFO); } @@ -3064,15 +3061,16 @@ void processMSG(messageqcpp::IOSocket* cfIos) processManager.reinitProcessType("cpimport"); //request reinit after Process is active - for ( int i = 0; i < 600 ; i++ ) - { - try - { - ProcessStatus procstat; - oam.getProcessStatus(processName, moduleName, procstat); + for ( int i = 0; i < 10 ; i++ ) { + try { + ProcessStatus procstat; + oam.getProcessStatus(processName, moduleName, procstat); - if (procstat.ProcessOpState == oam::ACTIVE) - { + if (procstat.ProcessOpState == oam::COLD_STANDBY) + break; + + if ( (procstat.ProcessOpState == oam::ACTIVE) || + (procstat.ProcessOpState == oam::STANDBY) ) { // if a PrimProc was restarted, reinit ACTIVE ExeMgr(s) and DDL/DMLProc if ( processName == "PrimProc") { @@ -3159,11 +3157,14 @@ void processMSG(messageqcpp::IOSocket* cfIos) } - // if a DDLProc was restarted, reinit DMLProc + // if a DDLProc was restarted, restart DMLProc if ( processName == "DDLProc") { processManager.reinitProcessType("DMLProc"); + //set query system states ready processManager.setQuerySystemState(true); + + processManager.setSystemState(oam::ACTIVE); } //only run on auto process restart @@ -3221,6 +3222,8 @@ void processMSG(messageqcpp::IOSocket* cfIos) processManager.setQuerySystemState(true); processManager.setSystemState(oam::ACTIVE); + + log.writeLog(__LINE__, "MSG RECEIVED: Process Restarted Completed"); } break; @@ -3603,6 +3606,7 @@ int ProcessManager::shutdownModule(string target, ByteStream::byte actionIndicat int ProcessManager::disableModule(string target, bool manualFlag) { Oam oam; + ProcessManager processManager(config, log); ModuleConfig moduleconfig; log.writeLog(__LINE__, "disableModule request for " + target, LOG_TYPE_DEBUG); @@ -3718,6 +3722,11 @@ int ProcessManager::disableModule(string target, bool manualFlag) if ( updateWorkerNodeconfig() != API_SUCCESS ) return API_FAILURE; + processManager.recycleProcess(target); + + //check for SIMPLEX Processes on mate might need to be started + processManager.checkSimplexModule(target); + //distribute config file distributeConfigFile("system"); @@ -3729,7 +3738,7 @@ int ProcessManager::disableModule(string target, bool manualFlag) /****************************************************************************************** * @brief recycleProcess * -* purpose: recyle process, generally after some disable module is run +* purpose: recyle process, done after disable/enable module * ******************************************************************************************/ void ProcessManager::recycleProcess(string module, bool enableModule) @@ -3749,53 +3758,40 @@ void ProcessManager::recycleProcess(string module, bool enableModule) } catch (...) {} - // restart DBRM Process and DMLProc and return if enable module is being done - if (enableModule) - { - //recycle DBRM processes in all cases - restartProcessType("DBRMControllerNode"); - restartProcessType("DBRMWorkerNode"); + stopProcessType("WriteEngineServer"); - restartProcessType("DMLProc"); - return; - } + stopProcessType("ExeMgr"); + + stopProcessType("PrimProc"); - //recycle DBRM processes in all cases - restartProcessType("DBRMControllerNode", module); - restartProcessType("DBRMWorkerNode"); + stopProcessType("DBRMControllerNode"); + stopProcessType("DBRMWorkerNode"); - // only recycle dmlproc, if down/up module is non-parent UM - if ( ( moduleType == "um" ) && - ( PrimaryUMModuleName != module) ) - { - restartProcessType("DMLProc", module); - return; - } + stopProcessType("DDLProc"); + stopProcessType("DMLProc"); - if ( PrimaryUMModuleName == module) - { - stopProcessType("DDLProc"); - stopProcessType("DMLProc"); - } + stopProcessType("mysqld"); - stopProcessType("ExeMgr"); +// restartProcessType("mysqld"); + + startProcessType("DBRMControllerNode"); + startProcessType("DBRMWorkerNode"); - restartProcessType("PrimProc"); - sleep(1); - - restartProcessType("mysqld"); - - restartProcessType("WriteEngineServer"); - sleep(1); + startProcessType("PrimProc"); + sleep(5); + + startProcessType("WriteEngineServer"); + sleep(3); startProcessType("ExeMgr"); - sleep(1); startProcessType("DDLProc"); sleep(1); startProcessType("DMLProc"); + startProcessType("mysqld"); + return; } @@ -3843,11 +3839,7 @@ int ProcessManager::enableModule(string target, int state, bool failover) if ( newStandbyModule == target) setStandbyModule(newStandbyModule); - - //set recycle process - if (!failover) - recycleProcess(target); - + log.writeLog(__LINE__, "enableModule request for " + target + " completed", LOG_TYPE_DEBUG); return API_SUCCESS; @@ -7108,17 +7100,27 @@ void ProcessManager::setQuerySystemState(bool set) Oam oam; BRM::DBRM dbrm; - log.writeLog(__LINE__, "setQuerySystemState = " + oam.itoa(set), LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "setQuerySystemState called = " + oam.itoa(set), LOG_TYPE_DEBUG); try { dbrm.setSystemQueryReady(set); - log.writeLog(__LINE__, "setQuerySystemState successful", LOG_TYPE_DEBUG); - } - catch (...) - { - log.writeLog(__LINE__, "setQuerySystemState failed", LOG_TYPE_DEBUG); - log.writeLog(__LINE__, "setQuerySystemState failed", LOG_TYPE_ERROR); + log.writeLog(__LINE__, "setSystemQueryReady = " + oam.itoa(set), LOG_TYPE_DEBUG); + + try { + dbrm.setSystemReady(set); + log.writeLog(__LINE__, "setSystemReady = " + oam.itoa(set), LOG_TYPE_DEBUG); + } + catch(...) + { + log.writeLog(__LINE__, "setSystemReady failed", LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "setSystemReady failed", LOG_TYPE_ERROR); + } + } + catch(...) + { + log.writeLog(__LINE__, "setSystemQueryReady failed", LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "setSystemQueryReady failed", LOG_TYPE_ERROR); } } @@ -7723,25 +7725,30 @@ void startSystemThread(oam::DeviceNetworkList Devicenetworklist) if (DMLprocessstatus.ProcessOpState == oam::ACTIVE) { - rtn = oam::ACTIVE; + rtn = oam::ACTIVE; break; } if (DMLprocessstatus.ProcessOpState == oam::FAILED) { - rtn = oam::FAILED; + rtn = oam::FAILED; + status = oam::API_FAILURE; break; } - // wait some more - sleep(2); - } + // wait some more + sleep(2); + } + + if ( rtn = oam::ACTIVE ) + //set query system state not ready + processManager.setQuerySystemState(true); - processManager.setSystemState(rtn); + processManager.setSystemState(rtn); } + else + processManager.setSystemState(oam::FAILED); - //set query system state ready - processManager.setQuerySystemState(true); // exit thread log.writeLog(__LINE__, "startSystemThread Exit", LOG_TYPE_DEBUG); diff --git a/procmon/main.cpp b/procmon/main.cpp index eb2e6dd1f..3e383ab10 100644 --- a/procmon/main.cpp +++ b/procmon/main.cpp @@ -224,7 +224,7 @@ int main(int argc, char** argv) } catch (...) {} - if ( cloud == "amazon-ec2" ) + if ( cloud == "amazon-ec2" || cloud == "amazon-vpc" ) { { if (!aMonitor.amazonIPCheck()) { @@ -1522,7 +1522,7 @@ static void chldHandleThread(MonitorConfig config) catch (...) {} - // check if process failover is needed due to process outage + // check if Mdoule failover is needed due to process outage aMonitor.checkModuleFailover((*listPtr).ProcessName); //check the db health @@ -1604,17 +1604,20 @@ static void chldHandleThread(MonitorConfig config) restartStatus = " restart failed with hard failure, don't retry!!"; (*listPtr).processID = 0; - // check if process failover is needed due to process outage + // check if Module failover is needed due to process outage aMonitor.checkModuleFailover((*listPtr).ProcessName); break; } else { if ( (*listPtr).processID != oam::API_MINOR_FAILURE ) + { //restarted successful + //Inform Process Manager that Process restart + aMonitor.processRestarted( (*listPtr).ProcessName, false); break; - } - + } + } // restart failed with minor error, sleep and try sleep(5); } @@ -2695,22 +2698,6 @@ void processStatusMSG(messageqcpp::IOSocket* cfIos) memcpy(fShmSystemStatus[0].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE); log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Set System State = " + oamState[state], LOG_TYPE_DEBUG); } - - //if DMLProc set to ACTIVE, set system state to ACTIVE if in an INIT state - if ( processName == "DMLProc" && state == oam::ACTIVE ) - { - if ( fShmSystemStatus[0].OpState == oam::BUSY_INIT || - fShmSystemStatus[0].OpState == oam::MAN_INIT || - fShmSystemStatus[0].OpState == oam::AUTO_INIT ) - { - fShmSystemStatus[0].OpState = state; - memcpy(fShmSystemStatus[0].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE); - log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Set System State = " + oamState[state], LOG_TYPE_DEBUG); - } - - BRM::DBRM dbrm; - dbrm.setSystemQueryReady(true); - } } break; diff --git a/procmon/processmonitor.cpp b/procmon/processmonitor.cpp index 2eb038189..e02f821bd 100644 --- a/procmon/processmonitor.cpp +++ b/procmon/processmonitor.cpp @@ -664,9 +664,6 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO else log.writeLog(__LINE__, "START: process already active " + processName); - //Inform Process Manager that Process restart - //processRestarted(processName); - ackMsg << (ByteStream::byte) ACK; ackMsg << (ByteStream::byte) START; ackMsg << (ByteStream::byte) requestStatus; @@ -771,9 +768,6 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO requestStatus = API_FAILURE; } - //Inform Process Manager that Process restart - //processRestarted(processName); - ackMsg << (ByteStream::byte) ACK; ackMsg << (ByteStream::byte) RESTART; ackMsg << (ByteStream::byte) requestStatus; @@ -4999,7 +4993,6 @@ void ProcessMonitor::checkModuleFailover( std::string processName) { // found a AVAILABLE mate, start it log.writeLog(__LINE__, "Change UM Master to module " + systemprocessstatus.processstatus[i].Module, LOG_TYPE_DEBUG); - log.writeLog(__LINE__, "Disable local UM module " + config.moduleName(), LOG_TYPE_DEBUG); log.writeLog(__LINE__, "Stop local UM module " + config.moduleName(), LOG_TYPE_DEBUG); log.writeLog(__LINE__, "Disable Local will Enable UM module " + systemprocessstatus.processstatus[i].Module, LOG_TYPE_DEBUG); @@ -5916,7 +5909,6 @@ bool ProcessMonitor::amazonIPCheck() log.writeLog(__LINE__, "Assign Elastic IP Address failed : '" + moduleName + "' / '" + ELIPaddress, LOG_TYPE_ERROR); break; } - break; } @@ -6095,8 +6087,11 @@ bool ProcessMonitor::amazonVolumeCheck(int dbrootID) if (oam.attachEC2Volume(volumeName, deviceName, instanceName)) { + log.writeLog(__LINE__, "amazonVolumeCheck function , volume to attached: " + volumeName, LOG_TYPE_DEBUG); + string cmd = "mount " + startup::StartUp::installDir() + "/data" + oam.itoa(dbrootID) + " > /dev/null"; system(cmd.c_str()); + log.writeLog(__LINE__, "amazonVolumeCheck function , volume to mounted: " + volumeName, LOG_TYPE_DEBUG); return true; } else diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index 4c043ebbb..92fd0ad98 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -42,15 +42,22 @@ PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads uint midThreads, uint lowThreads, uint ID) : _stop(false), weightPerRun(targetWeightPerRun), id(ID) { + boost::thread* newThread; for (uint32_t i = 0; i < highThreads; i++) - threads.create_thread(ThreadHelper(this, HIGH)); - + { + newThread = threads.create_thread(ThreadHelper(this, HIGH)); + newThread->detach(); + } for (uint32_t i = 0; i < midThreads; i++) - threads.create_thread(ThreadHelper(this, MEDIUM)); - + { + newThread = threads.create_thread(ThreadHelper(this, MEDIUM)); + newThread->detach(); + } for (uint32_t i = 0; i < lowThreads; i++) - threads.create_thread(ThreadHelper(this, LOW)); - + { + newThread = threads.create_thread(ThreadHelper(this, LOW)); + newThread->detach(); + } cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads << " low.\n"; defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads; @@ -65,6 +72,7 @@ PriorityThreadPool::~PriorityThreadPool() void PriorityThreadPool::addJob(const Job& job, bool useLock) { + boost::thread* newThread; mutex::scoped_lock lk(mutex, defer_lock_t()); if (useLock) @@ -73,19 +81,22 @@ void PriorityThreadPool::addJob(const Job& job, bool useLock) // Create any missing threads if (defaultThreadCounts[HIGH] != threadCounts[HIGH]) { - threads.create_thread(ThreadHelper(this, HIGH)); + newThread = threads.create_thread(ThreadHelper(this, HIGH)); + newThread->detach(); threadCounts[HIGH]++; } if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM]) { - threads.create_thread(ThreadHelper(this, MEDIUM)); + newThread = threads.create_thread(ThreadHelper(this, MEDIUM)); + newThread->detach(); threadCounts[MEDIUM]++; } if (defaultThreadCounts[LOW] != threadCounts[LOW]) { - threads.create_thread(ThreadHelper(this, LOW)); + newThread = threads.create_thread(ThreadHelper(this, LOW)); + newThread->detach(); threadCounts[LOW]++; } @@ -281,7 +292,6 @@ void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveproce void PriorityThreadPool::stop() { _stop = true; - threads.join_all(); } } // namespace threadpool diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index f1aa4ec19..a12e574b8 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -43,7 +43,8 @@ ThreadPool::ThreadPool() } ThreadPool::ThreadPool( size_t maxThreads, size_t queueSize ) - : fMaxThreads( maxThreads ), fQueueSize( queueSize ) + :fMaxThreads( maxThreads ), fQueueSize( queueSize ), + fPruneThread( NULL ) { init(); } @@ -72,6 +73,7 @@ void ThreadPool::init() fStop = false; fNextFunctor = fWaitingFunctors.end(); fNextHandle = 1; + fPruneThread = new boost::thread(boost::bind(&ThreadPool::pruneThread, this)); } void ThreadPool::setQueueSize(size_t queueSize) @@ -80,6 +82,39 @@ void ThreadPool::setQueueSize(size_t queueSize) fQueueSize = queueSize; } +void ThreadPool::pruneThread() +{ + boost::mutex::scoped_lock lock2(fPruneMutex); + + while(true) + { + boost::system_time timeout = boost::get_system_time() + boost::posix_time::minutes(1); + if (!fPruneThreadEnd.timed_wait(fPruneMutex, timeout)) + { + while(!fPruneThreads.empty()) + { + if (fDebug) + { + ostringstream oss; + oss << "pruning thread " << fPruneThreads.top(); + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } + fThreads.join_one(fPruneThreads.top()); + fPruneThreads.pop(); + } + } + else + { + break; + } + } +} void ThreadPool::setMaxThreads(size_t maxThreads) { @@ -93,6 +128,9 @@ void ThreadPool::stop() fStop = true; lock1.unlock(); + fPruneThreadEnd.notify_all(); + fPruneThread->join(); + delete fPruneThread; fNeedThread.notify_all(); fThreads.join_all(); } @@ -305,6 +343,8 @@ void ThreadPool::beginThread() throw() { if (fThreadCount > fMaxThreads) { + boost::mutex::scoped_lock lock2(fPruneMutex); + fPruneThreads.push(boost::this_thread::get_id()); --fThreadCount; return; } diff --git a/utils/threadpool/threadpool.h b/utils/threadpool/threadpool.h index 11d67338f..7bc605472 100644 --- a/utils/threadpool/threadpool.h +++ b/utils/threadpool/threadpool.h @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -51,6 +52,106 @@ namespace threadpool { + +// Taken from boost::thread_group and adapted +class ThreadPoolGroup +{ +private: + ThreadPoolGroup(ThreadPoolGroup const&); + ThreadPoolGroup& operator=(ThreadPoolGroup const&); +public: + ThreadPoolGroup() {} + ~ThreadPoolGroup() + { + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + delete *it; + } + } + + template + boost::thread* create_thread(F threadfunc) + { + boost::lock_guard guard(m); + std::auto_ptr new_thread(new boost::thread(threadfunc)); + threads.push_back(new_thread.get()); + return new_thread.release(); + } + + void add_thread(boost::thread* thrd) + { + if(thrd) + { + boost::lock_guard guard(m); + threads.push_back(thrd); + } + } + + void remove_thread(boost::thread* thrd) + { + boost::lock_guard guard(m); + std::list::iterator const it=std::find(threads.begin(),threads.end(),thrd); + if(it!=threads.end()) + { + threads.erase(it); + } + } + + void join_all() + { + boost::shared_lock guard(m); + + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + (*it)->join(); + } + } + + void interrupt_all() + { + boost::shared_lock guard(m); + + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + (*it)->interrupt(); + } + } + + size_t size() const + { + boost::shared_lock guard(m); + return threads.size(); + } + + void join_one(boost::thread::id id) + { + boost::shared_lock guard(m); + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + if ((*it)->get_id() == id) + { + (*it)->join(); + threads.erase(it); + return; + } + } + + } + +private: + std::list threads; + mutable boost::shared_mutex m; +}; + + /** @brief ThreadPool is a component for working with pools of threads and asynchronously * executing tasks. It is responsible for creating threads and tracking which threads are "busy" * and which are idle. Idle threads are utilized as "work" is added to the system. @@ -207,6 +308,7 @@ private: */ void beginThread() throw(); + void pruneThread(); ThreadPool(const ThreadPool&); ThreadPool& operator = (const ThreadPool&); @@ -245,7 +347,7 @@ private: boost::mutex fMutex; boost::condition fThreadAvailable; // triggered when a thread is available boost::condition fNeedThread; // triggered when a thread is needed - boost::thread_group fThreads; + ThreadPoolGroup fThreads; bool fStop; long fGeneralErrors; @@ -255,6 +357,10 @@ private: std::string fName; // Optional to add a name to the pool for debugging. bool fDebug; + boost::mutex fPruneMutex; + boost::condition fPruneThreadEnd; + boost::thread* fPruneThread; + std::stack fPruneThreads; // A list of stale thread IDs to be joined }; // This class, if instantiated, will continuously log details about the indicated threadpool diff --git a/writeengine/bulk/we_tableinfo.cpp b/writeengine/bulk/we_tableinfo.cpp index da28204d2..2885d1462 100644 --- a/writeengine/bulk/we_tableinfo.cpp +++ b/writeengine/bulk/we_tableinfo.cpp @@ -736,6 +736,17 @@ int TableInfo::setParseComplete(const int& columnId, #ifdef PROFILE Stats::startParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS); #endif + if (fLog->isDebug(DEBUG_2)) + { + ostringstream oss; + oss << "Dictionary cache flush: "; + for (uint32_t i = 0; i < fDictFlushBlks.size(); i++) + { + oss << fDictFlushBlks[i] << ", "; + } + oss << endl; + fLog->logMsg( oss.str(), MSGLVL_INFO1 ); + } cacheutils::flushPrimProcAllverBlocks(fDictFlushBlks); #ifdef PROFILE Stats::stopParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS); diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index 86625d013..057ef2504 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -2229,12 +2229,13 @@ uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::s { std::set::iterator lbidIter; std::vector dictFlushBlks; - + cerr << "API Flushing blocks: "; for (lbidIter = (*mapIter).second.begin(); lbidIter != (*mapIter).second.end(); lbidIter++) { + cerr << *lbidIter << ", "; dictFlushBlks.push_back((*lbidIter)); } - + cerr << endl; cacheutils::flushPrimProcAllverBlocks(dictFlushBlks); fWEWrapper.getDictMap().erase(txnID); } diff --git a/writeengine/shared/we_bulkrollbackmgr.cpp b/writeengine/shared/we_bulkrollbackmgr.cpp index 75d9de8c9..e9872da50 100644 --- a/writeengine/shared/we_bulkrollbackmgr.cpp +++ b/writeengine/shared/we_bulkrollbackmgr.cpp @@ -198,14 +198,16 @@ int BulkRollbackMgr::rollback ( bool keepMetaFile ) // the user but keep going. std::vector allOIDs; std::set::const_iterator iter = fAllColDctOIDs.begin(); - + cerr << "Rollback flushing: "; while (iter != fAllColDctOIDs.end()) { + cerr << *iter << ", "; //std::cout << "Flushing OID from PrimProc cache " << *iter << // std::endl; allOIDs.push_back(*iter); ++iter; } + cerr << endl; int cache_rc = cacheutils::flushOIDsFromCache( allOIDs ); diff --git a/writeengine/wrapper/writeengine.cpp b/writeengine/wrapper/writeengine.cpp index 923871ef9..b84f3bce9 100644 --- a/writeengine/wrapper/writeengine.cpp +++ b/writeengine/wrapper/writeengine.cpp @@ -187,6 +187,37 @@ int WriteEngineWrapper::checkValid(const TxnID& txnid, const ColStructList& colS return NO_ERROR; } +/*@brief findSmallestColumn --Find the smallest column for this table + */ +/*********************************************************** + * DESCRIPTION: + * Find the smallest column for this table + * PARAMETERS: + * lowColLen - returns smallest column width + * colId - returns smallest column id + * colStructList - column struct list + * RETURN: + * void + ***********************************************************/ +void WriteEngineWrapper::findSmallestColumn(uint32_t& colId, ColStructList colStructList) +// MCOL-1675: find the smallest column width to calculate the RowID from so +// that all HWMs will be incremented by this operation +{ + int32_t lowColLen = 8192; + for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++) + { + if (colStructList[colIt].colWidth < lowColLen) + { + colId = colIt; + lowColLen = colStructList[colId].colWidth; + if ( lowColLen == 1 ) + { + break; + } + } + } +} + /*@convertValArray - Convert interface values to internal values */ /*********************************************************** @@ -953,6 +984,11 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, for (i = 0; i < colStructList.size(); i++) Convertor::convertColType(&colStructList[i]); + uint32_t colId = 0; + // MCOL-1675: find the smallest column width to calculate the RowID from so + // that all HWMs will be incremented by this operation + findSmallestColumn(colId, colStructList); + // rc = checkValid(txnid, colStructList, colValueList, ridList); // if (rc != NO_ERROR) // return rc; @@ -979,8 +1015,8 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, //-------------------------------------------------------------------------- if (isFirstBatchPm) { - currentDBrootIdx = dbRootExtentTrackers[0]->getCurrentDBRootIdx(); - extentInfo = dbRootExtentTrackers[0]->getDBRootExtentList(); + currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx(); + extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList(); dbRoot = extentInfo[currentDBrootIdx].fDbRoot; partitionNum = extentInfo[currentDBrootIdx].fPartition; @@ -1027,7 +1063,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, { colOp = m_colOp[op(colStructList[i].fCompressionType)]; colOp->initColumn(curCol); - colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType, + colOp->setColParam(curCol, colId, colStructList[i].colWidth, colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType, dbRoot, partitionNum, segmentNum); rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid, extents[i].allocSize, dbRoot, @@ -1165,7 +1201,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, } // if (isFirstBatchPm) else //get the extent info from tableMetaData { - ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid); + ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) @@ -1201,7 +1237,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, //-------------------------------------------------------------------------- // allocate row id(s) //-------------------------------------------------------------------------- - curColStruct = colStructList[0]; + curColStruct = colStructList[colId]; colOp = m_colOp[op(curColStruct.fCompressionType)]; colOp->initColumn(curCol); @@ -1212,26 +1248,29 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, vector fileInfo; dbRoot = curColStruct.fColDbRoot; //use the first column to calculate row id - ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid); + ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { - if ((it->dbRoot == colStructList[0].fColDbRoot) && (it->partNum == colStructList[0].fColPartition) && (it->segNum == colStructList[0].fColSegment) && it->current ) + if ((it->dbRoot == colStructList[colId].fColDbRoot) && + (it->partNum == colStructList[colId].fColPartition) && + (it->segNum == colStructList[colId].fColSegment) && it->current ) + { break; - + } it++; } if (it != aColExtsInfo.end()) { hwm = it->hwm; - //cout << "Got from colextinfo hwm for oid " << colStructList[0].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl; + //cout << "Got from colextinfo hwm for oid " << colStructList[colId].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl; } oldHwm = hwm; //Save this info for rollback //need to pass real dbRoot, partition, and segment to setColParam - colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType, + colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType, curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment); rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file @@ -1261,8 +1300,8 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm); - //cout << "after allocrowid, total row = " < 256K. // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it. //-------------------------------------------------------------------------- -// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? + // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? if ((curCol.dataFile.fPartition == 0) && (curCol.dataFile.fSegment == 0) && ((totalRow - rowsLeft) > 0) && (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK)) { - for (size_t k = 1; k < colStructList.size(); k++) + for (unsigned k=0; ksetColParam(expandCol, 0, @@ -1683,19 +1725,11 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, for (i = 0; i < colStructList.size(); i++) Convertor::convertColType(&colStructList[i]); - // MCOL-984: find the smallest column width to calculate the RowID from so + uint32_t colId = 0; + // MCOL-1675: find the smallest column width to calculate the RowID from so // that all HWMs will be incremented by this operation - int32_t lowColLen = 8192; - int32_t colId = 0; + findSmallestColumn(colId, colStructList); - for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++) - { - if (colStructList[colIt].colWidth < lowColLen) - { - colId = colIt; - lowColLen = colStructList[colId].colWidth; - } - } // rc = checkValid(txnid, colStructList, colValueList, ridList); // if (rc != NO_ERROR) @@ -2019,7 +2053,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, // Expand initial abbreviated extent if any RID in 1st extent is > 256K. // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it. //-------------------------------------------------------------------------- -// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? + // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? if ((curCol.dataFile.fPartition == 0) && (curCol.dataFile.fSegment == 0) && ((totalRow - rowsLeft) > 0) && @@ -2032,7 +2066,8 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, continue; Column expandCol; - colOp = m_colOp[op(colStructList[k].fCompressionType)]; + colOp = m_colOp[op(colStructList[k].fCompressionType)]; + // Shouldn't we change 0 to colId here? colOp->setColParam(expandCol, 0, colStructList[k].colWidth, colStructList[k].colDataType, @@ -3106,6 +3141,11 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, for (i = 0; i < colStructList.size(); i++) Convertor::convertColType(&colStructList[i]); + uint32_t colId = 0; + // MCOL-1675: find the smallest column width to calculate the RowID from so + // that all HWMs will be incremented by this operation + findSmallestColumn(colId, colStructList); + rc = checkValid(txnid, colStructList, colValueList, ridList); if (rc != NO_ERROR) @@ -3124,7 +3164,7 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, //-------------------------------------------------------------------------- // allocate row id(s) //-------------------------------------------------------------------------- - curColStruct = colStructList[0]; + curColStruct = colStructList[colId]; colOp = m_colOp[op(curColStruct.fCompressionType)]; colOp->initColumn(curCol); @@ -3161,7 +3201,7 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, oldHwm = hwm; //Save this info for rollback //need to pass real dbRoot, partition, and segment to setColParam - colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType, + colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType, dbRoot, partitionNum, segmentNum); @@ -3279,13 +3319,15 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it. //-------------------------------------------------------------------------- // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? - if ((colStructList[0].fColPartition == 0) && - (colStructList[0].fColSegment == 0) && + if ((colStructList[colId].fColPartition == 0) && + (colStructList[colId].fColSegment == 0) && ((totalRow - rowsLeft) > 0) && (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK)) { - for (size_t k = 1; k < colStructList.size(); k++) + for (unsigned k=0; ksetColParam(expandCol, 0, diff --git a/writeengine/wrapper/writeengine.h b/writeengine/wrapper/writeengine.h index f0fe5c995..864d064b1 100644 --- a/writeengine/wrapper/writeengine.h +++ b/writeengine/wrapper/writeengine.h @@ -633,6 +633,11 @@ private: int checkValid(const TxnID& txnid, const ColStructList& colStructList, const ColValueList& colValueList, const RIDList& ridList) const; /** + * @brief Find the smallest column for this table + */ + void findSmallestColumn(uint32_t &colId, ColStructList colStructList); + + /** * @brief Convert interface column type to a internal column type */ // void convertColType(void* curStruct, const FuncType curType = FUNC_WRITE_ENGINE) const;