/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: insertpackageprocessor.cpp 9642 2013-06-24 14:57:42Z rdempsey $ * * ***********************************************************************/ #include #include "insertpackageprocessor.h" #include "autoincrementdata.h" #include #include #include #include "messagelog.h" #include "sqllogger.h" #include #include "oamcache.h" #include "bytestream.h" #include #include #include #include "we_messages.h" #include "tablelockdata.h" using namespace boost::algorithm; using namespace std; using namespace WriteEngine; using namespace dmlpackage; using namespace execplan; using namespace dataconvert; using namespace logging; using namespace oam; using namespace messageqcpp; namespace dmlpackageprocessor { DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackageInternal( dmlpackage::CalpontDMLPackage& cpackage) { SUMMARY_INFO("InsertPackageProcessor::processPackage"); DMLResult result; result.result = NO_ERROR; BRM::TxnID txnid; // set-up the transaction txnid.id = cpackage.get_TxnID(); txnid.valid = true; fSessionID = cpackage.get_SessionID(); DMLTable* tablePtr = cpackage.get_Table(); LoggingID logid(DMLLoggingId, fSessionID, txnid.id); logging::Message::Args args1; logging::Message msg(1); args1.add("Start SQL statement: "); ostringstream oss; oss << cpackage.get_SQLStatement() << "; |" << tablePtr->get_SchemaName() << "|"; args1.add(oss.str()); msg.format(args1); Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); // WriteEngine::ChunkManager* cm = cpackage.get_ChunkManager(); // fWriteEngine.setChunkManager(cm); // std::map oids; VERBOSE_INFO("Processing Insert DML Package..."); uint64_t uniqueId = 0; // Bug 5070. Added exception handling try { uniqueId = fDbrm->getUnique64(); } catch (std::exception& ex) { logging::Message::Args args; logging::Message message(9); args.add(ex.what()); message.format(args); result.result = INSERT_ERROR; result.message = message; fSessionManager.rolledback(txnid); return result; } catch (...) { logging::Message::Args args; logging::Message message(9); args.add("Unknown error occurred while getting unique number."); message.format(args); result.result = INSERT_ERROR; result.message = message; fSessionManager.rolledback(txnid); return result; } uint64_t tableLockId = 0; int rc = 0; std::string errorMsg; OamCache* oamcache = OamCache::makeOamCache(); std::vector moduleIds = oamcache->getModuleIds(); std::vector pms; try { for (unsigned int i = 0; i < moduleIds.size(); i++) { pms.push_back((uint32_t)moduleIds[i]); } // cout << "single insert get transaction id " << txnid.id << endl; // get the table object from the package boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID); // cout << "DMLProc using syscatptr:sessionid = " << systemCatalogPtr <<":" << fSessionID<< endl; CalpontSystemCatalog::TableName tableName; execplan::CalpontSystemCatalog::ROPair roPair; TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID); if (0 != tablePtr) { // check table lock systemCatalogPtr->identity(CalpontSystemCatalog::EC); systemCatalogPtr->sessionID(fSessionID); tableName.schema = tablePtr->get_SchemaName(); tableName.table = tablePtr->get_TableName(); roPair = systemCatalogPtr->tableRID(tableName); tableLockId = tablelockData->getTablelockId( roPair.objnum); // check whether this table is locked already for this session if (tableLockId == 0) { // cout << "tablelock is not found in cache, getting from dbrm" << endl; uint32_t processID = ::getpid(); int32_t txnId = txnid.id; int32_t sessionId = fSessionID; std::string processName("DMLProc"); int i = 0; try { tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnId, BRM::LOADING); } catch (std::exception&) { throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } if (tableLockId == 0) { int waitPeriod = 10; int sleepTime = 100; // sleep 100 milliseconds between checks int numTries = 10; // try 10 times per second waitPeriod = Config::getWaitPeriod(); numTries = waitPeriod * 10; struct timespec rm_ts; rm_ts.tv_sec = sleepTime / 1000; rm_ts.tv_nsec = sleepTime % 1000 * 1000000; for (; i < numTries; i++) { struct timespec abs_ts; do { abs_ts.tv_sec = rm_ts.tv_sec; abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); try { processID = ::getpid(); txnId = txnid.id; sessionId = fSessionID; processName = "DMLProc"; tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnId, BRM::LOADING); } catch (std::exception&) { throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } if (tableLockId > 0) break; } if (i >= numTries) // error out { result.result = INSERT_ERROR; logging::Message::Args args; string strOp("insert"); args.add(strOp); args.add(processName); args.add((uint64_t)processID); args.add(sessionId); throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args)); } } } // cout << " tablelock is obtained with id " << tableLockId << endl; tablelockData->setTablelock(roPair.objnum, tableLockId); int pmNum = 0; // Select PM to receive the row. // 1. Get BRM information // 2. Find the DBRoot with the fewest in-service blocks. // DBRoots having no blocks are excluded // 3. Map the selected DBRoot to the corresponding PM CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(tableName, true); std::vector allInfo(pms.size()); for (unsigned i = 0; i < pms.size(); i++) { rc = fDbrm->getDbRootHWMInfo((ridList[0].objnum), pms[i], allInfo[i]); if (rc != 0) //@Bug 4760. { result.result = INSERT_ERROR; ostringstream oss; oss << "Error getting extent information for table " << tableName.table; throw std::runtime_error(oss.str()); } } // Find DBRoot with fewest blocks; if all DBRoots // have 0 blocks, then we select the first DBRoot BRM::EmDbRootHWMInfo tmp; bool tmpSet = false; for (unsigned i = 0; i < allInfo.size(); i++) { BRM::EmDbRootHWMInfo_v emDbRootHWMInfos = allInfo[i]; for (unsigned j = 0; j < emDbRootHWMInfos.size(); j++) { if (!tmpSet) { tmp = emDbRootHWMInfos[j]; tmpSet = true; } else if (emDbRootHWMInfos[j].totalBlocks > 0) { if ((emDbRootHWMInfos[j].totalBlocks < tmp.totalBlocks) || (tmp.totalBlocks == 0)) { tmp = emDbRootHWMInfos[j]; } } } } // Select the PM to receive the row uint32_t dbroot; if (tmpSet) { dbroot = tmp.dbRoot; boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = (*dbRootPMMap)[dbroot]; //@Bug 4760. validate pm value if (pmNum == 0) { result.result = INSERT_ERROR; ostringstream oss; oss << "Error mapping extent/DBRoot to PM for table " << tableName.table; throw std::runtime_error(oss.str()); } } else { result.result = INSERT_ERROR; ostringstream oss; oss << "There is no extent information for table " << tableName.table; throw std::runtime_error(oss.str()); } // This is for single insert only. Batch insert is handled in dmlprocessor. // cout << "fWEClient = " << fWEClient << endl; fWEClient->addQueue(uniqueId); ByteStream bytestream; bytestream << (uint8_t)WE_SVR_SINGLE_INSERT; bytestream << uniqueId; bytestream << (uint32_t)txnid.id; bytestream << dbroot; cpackage.write(bytestream); boost::shared_ptr bsIn; ByteStream::byte rc1; try { fWEClient->write(bytestream, (uint32_t)pmNum); #ifdef IDB_DML_DEBUG cout << "Single insert sending WE_SVR_SINGLE_INSERT to pm " << pmNum << endl; #endif bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) // read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; } else { *bsIn >> rc1; if (rc1 != 0) { *bsIn >> errorMsg; rc = rc1; } } } catch (runtime_error& ex) // write error { #ifdef IDB_DML_DEBUG cout << "Single insert got exception" << ex.what() << endl; #endif rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { errorMsg = "Caught ... exception during single row insert"; rc = NETWORK_ERROR; #ifdef IDB_DML_DEBUG cout << "Single insert got unknown exception" << endl; #endif } // Log the insert statement. LoggingID logid(DMLLoggingId, fSessionID, txnid.id); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement"); msg.format(args1); Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(cpackage.get_SessionID(), txnid.id, cpackage.get_SQLStatement() + ";", cpackage.get_SchemaName()); } } catch (exception& ex) { if (checkPPLostConnection(ex)) { result.result = PP_LOST_CONNECTION; } else { cerr << "InsertPackageProcessor::processPackage: " << ex.what() << endl; logging::Message::Args args; logging::Message message(1); args.add("Insert Failed: "); args.add(ex.what()); args.add(""); args.add(""); message.format(args); if (result.result != VB_OVERFLOW_ERROR) { result.result = INSERT_ERROR; result.message = message; errorMsg = ex.what(); } } } catch (...) { cerr << "InsertPackageProcessor::processPackage: caught unknown exception!" << endl; logging::Message::Args args; logging::Message message(1); args.add("Insert Failed: "); args.add("encountered unkown exception"); args.add(""); args.add(""); message.format(args); result.result = INSERT_ERROR; result.message = message; } if (rc == 1) { logging::Message::Args args; logging::Message message(1); args.add("Insert Failed: "); args.add(errorMsg); args.add(""); args.add(""); message.format(args); result.result = PP_LOST_CONNECTION; result.message = message; } else if ((rc != 0) && (rc != IDBRANGE_WARNING)) { logging::Message::Args args; logging::Message message(1); args.add("Insert Failed: "); args.add(errorMsg); args.add(""); args.add(""); message.format(args); result.result = INSERT_ERROR; result.message = message; } else if (rc == IDBRANGE_WARNING) { logging::Message::Args args; logging::Message message(1); args.add(errorMsg); args.add(""); args.add(""); message.format(args); result.result = IDBRANGE_WARNING; result.message = message; } fWEClient->removeQueue(uniqueId); VERBOSE_INFO("Finished Processing Insert DML Package"); return result; } } // namespace dmlpackageprocessor