diff --git a/dbcon/ddlpackageproc/altertableprocessor.cpp b/dbcon/ddlpackageproc/altertableprocessor.cpp index a5632f7cf..8c0dd0231 100644 --- a/dbcon/ddlpackageproc/altertableprocessor.cpp +++ b/dbcon/ddlpackageproc/altertableprocessor.cpp @@ -300,8 +300,8 @@ bool comptypesAreCompat(int oldCtype, int newCtype) namespace ddlpackageprocessor { -AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( - ddlpackage::AlterTableStatement& alterTableStmt) +AlterTableProcessor::DDLResult AlterTableProcessor::processPackageInternal( + ddlpackage::SqlStatement* sqlTableStmt) { SUMMARY_INFO("AlterTableProcessor::processPackage"); @@ -312,6 +312,20 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( result.result = NO_ERROR; std::string err; uint64_t tableLockId = 0; + + auto* alterTableStmt = dynamic_cast(sqlTableStmt); + if (!alterTableStmt) + { + logging::Message::Args args; + logging::Message message(9); + args.add("AlterTableStatement wrong cast"); + message.format(args); + result.result = ALTER_ERROR; + result.message = message; + fSessionManager.rolledback(txnID); + return result; + } + DETAIL_INFO(alterTableStmt); int rc = 0; rc = fDbrm->isReadWrite(); @@ -329,8 +343,8 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( } //@Bug 4538. Log the sql statement before grabbing tablelock - string stmt = alterTableStmt.fSql + "|" + (alterTableStmt.fTableName)->fSchema + "|"; - SQLLogger logger(stmt, fDDLLoggingId, alterTableStmt.fSessionID, txnID.id); + string stmt = alterTableStmt->fSql + "|" + (alterTableStmt->fTableName)->fSchema + "|"; + SQLLogger logger(stmt, fDDLLoggingId, alterTableStmt->fSessionID, txnID.id); VERBOSE_INFO("Getting current txnID"); OamCache* oamcache = OamCache::makeOamCache(); @@ -371,18 +385,18 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( { // check table lock boost::shared_ptr systemCatalogPtr = - CalpontSystemCatalog::makeCalpontSystemCatalog(alterTableStmt.fSessionID); + CalpontSystemCatalog::makeCalpontSystemCatalog(alterTableStmt->fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); - systemCatalogPtr->sessionID(alterTableStmt.fSessionID); + systemCatalogPtr->sessionID(alterTableStmt->fSessionID); CalpontSystemCatalog::TableName tableName; - tableName.schema = (alterTableStmt.fTableName)->fSchema; - tableName.table = (alterTableStmt.fTableName)->fName; + tableName.schema = (alterTableStmt->fTableName)->fSchema; + tableName.table = (alterTableStmt->fTableName)->fName; execplan::CalpontSystemCatalog::ROPair roPair; roPair = systemCatalogPtr->tableRID(tableName); uint32_t processID = ::getpid(); int32_t txnid = txnID.id; - int32_t sessionId = alterTableStmt.fSessionID; + int32_t sessionId = alterTableStmt->fSessionID; std::string processName("DDLProc"); int i = 0; @@ -429,7 +443,7 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( { processID = ::getpid(); txnid = txnID.id; - sessionId = alterTableStmt.fSessionID; + sessionId = alterTableStmt->fSessionID; ; processName = "DDLProc"; tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, @@ -456,7 +470,7 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( } } - ddlpackage::AlterTableActionList actionList = alterTableStmt.fActions; + ddlpackage::AlterTableActionList actionList = alterTableStmt->fActions; AlterTableActionList::const_iterator action_iterator = actionList.begin(); while (action_iterator != actionList.end()) @@ -481,7 +495,7 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( columnDefPtr = addColumns.fColumns[0]; } - addColumn(alterTableStmt.fSessionID, txnID.id, result, columnDefPtr, *(alterTableStmt.fTableName), + addColumn(alterTableStmt->fSessionID, txnID.id, result, columnDefPtr, *(alterTableStmt->fTableName), uniqueId); if (result.result != NO_ERROR) @@ -493,9 +507,9 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( else if (s.find(AlterActionString[6]) != string::npos) { // Drop Column Default - dropColumnDefault(alterTableStmt.fSessionID, txnID.id, result, + dropColumnDefault(alterTableStmt->fSessionID, txnID.id, result, *(dynamic_cast(*action_iterator)), - *(alterTableStmt.fTableName), uniqueId); + *(alterTableStmt->fTableName), uniqueId); if (result.result != NO_ERROR) { @@ -506,15 +520,16 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( else if (s.find(AlterActionString[3]) != string::npos) { // Drop Columns - dropColumns(alterTableStmt.fSessionID, txnID.id, result, - *(dynamic_cast(*action_iterator)), *(alterTableStmt.fTableName), + dropColumns(alterTableStmt->fSessionID, txnID.id, result, + *(dynamic_cast(*action_iterator)), *(alterTableStmt->fTableName), uniqueId); } else if (s.find(AlterActionString[2]) != string::npos) { // Drop a column - dropColumn(alterTableStmt.fSessionID, txnID.id, result, - *(dynamic_cast(*action_iterator)), *(alterTableStmt.fTableName), uniqueId); + dropColumn(alterTableStmt->fSessionID, txnID.id, result, + *(dynamic_cast(*action_iterator)), *(alterTableStmt->fTableName), + uniqueId); } #if 0 @@ -529,9 +544,9 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( else if (s.find(AlterActionString[5]) != string::npos) { // Set Column Default - setColumnDefault(alterTableStmt.fSessionID, txnID.id, result, + setColumnDefault(alterTableStmt->fSessionID, txnID.id, result, *(dynamic_cast(*action_iterator)), - *(alterTableStmt.fTableName), uniqueId); + *(alterTableStmt->fTableName), uniqueId); } #if 0 @@ -547,23 +562,23 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( else if (s.find(AlterActionString[8]) != string::npos) { // Rename Table - renameTable(alterTableStmt.fSessionID, txnID.id, result, - *(dynamic_cast(*action_iterator)), *(alterTableStmt.fTableName), + renameTable(alterTableStmt->fSessionID, txnID.id, result, + *(dynamic_cast(*action_iterator)), *(alterTableStmt->fTableName), uniqueId); } else if (s.find(AlterActionString[10]) != string::npos) { // Rename a Column - renameColumn(alterTableStmt.fSessionID, txnID.id, result, - *(dynamic_cast(*action_iterator)), *(alterTableStmt.fTableName), + renameColumn(alterTableStmt->fSessionID, txnID.id, result, + *(dynamic_cast(*action_iterator)), *(alterTableStmt->fTableName), uniqueId); } else if (s.find(AlterActionString[11]) != string::npos) { // Table Comment - tableComment(alterTableStmt.fSessionID, txnID.id, result, - *(dynamic_cast(*action_iterator)), *(alterTableStmt.fTableName), + tableComment(alterTableStmt->fSessionID, txnID.id, result, + *(dynamic_cast(*action_iterator)), *(alterTableStmt->fTableName), uniqueId); } else @@ -575,7 +590,7 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( } // Log the DDL statement. - logging::logDDL(alterTableStmt.fSessionID, txnID.id, alterTableStmt.fSql, alterTableStmt.fOwner); + logging::logDDL(alterTableStmt->fSessionID, txnID.id, alterTableStmt->fSql, alterTableStmt->fOwner); DETAIL_INFO("Commiting transaction"); commitTransaction(uniqueId, txnID); @@ -583,11 +598,43 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage( } catch (std::exception& ex) { - rollBackAlter(ex.what(), txnID, alterTableStmt.fSessionID, result, uniqueId); + if (checkPPLostConnection(ex.what())) + { + if (tableLockId) + { + try + { + (void)fDbrm->releaseTableLock(tableLockId); + } + catch (std::exception&) + { + if (result.result == NO_ERROR) + { + logging::Message::Args args; + logging::Message message(1); + args.add("Table lock is not released due to "); + args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); + args.add(""); + args.add(""); + message.format(args); + result.result = ALTER_ERROR; + result.message = message; + return result; + } + } + } + result.result = PP_LOST_CONNECTION; + fWEClient->removeQueue(uniqueId); + return result; + } + else + { + rollBackAlter(ex.what(), txnID, alterTableStmt->fSessionID, result, uniqueId); + } } catch (...) { - rollBackAlter("encountered unknown exception. ", txnID, alterTableStmt.fSessionID, result, uniqueId); + rollBackAlter("encountered unknown exception. ", txnID, alterTableStmt->fSessionID, result, uniqueId); } // release table lock diff --git a/dbcon/ddlpackageproc/altertableprocessor.h b/dbcon/ddlpackageproc/altertableprocessor.h index 48a01a8a6..55868ec69 100644 --- a/dbcon/ddlpackageproc/altertableprocessor.h +++ b/dbcon/ddlpackageproc/altertableprocessor.h @@ -39,11 +39,6 @@ class AlterTableProcessor : public DDLPackageProcessor AlterTableProcessor(BRM::DBRM* aDbrm) : DDLPackageProcessor(aDbrm) { } - /** @brief process an alter table statement - * - * @param alterTableStmt the AlterTableStatement - */ - EXPORT DDLResult processPackage(ddlpackage::AlterTableStatement& alterTableStmt); /** @brief add a physical column file * * @param result the result of the operation @@ -151,6 +146,11 @@ class AlterTableProcessor : public DDLPackageProcessor uint64_t uniqueId); private: + /** @brief process an alter table statement + * + * @param alterTableStmt the AlterTableStatement + */ + DDLResult processPackageInternal(ddlpackage::SqlStatement* alterTableStmt) override; }; } // namespace ddlpackageprocessor diff --git a/dbcon/ddlpackageproc/createindexprocessor.cpp b/dbcon/ddlpackageproc/createindexprocessor.cpp index 706d4339e..cb92f65ea 100644 --- a/dbcon/ddlpackageproc/createindexprocessor.cpp +++ b/dbcon/ddlpackageproc/createindexprocessor.cpp @@ -37,8 +37,8 @@ using namespace logging; using namespace BRM; namespace ddlpackageprocessor { -CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( - ddlpackage::CreateIndexStatement& createIndexStmt) +CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackageInternal( + ddlpackage::SqlStatement* sqlStmt) { /* get OIDs for the list & tree files @@ -53,6 +53,19 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( DDLResult result; result.result = NO_ERROR; + auto* createIndexStmt = dynamic_cast(sqlStmt); + if (!createIndexStmt) + { + logging::Message::Args args; + logging::Message message(9); + args.add("CreateIndexStatement wrong cast "); + message.format(args); + + result.result = CREATE_ERROR; + result.message = message; + return result; + } + DETAIL_INFO(createIndexStmt); BRM::TxnID txnID; @@ -62,11 +75,11 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( This is based on the assumption that Front end is already error out if the user trys to create index on non-existing table. */ CalpontSystemCatalog::TableName tableName; - tableName.schema = (createIndexStmt.fTableName)->fSchema; - tableName.table = (createIndexStmt.fTableName)->fName; + tableName.schema = (createIndexStmt->fTableName)->fSchema; + tableName.table = (createIndexStmt->fTableName)->fName; CalpontSystemCatalog::ROPair roPair; boost::shared_ptr systemCatalogPtr = - CalpontSystemCatalog::makeCalpontSystemCatalog(createIndexStmt.fSessionID); + CalpontSystemCatalog::makeCalpontSystemCatalog(createIndexStmt->fSessionID); try { @@ -75,7 +88,7 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( catch (exception& ex) { // store primary key name in fPKName - fPKName = createIndexStmt.fIndexName->fName; + fPKName = createIndexStmt->fIndexName->fName; return result; } catch (...) @@ -88,10 +101,10 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( return result; } - fPKName = createIndexStmt.fIndexName->fName; + fPKName = createIndexStmt->fIndexName->fName; int err = 0; - SQLLogger logger(createIndexStmt.fSql, fDDLLoggingId, createIndexStmt.fSessionID, txnID.id); + SQLLogger logger(createIndexStmt->fSql, fDDLLoggingId, createIndexStmt->fSessionID, txnID.id); VERBOSE_INFO("Allocating object IDs for columns"); @@ -102,31 +115,31 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( VERBOSE_INFO("Starting a new transaction"); ddlpackage::DDL_CONSTRAINTS type = - createIndexStmt.fUnique ? ddlpackage::DDL_UNIQUE : ddlpackage::DDL_INVALID_CONSTRAINT; + createIndexStmt->fUnique ? ddlpackage::DDL_UNIQUE : ddlpackage::DDL_INVALID_CONSTRAINT; VERBOSE_INFO("Writing meta data to SYSINDEX"); bool multicol = false; - if (createIndexStmt.fColumnNames.size() > 1) + if (createIndexStmt->fColumnNames.size() > 1) { multicol = true; } // validate index columns CalpontSystemCatalog::TableColName tableColName; - tableColName.schema = (createIndexStmt.fTableName)->fSchema; - tableColName.table = (createIndexStmt.fTableName)->fName; + tableColName.schema = (createIndexStmt->fTableName)->fSchema; + tableColName.table = (createIndexStmt->fTableName)->fName; CalpontSystemCatalog::OID oid; CalpontSystemCatalog::ColType colType; ColumnNameList::const_iterator colIter; int totalWidth = 0; - DDLIndexPopulator pop(&fWriteEngine, &fSessionManager, createIndexStmt.fSessionID, txnID.id, result, - fIdxOID, createIndexStmt.fColumnNames, *createIndexStmt.fTableName, type, + DDLIndexPopulator pop(&fWriteEngine, &fSessionManager, createIndexStmt->fSessionID, txnID.id, result, + fIdxOID, createIndexStmt->fColumnNames, *(createIndexStmt->fTableName), type, getDebugLevel()); if (multicol) { - for (colIter = createIndexStmt.fColumnNames.begin(); colIter != createIndexStmt.fColumnNames.end(); + for (colIter = createIndexStmt->fColumnNames.begin(); colIter != createIndexStmt->fColumnNames.end(); colIter++) { tableColName.column = *colIter; @@ -167,7 +180,7 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( // writeSysIndexColMetaData(createIndexStmt.fSessionID, txnID.id, result,*createIndexStmt.fTableName, // createIndexStmt.fColumnNames, createIndexStmt.fIndexName->fName ); - if (createIndexStmt.fUnique) + if (createIndexStmt->fUnique) { VERBOSE_INFO("Writing column constraint meta data to SYSCONSTRAINT"); WriteEngine::ColStruct colStruct; @@ -189,7 +202,7 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( // get the columns for the SYSCONSTRAINT table ColumnList sysConsColumns; ColumnList::const_iterator sysCons_iterator; - getColumnsForTable(createIndexStmt.fSessionID, sysConsTableName.schema, sysConsTableName.table, + getColumnsForTable(createIndexStmt->fSessionID, sysConsTableName.schema, sysConsTableName.table, sysConsColumns); sysCons_iterator = sysConsColumns.begin(); std::string idxData; @@ -201,17 +214,17 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( if (CONSTRAINTNAME_COL == column.tableColName.column) { - idxData = createIndexStmt.fIndexName->fName; + idxData = createIndexStmt->fIndexName->fName; colTuple.data = idxData; } else if (SCHEMA_COL == column.tableColName.column) { - idxData = (createIndexStmt.fTableName)->fSchema; + idxData = (createIndexStmt->fTableName)->fSchema; colTuple.data = idxData; } else if (TABLENAME_COL == column.tableColName.column) { - idxData = (createIndexStmt.fTableName)->fName; + idxData = (createIndexStmt->fTableName)->fName; colTuple.data = idxData; } else if (CONSTRAINTTYPE_COL == column.tableColName.column) @@ -233,7 +246,7 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( } else if (INDEXNAME_COL == column.tableColName.column) { - idxData = createIndexStmt.fIndexName->fName; + idxData = createIndexStmt->fIndexName->fName; colTuple.data = idxData; } else @@ -271,7 +284,7 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( if (error != WriteEngine::NO_ERROR) { return rollBackCreateIndex(errorString("WE: Error inserting Column Record: ", error), txnID, - createIndexStmt.fSessionID); + createIndexStmt->fSessionID); // logging::Message::Args args; // logging::Message message(9); // args.add("Error updating: "); @@ -305,7 +318,7 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( // get the columns for the SYSCONSTRAINTCOL table ColumnList sysConsColColumns; ColumnList::const_iterator sysConsCol_iterator; - getColumnsForTable(createIndexStmt.fSessionID, sysConsColTableName.schema, sysConsColTableName.table, + getColumnsForTable(createIndexStmt->fSessionID, sysConsColTableName.schema, sysConsColTableName.table, sysConsColColumns); // write sysconstraintcol sysConsCol_iterator = sysConsColColumns.begin(); @@ -319,22 +332,22 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( if (SCHEMA_COL == column.tableColName.column) { - colData = (createIndexStmt.fTableName)->fSchema; + colData = (createIndexStmt->fTableName)->fSchema; colTupleCol.data = colData; } else if (TABLENAME_COL == column.tableColName.column) { - colData = (createIndexStmt.fTableName)->fName; + colData = (createIndexStmt->fTableName)->fName; colTupleCol.data = colData; } else if (COLNAME_COL == column.tableColName.column) { - colData = createIndexStmt.fColumnNames[0]; + colData = createIndexStmt->fColumnNames[0]; colTupleCol.data = colData; } else if (CONSTRAINTNAME_COL == column.tableColName.column) { - colData = createIndexStmt.fIndexName->fName; + colData = createIndexStmt->fIndexName->fName; colTupleCol.data = colData; } else @@ -372,7 +385,7 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( if (error != WriteEngine::NO_ERROR) { return rollBackCreateIndex(errorString("WE: Error inserting Column Record: ", error), txnID, - createIndexStmt.fSessionID); + createIndexStmt->fSessionID); /* logging::Message::Args args; logging::Message message(9); @@ -398,7 +411,7 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( if (err) { return rollBackCreateIndex(errorString("Write engine failed to create the new index. ", err), txnID, - createIndexStmt.fSessionID); + createIndexStmt->fSessionID); } // new if BULK_LOAD close @@ -407,11 +420,11 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( if (err) { return rollBackCreateIndex(errorString("Failed to populate index with current data. ", err), txnID, - createIndexStmt.fSessionID); + createIndexStmt->fSessionID); } // Log the DDL statement. - logging::logDDL(createIndexStmt.fSessionID, txnID.id, createIndexStmt.fSql, createIndexStmt.fOwner); + logging::logDDL(createIndexStmt->fSessionID, txnID.id, createIndexStmt->fSql, createIndexStmt->fOwner); DETAIL_INFO("Commiting transaction"); err = fWriteEngine.commit(txnID.id); @@ -419,7 +432,7 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( if (err) { return rollBackCreateIndex(errorString("Failed to commit the create index transaction. ", err), txnID, - createIndexStmt.fSessionID); + createIndexStmt->fSessionID); } fSessionManager.committed(txnID); @@ -428,12 +441,12 @@ CreateIndexProcessor::DDLResult CreateIndexProcessor::processPackage( catch (exception& ex) { - result = rollBackCreateIndex(ex.what(), txnID, createIndexStmt.fSessionID); + result = rollBackCreateIndex(ex.what(), txnID, createIndexStmt->fSessionID); } catch (...) { string msg("CreateIndexProcessor::processPackage: caught unknown exception!"); - result = rollBackCreateIndex(msg, txnID, createIndexStmt.fSessionID); + result = rollBackCreateIndex(msg, txnID, createIndexStmt->fSessionID); } return result; diff --git a/dbcon/ddlpackageproc/createindexprocessor.h b/dbcon/ddlpackageproc/createindexprocessor.h index bead73b5f..76ff1179d 100644 --- a/dbcon/ddlpackageproc/createindexprocessor.h +++ b/dbcon/ddlpackageproc/createindexprocessor.h @@ -39,7 +39,7 @@ class CreateIndexProcessor : public DDLPackageProcessor * * @param createIndexStmt the create index statement */ - DDLResult processPackage(ddlpackage::CreateIndexStatement& createIndexStmt); + DDLResult processPackageInternal(ddlpackage::SqlStatement* createIndexStmt); protected: DDLResult rollBackCreateIndex(const std::string& error, BRM::TxnID& txnID, int sessionId); diff --git a/dbcon/ddlpackageproc/createtableprocessor.cpp b/dbcon/ddlpackageproc/createtableprocessor.cpp index 77263ad1c..0b9f68f00 100644 --- a/dbcon/ddlpackageproc/createtableprocessor.cpp +++ b/dbcon/ddlpackageproc/createtableprocessor.cpp @@ -47,8 +47,8 @@ using namespace logging; namespace ddlpackageprocessor { -CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( - ddlpackage::CreateTableStatement& createTableStmt) +CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal( + ddlpackage::SqlStatement* sqlStmt) { SUMMARY_INFO("CreateTableProcessor::processPackage"); @@ -72,8 +72,22 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( return result; } + ddlpackage::CreateTableStatement* createTableStmt = + dynamic_cast(sqlStmt); + + if (!createTableStmt) + { + Message::Args args; + Message message(9); + args.add("CreateTableStatement wrong cast"); + message.format(args); + result.result = CREATE_ERROR; + result.message = message; + return result; + } + DETAIL_INFO(createTableStmt); - ddlpackage::TableDef& tableDef = *createTableStmt.fTableDef; + ddlpackage::TableDef& tableDef = *(createTableStmt->fTableDef); // If schema = CALPONTSYS, do not create table if (tableDef.fQualifiedName->fSchema == CALPONT_SCHEMA) @@ -89,7 +103,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( // Check whether the table is existed already boost::shared_ptr systemCatalogPtr = - CalpontSystemCatalog::makeCalpontSystemCatalog(createTableStmt.fSessionID); + CalpontSystemCatalog::makeCalpontSystemCatalog(createTableStmt->fSessionID); execplan::CalpontSystemCatalog::TableName tableName; tableName.schema = tableDef.fQualifiedName->fSchema; tableName.table = tableDef.fQualifiedName->fName; @@ -105,36 +119,44 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( } catch (IDBExcept& ie) { - // TODO: What is and is not an error here? - if (ie.errorCode() == ERR_DATA_OFFLINE) + if (checkPPLostConnection(ie.what())) { - // release transaction - fSessionManager.rolledback(txnID); - // Return the error for display to user - Message::Args args; - Message message(9); - args.add(ie.what()); - message.format(args); - result.result = CREATE_ERROR; - result.message = message; + result.result = NETWORK_ERROR; return result; } - else if (ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG) + else { - roPair.objnum = 0; - } - else // error out - { - // release transaction - fSessionManager.rolledback(txnID); - // Return the error for display to user - Message::Args args; - Message message(9); - args.add(ie.what()); - message.format(args); - result.result = CREATE_ERROR; - result.message = message; - return result; + // TODO: What is and is not an error here? + if (ie.errorCode() == ERR_DATA_OFFLINE) + { + // release transaction + fSessionManager.rolledback(txnID); + // Return the error for display to user + Message::Args args; + Message message(9); + args.add(ie.what()); + message.format(args); + result.result = CREATE_ERROR; + result.message = message; + return result; + } + else if (ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG) + { + roPair.objnum = 0; + } + else // error out + { + // release transaction + fSessionManager.rolledback(txnID); + // Return the error for display to user + Message::Args args; + Message message(9); + args.add(ie.what()); + message.format(args); + result.result = CREATE_ERROR; + result.message = message; + return result; + } } } catch (std::exception& ex) // error out @@ -185,8 +207,8 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( // Start a new transaction VERBOSE_INFO("Starting a new transaction"); - string stmt = createTableStmt.fSql + "|" + tableDef.fQualifiedName->fSchema + "|"; - SQLLogger logger(stmt, fDDLLoggingId, createTableStmt.fSessionID, txnID.id); + string stmt = createTableStmt->fSql + "|" + tableDef.fQualifiedName->fSchema + "|"; + SQLLogger logger(stmt, fDDLLoggingId, createTableStmt->fSessionID, txnID.id); std::string err; execplan::ObjectIDManager fObjectIDManager; @@ -255,7 +277,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( #endif uint32_t numColumnOids = numColumns + numDictCols; - numColumnOids += 1; // MCOL-5021 + numColumnOids += 1; // MCOL-5021 if (fStartingColOID < 0) { @@ -276,11 +298,11 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( ByteStream bytestream; bytestream << (ByteStream::byte)WE_SVR_WRITE_SYSTABLE; bytestream << uniqueId; - bytestream << (uint32_t)createTableStmt.fSessionID; + bytestream << (uint32_t)createTableStmt->fSessionID; bytestream << (uint32_t)txnID.id; bytestream << (uint32_t)fStartingColOID; bytestream << (uint32_t)(fStartingColOID + numColumnOids); - bytestream << (uint32_t)createTableStmt.fTableWithAutoi; + bytestream << (uint32_t)createTableStmt->fTableWithAutoi; uint16_t dbRoot; BRM::OID_t sysOid = 1001; // Find out where systable is @@ -375,7 +397,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( if (rc != NETWORK_ERROR) { - rollBackTransaction(uniqueId, txnID, createTableStmt.fSessionID); // What to do with the error code + rollBackTransaction(uniqueId, txnID, createTableStmt->fSessionID); // What to do with the error code } // release transaction @@ -387,7 +409,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATE_SYSCOLUMN; bytestream << uniqueId; - bytestream << (uint32_t)createTableStmt.fSessionID; + bytestream << (uint32_t)createTableStmt->fSessionID; bytestream << (uint32_t)txnID.id; bytestream << numColumns; @@ -486,22 +508,31 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( #ifdef IDB_DDL_DEBUG cout << fTxnid.id << " Create table WE_SVR_WRITE_CREATE_SYSCOLUMN: " << errorMsg << endl; #endif - result.result = (ResultCode)rc; - Message::Args args; - Message message(9); - args.add("(3)Create table failed due to "); - args.add(errorMsg); - message.format(args); - result.message = message; - - if (rc != NETWORK_ERROR) + if (checkPPLostConnection(errorMsg)) { - rollBackTransaction(uniqueId, txnID, createTableStmt.fSessionID); // What to do with the error code + result.result = PP_LOST_CONNECTION; + return result; } + else + { + result.result = (ResultCode)rc; + Message::Args args; + Message message(9); + args.add("(3)Create table failed due to "); + args.add(errorMsg); + message.format(args); + result.message = message; - // release transaction - fSessionManager.rolledback(txnID); - return result; + if (rc != NETWORK_ERROR) + { + rollBackTransaction(uniqueId, txnID, + createTableStmt->fSessionID); // What to do with the error code + } + + // release transaction + fSessionManager.rolledback(txnID); + return result; + } } // Get the number of tables in the database, the current table is included. @@ -627,7 +658,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( if (rc != NETWORK_ERROR) { - rollBackTransaction(uniqueId, txnID, createTableStmt.fSessionID); // What to do with the error code + rollBackTransaction(uniqueId, txnID, createTableStmt->fSessionID); // What to do with the error code } // release transaction @@ -716,7 +747,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( #ifdef IDB_DDL_DEBUG cout << fTxnid.id << " Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl; #endif - rollBackTransaction(uniqueId, txnID, createTableStmt.fSessionID); // What to do with the error code + rollBackTransaction(uniqueId, txnID, createTableStmt->fSessionID); // What to do with the error code fSessionManager.rolledback(txnID); } else @@ -728,7 +759,7 @@ CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( } // Log the DDL statement. - logDDL(createTableStmt.fSessionID, txnID.id, createTableStmt.fSql, createTableStmt.fOwner); + logDDL(createTableStmt->fSessionID, txnID.id, createTableStmt->fSql, createTableStmt->fOwner); } catch (std::exception& ex) { diff --git a/dbcon/ddlpackageproc/createtableprocessor.h b/dbcon/ddlpackageproc/createtableprocessor.h index 09b3682a0..48d14bb08 100644 --- a/dbcon/ddlpackageproc/createtableprocessor.h +++ b/dbcon/ddlpackageproc/createtableprocessor.h @@ -39,17 +39,17 @@ class CreateTableProcessor : public DDLPackageProcessor CreateTableProcessor(BRM::DBRM* aDbrm) : DDLPackageProcessor(aDbrm) { } - /** @brief process a create table statement - * - * @param createTableStmt the CreateTableStatement - */ - EXPORT DDLResult processPackage(ddlpackage::CreateTableStatement& createTableStmt); protected: void rollBackCreateTable(const std::string& error, BRM::TxnID txnID, int sessionId, ddlpackage::TableDef& tableDef, DDLResult& result); private: + /** @brief process a create table statement + * + * @param createTableStmt the CreateTableStatement + */ + DDLResult processPackageInternal(ddlpackage::SqlStatement* sqlTableStmt); }; } // namespace ddlpackageprocessor diff --git a/dbcon/ddlpackageproc/ddlpackageprocessor.cpp b/dbcon/ddlpackageproc/ddlpackageprocessor.cpp index 6d6bff705..07a663b51 100644 --- a/dbcon/ddlpackageproc/ddlpackageprocessor.cpp +++ b/dbcon/ddlpackageproc/ddlpackageprocessor.cpp @@ -562,10 +562,9 @@ void DDLPackageProcessor::createFiles(CalpontSystemCatalog::TableName aTableName { SUMMARY_INFO("DDLPackageProcessor::createFiles"); boost::shared_ptr systemCatalogPtr = - CalpontSystemCatalog::makeCalpontSystemCatalog(1); + CalpontSystemCatalog::makeCalpontSystemCatalog(1); CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(aTableName); - CalpontSystemCatalog::OID tableAUXColOid = - systemCatalogPtr->tableAUXColumnOID(aTableName); + CalpontSystemCatalog::OID tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(aTableName); if (tableAUXColOid > 3000) { @@ -1127,6 +1126,37 @@ void DDLPackageProcessor::createWriteTruncateTableLogFile( throw std::runtime_error(errorMsg); } +DDLPackageProcessor::DDLResult DDLPackageProcessor::processPackage(SqlStatement* sqlStmt) +{ + auto result = processPackageInternal(sqlStmt); + uint32_t tries = 0; + while ((result.result == PP_LOST_CONNECTION) && (tries < 5)) + { + std::cerr << "DDLPackageProcessor: NETWORK ERROR; attempt # " << tries << std::endl; + joblist::ResourceManager* rm = joblist::ResourceManager::instance(true); + joblist::DistributedEngineComm* fEc = joblist::DistributedEngineComm::instance(rm); + if (fEc->Setup()) + return result; + + result = processPackageInternal(sqlStmt); + ++tries; + } + return result; +} + +DDLPackageProcessor::DDLResult DDLPackageProcessor::processPackageInternal(SqlStatement* sqlStmt) +{ + // This should not be called. + DDLPackageProcessor::DDLResult result; + result.result = NOT_ACCEPTING_PACKAGES; + return result; +} + +bool DDLPackageProcessor::checkPPLostConnection(std::string error) +{ + return error.find(PPLostConnectionErrorCode) != std::string::npos; +} + void DDLPackageProcessor::returnOIDs(execplan::CalpontSystemCatalog::RIDList& ridList, execplan::CalpontSystemCatalog::DictOIDList& dictOIDList) { diff --git a/dbcon/ddlpackageproc/ddlpackageprocessor.h b/dbcon/ddlpackageproc/ddlpackageprocessor.h index 4e2637f24..6b3ccdf07 100644 --- a/dbcon/ddlpackageproc/ddlpackageprocessor.h +++ b/dbcon/ddlpackageproc/ddlpackageprocessor.h @@ -93,7 +93,8 @@ class DDLPackageProcessor NETWORK_ERROR, PARTITION_WARNING, WARN_NO_PARTITION, - DROP_TABLE_NOT_IN_CATALOG_ERROR + DROP_TABLE_NOT_IN_CATALOG_ERROR, + PP_LOST_CONNECTION }; enum DebugLevel /** @brief Debug level type enumeration */ @@ -247,6 +248,18 @@ class DDLPackageProcessor // std::cout << "in DDLPackageProcessor constructor " << this << std::endl; } + /** @brief Function wrapper for `processPackageInternal`. + */ + DDLResult processPackage(ddlpackage::SqlStatement* sqlStmt); + + /** @brief Check that give exception is related to PP lost connection. + */ + bool checkPPLostConnection(std::string error); + + /** @brief Internal implementation for `process` package command. + */ + virtual DDLResult processPackageInternal(ddlpackage::SqlStatement* sqlStmt); + /** @brief destructor */ EXPORT virtual ~DDLPackageProcessor(); @@ -373,6 +386,8 @@ class DDLPackageProcessor */ EXPORT void fetchLogFile(TableLogInfo& tableLogInfos, uint64_t uniqueId); + // virtual EXPORT DDLResult processPackage(ddlpackage::TruncTableStatement& truncTableStmt); + BRM::TxnID fTxnid; protected: @@ -835,6 +850,8 @@ class DDLPackageProcessor void cleanString(std::string& s); // std::string fDDLLogFileName; DebugLevel fDebugLevel; // internal use debug level + + const std::string PPLostConnectionErrorCode = "MCS-2045"; }; /** @brief helper template function to do safe from string to type conversions * @@ -849,4 +866,3 @@ bool from_string(T& t, const std::string& s, std::ios_base& (*f)(std::ios_base&) } // namespace ddlpackageprocessor #undef EXPORT - diff --git a/dbcon/ddlpackageproc/dropindexprocessor.cpp b/dbcon/ddlpackageproc/dropindexprocessor.cpp index e1b5720c8..65d12d441 100644 --- a/dbcon/ddlpackageproc/dropindexprocessor.cpp +++ b/dbcon/ddlpackageproc/dropindexprocessor.cpp @@ -30,8 +30,7 @@ using namespace logging; namespace ddlpackageprocessor { -DropIndexProcessor::DDLResult DropIndexProcessor::processPackage( - ddlpackage::DropIndexStatement& dropIndexStmt) +DropIndexProcessor::DDLResult DropIndexProcessor::processPackageInternal(ddlpackage::SqlStatement* sqlStmt) { SUMMARY_INFO("DropIndexProcessor::processPackage"); @@ -49,20 +48,32 @@ DropIndexProcessor::DDLResult DropIndexProcessor::processPackage( int err = 0; + auto* dropIndexStmt = dynamic_cast(sqlStmt); + if (!dropIndexStmt) + { + Message::Args args; + Message message(9); + args.add("DropIndexStatement wrong cast"); + message.format(args); + result.result = DROP_ERROR; + result.message = message; + return result; + } + VERBOSE_INFO(dropIndexStmt); - SQLLogger logger(dropIndexStmt.fSql, fDDLLoggingId, dropIndexStmt.fSessionID, txnID.id); + SQLLogger logger(dropIndexStmt->fSql, fDDLLoggingId, dropIndexStmt->fSessionID, txnID.id); - indexName.schema = dropIndexStmt.fIndexName->fSchema; - indexName.index = dropIndexStmt.fIndexName->fName; + indexName.schema = dropIndexStmt->fIndexName->fSchema; + indexName.index = dropIndexStmt->fIndexName->fName; // Look up table name from indexname. Oracle will error out if same constraintname or indexname exists. - CalpontSystemCatalog::TableName tableName = - sysCatalogPtr->lookupTableForIndex(dropIndexStmt.fIndexName->fName, dropIndexStmt.fIndexName->fSchema); + CalpontSystemCatalog::TableName tableName = sysCatalogPtr->lookupTableForIndex( + dropIndexStmt->fIndexName->fName, dropIndexStmt->fIndexName->fSchema); indexName.table = tableName.table; indexOID = sysCatalogPtr->lookupIndexNbr(indexName); VERBOSE_INFO("Removing the SYSINDEX meta data"); - removeSysIndexMetaData(dropIndexStmt.fSessionID, txnID.id, result, *dropIndexStmt.fIndexName); + removeSysIndexMetaData(dropIndexStmt->fSessionID, txnID.id, result, *(dropIndexStmt->fIndexName)); if (result.result != NO_ERROR) { @@ -71,7 +82,7 @@ DropIndexProcessor::DDLResult DropIndexProcessor::processPackage( } VERBOSE_INFO("Removing the SYSINDEXCOL meta data"); - removeSysIndexColMetaData(dropIndexStmt.fSessionID, txnID.id, result, *dropIndexStmt.fIndexName); + removeSysIndexColMetaData(dropIndexStmt->fSessionID, txnID.id, result, *(dropIndexStmt->fIndexName)); if (result.result != NO_ERROR) { @@ -89,7 +100,7 @@ DropIndexProcessor::DDLResult DropIndexProcessor::processPackage( } // Log the DDL statement - logging::logDDL(dropIndexStmt.fSessionID, txnID.id, dropIndexStmt.fSql, dropIndexStmt.fOwner); + logging::logDDL(dropIndexStmt->fSessionID, txnID.id, dropIndexStmt->fSql, dropIndexStmt->fOwner); // register the changes err = fWriteEngine.commit(txnID.id); @@ -106,7 +117,7 @@ DropIndexProcessor::DDLResult DropIndexProcessor::processPackage( return result; rollback: - fWriteEngine.rollbackTran(txnID.id, dropIndexStmt.fSessionID); + fWriteEngine.rollbackTran(txnID.id, dropIndexStmt->fSessionID); fSessionManager.rolledback(txnID); return result; } diff --git a/dbcon/ddlpackageproc/dropindexprocessor.h b/dbcon/ddlpackageproc/dropindexprocessor.h index 7baa4635d..b92c569ba 100644 --- a/dbcon/ddlpackageproc/dropindexprocessor.h +++ b/dbcon/ddlpackageproc/dropindexprocessor.h @@ -38,7 +38,7 @@ class DropIndexProcessor : public DDLPackageProcessor * * @param dropIndexStmt the drop index statement */ - DDLResult processPackage(ddlpackage::DropIndexStatement& dropIndexStmt); + DDLResult processPackageInternal(ddlpackage::SqlStatement& dropIndexStmt); protected: private: diff --git a/dbcon/ddlpackageproc/droppartitionprocessor.cpp b/dbcon/ddlpackageproc/droppartitionprocessor.cpp index dbf297e20..f5fd55747 100644 --- a/dbcon/ddlpackageproc/droppartitionprocessor.cpp +++ b/dbcon/ddlpackageproc/droppartitionprocessor.cpp @@ -37,20 +37,14 @@ using namespace oam; namespace ddlpackageprocessor { -DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage( - ddlpackage::DropPartitionStatement& dropPartitionStmt) +DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackageInternal( + ddlpackage::SqlStatement* sqlStmt) { SUMMARY_INFO("DropPartitionProcessor::processPackage"); DDLResult result; result.result = NO_ERROR; std::string err; - VERBOSE_INFO(dropPartitionStmt); - - // Commit current transaction. - // all DDL statements cause an implicit commit - VERBOSE_INFO("Getting current txnID"); - int rc = 0; rc = fDbrm->isReadWrite(); BRM::TxnID txnID; @@ -69,6 +63,23 @@ DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage( return result; } + auto* dropPartitionStmt = dynamic_cast(sqlStmt); + if (!dropPartitionStmt) + { + logging::Message::Args args; + logging::Message message(9); + args.add("DropPartitionStatement wrong cast"); + message.format(args); + result.result = ALTER_ERROR; + result.message = message; + return result; + } + + VERBOSE_INFO(dropPartitionStmt); + // Commit current transaction. + // all DDL statements cause an implicit commit + VERBOSE_INFO("Getting current txnID"); + std::vector oidList; CalpontSystemCatalog::OID tableAuxColOid; CalpontSystemCatalog::RIDList tableColRidList; @@ -76,7 +87,7 @@ DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage( execplan::CalpontSystemCatalog::ROPair roPair; uint32_t processID = 0; uint64_t uniqueID = 0; - uint32_t sessionID = dropPartitionStmt.fSessionID; + uint32_t sessionID = dropPartitionStmt->fSessionID; std::string processName("DDLProc"); uint64_t uniqueId = 0; @@ -108,19 +119,19 @@ DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage( return result; } - string stmt = dropPartitionStmt.fSql + "|" + dropPartitionStmt.fTableName->fSchema + "|"; + string stmt = dropPartitionStmt->fSql + "|" + dropPartitionStmt->fTableName->fSchema + "|"; SQLLogger logger(stmt, fDDLLoggingId, sessionID, txnID.id); try { // check table lock boost::shared_ptr systemCatalogPtr = - CalpontSystemCatalog::makeCalpontSystemCatalog(dropPartitionStmt.fSessionID); + CalpontSystemCatalog::makeCalpontSystemCatalog(dropPartitionStmt->fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); - systemCatalogPtr->sessionID(dropPartitionStmt.fSessionID); + systemCatalogPtr->sessionID(dropPartitionStmt->fSessionID); CalpontSystemCatalog::TableName tableName; - tableName.schema = dropPartitionStmt.fTableName->fSchema; - tableName.table = dropPartitionStmt.fTableName->fName; + tableName.schema = dropPartitionStmt->fTableName->fSchema; + tableName.table = dropPartitionStmt->fTableName->fName; roPair = systemCatalogPtr->tableRID(tableName); //@Bug 3054 check for system catalog @@ -177,7 +188,7 @@ DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage( } while (nanosleep(&abs_ts, &rm_ts) < 0); // reset - sessionID = dropPartitionStmt.fSessionID; + sessionID = dropPartitionStmt->fSessionID; txnID.id = fTxnid.id; txnID.valid = fTxnid.valid; processID = ::getpid(); @@ -224,8 +235,8 @@ DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage( // 7. Remove the extents from extentmap for the partition CalpontSystemCatalog::TableName userTableName; - userTableName.schema = dropPartitionStmt.fTableName->fSchema; - userTableName.table = dropPartitionStmt.fTableName->fName; + userTableName.schema = dropPartitionStmt->fTableName->fSchema; + userTableName.table = dropPartitionStmt->fTableName->fName; tableColRidList = systemCatalogPtr->columnRIDs(userTableName); tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName); @@ -252,7 +263,7 @@ DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage( // Mark the partition disabled from extent map string emsg; - rc = fDbrm->markPartitionForDeletion(oidList, dropPartitionStmt.fPartitions, emsg); + rc = fDbrm->markPartitionForDeletion(oidList, dropPartitionStmt->fPartitions, emsg); if (rc != 0 && rc != BRM::ERR_PARTITION_DISABLED && rc != BRM::ERR_INVALID_OP_LAST_PARTITION && rc != BRM::ERR_NOT_EXIST_PARTITION) @@ -277,7 +288,7 @@ DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage( set::iterator it; - for (it = dropPartitionStmt.fPartitions.begin(); it != dropPartitionStmt.fPartitions.end(); ++it) + for (it = dropPartitionStmt->fPartitions.begin(); it != dropPartitionStmt->fPartitions.end(); ++it) { if (outOfServicePartitions.find(*it) != outOfServicePartitions.end()) markedPartitions.insert(*it); @@ -293,7 +304,7 @@ DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage( // Remove the partition from extent map emsg.clear(); - rc = fDbrm->deletePartition(oidList, dropPartitionStmt.fPartitions, emsg); + rc = fDbrm->deletePartition(oidList, dropPartitionStmt->fPartitions, emsg); if (rc != 0) throw std::runtime_error(emsg); @@ -359,7 +370,8 @@ DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage( } // Log the DDL statement - logging::logDDL(dropPartitionStmt.fSessionID, txnID.id, dropPartitionStmt.fSql, dropPartitionStmt.fOwner); + logging::logDDL(dropPartitionStmt->fSessionID, txnID.id, dropPartitionStmt->fSql, + dropPartitionStmt->fOwner); // Remove the log file // release the transaction diff --git a/dbcon/ddlpackageproc/droppartitionprocessor.h b/dbcon/ddlpackageproc/droppartitionprocessor.h index 2c23fbd31..6c0b66d39 100644 --- a/dbcon/ddlpackageproc/droppartitionprocessor.h +++ b/dbcon/ddlpackageproc/droppartitionprocessor.h @@ -39,14 +39,14 @@ class DropPartitionProcessor : public DDLPackageProcessor DropPartitionProcessor(BRM::DBRM* aDbrm) : DDLPackageProcessor(aDbrm) { } + + protected: + private: /** @brief process a drop table statement * * @param dropTableStmt the drop table statement */ - EXPORT DDLResult processPackage(ddlpackage::DropPartitionStatement& dropPartitionStmt); - - protected: - private: + DDLResult processPackageInternal(ddlpackage::SqlStatement* dropPartitionStmt); }; } // namespace ddlpackageprocessor diff --git a/dbcon/ddlpackageproc/droptableprocessor.cpp b/dbcon/ddlpackageproc/droptableprocessor.cpp index d40aac3f2..79d3d35f8 100644 --- a/dbcon/ddlpackageproc/droptableprocessor.cpp +++ b/dbcon/ddlpackageproc/droptableprocessor.cpp @@ -50,14 +50,24 @@ using namespace oam; namespace ddlpackageprocessor { -DropTableProcessor::DDLResult DropTableProcessor::processPackage( - ddlpackage::DropTableStatement& dropTableStmt) +DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpackage::SqlStatement* sqlStmt) { SUMMARY_INFO("DropTableProcessor::processPackage"); - DDLResult result; result.result = NO_ERROR; std::string err; + + auto* dropTableStmt = dynamic_cast(sqlStmt); + if (!dropTableStmt) + { + Message::Args args; + Message message(9); + args.add("DropTableStatement wrong cast"); + message.format(args); + result.result = DROP_ERROR; + result.message = message; + return result; + } VERBOSE_INFO(dropTableStmt); // Commit current transaction. @@ -82,8 +92,8 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage( return result; } - string stmt = dropTableStmt.fSql + "|" + dropTableStmt.fTableName->fSchema + "|"; - SQLLogger logger(stmt, fDDLLoggingId, dropTableStmt.fSessionID, txnID.id); + string stmt = dropTableStmt->fSql + "|" + dropTableStmt->fTableName->fSchema + "|"; + SQLLogger logger(stmt, fDDLLoggingId, dropTableStmt->fSessionID, txnID.id); std::vector oidList; CalpontSystemCatalog::RIDList tableColRidList; @@ -136,12 +146,12 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage( { // check table lock boost::shared_ptr systemCatalogPtr = - CalpontSystemCatalog::makeCalpontSystemCatalog(dropTableStmt.fSessionID); + CalpontSystemCatalog::makeCalpontSystemCatalog(dropTableStmt->fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); - systemCatalogPtr->sessionID(dropTableStmt.fSessionID); + systemCatalogPtr->sessionID(dropTableStmt->fSessionID); CalpontSystemCatalog::TableName tableName; - tableName.schema = dropTableStmt.fTableName->fSchema; - tableName.table = dropTableStmt.fTableName->fName; + tableName.schema = dropTableStmt->fTableName->fSchema; + tableName.table = dropTableStmt->fTableName->fName; try { @@ -158,34 +168,42 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage( } catch (IDBExcept& ie) { - if (ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG) + if (checkPPLostConnection(ie.what())) { - Message::Args args; - Message message(1); - args.add("Table does not exist in ColumnStore."); - message.format(args); - result.result = DROP_TABLE_NOT_IN_CATALOG_ERROR; - result.message = message; - fSessionManager.rolledback(txnID); + result.result = PP_LOST_CONNECTION; return result; } else { - result.result = DROP_ERROR; - Message::Args args; - Message message(9); - args.add("Drop table failed due to "); - args.add(ie.what()); - message.format(args); - result.message = message; - fSessionManager.rolledback(txnID); - return result; + if (ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG) + { + Message::Args args; + Message message(1); + args.add("Table does not exist in ColumnStore."); + message.format(args); + result.result = DROP_TABLE_NOT_IN_CATALOG_ERROR; + result.message = message; + fSessionManager.rolledback(txnID); + return result; + } + else + { + result.result = DROP_ERROR; + Message::Args args; + Message message(9); + args.add("Drop table failed due to "); + args.add(ie.what()); + message.format(args); + result.message = message; + fSessionManager.rolledback(txnID); + return result; + } } } uint32_t processID = ::getpid(); int32_t txnid = txnID.id; - int32_t sessionId = dropTableStmt.fSessionID; + int32_t sessionId = dropTableStmt->fSessionID; std::string processName("DDLProc"); int i = 0; @@ -228,12 +246,11 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage( abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); - try { processID = ::getpid(); txnid = txnID.id; - sessionId = dropTableStmt.fSessionID; + sessionId = dropTableStmt->fSessionID; ; processName = "DDLProc"; tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, @@ -272,8 +289,8 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage( // 10.Return the OIDs CalpontSystemCatalog::TableName userTableName; - userTableName.schema = dropTableStmt.fTableName->fSchema; - userTableName.table = dropTableStmt.fTableName->fName; + userTableName.schema = dropTableStmt->fTableName->fSchema; + userTableName.table = dropTableStmt->fTableName->fName; tableColRidList = systemCatalogPtr->columnRIDs(userTableName); @@ -300,10 +317,10 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage( #endif bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSTABLE; bytestream << uniqueId; - bytestream << (uint32_t)dropTableStmt.fSessionID; + bytestream << (uint32_t)dropTableStmt->fSessionID; bytestream << (uint32_t)txnID.id; - bytestream << dropTableStmt.fTableName->fSchema; - bytestream << dropTableStmt.fTableName->fName; + bytestream << dropTableStmt->fTableName->fSchema; + bytestream << dropTableStmt->fTableName->fName; // Find out where systable is BRM::OID_t sysOid = 1001; @@ -378,30 +395,39 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage( if (rc != 0) { - cout << fTxnid.id << " Error in dropping table from systables(" << (int)rc << ") " << errorMsg.c_str() - << endl; - Message::Args args; - Message message(9); - args.add("Error in dropping table from systables."); - args.add(errorMsg); - message.format(args); - result.result = (ResultCode)rc; - result.message = message; - // release table lock and session - fSessionManager.rolledback(txnID); - (void)fDbrm->releaseTableLock(tableLockId); - fWEClient->removeQueue(uniqueId); - return result; + if (checkPPLostConnection(errorMsg)) + { + result.result = PP_LOST_CONNECTION; + (void)fDbrm->releaseTableLock(tableLockId); + fWEClient->removeQueue(uniqueId); + return result; + } + else + { + cout << fTxnid.id << " Error in dropping table from systables(" << (int)rc << ") " << errorMsg.c_str() + << endl; + Message::Args args; + Message message(9); + args.add("Error in dropping table from systables."); + args.add(errorMsg); + message.format(args); + result.result = (ResultCode)rc; + result.message = message; + fSessionManager.rolledback(txnID); + (void)fDbrm->releaseTableLock(tableLockId); + fWEClient->removeQueue(uniqueId); + return result; + } } // remove from syscolumn bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSCOLUMN; bytestream << uniqueId; - bytestream << (uint32_t)dropTableStmt.fSessionID; + bytestream << (uint32_t)dropTableStmt->fSessionID; bytestream << (uint32_t)txnID.id; - bytestream << dropTableStmt.fTableName->fSchema; - bytestream << dropTableStmt.fTableName->fName; + bytestream << dropTableStmt->fTableName->fSchema; + bytestream << dropTableStmt->fTableName->fName; // Find out where syscolumn is sysOid = 1021; @@ -518,7 +544,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage( } // Log the DDL statement - logDDL(dropTableStmt.fSessionID, txnID.id, dropTableStmt.fSql, dropTableStmt.fOwner); + logDDL(dropTableStmt->fSessionID, txnID.id, dropTableStmt->fSql, dropTableStmt->fOwner); } catch (std::exception& ex) { @@ -738,8 +764,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage( return result; } -TruncTableProcessor::DDLResult TruncTableProcessor::processPackage( - ddlpackage::TruncTableStatement& truncTableStmt) +TruncTableProcessor::DDLResult TruncTableProcessor::processPackageInternal(ddlpackage::SqlStatement* sqlStmt) { SUMMARY_INFO("TruncTableProcessor::processPackage"); // 1. lock the table @@ -756,6 +781,8 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage( DDLResult result; result.result = NO_ERROR; std::string err; + + auto* truncTableStmt = dynamic_cast(sqlStmt); VERBOSE_INFO(truncTableStmt); // @Bug 4150. Check dbrm status before doing anything to the table. @@ -778,8 +805,8 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage( } //@Bug 5765 log the schema. - string stmt = truncTableStmt.fSql + "|" + truncTableStmt.fTableName->fSchema + "|"; - SQLLogger logger(stmt, fDDLLoggingId, truncTableStmt.fSessionID, txnID.id); + string stmt = truncTableStmt->fSql + "|" + truncTableStmt->fTableName->fSchema + "|"; + SQLLogger logger(stmt, fDDLLoggingId, truncTableStmt->fSessionID, txnID.id); std::vector columnOidList; std::vector allOidList; @@ -789,12 +816,11 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage( execplan::CalpontSystemCatalog::ROPair roPair; std::string processName("DDLProc"); uint32_t processID = ::getpid(); - ; int32_t txnid = txnID.id; boost::shared_ptr systemCatalogPtr = - CalpontSystemCatalog::makeCalpontSystemCatalog(truncTableStmt.fSessionID); + CalpontSystemCatalog::makeCalpontSystemCatalog(truncTableStmt->fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); - systemCatalogPtr->sessionID(truncTableStmt.fSessionID); + systemCatalogPtr->sessionID(truncTableStmt->fSessionID); CalpontSystemCatalog::TableInfo tableInfo; uint64_t uniqueId = 0; @@ -840,10 +866,10 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage( // check table lock CalpontSystemCatalog::TableName tableName; - tableName.schema = truncTableStmt.fTableName->fSchema; - tableName.table = truncTableStmt.fTableName->fName; + tableName.schema = truncTableStmt->fTableName->fSchema; + tableName.table = truncTableStmt->fTableName->fName; roPair = systemCatalogPtr->tableRID(tableName); - int32_t sessionId = truncTableStmt.fSessionID; + int32_t sessionId = truncTableStmt->fSessionID; std::string processName("DDLProc"); int i = 0; @@ -886,12 +912,11 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage( abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); - try { processID = ::getpid(); txnid = txnID.id; - sessionId = truncTableStmt.fSessionID; + sessionId = truncTableStmt->fSessionID; processName = "DDLProc"; tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING); @@ -916,8 +941,8 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage( } CalpontSystemCatalog::TableName userTableName; - userTableName.schema = truncTableStmt.fTableName->fSchema; - userTableName.table = truncTableStmt.fTableName->fName; + userTableName.schema = truncTableStmt->fTableName->fSchema; + userTableName.table = truncTableStmt->fTableName->fName; tableColRidList = systemCatalogPtr->columnRIDs(userTableName); tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName); @@ -950,30 +975,61 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage( } catch (std::exception& ex) { - cerr << "TruncateTableProcessor::processPackage: " << ex.what() << endl; - - Message::Args args; - Message message(9); - args.add("Truncate table failed: "); - args.add(ex.what()); - args.add(""); - fSessionManager.rolledback(txnID); - - try + if (checkPPLostConnection(ex.what())) { - (void)fDbrm->releaseTableLock(tableLockId); + if (tableLockId != 0) + { + Message::Args args; + Message message(9); + args.add("Truncate table failed: "); + args.add(ex.what()); + args.add(""); + try + { + (void)fDbrm->releaseTableLock(tableLockId); + } + catch (std::exception&) + { + args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); + fSessionManager.rolledback(txnID); + message.format(args); + fWEClient->removeQueue(uniqueId); + result.result = TRUNC_ERROR; + return result; + } + } + + fWEClient->removeQueue(uniqueId); + result.result = PP_LOST_CONNECTION; + return result; } - catch (std::exception&) + else { - args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); + cerr << "TruncateTableProcessor::processPackage: " << ex.what() << endl; + + Message::Args args; + Message message(9); + args.add("Truncate table failed: "); + args.add(ex.what()); + args.add(""); + fSessionManager.rolledback(txnID); + + try + { + (void)fDbrm->releaseTableLock(tableLockId); + } + catch (std::exception&) + { + args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); + } + + fWEClient->removeQueue(uniqueId); + message.format(args); + + result.result = TRUNC_ERROR; + result.message = message; + return result; } - - fWEClient->removeQueue(uniqueId); - message.format(args); - - result.result = TRUNC_ERROR; - result.message = message; - return result; } catch (...) { @@ -1328,7 +1384,7 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage( if (rc != 0) { - rollBackTransaction(uniqueId, txnID, truncTableStmt.fSessionID); // What to do with the error code + rollBackTransaction(uniqueId, txnID, truncTableStmt->fSessionID); // What to do with the error code fSessionManager.rolledback(txnID); } @@ -1341,7 +1397,7 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage( } // Log the DDL statement - logDDL(truncTableStmt.fSessionID, txnID.id, truncTableStmt.fSql, truncTableStmt.fOwner); + logDDL(truncTableStmt->fSessionID, txnID.id, truncTableStmt->fSql, truncTableStmt->fOwner); try { @@ -1380,4 +1436,3 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage( } } // namespace ddlpackageprocessor - diff --git a/dbcon/ddlpackageproc/droptableprocessor.h b/dbcon/ddlpackageproc/droptableprocessor.h index 90ab26173..1a86706e3 100644 --- a/dbcon/ddlpackageproc/droptableprocessor.h +++ b/dbcon/ddlpackageproc/droptableprocessor.h @@ -39,14 +39,14 @@ class DropTableProcessor : public DDLPackageProcessor DropTableProcessor(BRM::DBRM* aDbrm) : DDLPackageProcessor(aDbrm) { } + + protected: + private: /** @brief process a drop table statement * * @param dropTableStmt the drop table statement */ - EXPORT DDLResult processPackage(ddlpackage::DropTableStatement& dropTableStmt); - - protected: - private: + DDLResult processPackageInternal(ddlpackage::SqlStatement* dropTableStmt); }; /** @brief specialization of a DDLPacakageProcessor @@ -59,14 +59,14 @@ class TruncTableProcessor : public DDLPackageProcessor TruncTableProcessor(BRM::DBRM* aDbrm) : DDLPackageProcessor(aDbrm) { } + + protected: + private: /** @brief process a truncate table statement * * @param truncTableStmt the truncate table statement */ - EXPORT DDLResult processPackage(ddlpackage::TruncTableStatement& truncTableStmt); - - protected: - private: + DDLResult processPackageInternal(ddlpackage::SqlStatement* truncTableStmt); }; } // namespace ddlpackageprocessor diff --git a/dbcon/ddlpackageproc/markpartitionprocessor.cpp b/dbcon/ddlpackageproc/markpartitionprocessor.cpp index 6e5f5eb11..6c87d5e0e 100644 --- a/dbcon/ddlpackageproc/markpartitionprocessor.cpp +++ b/dbcon/ddlpackageproc/markpartitionprocessor.cpp @@ -34,16 +34,28 @@ using namespace oam; namespace ddlpackageprocessor { -MarkPartitionProcessor::DDLResult MarkPartitionProcessor::processPackage( - ddlpackage::MarkPartitionStatement& markPartitionStmt) +MarkPartitionProcessor::DDLResult MarkPartitionProcessor::processPackageInternal( + ddlpackage::SqlStatement* sqlStmt) { - SUMMARY_INFO("RestorePartitionProcessor::processPackage"); + SUMMARY_INFO("MarkPartitionProcessor::processPackage"); DDLResult result; result.result = NO_ERROR; std::string err; - VERBOSE_INFO(markPartitionStmt); + auto* markPartitionStmt = dynamic_cast(sqlStmt); + if (!markPartitionStmt) + { + logging::Message::Args args; + logging::Message message(9); + args.add("MarkPartitionStatement wrong cast"); + message.format(args); + result.result = DROP_ERROR; + result.message = message; + return result; + } + + VERBOSE_INFO(markPartitionStmt); BRM::TxnID txnID; txnID.id = fTxnid.id; txnID.valid = fTxnid.valid; @@ -69,30 +81,30 @@ MarkPartitionProcessor::DDLResult MarkPartitionProcessor::processPackage( CalpontSystemCatalog::DictOIDList dictOIDList; std::string processName("DDLProc"); - string stmt = markPartitionStmt.fSql + "|" + markPartitionStmt.fTableName->fSchema + "|"; - SQLLogger logger(stmt, fDDLLoggingId, markPartitionStmt.fSessionID, txnID.id); + string stmt = markPartitionStmt->fSql + "|" + markPartitionStmt->fTableName->fSchema + "|"; + SQLLogger logger(stmt, fDDLLoggingId, markPartitionStmt->fSessionID, txnID.id); uint32_t processID = 0; uint64_t uniqueID = 0; - uint32_t sessionID = markPartitionStmt.fSessionID; + uint32_t sessionID = markPartitionStmt->fSessionID; execplan::CalpontSystemCatalog::ROPair roPair; try { // check table lock boost::shared_ptr systemCatalogPtr = - CalpontSystemCatalog::makeCalpontSystemCatalog(markPartitionStmt.fSessionID); + CalpontSystemCatalog::makeCalpontSystemCatalog(markPartitionStmt->fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); - systemCatalogPtr->sessionID(markPartitionStmt.fSessionID); + systemCatalogPtr->sessionID(markPartitionStmt->fSessionID); CalpontSystemCatalog::TableName tableName; - tableName.schema = markPartitionStmt.fTableName->fSchema; - tableName.table = markPartitionStmt.fTableName->fName; + tableName.schema = markPartitionStmt->fTableName->fSchema; + tableName.table = markPartitionStmt->fTableName->fName; roPair = systemCatalogPtr->tableRID(tableName); //@Bug 3054 check for system catalog if (roPair.objnum < 3000) { - throw std::runtime_error("Drop partition cannot be operated on Calpont system catalog."); + throw std::runtime_error("Mark partition cannot be operated on Calpont system catalog."); } int i = 0; @@ -142,7 +154,7 @@ MarkPartitionProcessor::DDLResult MarkPartitionProcessor::processPackage( } while (nanosleep(&abs_ts, &rm_ts) < 0); // reset - sessionID = markPartitionStmt.fSessionID; + sessionID = markPartitionStmt->fSessionID; txnID.id = fTxnid.id; txnID.valid = fTxnid.valid; processID = ::getpid(); @@ -186,8 +198,8 @@ MarkPartitionProcessor::DDLResult MarkPartitionProcessor::processPackage( // 6. Remove the column and dictionary files for the partition CalpontSystemCatalog::TableName userTableName; - userTableName.schema = markPartitionStmt.fTableName->fSchema; - userTableName.table = markPartitionStmt.fTableName->fName; + userTableName.schema = markPartitionStmt->fTableName->fSchema; + userTableName.table = markPartitionStmt->fTableName->fName; tableColRidList = systemCatalogPtr->columnRIDs(userTableName); tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName); @@ -214,7 +226,7 @@ MarkPartitionProcessor::DDLResult MarkPartitionProcessor::processPackage( // Remove the partition from extent map string emsg; - rc = fDbrm->markPartitionForDeletion(oidList, markPartitionStmt.fPartitions, emsg); + rc = fDbrm->markPartitionForDeletion(oidList, markPartitionStmt->fPartitions, emsg); if (rc != 0) { @@ -281,7 +293,7 @@ MarkPartitionProcessor::DDLResult MarkPartitionProcessor::processPackage( } // Log the DDL statement - logging::logDDL(markPartitionStmt.fSessionID, 0, markPartitionStmt.fSql, markPartitionStmt.fOwner); + logging::logDDL(markPartitionStmt->fSessionID, 0, markPartitionStmt->fSql, markPartitionStmt->fOwner); try { diff --git a/dbcon/ddlpackageproc/markpartitionprocessor.h b/dbcon/ddlpackageproc/markpartitionprocessor.h index 4f706bd0a..c3661a81d 100644 --- a/dbcon/ddlpackageproc/markpartitionprocessor.h +++ b/dbcon/ddlpackageproc/markpartitionprocessor.h @@ -39,14 +39,14 @@ class MarkPartitionProcessor : public DDLPackageProcessor MarkPartitionProcessor(BRM::DBRM* aDbrm) : DDLPackageProcessor(aDbrm) { } + + protected: + private: /** @brief process a create table statement * * @param createTableStmt the CreateTableStatement */ - EXPORT DDLResult processPackage(ddlpackage::MarkPartitionStatement& MarkPartitionStmt); - - protected: - private: + DDLResult processPackageInternal(ddlpackage::SqlStatement* MarkPartitionStmt); }; } // namespace ddlpackageprocessor diff --git a/dbcon/ddlpackageproc/restorepartitionprocessor.cpp b/dbcon/ddlpackageproc/restorepartitionprocessor.cpp index fea8f7b54..d2883222b 100644 --- a/dbcon/ddlpackageproc/restorepartitionprocessor.cpp +++ b/dbcon/ddlpackageproc/restorepartitionprocessor.cpp @@ -34,14 +34,27 @@ using namespace WriteEngine; namespace ddlpackageprocessor { -RestorePartitionProcessor::DDLResult RestorePartitionProcessor::processPackage( - ddlpackage::RestorePartitionStatement& restorePartitionStmt) +RestorePartitionProcessor::DDLResult RestorePartitionProcessor::processPackageInternal( + ddlpackage::SqlStatement* sqlStmt) { SUMMARY_INFO("RestorePartitionProcessor::processPackage"); DDLResult result; result.result = NO_ERROR; std::string err; + + auto* restorePartitionStmt = dynamic_cast(sqlStmt); + if (!restorePartitionStmt) + { + logging::Message::Args args; + logging::Message message(9); + args.add("RestorePartitionStatement wrong cast"); + message.format(args); + result.result = DROP_ERROR; + result.message = message; + return result; + } + VERBOSE_INFO(restorePartitionStmt); BRM::TxnID txnID; @@ -69,24 +82,24 @@ RestorePartitionProcessor::DDLResult RestorePartitionProcessor::processPackage( CalpontSystemCatalog::DictOIDList dictOIDList; std::string processName("DDLProc"); - string stmt = restorePartitionStmt.fSql + "|" + restorePartitionStmt.fTableName->fSchema + "|"; - SQLLogger logger(stmt, fDDLLoggingId, restorePartitionStmt.fSessionID, txnID.id); + string stmt = restorePartitionStmt->fSql + "|" + restorePartitionStmt->fTableName->fSchema + "|"; + SQLLogger logger(stmt, fDDLLoggingId, restorePartitionStmt->fSessionID, txnID.id); uint32_t processID = 0; uint64_t uniqueID = 0; - uint32_t sessionID = restorePartitionStmt.fSessionID; + uint32_t sessionID = restorePartitionStmt->fSessionID; execplan::CalpontSystemCatalog::ROPair roPair; try { // check table lock boost::shared_ptr systemCatalogPtr = - CalpontSystemCatalog::makeCalpontSystemCatalog(restorePartitionStmt.fSessionID); + CalpontSystemCatalog::makeCalpontSystemCatalog(restorePartitionStmt->fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); - systemCatalogPtr->sessionID(restorePartitionStmt.fSessionID); + systemCatalogPtr->sessionID(restorePartitionStmt->fSessionID); CalpontSystemCatalog::TableName tableName; - tableName.schema = restorePartitionStmt.fTableName->fSchema; - tableName.table = restorePartitionStmt.fTableName->fName; + tableName.schema = restorePartitionStmt->fTableName->fSchema; + tableName.table = restorePartitionStmt->fTableName->fName; roPair = systemCatalogPtr->tableRID(tableName); //@Bug 3054 check for system catalog @@ -142,7 +155,7 @@ RestorePartitionProcessor::DDLResult RestorePartitionProcessor::processPackage( } while (nanosleep(&abs_ts, &rm_ts) < 0); // reset - sessionID = restorePartitionStmt.fSessionID; + sessionID = restorePartitionStmt->fSessionID; txnID.id = fTxnid.id; txnID.valid = fTxnid.valid; processID = ::getpid(); @@ -188,8 +201,8 @@ RestorePartitionProcessor::DDLResult RestorePartitionProcessor::processPackage( // 6. Remove the column and dictionary files for the partition CalpontSystemCatalog::TableName userTableName; - userTableName.schema = restorePartitionStmt.fTableName->fSchema; - userTableName.table = restorePartitionStmt.fTableName->fName; + userTableName.schema = restorePartitionStmt->fTableName->fSchema; + userTableName.table = restorePartitionStmt->fTableName->fName; tableColRidList = systemCatalogPtr->columnRIDs(userTableName); tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName); @@ -216,7 +229,7 @@ RestorePartitionProcessor::DDLResult RestorePartitionProcessor::processPackage( // Remove the partition from extent map string emsg; - rc = fDbrm->restorePartition(oidList, restorePartitionStmt.fPartitions, emsg); + rc = fDbrm->restorePartition(oidList, restorePartitionStmt->fPartitions, emsg); if (rc != 0) { @@ -281,8 +294,8 @@ RestorePartitionProcessor::DDLResult RestorePartitionProcessor::processPackage( } // Log the DDL statement - logging::logDDL(restorePartitionStmt.fSessionID, txnID.id, restorePartitionStmt.fSql, - restorePartitionStmt.fOwner); + logging::logDDL(restorePartitionStmt->fSessionID, txnID.id, restorePartitionStmt->fSql, + restorePartitionStmt->fOwner); try { diff --git a/dbcon/ddlpackageproc/restorepartitionprocessor.h b/dbcon/ddlpackageproc/restorepartitionprocessor.h index 760d6433c..8e77c7eb9 100644 --- a/dbcon/ddlpackageproc/restorepartitionprocessor.h +++ b/dbcon/ddlpackageproc/restorepartitionprocessor.h @@ -39,14 +39,14 @@ class RestorePartitionProcessor : public DDLPackageProcessor RestorePartitionProcessor(BRM::DBRM* aDbrm) : DDLPackageProcessor(aDbrm) { } + + protected: + private: /** @brief process a drop table statement * * @param dropTableStmt the drop table statement */ - EXPORT DDLResult processPackage(ddlpackage::RestorePartitionStatement& RestorePartitionStmt); - - protected: - private: + DDLResult processPackageInternal(ddlpackage::SqlStatement* RestorePartitionStmt); }; } // namespace ddlpackageprocessor diff --git a/dbcon/dmlpackageproc/commandpackageprocessor.cpp b/dbcon/dmlpackageproc/commandpackageprocessor.cpp index 764ecc609..7c3c4794d 100644 --- a/dbcon/dmlpackageproc/commandpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/commandpackageprocessor.cpp @@ -53,7 +53,7 @@ namespace dmlpackageprocessor /*static*/ std::set CommandPackageProcessor::fActiveClearTableLockCmds; /*static*/ boost::mutex CommandPackageProcessor::fActiveClearTableLockCmdMutex; -DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackage( +DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackageInternal( dmlpackage::CalpontDMLPackage& cpackage) { SUMMARY_INFO("CommandPackageProcessor::processPackage"); @@ -485,17 +485,24 @@ DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackage( } catch (std::exception& ex) { - cerr << "CommandPackageProcessor::processPackage: " << ex.what() << endl; + if (checkPPLostConnection(ex)) + { + result.result = PP_LOST_CONNECTION; + } + else + { + cerr << "CommandPackageProcessor::processPackage: " << ex.what() << endl; - logging::Message::Args args; - logging::Message message(1); - args.add(ex.what()); - args.add(""); - args.add(""); - message.format(args); + logging::Message::Args args; + logging::Message message(1); + args.add(ex.what()); + args.add(""); + args.add(""); + message.format(args); - result.result = COMMAND_ERROR; - result.message = message; + result.result = COMMAND_ERROR; + result.message = message; + } } catch (...) { diff --git a/dbcon/dmlpackageproc/commandpackageprocessor.h b/dbcon/dmlpackageproc/commandpackageprocessor.h index b0d9a67a6..ec0218e59 100644 --- a/dbcon/dmlpackageproc/commandpackageprocessor.h +++ b/dbcon/dmlpackageproc/commandpackageprocessor.h @@ -47,17 +47,13 @@ class CommandPackageProcessor : public DMLPackageProcessor CommandPackageProcessor(BRM::DBRM* aDbrm, uint32_t sid) : DMLPackageProcessor(aDbrm, sid) { } - /** @brief process an CommandDMLPackage - * - * @param cpackage the CommandDMLPackage to process - */ - EXPORT DMLResult processPackage(dmlpackage::CalpontDMLPackage& cpackage); protected: private: void viewTableLock(const dmlpackage::CalpontDMLPackage& cpackage, DMLResult& result); void clearTableLock(uint64_t uniqueId, const dmlpackage::CalpontDMLPackage& cpackage, DMLResult& result); void establishTableLockToClear(uint64_t tableLockID, BRM::TableLockInfo& lockInfo); + DMLResult processPackageInternal(dmlpackage::CalpontDMLPackage& cpackage) override; // Tracks active cleartablelock commands by storing set of table lock IDs static std::set fActiveClearTableLockCmds; diff --git a/dbcon/dmlpackageproc/deletepackageprocessor.cpp b/dbcon/dmlpackageproc/deletepackageprocessor.cpp index abe88fbac..42e3b4ee4 100644 --- a/dbcon/dmlpackageproc/deletepackageprocessor.cpp +++ b/dbcon/dmlpackageproc/deletepackageprocessor.cpp @@ -57,7 +57,8 @@ using namespace messageqcpp; using namespace oam; namespace dmlpackageprocessor { -DMLPackageProcessor::DMLResult DeletePackageProcessor::processPackage(dmlpackage::CalpontDMLPackage& cpackage) +DMLPackageProcessor::DMLResult DeletePackageProcessor::processPackageInternal( + dmlpackage::CalpontDMLPackage& cpackage) { SUMMARY_INFO("DeletePackageProcessor::processPackage"); @@ -174,7 +175,6 @@ DMLPackageProcessor::DMLResult DeletePackageProcessor::processPackage(dmlpackage abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); - try { processID = ::getpid(); @@ -264,15 +264,22 @@ DMLPackageProcessor::DMLResult DeletePackageProcessor::processPackage(dmlpackage } catch (exception& ex) { - cerr << "DeletePackageProcessor::processPackage: " << ex.what() << endl; - - //@Bug 4994 Cancelled job is not error - if (result.result == 0) + if (checkPPLostConnection(ex)) { - result.result = DELETE_ERROR; + result.result = PP_LOST_CONNECTION; } + else + { + cerr << "DeletePackageProcessor::processPackage: " << ex.what() << endl; - result.message = Message(ex.what()); + //@Bug 4994 Cancelled job is not error + if (result.result == 0) + { + result.result = DELETE_ERROR; + } + + result.message = Message(ex.what()); + } } catch (...) { diff --git a/dbcon/dmlpackageproc/deletepackageprocessor.h b/dbcon/dmlpackageproc/deletepackageprocessor.h index bf487b41c..084e004db 100644 --- a/dbcon/dmlpackageproc/deletepackageprocessor.h +++ b/dbcon/dmlpackageproc/deletepackageprocessor.h @@ -45,14 +45,11 @@ class DeletePackageProcessor : public DMLPackageProcessor DeletePackageProcessor(BRM::DBRM* aDbrm, uint32_t sid) : DMLPackageProcessor(aDbrm, sid) { } - /** @brief process a DeleteDMLPackage - * - * @param cpackage the delete dml package to process - */ - EXPORT DMLResult processPackage(dmlpackage::CalpontDMLPackage& cpackage); protected: private: + DMLResult processPackageInternal(dmlpackage::CalpontDMLPackage& cpackage) override; + /** @brief delete a row * * @param txnID the transaction id diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.cpp b/dbcon/dmlpackageproc/dmlpackageprocessor.cpp index 0f49bc215..e5d288d0d 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.cpp @@ -287,6 +287,31 @@ int32_t DMLPackageProcessor::tryToRollBackTransaction(uint64_t uniqueId, BRM::Tx return weRc; } +DMLPackageProcessor::DMLResult DMLPackageProcessor::processPackage(dmlpackage::CalpontDMLPackage& cpackage) +{ + auto result = processPackageInternal(cpackage); + uint32_t tries = 0; + // Try to setup connection and process package one more time. + while ((result.result == PP_LOST_CONNECTION) && (tries < 5)) + { + std::cerr << "DMLPackageProcessor: NETWORK ERROR; attempt # " << tries << std::endl; + joblist::ResourceManager* rm = joblist::ResourceManager::instance(true); + joblist::DistributedEngineComm* fEc = joblist::DistributedEngineComm::instance(rm); + if (fEc->Setup()) + return result; + + result = processPackageInternal(cpackage); + ++tries; + } + return result; +} + +bool DMLPackageProcessor::checkPPLostConnection(std::exception& ex) +{ + std::string error = ex.what(); + return error.find(PPLostConnectionErrorCode) != std::string::npos; +} + int DMLPackageProcessor::rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID, std::string& errorMsg) { diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.h b/dbcon/dmlpackageproc/dmlpackageprocessor.h index 5038ca4bd..ec7956c81 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.h +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.h @@ -98,7 +98,8 @@ class DMLPackageProcessor TABLE_LOCK_ERROR, JOB_ERROR, JOB_CANCELED, - DBRM_READ_ONLY + DBRM_READ_ONLY, + PP_LOST_CONNECTION }; enum DebugLevel /** @brief Debug level type enumeration */ @@ -212,7 +213,11 @@ class DMLPackageProcessor * * @param cpackage the CalpontDMLPackage to process */ - virtual DMLResult processPackage(dmlpackage::CalpontDMLPackage& cpackage) = 0; + DMLResult processPackage(dmlpackage::CalpontDMLPackage& cpackage); + + /** @brief Check that give exception is related to PP lost connection. + */ + bool checkPPLostConnection(std::exception& ex); inline void setRM(joblist::ResourceManager* frm) { @@ -502,6 +507,8 @@ class DMLPackageProcessor execplan::ClientRotator* fExeMgr; private: + virtual DMLResult processPackageInternal(dmlpackage::CalpontDMLPackage& cpackage) = 0; + /** @brief clean beginning and ending glitches and spaces from string * * @param s string to be cleaned @@ -509,6 +516,8 @@ class DMLPackageProcessor void cleanString(std::string& s); DebugLevel fDebugLevel; // internal use debug level + + const std::string PPLostConnectionErrorCode = "MCS-2045"; }; /** @brief helper template function to do safe from string to type conversions diff --git a/dbcon/dmlpackageproc/insertpackageprocessor.cpp b/dbcon/dmlpackageproc/insertpackageprocessor.cpp index 6257a3aed..524acb677 100644 --- a/dbcon/dmlpackageproc/insertpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/insertpackageprocessor.cpp @@ -50,7 +50,8 @@ using namespace messageqcpp; namespace dmlpackageprocessor { -DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackage(dmlpackage::CalpontDMLPackage& cpackage) +DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackageInternal( + dmlpackage::CalpontDMLPackage& cpackage) { SUMMARY_INFO("InsertPackageProcessor::processPackage"); @@ -184,7 +185,6 @@ DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackage(dmlpackage abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); - try { processID = ::getpid(); @@ -365,21 +365,28 @@ DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackage(dmlpackage } catch (exception& ex) { - cerr << "InsertPackageProcessor::processPackage: " << ex.what() << endl; - - logging::Message::Args args; - logging::Message message(1); - args.add("Insert Failed: "); - args.add(ex.what()); - args.add(""); - args.add(""); - message.format(args); - - if (result.result != VB_OVERFLOW_ERROR) + if (checkPPLostConnection(ex)) { - result.result = INSERT_ERROR; - result.message = message; - errorMsg = ex.what(); + result.result = PP_LOST_CONNECTION; + } + else + { + cerr << "InsertPackageProcessor::processPackage: " << ex.what() << endl; + + logging::Message::Args args; + logging::Message message(1); + args.add("Insert Failed: "); + args.add(ex.what()); + args.add(""); + args.add(""); + message.format(args); + + if (result.result != VB_OVERFLOW_ERROR) + { + result.result = INSERT_ERROR; + result.message = message; + errorMsg = ex.what(); + } } } catch (...) @@ -397,7 +404,19 @@ DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackage(dmlpackage result.message = message; } - if ((rc != 0) && (rc != IDBRANGE_WARNING)) + if (rc == 1) + { + logging::Message::Args args; + logging::Message message(1); + args.add("Insert Failed: "); + args.add(errorMsg); + args.add(""); + args.add(""); + message.format(args); + result.result = PP_LOST_CONNECTION; + result.message = message; + } + else if ((rc != 0) && (rc != IDBRANGE_WARNING)) { logging::Message::Args args; logging::Message message(1); @@ -427,4 +446,3 @@ DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackage(dmlpackage } } // namespace dmlpackageprocessor - diff --git a/dbcon/dmlpackageproc/insertpackageprocessor.h b/dbcon/dmlpackageproc/insertpackageprocessor.h index a868bccfd..7be135207 100644 --- a/dbcon/dmlpackageproc/insertpackageprocessor.h +++ b/dbcon/dmlpackageproc/insertpackageprocessor.h @@ -45,14 +45,10 @@ class InsertPackageProcessor : public DMLPackageProcessor InsertPackageProcessor(BRM::DBRM* aDbrm, uint32_t sid) : DMLPackageProcessor(aDbrm, sid) { } - /** @brief process an InsertDMLPackage - * - * @param cpackage the InsertDMLPackage to process - */ - EXPORT DMLResult processPackage(dmlpackage::CalpontDMLPackage& cpackage); protected: private: + DMLResult processPackageInternal(dmlpackage::CalpontDMLPackage& cpackage) override; }; } // namespace dmlpackageprocessor diff --git a/dbcon/dmlpackageproc/updatepackageprocessor.cpp b/dbcon/dmlpackageproc/updatepackageprocessor.cpp index 35b4cb166..ab2127721 100644 --- a/dbcon/dmlpackageproc/updatepackageprocessor.cpp +++ b/dbcon/dmlpackageproc/updatepackageprocessor.cpp @@ -61,7 +61,8 @@ using namespace oam; namespace dmlpackageprocessor { // StopWatch timer; -DMLPackageProcessor::DMLResult UpdatePackageProcessor::processPackage(dmlpackage::CalpontDMLPackage& cpackage) +DMLPackageProcessor::DMLResult UpdatePackageProcessor::processPackageInternal( + dmlpackage::CalpontDMLPackage& cpackage) { SUMMARY_INFO("UpdatePackageProcessor::processPackage"); @@ -201,7 +202,6 @@ DMLPackageProcessor::DMLResult UpdatePackageProcessor::processPackage(dmlpackage abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); - try { processID = ::getpid(); @@ -301,22 +301,28 @@ DMLPackageProcessor::DMLResult UpdatePackageProcessor::processPackage(dmlpackage } catch (std::exception& ex) { - cerr << "UpdatePackageProcessor::processPackage:" << ex.what() << endl; - - if (result.result == 0) + if (checkPPLostConnection(ex)) { - result.result = UPDATE_ERROR; + result.result = PP_LOST_CONNECTION; } + else + { + cerr << "UpdatePackageProcessor::processPackage:" << ex.what() << endl; + if (result.result == 0) + { + result.result = UPDATE_ERROR; + } - result.message = Message(ex.what()); - result.rowCount = 0; - LoggingID logid(DMLLoggingId, fSessionID, txnid.id); - logging::Message::Args args1; - logging::Message msg(1); - args1.add("End SQL statement with error"); - msg.format(args1); - logging::Logger logger(logid.fSubsysID); - logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + result.message = Message(ex.what()); + result.rowCount = 0; + LoggingID logid(DMLLoggingId, fSessionID, txnid.id); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("End SQL statement with error"); + msg.format(args1); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + } } catch (...) { diff --git a/dbcon/dmlpackageproc/updatepackageprocessor.h b/dbcon/dmlpackageproc/updatepackageprocessor.h index 42ad8abcb..72b2e0b40 100644 --- a/dbcon/dmlpackageproc/updatepackageprocessor.h +++ b/dbcon/dmlpackageproc/updatepackageprocessor.h @@ -42,14 +42,11 @@ class UpdatePackageProcessor : public DMLPackageProcessor UpdatePackageProcessor(BRM::DBRM* aDbrm, uint32_t sid) : DMLPackageProcessor(aDbrm, sid) { } - /** @brief process an UpdateDMLPackage - * - * @param cpackage the UpdateDMLPackage to process - */ - EXPORT DMLResult processPackage(dmlpackage::CalpontDMLPackage& cpackage); protected: private: + DMLResult processPackageInternal(dmlpackage::CalpontDMLPackage& cpackage) override; + /** @brief send execution plan to ExeMgr and fetch rows * * @param cpackage the UpdateDMLPackage to process diff --git a/ddlproc/ddlprocessor.cpp b/ddlproc/ddlprocessor.cpp index 292e7e87a..330c537f3 100644 --- a/ddlproc/ddlprocessor.cpp +++ b/ddlproc/ddlprocessor.cpp @@ -504,7 +504,7 @@ class PackageHandler qts.schema_name = createTableStmt.schemaName(); fQtc.postQueryTele(qts); - result = processor->processPackage(createTableStmt); + result = processor->processPackage(&createTableStmt); systemCatalogPtr->removeCalpontSystemCatalog(createTableStmt.fSessionID); systemCatalogPtr->removeCalpontSystemCatalog(createTableStmt.fSessionID | 0x80000000); @@ -539,7 +539,7 @@ class PackageHandler processor->fTimeZone = alterTableStmt.getTimeZone(); - result = processor->processPackage(alterTableStmt); + result = processor->processPackage(&alterTableStmt); systemCatalogPtr->removeCalpontSystemCatalog(alterTableStmt.fSessionID); systemCatalogPtr->removeCalpontSystemCatalog(alterTableStmt.fSessionID | 0x80000000); @@ -574,7 +574,7 @@ class PackageHandler fQtc.postQueryTele(qts); // cout << "Drop table using txnid " << fTxnid.id << endl; - result = processor->processPackage(dropTableStmt); + result = processor->processPackage(&dropTableStmt); systemCatalogPtr->removeCalpontSystemCatalog(dropTableStmt.fSessionID); systemCatalogPtr->removeCalpontSystemCatalog(dropTableStmt.fSessionID | 0x80000000); @@ -608,7 +608,7 @@ class PackageHandler qts.schema_name = truncTableStmt.schemaName(); fQtc.postQueryTele(qts); - result = processor->processPackage(truncTableStmt); + result = processor->processPackage(&truncTableStmt); systemCatalogPtr->removeCalpontSystemCatalog(truncTableStmt.fSessionID); systemCatalogPtr->removeCalpontSystemCatalog(truncTableStmt.fSessionID | 0x80000000); @@ -628,7 +628,7 @@ class PackageHandler boost::scoped_ptr processor(new MarkPartitionProcessor(fDbrm)); (processor->fTxnid).id = fTxnid.id; (processor->fTxnid).valid = true; - result = processor->processPackage(markPartitionStmt); + result = processor->processPackage(&markPartitionStmt); systemCatalogPtr->removeCalpontSystemCatalog(markPartitionStmt.fSessionID); systemCatalogPtr->removeCalpontSystemCatalog(markPartitionStmt.fSessionID | 0x80000000); } @@ -643,7 +643,7 @@ class PackageHandler boost::scoped_ptr processor(new RestorePartitionProcessor(fDbrm)); (processor->fTxnid).id = fTxnid.id; (processor->fTxnid).valid = true; - result = processor->processPackage(restorePartitionStmt); + result = processor->processPackage(&restorePartitionStmt); systemCatalogPtr->removeCalpontSystemCatalog(restorePartitionStmt.fSessionID); systemCatalogPtr->removeCalpontSystemCatalog(restorePartitionStmt.fSessionID | 0x80000000); } @@ -658,7 +658,7 @@ class PackageHandler boost::scoped_ptr processor(new DropPartitionProcessor(fDbrm)); (processor->fTxnid).id = fTxnid.id; (processor->fTxnid).valid = true; - result = processor->processPackage(dropPartitionStmt); + result = processor->processPackage(&dropPartitionStmt); systemCatalogPtr->removeCalpontSystemCatalog(dropPartitionStmt.fSessionID); systemCatalogPtr->removeCalpontSystemCatalog(dropPartitionStmt.fSessionID | 0x80000000); } diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index a7f44fe45..bd0f87d61 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -577,7 +577,18 @@ void PackageHandler::run() CalpontSystemCatalog::TableName tableName; tableName.schema = insertPkg.get_Table()->get_SchemaName(); tableName.table = insertPkg.get_Table()->get_TableName(); - CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); + CalpontSystemCatalog::ROPair roPair; + try + { + roPair = fcsc->tableRID(tableName); + } + catch (...) + { + if (setupDec()) + throw; + roPair = fcsc->tableRID(tableName); + } + fTableOid = roPair.objnum; } synchTable.setPackage(this, &insertPkg); // Blocks if another DML thread is using this fTableOid @@ -976,7 +987,17 @@ void PackageHandler::run() CalpontSystemCatalog::TableName tableName; tableName.schema = updatePkg->get_Table()->get_SchemaName(); tableName.table = updatePkg->get_Table()->get_TableName(); - CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); + CalpontSystemCatalog::ROPair roPair; + try + { + roPair = fcsc->tableRID(tableName); + } + catch (...) + { + if (setupDec()) + throw; + roPair = fcsc->tableRID(tableName); + } fTableOid = roPair.objnum; } synchTable.setPackage(this, @@ -1036,7 +1057,17 @@ void PackageHandler::run() CalpontSystemCatalog::TableName tableName; tableName.schema = deletePkg->get_Table()->get_SchemaName(); tableName.table = deletePkg->get_Table()->get_TableName(); - CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); + CalpontSystemCatalog::ROPair roPair; + try + { + roPair = fcsc->tableRID(tableName); + } + catch (...) + { + if (setupDec()) + throw; + roPair = fcsc->tableRID(tableName); + } fTableOid = roPair.objnum; } synchTable.setPackage(this, diff --git a/dmlproc/dmlprocessor.h b/dmlproc/dmlprocessor.h index fbc7c32d6..5d1c0dc0e 100644 --- a/dmlproc/dmlprocessor.h +++ b/dmlproc/dmlprocessor.h @@ -199,6 +199,13 @@ class PackageHandler } private: + int32_t setupDec() + { + joblist::ResourceManager* rm = joblist::ResourceManager::instance(true); + joblist::DistributedEngineComm* fEc = joblist::DistributedEngineComm::instance(rm); + return fEc->Setup(); + } + messageqcpp::IOSocket fIos; boost::shared_ptr fByteStream; boost::scoped_ptr fProcessor; @@ -311,22 +318,21 @@ class RollbackTransactionProcessor : public dmlpackageprocessor::DMLPackageProce RollbackTransactionProcessor(BRM::DBRM* aDbrm) : DMLPackageProcessor(aDbrm, 1) { } - /** @brief process an Rollback transactions - * - * @param cpackage the UpdateDMLPackage to process - */ - inline DMLResult processPackage(dmlpackage::CalpontDMLPackage& cpackage) - { - DMLResult result; - result.result = NO_ERROR; - return result; - } void processBulkRollback(BRM::TableLockInfo lockInfo, BRM::DBRM* dbrm, uint64_t uniqueId, oam::OamCache::dbRootPMMap_t& dbRootPMMap, bool& lockReleased); protected: + private: + /** @brief process an Rollback transactions + * + * @param cpackage the UpdateDMLPackage to process + */ + DMLResult processPackageInternal(dmlpackage::CalpontDMLPackage& cpackage) + { + DMLResult result; + result.result = NO_ERROR; + return result; + } }; - } // namespace dmlprocessor -