From 07bd4130530b228b8ef8d3fa575f1d0c3e375ec4 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Fri, 17 Aug 2018 22:27:02 +0300 Subject: [PATCH 01/15] MCOL-1660/1659 Table/column identifiers support spaces in DDL. MCOL-1660/1659 Table/column identifiers support spaces in DDL. --- dbcon/ddlpackage/ddl.l | 6 +++--- dbcon/ddlpackage/ddl.y | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dbcon/ddlpackage/ddl.l b/dbcon/ddlpackage/ddl.l index 7f9362cee..926a836e2 100644 --- a/dbcon/ddlpackage/ddl.l +++ b/dbcon/ddlpackage/ddl.l @@ -72,9 +72,9 @@ ident_start [A-Za-z\200-\377_] ident_cont [A-Za-z\200-\377_0-9\$] identifier {ident_start}{ident_cont}* /* fully qualified names regexes */ -fq_identifier {identifier}\.{identifier} -identifier_quoted {grave_accent}{identifier}{grave_accent} -identifier_double_quoted {double_quote}{identifier}{double_quote} +ident_w_spaces {identifier}\x20* +identifier_quoted {grave_accent}{ident_w_spaces}+{grave_accent} +identifier_double_quoted {double_quote}{ident_w_spaces}+{double_quote} integer [-+]?{digit}+ decimal ([-+]?({digit}*\.{digit}+)|({digit}+\.{digit}*)) diff --git a/dbcon/ddlpackage/ddl.y b/dbcon/ddlpackage/ddl.y index 2556b8340..c9fc805ed 100644 --- a/dbcon/ddlpackage/ddl.y +++ b/dbcon/ddlpackage/ddl.y @@ -604,7 +604,7 @@ table_name: ; qualified_name: - | ident { + ident { if (x->fDBSchema.size()) $$ = new QualifiedName((char*)x->fDBSchema.c_str(), $1); else From 4572c25534f6fbbbd877e1dcd460cfc6ef75d996 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Tue, 28 Aug 2018 11:29:38 +0300 Subject: [PATCH 02/15] MCOL-1675 When insert record calculate HWM using a column with the smallest width instead of the first column in the same way as in MCOL-984. --- writeengine/wrapper/writeengine.cpp | 111 +++++++++++++++++++--------- writeengine/wrapper/writeengine.h | 5 ++ 2 files changed, 82 insertions(+), 34 deletions(-) diff --git a/writeengine/wrapper/writeengine.cpp b/writeengine/wrapper/writeengine.cpp index afea06fee..d163d1f42 100644 --- a/writeengine/wrapper/writeengine.cpp +++ b/writeengine/wrapper/writeengine.cpp @@ -182,6 +182,37 @@ int WriteEngineWrapper::checkValid(const TxnID& txnid, const ColStructList& colS return NO_ERROR; } +/*@brief findSmallestColumn --Find the smallest column for this table + */ +/*********************************************************** + * DESCRIPTION: + * Find the smallest column for this table + * PARAMETERS: + * lowColLen - returns smallest column width + * colId - returns smallest column id + * colStructList - column struct list + * RETURN: + * void + ***********************************************************/ +void WriteEngineWrapper::findSmallestColumn(uint32_t& colId, ColStructList colStructList) +// MCOL-1675: find the smallest column width to calculate the RowID from so +// that all HWMs will be incremented by this operation +{ + int32_t lowColLen = 8192; + for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++) + { + if (colStructList[colIt].colWidth < lowColLen) + { + colId = colIt; + lowColLen = colStructList[colId].colWidth; + if ( lowColLen == 1 ) + { + break; + } + } + } +} + /*@convertValArray - Convert interface values to internal values */ /*********************************************************** @@ -847,6 +878,11 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, for (i = 0; i < colStructList.size(); i++) Convertor::convertColType(&colStructList[i]); + uint32_t colId = 0; + // MCOL-1675: find the smallest column width to calculate the RowID from so + // that all HWMs will be incremented by this operation + findSmallestColumn(colId, colStructList); + // rc = checkValid(txnid, colStructList, colValueList, ridList); // if (rc != NO_ERROR) // return rc; @@ -873,8 +909,8 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, //-------------------------------------------------------------------------- if (isFirstBatchPm) { - currentDBrootIdx = dbRootExtentTrackers[0]->getCurrentDBRootIdx(); - extentInfo = dbRootExtentTrackers[0]->getDBRootExtentList(); + currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx(); + extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList(); dbRoot = extentInfo[currentDBrootIdx].fDbRoot; partitionNum = extentInfo[currentDBrootIdx].fPartition; @@ -914,7 +950,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, { colOp = m_colOp[op(colStructList[i].fCompressionType)]; colOp->initColumn(curCol); - colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType, + colOp->setColParam(curCol, colId, colStructList[i].colWidth, colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType, dbRoot, partitionNum, segmentNum); rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid, extents[i].allocSize, dbRoot, @@ -1040,7 +1076,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, } // if (isFirstBatchPm) else //get the extent info from tableMetaData { - ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid); + ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { @@ -1073,7 +1109,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, //-------------------------------------------------------------------------- // allocate row id(s) //-------------------------------------------------------------------------- - curColStruct = colStructList[0]; + curColStruct = colStructList[colId]; colOp = m_colOp[op(curColStruct.fCompressionType)]; colOp->initColumn(curCol); @@ -1084,23 +1120,27 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, vector fileInfo; dbRoot = curColStruct.fColDbRoot; //use the first column to calculate row id - ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid); + ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { - if ((it->dbRoot == colStructList[0].fColDbRoot) && (it->partNum == colStructList[0].fColPartition) && (it->segNum == colStructList[0].fColSegment) && it->current ) + if ((it->dbRoot == colStructList[colId].fColDbRoot) && + (it->partNum == colStructList[colId].fColPartition) && + (it->segNum == colStructList[colId].fColSegment) && it->current ) + { break; + } it++; } if (it != aColExtsInfo.end()) { hwm = it->hwm; - //cout << "Got from colextinfo hwm for oid " << colStructList[0].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl; + //cout << "Got from colextinfo hwm for oid " << colStructList[colId].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl; } oldHwm = hwm; //Save this info for rollback //need to pass real dbRoot, partition, and segment to setColParam - colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType, + colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType, curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment); rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file @@ -1123,13 +1163,13 @@ timer.start("allocRowId"); if (idbdatafile::IDBPolicy::useHdfs()) insertSelect = true; - rc = colOp->allocRowId(txnid, bUseStartExtent, + rc = colOp->allocRowId(txnid, bUseStartExtent, curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm); - //cout << "after allocrowid, total row = " < 256K. // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it. //-------------------------------------------------------------------------- -// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? + // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? if ((curCol.dataFile.fPartition == 0) && (curCol.dataFile.fSegment == 0) && ((totalRow-rowsLeft) > 0) && (rowIdArray[totalRow-rowsLeft-1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK)) { - for (unsigned k=1; ksetColParam(expandCol, 0, @@ -1505,18 +1548,10 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, for (i = 0; i < colStructList.size(); i++) Convertor::convertColType(&colStructList[i]); - // MCOL-984: find the smallest column width to calculate the RowID from so - // that all HWMs will be incremented by this operation - int32_t lowColLen = 8192; - int32_t colId = 0; - for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++) - { - if (colStructList[colIt].colWidth < lowColLen) - { - colId = colIt; - lowColLen = colStructList[colId].colWidth; - } - } + uint32_t colId = 0; + // MCOL-1675: find the smallest column width to calculate the RowID from so + // that all HWMs will be incremented by this operation + findSmallestColumn(colId, colStructList); // rc = checkValid(txnid, colStructList, colValueList, ridList); // if (rc != NO_ERROR) @@ -1809,7 +1844,7 @@ timer.stop("allocRowId"); // Expand initial abbreviated extent if any RID in 1st extent is > 256K. // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it. //-------------------------------------------------------------------------- -// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? + // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? if ((curCol.dataFile.fPartition == 0) && (curCol.dataFile.fSegment == 0) && ((totalRow-rowsLeft) > 0) && @@ -1821,7 +1856,8 @@ timer.stop("allocRowId"); if (k == colId) continue; Column expandCol; - colOp = m_colOp[op(colStructList[k].fCompressionType)]; + colOp = m_colOp[op(colStructList[k].fCompressionType)]; + // Shouldn't we change 0 to colId here? colOp->setColParam(expandCol, 0, colStructList[k].colWidth, colStructList[k].colDataType, @@ -2782,6 +2818,11 @@ StopWatch timer; for (i = 0; i < colStructList.size(); i++) Convertor::convertColType(&colStructList[i]); + uint32_t colId = 0; + // MCOL-1675: find the smallest column width to calculate the RowID from so + // that all HWMs will be incremented by this operation + findSmallestColumn(colId, colStructList); + rc = checkValid(txnid, colStructList, colValueList, ridList); if (rc != NO_ERROR) return rc; @@ -2799,7 +2840,7 @@ StopWatch timer; //-------------------------------------------------------------------------- // allocate row id(s) //-------------------------------------------------------------------------- - curColStruct = colStructList[0]; + curColStruct = colStructList[colId]; colOp = m_colOp[op(curColStruct.fCompressionType)]; colOp->initColumn(curCol); @@ -2834,7 +2875,7 @@ StopWatch timer; oldHwm = hwm; //Save this info for rollback //need to pass real dbRoot, partition, and segment to setColParam - colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType, + colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType, dbRoot, partitionNum, segmentNum); @@ -2944,13 +2985,15 @@ timer.stop("allocRowId"); // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it. //-------------------------------------------------------------------------- // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? - if ((colStructList[0].fColPartition == 0) && - (colStructList[0].fColSegment == 0) && + if ((colStructList[colId].fColPartition == 0) && + (colStructList[colId].fColSegment == 0) && ((totalRow-rowsLeft) > 0) && (rowIdArray[totalRow-rowsLeft-1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK)) { - for (unsigned k=1; ksetColParam(expandCol, 0, diff --git a/writeengine/wrapper/writeengine.h b/writeengine/wrapper/writeengine.h index 099854f77..93729ed75 100644 --- a/writeengine/wrapper/writeengine.h +++ b/writeengine/wrapper/writeengine.h @@ -607,6 +607,11 @@ private: */ int checkValid(const TxnID& txnid, const ColStructList& colStructList, const ColValueList& colValueList, const RIDList& ridList) const; + /** + * @brief Find the smallest column for this table + */ + void findSmallestColumn(uint32_t &colId, ColStructList colStructList); + /** * @brief Convert interface column type to a internal column type */ From 58bc867ca17b85d2447fe25835787d58bea2c73c Mon Sep 17 00:00:00 2001 From: Ravi Prakash Date: Thu, 30 Aug 2018 20:24:24 -0700 Subject: [PATCH 03/15] MCOL-1188 assertion 'fColumn.get() && fSub && fFunc' failed, ... MySQL server crashed. The problem was in processing a subquery in the where clause that was categorized as a CACHE_ITEM in the parse tree. The fix involved how we walk the parse tree in gp_walk(). --- dbcon/mysql/ha_calpont_execplan.cpp | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index 42d26108c..47f07c67b 100755 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -4096,15 +4096,17 @@ void gp_walk(const Item *item, void *arg) if (itype == Item::FUNC_ITEM && ((Item_func*)item)->functype() == Item_func::XOR_FUNC ) itype = Item::COND_ITEM; - if(item->type() == Item::CACHE_ITEM) - { - item = ((Item_cache*)item)->get_example(); - itype = item->type(); - isCached = true; - } - switch (itype) { + case Item::CACHE_ITEM: + { + // The item or condition is cached as per MariaDB server view but + // for InfiniDB it need to be executed. + // MCOL-1188 and + Item* orig_item = ((Item_cache*)item)->get_example(); + orig_item->traverse_cond(gp_walk, gwip, Item::POSTFIX); + break; + } case Item::FIELD_ITEM: { Item_field* ifp = (Item_field*)item; From 5247dfa0825186047c1650f0e626e145615a584e Mon Sep 17 00:00:00 2001 From: Ravi Prakash Date: Tue, 4 Sep 2018 12:20:40 -0700 Subject: [PATCH 04/15] MCOL-1188 assertion 'fColumn.get() && fSub && fFunc' failed,... Some cleanup for the previous check-in. --- dbcon/mysql/ha_calpont_execplan.cpp | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index 47f07c67b..2de5dfa56 100755 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -4084,7 +4084,6 @@ void gp_walk(const Item *item, void *arg) { gp_walk_info* gwip = reinterpret_cast(arg); idbassert(gwip); - bool isCached = false; //Bailout... if (gwip->fatalParseError) return; @@ -4101,8 +4100,8 @@ void gp_walk(const Item *item, void *arg) case Item::CACHE_ITEM: { // The item or condition is cached as per MariaDB server view but - // for InfiniDB it need to be executed. - // MCOL-1188 and + // for InfiniDB it need to be parsed and executed. + // MCOL-1188 and MCOL-1029 Item* orig_item = ((Item_cache*)item)->get_example(); orig_item->traverse_cond(gp_walk, gwip, Item::POSTFIX); break; @@ -4286,13 +4285,9 @@ void gp_walk(const Item *item, void *arg) cc->resultType(colType_MysqlToIDB(item)); } - // cached item comes in one piece - if (!isCached) + for (uint32_t i = 0; i < ifp->argument_count() && !gwip->rcWorkStack.empty(); i++) { - for (uint32_t i = 0; i < ifp->argument_count() && !gwip->rcWorkStack.empty(); i++) - { - gwip->rcWorkStack.pop(); - } + gwip->rcWorkStack.pop(); } // bug 3137. If filter constant like 1=0, put it to ptWorkStack // MariaDB bug 750. Breaks if compare is an argument to a function. @@ -4360,14 +4355,6 @@ void gp_walk(const Item *item, void *arg) bool isOr = (ftype == Item_func::COND_OR_FUNC); bool isXor = (ftype == Item_func::XOR_FUNC); - // MCOL-1029 A cached COND_ITEM is something like: - // AND (TRUE OR FALSE) - // We can skip it - if (isCached) - { - break; - } - List *argumentList; List xorArgumentList; if (isXor) From e5f18964f06e8624bf27bea088fdee792447c40c Mon Sep 17 00:00:00 2001 From: David Hill Date: Tue, 4 Sep 2018 16:41:44 -0500 Subject: [PATCH 05/15] MCOL-1523 --- oam/install_scripts/columnstoreAlias | 3 + oamapps/mcsadmin/mcsadmin.cpp | 30 +++- oamapps/postConfigure/postConfigure.cpp | 188 +++++++++++------------- procmgr/processmanager.cpp | 149 ++++++++++++------- 4 files changed, 212 insertions(+), 158 deletions(-) diff --git a/oam/install_scripts/columnstoreAlias b/oam/install_scripts/columnstoreAlias index cd225c1a9..255eb7e7e 100644 --- a/oam/install_scripts/columnstoreAlias +++ b/oam/install_scripts/columnstoreAlias @@ -10,5 +10,8 @@ alias core='cd /var/log/mariadb/columnstore/corefiles' alias tmsg='tail -f /var/log/messages' alias tdebug='tail -f /var/log/mariadb/columnstore/debug.log' alias tinfo='tail -f /var/log/mariadb/columnstore/info.log' +alias terror='tail -f /var/log/mariadb/columnstore/err.log' +alias twarning='tail -f /var/log/mariadb/columnstore/warning.log' +alias tcrit='tail -f /var/log/mariadb/columnstore/crit.log' alias dbrm='cd /usr/local/mariadb/columnstore/data1/systemFiles/dbrm' alias module='cat /usr/local/mariadb/columnstore/local/module' diff --git a/oamapps/mcsadmin/mcsadmin.cpp b/oamapps/mcsadmin/mcsadmin.cpp index 935d081cc..6090058d4 100644 --- a/oamapps/mcsadmin/mcsadmin.cpp +++ b/oamapps/mcsadmin/mcsadmin.cpp @@ -7036,15 +7036,33 @@ int processCommand(string* arguments) if (systemstatus.SystemOpState == oam::ACTIVE ) { try { - cout << endl << " Starting Modules" << endl; - oam.startModule(devicenetworklist, ackTemp); +// cout << endl << " Starting Modules" << endl; +// oam.startModule(devicenetworklist, ackTemp); //reload DBRM with new configuration, needs to be done here after startModule - cmd = startup::StartUp::installDir() + "/bin/dbrmctl reload > /dev/null 2>&1"; - system(cmd.c_str()); - sleep(15); +// cmd = startup::StartUp::installDir() + "/bin/dbrmctl reload > /dev/null 2>&1"; +// system(cmd.c_str()); +// sleep(15); - cout << " Successful start of Modules " << endl; +// cout << " Successful start of Modules " << endl; + + cout << endl << " Restarting System "; + int returnStatus = oam.restartSystem(gracefulTemp, ackTemp); + switch (returnStatus) + { + case API_SUCCESS: + if ( waitForActive() ) + cout << endl << " Successful restart of System " << endl << endl; + else + cout << endl << "**** restartSystem Failed : check log files" << endl; + break; + case API_CANCELLED: + cout << endl << " Restart of System canceled" << endl << endl; + break; + default: + cout << endl << "**** restartSystem Failed : Check system logs" << endl; + break; + } } catch (exception& e) { diff --git a/oamapps/postConfigure/postConfigure.cpp b/oamapps/postConfigure/postConfigure.cpp index 98227da9d..e69677039 100644 --- a/oamapps/postConfigure/postConfigure.cpp +++ b/oamapps/postConfigure/postConfigure.cpp @@ -1247,26 +1247,16 @@ int main(int argc, char *argv[]) //amazon install setup check bool amazonInstall = false; string cloud = oam::UnassignedName; - system("aws --version > /tmp/amazon.log 2>&1"); - - ifstream in("/tmp/amazon.log"); - - in.seekg(0, std::ios::end); - int size = in.tellg(); - if ( size == 0 || oam.checkLogStatus("/tmp/amazon.log", "not found")) + + if (!multi_server_quick_install) { - // not running on amazon with ec2-api-tools - if (amazon_quick_install) - { - cout << "ERROR: Amazon Quick Installer was specified, bu the AMazon CLI API packages isnt installed, exiting" << endl; - exit(1); - } + system("aws --version > /tmp/amazon.log 2>&1"); - amazonInstall = false; - } - else - { - if ( size == 0 || oam.checkLogStatus("/tmp/amazon.log", "not installed")) + ifstream in("/tmp/amazon.log"); + + in.seekg(0, std::ios::end); + int size = in.tellg(); + if ( size == 0 || oam.checkLogStatus("/tmp/amazon.log", "not found")) { // not running on amazon with ec2-api-tools if (amazon_quick_install) @@ -1278,9 +1268,23 @@ int main(int argc, char *argv[]) amazonInstall = false; } else - amazonInstall = true; - } + { + if ( size == 0 || oam.checkLogStatus("/tmp/amazon.log", "not installed")) + { + // not running on amazon with ec2-api-tools + if (amazon_quick_install) + { + cout << "ERROR: Amazon Quick Installer was specified, bu the AMazon CLI API packages isnt installed, exiting" << endl; + exit(1); + } + amazonInstall = false; + } + else + amazonInstall = true; + } + } + try { cloud = sysConfig->getConfig(InstallSection, "Cloud"); } @@ -3090,7 +3094,9 @@ int main(int argc, char *argv[]) //check if dbrm data resides in older directory path and inform user if it does dbrmDirCheck(); - if ( IserverTypeInstall == oam::INSTALL_COMBINE_DM_UM_PM && pmNumber == 1) { + if ( ( IserverTypeInstall == oam::INSTALL_COMBINE_DM_UM_PM ) || + ( (IserverTypeInstall != oam::INSTALL_COMBINE_DM_UM_PM) && pmwithum ) ) + { //run the mysql / mysqld setup scripts cout << endl << "===== Running the MariaDB ColumnStore MariaDB Server setup scripts =====" << endl << endl; @@ -3098,7 +3104,57 @@ int main(int argc, char *argv[]) // call the mysql setup scripts mysqlSetup(); - sleep(5); + sleep(3); + } + + if ( IserverTypeInstall != oam::INSTALL_COMBINE_DM_UM_PM || + pmNumber > 1 ) + { + if ( password.empty() ) + { + cout << endl; + cout << "Next step is to enter the password to access the other Servers." << endl; + cout << "This is either your password or you can default to using a ssh key" << endl; + cout << "If using a password, the password needs to be the same on all Servers." << endl << endl; + + if ( noPrompting ) { + cout << "Enter password, hit 'enter' to default to using a ssh key, or 'exit' > " << endl; + password = "ssh"; + } + else + { + while(true) + { + char *pass1, *pass2; + + pass1=getpass("Enter password, hit 'enter' to default to using a ssh key, or 'exit' > "); + if ( strcmp(pass1, "") == 0 ) { + password = "ssh"; + break; + } + + string p1 = pass1; + if ( p1 == "exit") + exit(0); + + pass2=getpass("Confirm password > "); + string p2 = pass2; + if ( p1 == p2 ) { + password = p2; + break; + } + else + cout << "Password mismatch, please re-enter" << endl; + } + + //add single quote for special characters + if ( password != "ssh" ) + { + password = "'" + password + "'"; + } + + } + } } int thread_id = 0; @@ -3116,7 +3172,7 @@ int main(int argc, char *argv[]) //skip interface with remote servers and perform install if ( !nonDistribute ) { - // + // // perform remote install of other servers in the system // cout << endl << "===== System Installation =====" << endl << endl; @@ -3173,67 +3229,8 @@ int main(int argc, char *argv[]) if( !pkgCheck(columnstorePackage) ) exit(1); - if ( password.empty() ) - { - cout << endl; - cout << "Next step is to enter the password to access the other Servers." << endl; - cout << "This is either your password or you can default to using a ssh key" << endl; - cout << "If using a password, the password needs to be the same on all Servers." << endl << endl; - } - - while(true) - { - char *pass1, *pass2; - - if ( noPrompting ) { - cout << "Enter password, hit 'enter' to default to using a ssh key, or 'exit' > " << endl; - if ( password.empty() ) - password = "ssh"; - break; - } - - //check for command line option password - if ( !password.empty() ) - break; - - pass1=getpass("Enter password, hit 'enter' to default to using a ssh key, or 'exit' > "); - if ( strcmp(pass1, "") == 0 ) { - password = "ssh"; - break; - } - - if ( pass1 == "exit") - exit(0); - - string p1 = pass1; - pass2=getpass("Confirm password > "); - string p2 = pass2; - if ( p1 == p2 ) { - password = p2; - break; - } - else - cout << "Password mismatch, please re-enter" << endl; - } - - //add single quote for special characters - if ( password != "ssh" ) - { - password = "'" + password + "'"; - } - checkSystemMySQLPort(mysqlPort, sysConfig, USER, password, childmodulelist, IserverTypeInstall, pmwithum); - if ( ( IserverTypeInstall == oam::INSTALL_COMBINE_DM_UM_PM ) || - ( (IserverTypeInstall != oam::INSTALL_COMBINE_DM_UM_PM) && pmwithum ) ) - { - cout << endl << "===== Running the MariaDB ColumnStore MariaDB ColumnStore setup scripts =====" << endl << endl; - - // call the mysql setup scripts - mysqlSetup(); - sleep(5); - } - string AmazonInstall = "0"; if ( amazonInstall ) AmazonInstall = "1"; @@ -3411,19 +3408,7 @@ int main(int argc, char *argv[]) cout << " DONE" << endl; } } - else - { - if ( ( IserverTypeInstall == oam::INSTALL_COMBINE_DM_UM_PM ) || - ( (IserverTypeInstall != oam::INSTALL_COMBINE_DM_UM_PM) && pmwithum ) ) - { - cout << endl << "===== Running the MariaDB ColumnStore MariaDB ColumnStore setup scripts =====" << endl << endl; - - // call the mysql setup scripts - mysqlSetup(); - sleep(5); - } - } - + //configure data redundancy if (DataRedundancy) { @@ -3641,9 +3626,6 @@ int main(int argc, char *argv[]) } //set mysql replication, if wasn't setup before on system -// if ( ( mysqlRep && pmwithum ) || -// ( mysqlRep && (umNumber > 1) ) || -// ( mysqlRep && (pmNumber > 1) && (IserverTypeInstall == oam::INSTALL_COMBINE_DM_UM_PM) ) ) if ( mysqlRep ) { cout << endl << "Run MariaDB ColumnStore Replication Setup.. "; @@ -3665,7 +3647,10 @@ int main(int argc, char *argv[]) cout << "Enter the following command to define MariaDB ColumnStore Alias Commands" << endl << endl; - cout << ". " + installDir + "/bin/columnstoreAlias" << endl << endl; + if ( !rootUser ) + cout << ". /etc/profile.d/columnstoreEnv.sh" << endl; + + cout << ". /etc/profile.d/columnstoreAlias.sh" << endl << endl; cout << "Enter 'mcsmysql' to access the MariaDB ColumnStore SQL console" << endl; cout << "Enter 'mcsadmin' to access the MariaDB ColumnStore Admin console" << endl << endl; @@ -3682,7 +3667,10 @@ int main(int argc, char *argv[]) cout << "Enter the following command to define MariaDB ColumnStore Alias Commands" << endl << endl; - cout << ". " + installDir + "/bin/columnstoreAlias" << endl << endl; + if ( !rootUser ) + cout << ". /etc/profile.d/columnstoreEnv.sh" << endl; + + cout << ". /etc/profile.d/columnstoreAlias.sh" << endl << endl; cout << "Enter 'mcsmysql' to access the MariaDB ColumnStore SQL console" << endl; cout << "Enter 'mcsadmin' to access the MariaDB ColumnStore Admin console" << endl << endl; diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index 0a054f9c3..89f9a145c 100755 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -575,10 +575,12 @@ void processMSG(messageqcpp::IOSocket* cfIos) if ( count > 0 ) { + string module = oam::UnassignedName; for (int i = 0; i < count; i++) { msg >> value; devicenetworkconfig.DeviceName = value; + module = value; msg >> value; devicenetworkconfig.UserTempDeviceName = value; msg >> value; @@ -606,11 +608,24 @@ void processMSG(messageqcpp::IOSocket* cfIos) } if( status == API_SUCCESS) { + processManager.setSystemState(oam::BUSY_INIT); + + //set query system state not ready + processManager.setQuerySystemState(false); + + //set recycle process + processManager.recycleProcess(target, true); + //distribute config file processManager.distributeConfigFile("system"); + processManager.setSystemState(oam::ACTIVE); + + //set query system state ready + processManager.setQuerySystemState(true); + //call dbrm control - oam.dbrmctl("halt"); +/* oam.dbrmctl("halt"); log.writeLog(__LINE__, "'dbrmctl halt' done", LOG_TYPE_DEBUG); oam.dbrmctl("reload"); @@ -618,13 +633,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) oam.dbrmctl("resume"); log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); - -// processManager.restartProcessType("ExeMgr"); - - //setup MySQL Replication for started modules -// log.writeLog(__LINE__, "Setup MySQL Replication for module being started", LOG_TYPE_DEBUG); -// processManager.setMySQLReplication(startdevicenetworklist); - } +*/ } } else { @@ -829,8 +838,10 @@ void processMSG(messageqcpp::IOSocket* cfIos) if (opState == oam::MAN_OFFLINE || opState == oam::MAN_DISABLED || opState == oam::AUTO_DISABLED || opState == oam::AUTO_OFFLINE) { - oam.dbrmctl("halt"); - log.writeLog(__LINE__, "'dbrmctl halt' done", LOG_TYPE_DEBUG); + processManager.setSystemState(oam::BUSY_INIT); + + //set query system state not ready + processManager.setQuerySystemState(false); status = processManager.disableModule(moduleName, true); log.writeLog(__LINE__, "Disable Module Completed on " + moduleName, LOG_TYPE_INFO); @@ -839,14 +850,11 @@ void processMSG(messageqcpp::IOSocket* cfIos) //check for SIMPLEX Processes on mate might need to be started processManager.checkSimplexModule(moduleName); + + processManager.setSystemState(oam::ACTIVE); - //call dbrm control -// oam.dbrmctl("reload"); -// log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG); - - // resume the dbrm - oam.dbrmctl("resume"); - log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); + //set query system state ready + processManager.setQuerySystemState(true); } else { @@ -910,7 +918,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) DeviceNetworkList::iterator listPT = devicenetworklist.begin(); - //stopModules being removed with the REMOVE option, which will stop process + // do stopmodule then enable for( ; listPT != devicenetworklist.end() ; listPT++) { string moduleName = (*listPT).DeviceName; @@ -933,6 +941,9 @@ void processMSG(messageqcpp::IOSocket* cfIos) } if (opState == oam::MAN_DISABLED) { + processManager.stopModule(moduleName, graceful, manualFlag); + log.writeLog(__LINE__, "stop Module Completed on " + moduleName, LOG_TYPE_INFO); + status = processManager.enableModule(moduleName, oam::MAN_OFFLINE); log.writeLog(__LINE__, "Enable Module Completed on " + moduleName, LOG_TYPE_INFO); } @@ -1246,6 +1257,9 @@ void processMSG(messageqcpp::IOSocket* cfIos) log.writeLog(__LINE__, "STOPSYSTEM: ACK back to sender"); } + //set query system state ready + processManager.setQuerySystemState(true); + startsystemthreadStop = false; break; @@ -2758,9 +2772,6 @@ void processMSG(messageqcpp::IOSocket* cfIos) log.writeLog(__LINE__, "MSG RECEIVED: Process Restarted on " + moduleName + "/" + processName); //set query system states not ready - BRM::DBRM dbrm; - dbrm.setSystemQueryReady(false); - processManager.setQuerySystemState(false); processManager.setSystemState(oam::BUSY_INIT); @@ -2841,12 +2852,14 @@ void processMSG(messageqcpp::IOSocket* cfIos) break; sleep(1); } - dbrm.setSystemQueryReady(true); + processManager.setQuerySystemState(true); + } // if a DDLProc was restarted, reinit DMLProc if( processName == "DDLProc") { processManager.reinitProcessType("DMLProc"); + processManager.setQuerySystemState(true); } //only run on auto process restart @@ -2893,9 +2906,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) } } - //enable query stats - dbrm.setSystemQueryReady(true); - + //set query system states ready processManager.setQuerySystemState(true); processManager.setSystemState(oam::ACTIVE); @@ -3386,7 +3397,7 @@ int ProcessManager::disableModule(string target, bool manualFlag) /****************************************************************************************** * @brief recycleProcess * -* purpose: recyle process, generally after some disable module is run +* purpose: recyle process, done after disable/enable module * ******************************************************************************************/ void ProcessManager::recycleProcess(string module, bool enableModule) @@ -3410,48 +3421,65 @@ void ProcessManager::recycleProcess(string module, bool enableModule) //recycle DBRM processes in all cases restartProcessType("DBRMControllerNode"); restartProcessType("DBRMWorkerNode"); + sleep(5); restartProcessType("DMLProc"); return; } //recycle DBRM processes in all cases - restartProcessType("DBRMControllerNode", module); - restartProcessType("DBRMWorkerNode"); +// restartProcessType("DBRMControllerNode", module); +// restartProcessType("DBRMWorkerNode"); - - // only recycle dmlproc, if down/up module is non-parent UM - if ( ( moduleType == "um" ) && - ( PrimaryUMModuleName != module) ) + // only recycle ddl/dmlproc, if down/up module is non-parent UM +/* if ( ( moduleType == "um" ) && + if ( PrimaryUMModuleName != module) { + restartProcessType("DDLProc",module); restartProcessType("DMLProc",module); return; } - - if( PrimaryUMModuleName == module) - { - stopProcessType("DDLProc"); - stopProcessType("DMLProc"); - } +*/ +// if( PrimaryUMModuleName == module) +// { +// stopProcessType("DDLProc"); +// stopProcessType("DMLProc"); +// } + + stopProcessType("WriteEngineServer"); stopProcessType("ExeMgr"); + + stopProcessType("PrimProc"); - restartProcessType("PrimProc"); - sleep(1); + stopProcessType("DBRMControllerNode"); + stopProcessType("DBRMWorkerNode"); + + stopProcessType("DDLProc"); + stopProcessType("DMLProc"); - restartProcessType("mysqld"); + stopProcessType("mysqld"); - restartProcessType("WriteEngineServer"); - sleep(1); +// restartProcessType("mysqld"); + + startProcessType("DBRMControllerNode"); + startProcessType("DBRMWorkerNode"); + + startProcessType("PrimProc"); + sleep(5); + + startProcessType("WriteEngineServer"); + sleep(3); startProcessType("ExeMgr"); - sleep(1); startProcessType("DDLProc"); sleep(1); startProcessType("DMLProc"); + startProcessType("mysqld"); + return; } @@ -3500,8 +3528,8 @@ int ProcessManager::enableModule(string target, int state, bool failover) setStandbyModule(newStandbyModule); //set recycle process - if (!failover) - recycleProcess(target); +// if (!failover) +// recycleProcess(target); log.writeLog(__LINE__, "enableModule request for " + target + " completed", LOG_TYPE_DEBUG); @@ -3774,6 +3802,7 @@ void ProcessManager::setSystemState(uint16_t state) Oam oam; ALARMManager aManager; Configuration config; + ProcessManager processManager(config, log); log.writeLog(__LINE__, "Set System State = " + oamState[state], LOG_TYPE_DEBUG); @@ -3794,6 +3823,9 @@ void ProcessManager::setSystemState(uint16_t state) // Process Alarms string system = "System"; if( state == oam::ACTIVE ) { + //set query system states ready + processManager.setQuerySystemState(true); + //clear alarms if set aManager.sendAlarmReport(system.c_str(), SYSTEM_DOWN_AUTO, CLEAR); aManager.sendAlarmReport(system.c_str(), SYSTEM_DOWN_MANUAL, CLEAR); @@ -6244,7 +6276,7 @@ int ProcessManager::sendMsgProcMon( std::string module, ByteStream msg, int requ string IPAddr = sysConfig->getConfig(msgPort, "IPAddr"); if ( IPAddr == oam::UnassignedIpAddr ) { - log.writeLog(__LINE__, "sendMsgProcMon ping failure", LOG_TYPE_ERROR); + log.writeLog(__LINE__, "sendMsgProcMon ping failure " + module + " " + IPAddr, LOG_TYPE_ERROR); return oam::API_SUCCESS; } @@ -6253,7 +6285,7 @@ int ProcessManager::sendMsgProcMon( std::string module, ByteStream msg, int requ string cmd = cmdLine + IPAddr + cmdOption; if ( system(cmd.c_str()) != 0) { //ping failure - log.writeLog(__LINE__, "sendMsgProcMon ping failure", LOG_TYPE_ERROR); + log.writeLog(__LINE__, "sendMsgProcMon ping failure " + module + " " + IPAddr, LOG_TYPE_ERROR); return oam::API_SUCCESS; } } @@ -6490,12 +6522,22 @@ void ProcessManager::setQuerySystemState(bool set) try { dbrm.setSystemQueryReady(set); - log.writeLog(__LINE__, "setQuerySystemState successful", LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "setSystemQueryReady successful", LOG_TYPE_DEBUG); + + try { + dbrm.setSystemReady(set); + log.writeLog(__LINE__, "setSystemReady successful", LOG_TYPE_DEBUG); + } + catch(...) + { + log.writeLog(__LINE__, "setSystemReady failed", LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "setSystemReady failed", LOG_TYPE_ERROR); + } } catch(...) { - log.writeLog(__LINE__, "setQuerySystemState failed", LOG_TYPE_DEBUG); - log.writeLog(__LINE__, "setQuerySystemState failed", LOG_TYPE_ERROR); + log.writeLog(__LINE__, "setSystemQueryReady failed", LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "setSystemQueryReady failed", LOG_TYPE_ERROR); } } @@ -6993,7 +7035,7 @@ void startSystemThread(oam::DeviceNetworkList Devicenetworklist) } //set query system state not ready - processManager.setQuerySystemState(true); + processManager.setQuerySystemState(false); // Bug 4554: Wait until DMLProc is finished with rollback if (status == oam::API_SUCCESS) @@ -7062,6 +7104,9 @@ void startSystemThread(oam::DeviceNetworkList Devicenetworklist) processManager.setSystemState(rtn); } + //set query system state ready + processManager.setQuerySystemState(true); + // exit thread log.writeLog(__LINE__, "startSystemThread Exit", LOG_TYPE_DEBUG); startsystemthreadStatus = status; From 8b0507b9872ce3946006e3de90448a39606b3fda Mon Sep 17 00:00:00 2001 From: David Hill Date: Wed, 5 Sep 2018 14:53:13 -0500 Subject: [PATCH 06/15] MCOL-1523 --- oamapps/mcsadmin/mcsadmin.cpp | 11 +-- oamapps/postConfigure/postConfigure.cpp | 2 +- procmgr/main.cpp | 10 +-- procmgr/processmanager.cpp | 113 ++++++++---------------- procmon/main.cpp | 22 ++--- 5 files changed, 52 insertions(+), 106 deletions(-) diff --git a/oamapps/mcsadmin/mcsadmin.cpp b/oamapps/mcsadmin/mcsadmin.cpp index 6090058d4..befcb68fa 100644 --- a/oamapps/mcsadmin/mcsadmin.cpp +++ b/oamapps/mcsadmin/mcsadmin.cpp @@ -7036,17 +7036,8 @@ int processCommand(string* arguments) if (systemstatus.SystemOpState == oam::ACTIVE ) { try { -// cout << endl << " Starting Modules" << endl; -// oam.startModule(devicenetworklist, ackTemp); - - //reload DBRM with new configuration, needs to be done here after startModule -// cmd = startup::StartUp::installDir() + "/bin/dbrmctl reload > /dev/null 2>&1"; -// system(cmd.c_str()); -// sleep(15); - -// cout << " Successful start of Modules " << endl; - cout << endl << " Restarting System "; + gracefulTemp = oam::FORCEFUL; int returnStatus = oam.restartSystem(gracefulTemp, ackTemp); switch (returnStatus) { diff --git a/oamapps/postConfigure/postConfigure.cpp b/oamapps/postConfigure/postConfigure.cpp index e69677039..fe7a3b337 100644 --- a/oamapps/postConfigure/postConfigure.cpp +++ b/oamapps/postConfigure/postConfigure.cpp @@ -3172,7 +3172,7 @@ int main(int argc, char *argv[]) //skip interface with remote servers and perform install if ( !nonDistribute ) { - // + // // perform remote install of other servers in the system // cout << endl << "===== System Installation =====" << endl << endl; diff --git a/procmgr/main.cpp b/procmgr/main.cpp index 2747fda16..5ef5113f1 100644 --- a/procmgr/main.cpp +++ b/procmgr/main.cpp @@ -1489,7 +1489,7 @@ void pingDeviceThread() if (moduleInfoList[moduleName] >= ModuleHeartbeatCount || opState == oam::DOWN || opState == oam::AUTO_DISABLED) { - log.writeLog(__LINE__, "Module alive, bring it back online: " + moduleName, LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "*** Module alive, bring it back online: " + moduleName, LOG_TYPE_DEBUG); string PrimaryUMModuleName = config.moduleName(); try { @@ -1927,7 +1927,7 @@ void pingDeviceThread() { //Log failure, issue alarm, set moduleOpState Configuration config; - log.writeLog(__LINE__, "module is down: " + moduleName, LOG_TYPE_CRITICAL); + log.writeLog(__LINE__, "*** module is down: " + moduleName, LOG_TYPE_CRITICAL); //set query system state not ready BRM::DBRM dbrm; @@ -2013,9 +2013,6 @@ void pingDeviceThread() // resume the dbrm oam.dbrmctl("resume"); log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); - - //set recycle process - processManager.recycleProcess(moduleName); } // return values = 'ip address' for running or rebooting, stopped or terminated @@ -2234,9 +2231,6 @@ void pingDeviceThread() oam.dbrmctl("resume"); log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); - //set recycle process - processManager.recycleProcess(moduleName); - //enable query stats dbrm.setSystemQueryReady(true); diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index 89f9a145c..daacdbf10 100755 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -619,21 +619,11 @@ void processMSG(messageqcpp::IOSocket* cfIos) //distribute config file processManager.distributeConfigFile("system"); - processManager.setSystemState(oam::ACTIVE); - //set query system state ready processManager.setQuerySystemState(true); - //call dbrm control -/* oam.dbrmctl("halt"); - log.writeLog(__LINE__, "'dbrmctl halt' done", LOG_TYPE_DEBUG); - - oam.dbrmctl("reload"); - log.writeLog(__LINE__, "'dbrmctl reload' done", LOG_TYPE_DEBUG); - - oam.dbrmctl("resume"); - log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); -*/ } + processManager.setSystemState(oam::ACTIVE); + } } else { @@ -846,15 +836,10 @@ void processMSG(messageqcpp::IOSocket* cfIos) status = processManager.disableModule(moduleName, true); log.writeLog(__LINE__, "Disable Module Completed on " + moduleName, LOG_TYPE_INFO); - processManager.recycleProcess(moduleName); - - //check for SIMPLEX Processes on mate might need to be started - processManager.checkSimplexModule(moduleName); - - processManager.setSystemState(oam::ACTIVE); - //set query system state ready processManager.setQuerySystemState(true); + + processManager.setSystemState(oam::ACTIVE); } else { @@ -1611,6 +1596,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) status = retStatus; } } + //now stop local module processManager.stopModule(config.moduleName(), graceful, manualFlag ); @@ -1627,7 +1613,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) oam::DeviceNetworkList devicenetworklist; pthread_t startsystemthread; - pthread_create (&startsystemthread, NULL, (void*(*)(void*)) &startSystemThread, &devicenetworklist); + status = pthread_create (&startsystemthread, NULL, (void*(*)(void*)) &startSystemThread, &devicenetworklist); if ( status != 0 ) { log.writeLog(__LINE__, "STARTMODULE: pthread_create failed, return status = " + oam.itoa(status)); @@ -1636,20 +1622,19 @@ void processMSG(messageqcpp::IOSocket* cfIos) if (status == 0 && ackIndicator) { - // BUG 4554 We don't need the join because calpont console is now looking for "Active" - // We need to return the ack right away to let console know we got the message. -// pthread_join(startsystemthread, NULL); -// status = startsystemthreadStatus; + pthread_join(startsystemthread, NULL); + status = startsystemthreadStatus; } - - // setup MySQL Replication after switchover command -/* if (graceful == FORCEFUL) + + // setup MySQL Replication after FORCE restart command + if ( (status == API_SUCCESS) && + (graceful == oam::FORCEFUL) ) { - log.writeLog(__LINE__, "Setup MySQL Replication for restartSystem FORCE, used by switch-parent command", LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "Setup MySQL Replication for restartSystem FORCE", LOG_TYPE_DEBUG); oam::DeviceNetworkList devicenetworklist; processManager.setMySQLReplication(devicenetworklist); } -*/ + log.writeLog(__LINE__, "RESTARTSYSTEM: Start System Request Completed", LOG_TYPE_INFO); } @@ -3277,6 +3262,7 @@ int ProcessManager::shutdownModule(string target, ByteStream::byte actionIndicat int ProcessManager::disableModule(string target, bool manualFlag) { Oam oam; + ProcessManager processManager(config, log); ModuleConfig moduleconfig; log.writeLog(__LINE__, "disableModule request for " + target, LOG_TYPE_DEBUG); @@ -3386,6 +3372,11 @@ int ProcessManager::disableModule(string target, bool manualFlag) if ( updateWorkerNodeconfig() != API_SUCCESS ) return API_FAILURE; + processManager.recycleProcess(target); + + //check for SIMPLEX Processes on mate might need to be started + processManager.checkSimplexModule(target); + //distribute config file distributeConfigFile("system"); @@ -3414,37 +3405,6 @@ void ProcessManager::recycleProcess(string module, bool enableModule) oam.getSystemConfig("PrimaryUMModuleName", PrimaryUMModuleName); } catch(...) {} - - // restart DBRM Process and DMLProc and return if enable module is being done - if (enableModule) - { - //recycle DBRM processes in all cases - restartProcessType("DBRMControllerNode"); - restartProcessType("DBRMWorkerNode"); - sleep(5); - - restartProcessType("DMLProc"); - return; - } - - //recycle DBRM processes in all cases -// restartProcessType("DBRMControllerNode", module); -// restartProcessType("DBRMWorkerNode"); - - // only recycle ddl/dmlproc, if down/up module is non-parent UM -/* if ( ( moduleType == "um" ) && - if ( PrimaryUMModuleName != module) - { - restartProcessType("DDLProc",module); - restartProcessType("DMLProc",module); - return; - } -*/ -// if( PrimaryUMModuleName == module) -// { -// stopProcessType("DDLProc"); -// stopProcessType("DMLProc"); -// } stopProcessType("WriteEngineServer"); @@ -3526,10 +3486,6 @@ int ProcessManager::enableModule(string target, int state, bool failover) if ( newStandbyModule == target) setStandbyModule(newStandbyModule); - - //set recycle process -// if (!failover) -// recycleProcess(target); log.writeLog(__LINE__, "enableModule request for " + target + " completed", LOG_TYPE_DEBUG); @@ -6518,15 +6474,15 @@ void ProcessManager::setQuerySystemState(bool set) Oam oam; BRM::DBRM dbrm; - log.writeLog(__LINE__, "setQuerySystemState = " + oam.itoa(set), LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "setQuerySystemState called = " + oam.itoa(set), LOG_TYPE_DEBUG); try { dbrm.setSystemQueryReady(set); - log.writeLog(__LINE__, "setSystemQueryReady successful", LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "setSystemQueryReady = " + oam.itoa(set), LOG_TYPE_DEBUG); try { dbrm.setSystemReady(set); - log.writeLog(__LINE__, "setSystemReady successful", LOG_TYPE_DEBUG); + log.writeLog(__LINE__, "setSystemReady = " + oam.itoa(set), LOG_TYPE_DEBUG); } catch(...) { @@ -7089,23 +7045,28 @@ void startSystemThread(oam::DeviceNetworkList Devicenetworklist) } if (DMLprocessstatus.ProcessOpState == oam::ACTIVE) { - rtn = oam::ACTIVE; + rtn = oam::ACTIVE; break; } if (DMLprocessstatus.ProcessOpState == oam::FAILED) { - rtn = oam::FAILED; + rtn = oam::FAILED; + status = oam::API_FAILURE; break; } - // wait some more - sleep(2); - } - processManager.setSystemState(rtn); + // wait some more + sleep(2); + } + + if ( rtn = oam::ACTIVE ) + //set query system state not ready + processManager.setQuerySystemState(true); + + processManager.setSystemState(rtn); } - - //set query system state ready - processManager.setQuerySystemState(true); + else + processManager.setSystemState(oam::FAILED); // exit thread log.writeLog(__LINE__, "startSystemThread Exit", LOG_TYPE_DEBUG); diff --git a/procmon/main.cpp b/procmon/main.cpp index d6edd4ac7..a30de1fa1 100644 --- a/procmon/main.cpp +++ b/procmon/main.cpp @@ -2451,17 +2451,17 @@ void processStatusMSG(messageqcpp::IOSocket* cfIos) } //if DMLProc set to ACTIVE, set system state to ACTIVE if in an INIT state - if ( processName == "DMLProc" && state == oam::ACTIVE ) - { - if ( fShmSystemStatus[0].OpState == oam::BUSY_INIT || - fShmSystemStatus[0].OpState == oam::MAN_INIT || - fShmSystemStatus[0].OpState == oam::AUTO_INIT ) - { - fShmSystemStatus[0].OpState = state; - memcpy(fShmSystemStatus[0].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE); - log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Set System State = " + oamState[state], LOG_TYPE_DEBUG); - } - } +// if ( processName == "DMLProc" && state == oam::ACTIVE ) +// { +// if ( fShmSystemStatus[0].OpState == oam::BUSY_INIT || +// fShmSystemStatus[0].OpState == oam::MAN_INIT || +// fShmSystemStatus[0].OpState == oam::AUTO_INIT ) +// { +// fShmSystemStatus[0].OpState = state; +// memcpy(fShmSystemStatus[0].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE); +// log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Set System State = " + oamState[state], LOG_TYPE_DEBUG); +// } +// } } break; From 14d3a34c2893d4d66d62802382f33f0d39f87195 Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Fri, 7 Sep 2018 11:43:54 +0100 Subject: [PATCH 07/15] MCOL-1694 & MCOL-1505 Improved exception handling This patch catches exceptions in DDLProc, DMLProc and ExeMgr which could potentially happen during startup. Logging them instead of silently ignoring them (or crashing in ExeMgr). --- ddlproc/ddlproc.cpp | 33 ++++++++++++++++++++++++++++-- dmlproc/dmlproc.cpp | 44 ++++++++++++++++++++++++++++++++++++++++ dmlproc/dmlprocessor.cpp | 20 ++++++++++++++++++ exemgr/main.cpp | 30 +++++++++++++++++++++++++-- 4 files changed, 123 insertions(+), 4 deletions(-) diff --git a/ddlproc/ddlproc.cpp b/ddlproc/ddlproc.cpp index 45bc6a48d..d74295cfa 100644 --- a/ddlproc/ddlproc.cpp +++ b/ddlproc/ddlproc.cpp @@ -135,8 +135,30 @@ int main(int argc, char* argv[]) { oam.processInitComplete("DDLProc", ACTIVE); } + catch (std::exception& ex) + { + cerr << ex.what() << endl; + LoggingID logid(23, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("DDLProc init caught exception: "); + args1.add(ex.what()); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + return 1; + } catch (...) { + cerr << "Caught unknown exception in init!" << endl; + LoggingID logid(23, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("DDLProc init caught unknown exception"); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + return 1; } } @@ -147,21 +169,28 @@ int main(int argc, char* argv[]) catch (std::exception& ex) { cerr << ex.what() << endl; + LoggingID logid(23, 0, 0); Message::Args args; Message message(8); args.add("DDLProc failed on: "); args.add(ex.what()); message.format( args ); - + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, message, logid); + return 1; } catch (...) { cerr << "Caught unknown exception!" << endl; + LoggingID logid(23, 0, 0); Message::Args args; Message message(8); args.add("DDLProc failed on: "); - args.add("receiving DDLPackage"); + args.add("receiving DDLPackage (unknown exception)"); message.format( args ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, message, logid); + return 1; } return 0; } diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index df17fbed6..106977824 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -494,8 +494,30 @@ int main(int argc, char* argv[]) // At first we set to BUSY_INIT oam.processInitComplete("DMLProc", oam::BUSY_INIT); } + catch (std::exception& ex) + { + cerr << ex.what() << endl; + LoggingID logid(21, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("DMLProc init caught exception: "); + args1.add(ex.what()); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + return 1; + } catch (...) { + cerr << "Caught unknown exception in init!" << endl; + LoggingID logid(21, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("DMLProc init caught unknown exception"); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + return 1; } //@Bug 1627 @@ -584,8 +606,30 @@ int main(int argc, char* argv[]) { oam.processInitComplete("DMLProc", ACTIVE); } + catch (std::exception& ex) + { + cerr << ex.what() << endl; + LoggingID logid(21, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("DMLProc init caught exception: "); + args1.add(ex.what()); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + return 1; + } catch (...) { + cerr << "Caught unknown exception in init!" << endl; + LoggingID logid(21, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("DMLProc init caught unknown exception"); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_CRITICAL, msg, logid); + return 1; } Dec = DistributedEngineComm::instance(rm); diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index 2205d1712..3b3a5cffc 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -1155,8 +1155,28 @@ void DMLServer::start() } cancelThread.join(); } + catch (std::exception& ex) + { + cerr << ex.what() << endl; + logging::LoggingID lid(21); + Message::Args args; + Message message(8); + args.add("DMLProc init caught exception: "); + args.add(ex.what()); + message.format(args); + logging::Logger logger(lid.fSubsysID); + logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid); + } catch (...) { + cerr << "Caught unknown exception!" << endl; + logging::LoggingID lid(21); + Message::Args args; + Message message(8); + args.add("DMLProc init caught unknown exception"); + message.format(args); + logging::Logger logger(lid.fSubsysID); + logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid); } } diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 92f949f57..716c2bf54 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -1300,8 +1300,34 @@ void cleanTempDir() assert(tmpPrefix != "/"); /* This is quite scary as ExeMgr usually runs as root */ - boost::filesystem::remove_all(tmpPrefix); - boost::filesystem::create_directories(tmpPrefix); + try + { + boost::filesystem::remove_all(tmpPrefix); + boost::filesystem::create_directories(tmpPrefix); + } + catch (std::exception& ex) + { + cerr << ex.what() << endl; + LoggingID logid(16, 0, 0); + Message::Args args; + Message message(8); + args.add("Execption whilst cleaning tmpdir: "); + args.add(ex.what()); + message.format( args ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_WARNING, message, logid); + } + catch (...) + { + cerr << "Caught unknown exception during tmpdir cleanup" << endl; + LoggingID logid(16, 0, 0); + Message::Args args; + Message message(8); + args.add("Unknown execption whilst cleaning tmpdir"); + message.format( args ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_WARNING, message, logid); + } } From c50f5fa05dc5e0c9d5fa8093ebc44e9194cc7cbf Mon Sep 17 00:00:00 2001 From: David Hill Date: Mon, 10 Sep 2018 13:03:35 -0500 Subject: [PATCH 08/15] bump version --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 8a397bd3a..04362fcf8 100644 --- a/VERSION +++ b/VERSION @@ -1,4 +1,4 @@ COLUMNSTORE_VERSION_MAJOR=1 COLUMNSTORE_VERSION_MINOR=1 -COLUMNSTORE_VERSION_PATCH=6 +COLUMNSTORE_VERSION_PATCH=7 COLUMNSTORE_VERSION_RELEASE=1 From f7a2b50b21d3bbe1fd2cb6bec850b92f0ed9c5c7 Mon Sep 17 00:00:00 2001 From: David Hill Date: Tue, 11 Sep 2018 15:47:25 -0500 Subject: [PATCH 09/15] MCOL-1699 - fix iss with adddbroot amazon --- oam/oamcpp/liboamcpp.cpp | 37 +++++++++++++++++++++---------------- procmon/main.cpp | 2 +- procmon/processmonitor.cpp | 4 +++- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/oam/oamcpp/liboamcpp.cpp b/oam/oamcpp/liboamcpp.cpp index 7483ca239..baa8dbe94 100644 --- a/oam/oamcpp/liboamcpp.cpp +++ b/oam/oamcpp/liboamcpp.cpp @@ -7713,7 +7713,7 @@ namespace oam // run script to get Instance status and IP Address string cmd = InstallDir + "/bin/MCSInstanceCmds.sh getInstance > /tmp/getInstanceInfo_" + name; int status = system(cmd.c_str()); - if (WEXITSTATUS(status) != 0 ) + if (WEXITSTATUS(status) == 1 ) return "failed"; // get Instance Name @@ -7744,7 +7744,7 @@ namespace oam // run script to get Instance status and IP Address string cmd = InstallDir + "/bin/MCSInstanceCmds.sh getType > /tmp/getInstanceType_" + name; int status = system(cmd.c_str()); - if (WEXITSTATUS(status) != 0 ) + if (WEXITSTATUS(status) == 1 ) return "failed"; // get Instance Name @@ -7775,7 +7775,7 @@ namespace oam // run script to get Instance Subnet string cmd = InstallDir + "/bin/MCSInstanceCmds.sh getSubnet > /tmp/getInstanceSubnet_" + name; int status = system(cmd.c_str()); - if (WEXITSTATUS(status) != 0 ) + if (WEXITSTATUS(status) == 1 ) return "failed"; // get Instance Name @@ -7807,7 +7807,7 @@ namespace oam // run script to get Instance status and IP Address string cmd = InstallDir + "/bin/MCSInstanceCmds.sh launchInstance " + IPAddress + " " + type + " " + group + " > /tmp/getInstance_" + name; int status = system(cmd.c_str()); - if (WEXITSTATUS(status) != 0 ) + if (WEXITSTATUS(status) == 1 ) return "failed"; if (checkLogStatus("/tmp/getInstance", "Required") ) @@ -7883,7 +7883,7 @@ namespace oam // run script to get Instance status and IP Address string cmd = InstallDir + "/bin/MCSInstanceCmds.sh startInstance " + instanceName + " > /tmp/startEC2Instance_" + instanceName; int ret = system(cmd.c_str()); - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) return false; return true; @@ -7902,7 +7902,7 @@ namespace oam // run script to get Instance status and IP Address string cmd = InstallDir + "/bin/MCSInstanceCmds.sh assignElasticIP " + instanceName + " " + IpAddress + " > /tmp/assignElasticIP_" + instanceName; int ret = system(cmd.c_str()); - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) exceptionControl("assignElasticIP", oam::API_FAILURE); return true; @@ -7921,7 +7921,7 @@ namespace oam // run script to get Instance status and IP Address string cmd = InstallDir + "/bin/MCSInstanceCmds.sh deassignElasticIP " + IpAddress + " > /tmp/deassignElasticIP_" + IpAddress; int ret = system(cmd.c_str()); - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) exceptionControl("deassignElasticIP", oam::API_FAILURE); return true; @@ -7940,8 +7940,9 @@ namespace oam // run script to get Volume Status string cmd = InstallDir + "/bin/MCSVolumeCmds.sh describe " + volumeName + " > /tmp/getVolumeStatus_" + volumeName; int ret = system(cmd.c_str()); - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ){ return "failed"; + } // get status string status; @@ -7971,7 +7972,7 @@ namespace oam // run script to get Volume Status string cmd = InstallDir + "/bin/MCSVolumeCmds.sh create " + size + " " + name + " > /tmp/createVolumeStatus_" + name; int ret = system(cmd.c_str()); - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) return "failed"; // get status @@ -8016,11 +8017,15 @@ namespace oam string cmd = InstallDir + "/bin/MCSVolumeCmds.sh attach " + volumeName + " " + instanceName + " " + deviceName + " > /tmp/attachVolumeStatus_" + volumeName; ret = system(cmd.c_str()); - if (WEXITSTATUS(ret) == 0 ) + if (WEXITSTATUS(ret) == 1 ) + { + //failing to attach, dettach and retry + writeLog("attachEC2Volume: Attach failed, call detach:" + volumeName + " " + instanceName + " " + deviceName, LOG_TYPE_ERROR ); + + detachEC2Volume(volumeName); + } + else return true; - - //failing to attach, dettach and retry - detachEC2Volume(volumeName); } if (ret == 0 ) @@ -8042,7 +8047,7 @@ namespace oam // run script to attach Volume string cmd = InstallDir + "/bin/MCSVolumeCmds.sh detach " + volumeName + " > /tmp/detachVolumeStatus_" + volumeName; int ret = system(cmd.c_str()); - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) return false; return true; @@ -8061,7 +8066,7 @@ namespace oam // run script to delete Volume string cmd = InstallDir + "/bin/MCSVolumeCmds.sh delete " + volumeName + " > /tmp/deleteVolumeStatus_" + volumeName; int ret = system(cmd.c_str()); - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) return false; return true; @@ -8080,7 +8085,7 @@ namespace oam // run script to create a tag string cmd = InstallDir + "/bin/MCSVolumeCmds.sh createTag " + resourceName + " " + tagName + " " + tagValue + " > /tmp/createTagStatus_" + resourceName; int ret = system(cmd.c_str()); - if (WEXITSTATUS(ret) != 0 ) + if (WEXITSTATUS(ret) == 1 ) return false; return true; diff --git a/procmon/main.cpp b/procmon/main.cpp index ad05a4f95..d877bdcfe 100644 --- a/procmon/main.cpp +++ b/procmon/main.cpp @@ -210,7 +210,7 @@ int main(int argc, char **argv) } catch(...) {} - if ( cloud == "amazon-ec2" ) { + if ( cloud == "amazon-ec2" || cloud == "amazon-vpc" ) { if(!aMonitor.amazonIPCheck()) { log.writeLog(__LINE__, "ERROR: amazonIPCheck failed, exiting", LOG_TYPE_CRITICAL); sleep(2); diff --git a/procmon/processmonitor.cpp b/procmon/processmonitor.cpp index efa01c449..bf471e62c 100644 --- a/procmon/processmonitor.cpp +++ b/procmon/processmonitor.cpp @@ -5491,7 +5491,6 @@ bool ProcessMonitor::amazonIPCheck() log.writeLog(__LINE__, "Assign Elastic IP Address failed : '" + moduleName + "' / '" + ELIPaddress, LOG_TYPE_ERROR); break; } - break; } @@ -5653,8 +5652,11 @@ bool ProcessMonitor::amazonVolumeCheck(int dbrootID) {} if (oam.attachEC2Volume(volumeName, deviceName, instanceName)) { + log.writeLog(__LINE__, "amazonVolumeCheck function , volume to attached: " + volumeName, LOG_TYPE_DEBUG); + string cmd = "mount " + startup::StartUp::installDir() + "/data" + oam.itoa(dbrootID) + " > /dev/null"; system(cmd.c_str()); + log.writeLog(__LINE__, "amazonVolumeCheck function , volume to mounted: " + volumeName, LOG_TYPE_DEBUG); return true; } else From 21f108896d02d74a2e6ca9945d40f67e816dd176 Mon Sep 17 00:00:00 2001 From: David Hill Date: Wed, 12 Sep 2018 08:36:13 -0500 Subject: [PATCH 10/15] MCOL-1523 - fix compile issue --- procmon/main.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/procmon/main.cpp b/procmon/main.cpp index a3b8d52ef..ac7c761cc 100644 --- a/procmon/main.cpp +++ b/procmon/main.cpp @@ -2443,10 +2443,6 @@ void processStatusMSG(messageqcpp::IOSocket* cfIos) memcpy(fShmSystemStatus[0].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE); log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Set System State = " + oamState[state], LOG_TYPE_DEBUG); } - - BRM::DBRM dbrm; - dbrm.setSystemQueryReady(true); - } } break; From 8ec02bfce5efd124a950e4be706b037df81f147f Mon Sep 17 00:00:00 2001 From: David Hill Date: Wed, 12 Sep 2018 14:31:23 -0500 Subject: [PATCH 11/15] MCOL-1423 --- procmgr/processmanager.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index 97e42533f..42d1c167b 100755 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -2846,10 +2846,13 @@ void processMSG(messageqcpp::IOSocket* cfIos) } - // if a DDLProc was restarted, reinit DMLProc + // if a DDLProc was restarted, restart DMLProc if( processName == "DDLProc") { processManager.reinitProcessType("DMLProc"); + //set query system states ready processManager.setQuerySystemState(true); + + processManager.setSystemState(oam::ACTIVE); } //only run on auto process restart @@ -2900,6 +2903,8 @@ void processMSG(messageqcpp::IOSocket* cfIos) processManager.setQuerySystemState(true); processManager.setSystemState(oam::ACTIVE); + + log.writeLog(__LINE__, "MSG RECEIVED: Process Restarted Completed"); } break; From 3ac9d93597f2fba2d8eebcad2ab434460c58c486 Mon Sep 17 00:00:00 2001 From: David Hill Date: Sat, 15 Sep 2018 14:28:46 -0500 Subject: [PATCH 12/15] MCOL-1523 - addiiotnal fixes --- oamapps/mcsadmin/mcsadmin.cpp | 2 +- procmgr/processmanager.cpp | 10 +++++++--- procmon/main.cpp | 8 ++++++-- procmon/processmonitor.cpp | 7 ------- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/oamapps/mcsadmin/mcsadmin.cpp b/oamapps/mcsadmin/mcsadmin.cpp index befcb68fa..29d53b35f 100644 --- a/oamapps/mcsadmin/mcsadmin.cpp +++ b/oamapps/mcsadmin/mcsadmin.cpp @@ -7036,7 +7036,7 @@ int processCommand(string* arguments) if (systemstatus.SystemOpState == oam::ACTIVE ) { try { - cout << endl << " Restarting System "; + cout << endl << " Restarting System " << endl; gracefulTemp = oam::FORCEFUL; int returnStatus = oam.restartSystem(gracefulTemp, ackTemp); switch (returnStatus) diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index 42d1c167b..8ce936c97 100755 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -1637,7 +1637,7 @@ void processMSG(messageqcpp::IOSocket* cfIos) { log.writeLog(__LINE__, "Setup MySQL Replication for restartSystem FORCE", LOG_TYPE_DEBUG); oam::DeviceNetworkList devicenetworklist; - processManager.setMySQLReplication(devicenetworklist); + processManager.setMySQLReplication(devicenetworklist, oam::UnassignedName, true); } log.writeLog(__LINE__, "RESTARTSYSTEM: Start System Request Completed", LOG_TYPE_INFO); @@ -2769,12 +2769,16 @@ void processMSG(messageqcpp::IOSocket* cfIos) processManager.reinitProcessType("cpimport"); //request reinit after Process is active - for ( int i = 0; i < 600 ; i++ ) { + for ( int i = 0; i < 10 ; i++ ) { try { ProcessStatus procstat; oam.getProcessStatus(processName, moduleName, procstat); - if (procstat.ProcessOpState == oam::ACTIVE) { + if (procstat.ProcessOpState == oam::COLD_STANDBY) + break; + + if ( (procstat.ProcessOpState == oam::ACTIVE) || + (procstat.ProcessOpState == oam::STANDBY) ) { // if a PrimProc was restarted, reinit ACTIVE ExeMgr(s) and DDL/DMLProc if( processName == "PrimProc") { diff --git a/procmon/main.cpp b/procmon/main.cpp index 4be046511..63f0140b2 100644 --- a/procmon/main.cpp +++ b/procmon/main.cpp @@ -1388,7 +1388,7 @@ static void chldHandleThread(MonitorConfig config) catch(...) {} - // check if process failover is needed due to process outage + // check if Mdoule failover is needed due to process outage aMonitor.checkModuleFailover((*listPtr).ProcessName); //check the db health @@ -1463,15 +1463,19 @@ static void chldHandleThread(MonitorConfig config) restartStatus = " restart failed with hard failure, don't retry!!"; (*listPtr).processID = 0; - // check if process failover is needed due to process outage + // check if Module failover is needed due to process outage aMonitor.checkModuleFailover((*listPtr).ProcessName); break; } else { if ( (*listPtr).processID != oam::API_MINOR_FAILURE ) + { //restarted successful + //Inform Process Manager that Process restart + aMonitor.processRestarted( (*listPtr).ProcessName, false); break; + } } // restart failed with minor error, sleep and try sleep(5); diff --git a/procmon/processmonitor.cpp b/procmon/processmonitor.cpp index bf471e62c..2109d8ed1 100644 --- a/procmon/processmonitor.cpp +++ b/procmon/processmonitor.cpp @@ -621,9 +621,6 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO else log.writeLog(__LINE__, "START: process already active " + processName); - //Inform Process Manager that Process restart - //processRestarted(processName); - ackMsg << (ByteStream::byte) ACK; ackMsg << (ByteStream::byte) START; ackMsg << (ByteStream::byte) requestStatus; @@ -720,9 +717,6 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO requestStatus = API_FAILURE; } - //Inform Process Manager that Process restart - //processRestarted(processName); - ackMsg << (ByteStream::byte) ACK; ackMsg << (ByteStream::byte) RESTART; ackMsg << (ByteStream::byte) requestStatus; @@ -4650,7 +4644,6 @@ void ProcessMonitor::checkModuleFailover( std::string processName) systemprocessstatus.processstatus[i].ProcessOpState == oam::FAILED ) { // found a AVAILABLE mate, start it log.writeLog(__LINE__, "Change UM Master to module " + systemprocessstatus.processstatus[i].Module, LOG_TYPE_DEBUG); - log.writeLog(__LINE__, "Disable local UM module " + config.moduleName(), LOG_TYPE_DEBUG); log.writeLog(__LINE__, "Stop local UM module " + config.moduleName(), LOG_TYPE_DEBUG); log.writeLog(__LINE__, "Disable Local will Enable UM module " + systemprocessstatus.processstatus[i].Module, LOG_TYPE_DEBUG); From 24c5e937565f044919b18662309db4a133ed9291 Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Fri, 21 Sep 2018 09:50:10 +0100 Subject: [PATCH 13/15] MCOL-1737 Add debug logging options for LRU cache This adds options which are user enabled to debug the LRU cache inside ColumnStore. Specifically cache flushing. It adds the following: * PrimProc flush information when SIGUSR2 mode is enabled * cpimport dictionary flush information when -d2 is used * WriteEngineServer DML flush information to STDERR --- primitives/blockcache/filebuffermgr.cpp | 74 ++++++++++++++++++++++- writeengine/bulk/we_tableinfo.cpp | 11 ++++ writeengine/server/we_dmlcommandproc.cpp | 3 + writeengine/shared/we_bulkrollbackmgr.cpp | 3 + 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/primitives/blockcache/filebuffermgr.cpp b/primitives/blockcache/filebuffermgr.cpp index e6cab58ac..eb5f9a2a9 100644 --- a/primitives/blockcache/filebuffermgr.cpp +++ b/primitives/blockcache/filebuffermgr.cpp @@ -117,7 +117,10 @@ void FileBufferMgr::flushCache() // the block pool should not be freed in the above block to allow us // to continue doing concurrent unprotected-but-"safe" memcpys // from that memory - + if (fReportFrequency) + { + fLog << "Clearing entire cache" << endl; + } fFBPool.clear(); // fFBPool.reserve(fMaxNumBlocks); } @@ -150,6 +153,15 @@ void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt) BRM::LBID_t lbid; BRM::VER_t ver; filebuffer_uset_iter_t iter; + if (fReportFrequency) + { + fLog << "flushMany " << cnt << " items: "; + for (uint32_t j = 0; j < cnt; j++) + { + fLog << "lbid: " << laVptr[j].LBID << " ver: " << laVptr[j].Ver << ", "; + } + fLog << endl; + } for (uint32_t j = 0; j < cnt; j++) { lbid = static_cast(laVptr->LBID); @@ -157,6 +169,10 @@ void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt) iter = fbSet.find(HashObject_t(lbid, ver, 0)); if (iter != fbSet.end()) { + if (fReportFrequency) + { + fLog << "flushMany hit, lbid: " << lbid << " index: " << iter->poolIdx << endl; + } //remove it from fbList uint32_t idx = iter->poolIdx; fbList.erase(fFBPool[idx].listLoc()); @@ -179,6 +195,16 @@ void FileBufferMgr::flushManyAllversion(const LBID_t* laVptr, uint32_t cnt) mutex::scoped_lock lk(fWLock); + if (fReportFrequency) + { + fLog << "flushManyAllversion " << cnt << " items: "; + for (uint32_t i = 0; i < cnt; i++) + { + fLog << laVptr[i] << ", "; + } + fLog << endl; + } + if (fCacheSize == 0 || cnt == 0) return; @@ -187,6 +213,10 @@ void FileBufferMgr::flushManyAllversion(const LBID_t* laVptr, uint32_t cnt) for (it = fbSet.begin(); it != fbSet.end();) { if (uniquer.find(it->lbid) != uniquer.end()) { + if (fReportFrequency) + { + fLog << "flushManyAllversion hit: " << it->lbid << " index: " << it->poolIdx << endl; + } const uint32_t idx = it->poolIdx; fbList.erase(fFBPool[idx].listLoc()); fEmptyPoolSlots.push_back(idx); @@ -213,6 +243,16 @@ void FileBufferMgr::flushOIDs(const uint32_t *oids, uint32_t count) pair itList; filebuffer_uset_t::iterator it; + if (fReportFrequency) + { + fLog << "flushOIDs " << count << " items: "; + for (uint32_t i = 0; i < count; i++) + { + fLog << oids[i] << ", "; + } + fLog << endl; + } + // If there are more than this # of extents to drop, the whole cache will be cleared const uint32_t clearThreshold = 50000; @@ -269,6 +309,22 @@ void FileBufferMgr::flushPartition(const vector &oids, const set::iterator sit; + fLog << "flushPartition oids: "; + for (uint32_t i = 0; i < count; i++) + { + fLog << oids[i] << ", "; + } + fLog << "flushPartition partitions: "; + for (sit = partitions.begin(); sit != partitions.end(); ++sit) + { + fLog << (*sit).toString() << ", "; + } + fLog << endl; + } + if (fCacheSize == 0 || oids.size() == 0 || partitions.size() == 0) return; @@ -496,7 +552,7 @@ int FileBufferMgr::insert(const BRM::LBID_t lbid, const BRM::VER_t ver, const ui if (fReportFrequency && (fBlksLoaded%fReportFrequency)==0) { struct timespec tm; clock_gettime(CLOCK_MONOTONIC, &tm); - fLog + fLog << "insert: " << left << fixed << ((double)(tm.tv_sec+(1.e-9*tm.tv_nsec))) << " " << right << setw(12) << fBlksLoaded << " " << right << setw(12) << fBlksNotUsed << endl; @@ -671,6 +727,11 @@ int FileBufferMgr::bulkInsert(const vector &ops) mutex::scoped_lock lk(fWLock); + if (fReportFrequency) + { + fLog << "bulkInsert: "; + } + for (i = 0; i < ops.size(); i++) { const CacheInsert_t &op = ops[i]; @@ -694,7 +755,10 @@ int FileBufferMgr::bulkInsert(const vector &ops) continue; } - //cout << "FBM: inserting <" << op.lbid << ", " << op.ver << endl; + if (fReportFrequency) + { + fLog << op.lbid << " " << op.ver << ", "; + } fCacheSize++; fBlksLoaded++; FBData_t fbdata = {op.lbid, op.ver, 0}; @@ -712,6 +776,10 @@ int FileBufferMgr::bulkInsert(const vector &ops) #endif ret++; } + if (fReportFrequency) + { + fLog << endl; + } idbassert(fCacheSize <= maxCacheSize()); return ret; diff --git a/writeengine/bulk/we_tableinfo.cpp b/writeengine/bulk/we_tableinfo.cpp index 0e651b55d..922e0a24b 100644 --- a/writeengine/bulk/we_tableinfo.cpp +++ b/writeengine/bulk/we_tableinfo.cpp @@ -709,6 +709,17 @@ int TableInfo::setParseComplete(const int &columnId, #ifdef PROFILE Stats::startParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS); #endif + if (fLog->isDebug(DEBUG_2)) + { + ostringstream oss; + oss << "Dictionary cache flush: "; + for (uint32_t i = 0; i < fDictFlushBlks.size(); i++) + { + oss << fDictFlushBlks[i] << ", "; + } + oss << endl; + fLog->logMsg( oss.str(), MSGLVL_INFO1 ); + } cacheutils::flushPrimProcAllverBlocks(fDictFlushBlks); #ifdef PROFILE Stats::stopParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS); diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index 0fda76182..90831afd6 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -2036,10 +2036,13 @@ uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::s { std::set::iterator lbidIter; std::vector dictFlushBlks; + cerr << "API Flushing blocks: "; for (lbidIter = (*mapIter).second.begin(); lbidIter != (*mapIter).second.end(); lbidIter++) { + cerr << *lbidIter << ", "; dictFlushBlks.push_back((*lbidIter)); } + cerr << endl; cacheutils::flushPrimProcAllverBlocks(dictFlushBlks); fWEWrapper.getDictMap().erase(txnID); } diff --git a/writeengine/shared/we_bulkrollbackmgr.cpp b/writeengine/shared/we_bulkrollbackmgr.cpp index 2c41734e0..6f8c986b8 100644 --- a/writeengine/shared/we_bulkrollbackmgr.cpp +++ b/writeengine/shared/we_bulkrollbackmgr.cpp @@ -195,13 +195,16 @@ int BulkRollbackMgr::rollback ( bool keepMetaFile ) // the user but keep going. std::vector allOIDs; std::set::const_iterator iter=fAllColDctOIDs.begin(); + cerr << "Rollback flushing: "; while (iter != fAllColDctOIDs.end()) { + cerr << *iter << ", "; //std::cout << "Flushing OID from PrimProc cache " << *iter << // std::endl; allOIDs.push_back(*iter); ++iter; } + cerr << endl; int cache_rc = cacheutils::flushOIDsFromCache( allOIDs ); if (cache_rc != 0) From 94dfacfe2590e2a4524959e9b309402bf592b66b Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Fri, 28 Sep 2018 07:21:49 +0100 Subject: [PATCH 14/15] MCOL-1750 Fix threadpool stack leaks When a thread has been idle for 10 minutes and we have too many threads in the threadpool the thread will be pruned. This is done by the thread's main function just returning. Unfortunately this does not free up the memory, the thread either needs to be joined or detatched. We cannot use detached threads since there are mutexes and conditional variables between the main thread and the threadpool threads. If the main thread finishes before the threadpool threads (as would happen in cpimport) then crashes occur. The parent needs to wait on the child threads which is the whole point in joining. So this fix spawns a new thread which every minute will check the list of threads to be joined due to timeout and join them. We have had to use an adapted version of boost::thread_group so that we can join a single thread based off its thread ID. In addition with have modified PriorityThreadPool to use detached threads since this does not need to signal the child threads at the end. --- utils/threadpool/prioritythreadpool.cpp | 27 ++++-- utils/threadpool/threadpool.cpp | 42 ++++++++- utils/threadpool/threadpool.h | 108 +++++++++++++++++++++++- 3 files changed, 168 insertions(+), 9 deletions(-) diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index 4d19df91e..a5c713eab 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -42,12 +42,22 @@ PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads uint midThreads, uint lowThreads, uint ID) : _stop(false), weightPerRun(targetWeightPerRun), id(ID) { + boost::thread* newThread; for (uint32_t i = 0; i < highThreads; i++) - threads.create_thread(ThreadHelper(this, HIGH)); + { + newThread = threads.create_thread(ThreadHelper(this, HIGH)); + newThread->detach(); + } for (uint32_t i = 0; i < midThreads; i++) - threads.create_thread(ThreadHelper(this, MEDIUM)); + { + newThread = threads.create_thread(ThreadHelper(this, MEDIUM)); + newThread->detach(); + } for (uint32_t i = 0; i < lowThreads; i++) - threads.create_thread(ThreadHelper(this, LOW)); + { + newThread = threads.create_thread(ThreadHelper(this, LOW)); + newThread->detach(); + } cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads << " low.\n"; defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads; @@ -62,6 +72,7 @@ PriorityThreadPool::~PriorityThreadPool() void PriorityThreadPool::addJob(const Job &job, bool useLock) { + boost::thread* newThread; mutex::scoped_lock lk(mutex, defer_lock_t()); if (useLock) @@ -70,17 +81,20 @@ void PriorityThreadPool::addJob(const Job &job, bool useLock) // Create any missing threads if (defaultThreadCounts[HIGH] != threadCounts[HIGH]) { - threads.create_thread(ThreadHelper(this, HIGH)); + newThread = threads.create_thread(ThreadHelper(this, HIGH)); + newThread->detach(); threadCounts[HIGH]++; } if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM]) { - threads.create_thread(ThreadHelper(this, MEDIUM)); + newThread = threads.create_thread(ThreadHelper(this, MEDIUM)); + newThread->detach(); threadCounts[MEDIUM]++; } if (defaultThreadCounts[LOW] != threadCounts[LOW]) { - threads.create_thread(ThreadHelper(this, LOW)); + newThread = threads.create_thread(ThreadHelper(this, LOW)); + newThread->detach(); threadCounts[LOW]++; } @@ -261,7 +275,6 @@ void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveproce void PriorityThreadPool::stop() { _stop = true; - threads.join_all(); } } // namespace threadpool diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index d903f9892..0b1546e98 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -43,7 +43,8 @@ ThreadPool::ThreadPool() } ThreadPool::ThreadPool( size_t maxThreads, size_t queueSize ) - :fMaxThreads( maxThreads ), fQueueSize( queueSize ) + :fMaxThreads( maxThreads ), fQueueSize( queueSize ), + fPruneThread( NULL ) { init(); } @@ -72,6 +73,7 @@ void ThreadPool::init() fStop = false; fNextFunctor = fWaitingFunctors.end(); fNextHandle=1; + fPruneThread = new boost::thread(boost::bind(&ThreadPool::pruneThread, this)); } void ThreadPool::setQueueSize(size_t queueSize) @@ -80,6 +82,39 @@ void ThreadPool::setQueueSize(size_t queueSize) fQueueSize = queueSize; } +void ThreadPool::pruneThread() +{ + boost::mutex::scoped_lock lock2(fPruneMutex); + + while(true) + { + boost::system_time timeout = boost::get_system_time() + boost::posix_time::minutes(1); + if (!fPruneThreadEnd.timed_wait(fPruneMutex, timeout)) + { + while(!fPruneThreads.empty()) + { + if (fDebug) + { + ostringstream oss; + oss << "pruning thread " << fPruneThreads.top(); + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } + fThreads.join_one(fPruneThreads.top()); + fPruneThreads.pop(); + } + } + else + { + break; + } + } +} void ThreadPool::setMaxThreads(size_t maxThreads) { @@ -93,6 +128,9 @@ void ThreadPool::stop() fStop = true; lock1.unlock(); + fPruneThreadEnd.notify_all(); + fPruneThread->join(); + delete fPruneThread; fNeedThread.notify_all(); fThreads.join_all(); } @@ -293,6 +331,8 @@ void ThreadPool::beginThread() throw() { if (fThreadCount > fMaxThreads) { + boost::mutex::scoped_lock lock2(fPruneMutex); + fPruneThreads.push(boost::this_thread::get_id()); --fThreadCount; return; } diff --git a/utils/threadpool/threadpool.h b/utils/threadpool/threadpool.h index f11bb4b2b..95f7d4c7e 100644 --- a/utils/threadpool/threadpool.h +++ b/utils/threadpool/threadpool.h @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -51,6 +52,106 @@ namespace threadpool { + +// Taken from boost::thread_group and adapted +class ThreadPoolGroup +{ +private: + ThreadPoolGroup(ThreadPoolGroup const&); + ThreadPoolGroup& operator=(ThreadPoolGroup const&); +public: + ThreadPoolGroup() {} + ~ThreadPoolGroup() + { + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + delete *it; + } + } + + template + boost::thread* create_thread(F threadfunc) + { + boost::lock_guard guard(m); + std::unique_ptr new_thread(new boost::thread(threadfunc)); + threads.push_back(new_thread.get()); + return new_thread.release(); + } + + void add_thread(boost::thread* thrd) + { + if(thrd) + { + boost::lock_guard guard(m); + threads.push_back(thrd); + } + } + + void remove_thread(boost::thread* thrd) + { + boost::lock_guard guard(m); + std::list::iterator const it=std::find(threads.begin(),threads.end(),thrd); + if(it!=threads.end()) + { + threads.erase(it); + } + } + + void join_all() + { + boost::shared_lock guard(m); + + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + (*it)->join(); + } + } + + void interrupt_all() + { + boost::shared_lock guard(m); + + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + (*it)->interrupt(); + } + } + + size_t size() const + { + boost::shared_lock guard(m); + return threads.size(); + } + + void join_one(boost::thread::id id) + { + boost::shared_lock guard(m); + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + if ((*it)->get_id() == id) + { + (*it)->join(); + threads.erase(it); + return; + } + } + + } + +private: + std::list threads; + mutable boost::shared_mutex m; +}; + + /** @brief ThreadPool is a component for working with pools of threads and asynchronously * executing tasks. It is responsible for creating threads and tracking which threads are "busy" * and which are idle. Idle threads are utilized as "work" is added to the system. @@ -183,6 +284,7 @@ private: */ void beginThread() throw(); + void pruneThread(); ThreadPool(const ThreadPool&); ThreadPool& operator = (const ThreadPool&); @@ -221,7 +323,7 @@ private: boost::mutex fMutex; boost::condition fThreadAvailable; // triggered when a thread is available boost::condition fNeedThread; // triggered when a thread is needed - boost::thread_group fThreads; + ThreadPoolGroup fThreads; bool fStop; long fGeneralErrors; @@ -231,6 +333,10 @@ private: std::string fName; // Optional to add a name to the pool for debugging. bool fDebug; + boost::mutex fPruneMutex; + boost::condition fPruneThreadEnd; + boost::thread* fPruneThread; + std::stack fPruneThreads; // A list of stale thread IDs to be joined }; // This class, if instantiated, will continuously log details about the indicated threadpool From 5092b4fd13ba01f8e727184687a236e14a4f3a56 Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Fri, 28 Sep 2018 07:55:06 +0100 Subject: [PATCH 15/15] MCOL-1750 unique_ptr doesn't work in all OSes --- utils/threadpool/threadpool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/threadpool/threadpool.h b/utils/threadpool/threadpool.h index 95f7d4c7e..1f0c6d0aa 100644 --- a/utils/threadpool/threadpool.h +++ b/utils/threadpool/threadpool.h @@ -75,7 +75,7 @@ public: boost::thread* create_thread(F threadfunc) { boost::lock_guard guard(m); - std::unique_ptr new_thread(new boost::thread(threadfunc)); + std::auto_ptr new_thread(new boost::thread(threadfunc)); threads.push_back(new_thread.get()); return new_thread.release(); }