/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: droppartitionprocessor.cpp 6567 2010-04-27 19:45:29Z rdempsey $ * * ***********************************************************************/ #include "droppartitionprocessor.h" #include "messagelog.h" #include "sqllogger.h" #include "cacheutils.h" #include "oamcache.h" #include "logicalpartition.h" using namespace std; using namespace execplan; using namespace logging; using namespace WriteEngine; using namespace oam; namespace ddlpackageprocessor { DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackageInternal( ddlpackage::SqlStatement* sqlStmt) { SUMMARY_INFO("DropPartitionProcessor::processPackage"); DDLResult result; result.result = NO_ERROR; std::string err; int rc = 0; rc = fDbrm->isReadWrite(); BRM::TxnID txnID; txnID.id = fTxnid.id; txnID.valid = fTxnid.valid; if (rc != 0) { logging::Message::Args args; logging::Message message(9); args.add("Unable to execute the statement due to DBRM is read only"); message.format(args); result.result = DROP_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } auto* dropPartitionStmt = dynamic_cast(sqlStmt); if (!dropPartitionStmt) { logging::Message::Args args; logging::Message message(9); args.add("DropPartitionStatement wrong cast"); message.format(args); result.result = ALTER_ERROR; result.message = message; return result; } VERBOSE_INFO(dropPartitionStmt); // Commit current transaction. // all DDL statements cause an implicit commit VERBOSE_INFO("Getting current txnID"); std::vector oidList; CalpontSystemCatalog::OID tableAuxColOid; CalpontSystemCatalog::RIDList tableColRidList; CalpontSystemCatalog::DictOIDList dictOIDList; execplan::CalpontSystemCatalog::ROPair roPair; uint32_t processID = 0; uint64_t uniqueID = 0; uint32_t sessionID = dropPartitionStmt->fSessionID; std::string processName("DDLProc"); uint64_t uniqueId = 0; // Bug 5070. Added exception handling try { uniqueId = fDbrm->getUnique64(); } catch (std::exception& ex) { logging::Message::Args args; logging::Message message(9); args.add(ex.what()); message.format(args); result.result = ALTER_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } catch (...) { logging::Message::Args args; logging::Message message(9); args.add("Unknown error occurred while getting unique number."); message.format(args); result.result = ALTER_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } auto stmt = formatStatementString(dropPartitionStmt->fSql, dropPartitionStmt->fTableName->fSchema, dropPartitionStmt->fTableName->fName, dropPartitionStmt->fPartitions); SQLLogger logger(stmt, fDDLLoggingId, sessionID, txnID.id); ostringstream debugLog; try { // check table lock boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(dropPartitionStmt->fSessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); systemCatalogPtr->sessionID(dropPartitionStmt->fSessionID); CalpontSystemCatalog::TableName tableName; tableName.schema = dropPartitionStmt->fTableName->fSchema; tableName.table = dropPartitionStmt->fTableName->fName; roPair = systemCatalogPtr->tableRID(tableName); //@Bug 3054 check for system catalog if (roPair.objnum < 3000) { throw std::runtime_error("Drop partition cannot be operated on Calpont system catalog."); } int i = 0; processID = ::getpid(); oam::OamCache* oamcache = OamCache::makeOamCache(); std::vector pmList = oamcache->getModuleIds(); std::vector pms; for (unsigned i = 0; i < pmList.size(); i++) { pms.push_back((uint32_t)pmList[i]); } try { uniqueID = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, (int32_t*)&sessionID, (int32_t*)&txnID.id, BRM::LOADING); } catch (std::exception&) { result.result = DROP_ERROR; result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE); // no need to release lock. dbrm un-hold the lock fSessionManager.rolledback(txnID); return result; } if (uniqueID == 0) { int waitPeriod = 10; int sleepTime = 100; // sleep 100 milliseconds between checks int numTries = 10; // try 10 times per second waitPeriod = Config::getWaitPeriod(); numTries = waitPeriod * 10; struct timespec rm_ts; rm_ts.tv_sec = sleepTime / 1000; rm_ts.tv_nsec = sleepTime % 1000 * 1000000; for (; i < numTries; i++) { struct timespec abs_ts; do { abs_ts.tv_sec = rm_ts.tv_sec; abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); // reset sessionID = dropPartitionStmt->fSessionID; txnID.id = fTxnid.id; txnID.valid = fTxnid.valid; processID = ::getpid(); processName = "DDLProc"; try { uniqueID = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, (int32_t*)&sessionID, (int32_t*)&txnID.id, BRM::LOADING); } catch (std::exception&) { result.result = DROP_ERROR; result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE); fSessionManager.rolledback(txnID); return result; } if (uniqueID > 0) break; } if (i >= numTries) // error out { result.result = DROP_ERROR; logging::Message::Args args; string strOp("drop partition"); args.add(strOp); args.add(processName); args.add((uint64_t)processID); args.add((uint64_t)sessionID); result.message = Message(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args)); fSessionManager.rolledback(txnID); return result; } } debugLog << "DropPartitionProcessor: got table lock (id) " << uniqueID << " for table (OID)" << roPair.objnum << endl; // 1. Get the OIDs for the columns // 2. Get the OIDs for the dictionaries // 3. Save the OIDs to a log file // 4. Disable the extents from extentmap for the partition // 5. Remove the column and dictionary files for the partition // 6. Flush PrimProc Cache // 7. Remove the extents from extentmap for the partition CalpontSystemCatalog::TableName userTableName; userTableName.schema = dropPartitionStmt->fTableName->fSchema; userTableName.table = dropPartitionStmt->fTableName->fName; tableColRidList = systemCatalogPtr->columnRIDs(userTableName); tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName); dictOIDList = systemCatalogPtr->dictOIDs(userTableName); debugLog << "DropPartitionProcessor:" << endl; debugLog << "column RIDS: "; for (const auto& rid : tableColRidList) debugLog << rid.objnum << " "; debugLog << endl; debugLog << "dict OIDS: "; for (const auto& dictOid : dictOIDList) debugLog << dictOid.dictOID << " "; debugLog << endl; // Save qualified tablename, all column, dictionary OIDs, and transaction ID into a file in ASCII format for (unsigned i = 0; i < tableColRidList.size(); i++) { if (tableColRidList[i].objnum > 3000) oidList.push_back(tableColRidList[i].objnum); } if (tableAuxColOid > 3000) { oidList.push_back(tableAuxColOid); } for (unsigned i = 0; i < dictOIDList.size(); i++) { if (dictOIDList[i].dictOID > 3000) oidList.push_back(dictOIDList[i].dictOID); } // Mark the partition disabled from extent map string emsg; rc = fDbrm->markPartitionForDeletion(oidList, dropPartitionStmt->fPartitions, emsg); if (rc != 0 && rc != BRM::ERR_PARTITION_DISABLED && rc != BRM::ERR_INVALID_OP_LAST_PARTITION && rc != BRM::ERR_NOT_EXIST_PARTITION) { throw std::runtime_error(emsg); } debugLog << "DropPartitionProcessor: marked partitions for deletion:" << endl; for (const auto& partition : dropPartitionStmt->fPartitions) debugLog << "dbroot: " << partition.dbroot << " physical parition: " << partition.pp << " segment: " << partition.seg << endl; VERBOSE_INFO(debugLog.str()); debugLog.clear(); set markedPartitions; set outOfServicePartitions; // only log partitions that are successfully marked disabled. rc = fDbrm->getOutOfServicePartitions(oidList[0], outOfServicePartitions); if (rc != 0) { string errorMsg; BRM::errString(rc, errorMsg); ostringstream oss; oss << "getOutOfServicePartitions failed due to " << errorMsg; throw std::runtime_error(oss.str()); } set::iterator it; for (it = dropPartitionStmt->fPartitions.begin(); it != dropPartitionStmt->fPartitions.end(); ++it) { if (outOfServicePartitions.find(*it) != outOfServicePartitions.end()) markedPartitions.insert(*it); } // Save the oids to a file createWritePartitionLogFile(roPair.objnum, markedPartitions, oidList, uniqueId); VERBOSE_INFO("Removing parition files"); removePartitionFiles(oidList, markedPartitions, uniqueId); // Flush PrimProc cache for those lbids VERBOSE_INFO("Flushing paritions"); rc = cacheutils::flushPartition(oidList, markedPartitions); // Remove the partition from extent map emsg.clear(); rc = fDbrm->deletePartition(oidList, dropPartitionStmt->fPartitions, emsg); VERBOSE_INFO("DropPartitionProcessor: partitions removed"); if (rc != 0) throw std::runtime_error(emsg); } catch (exception& ex) { cerr << "DropPartitionProcessor::processPackage: " << ex.what() << endl; logging::Message::Args args; logging::Message message(ex.what()); if (rc == BRM::ERR_TABLE_NOT_LOCKED) result.result = USER_ERROR; else if (rc == BRM::ERR_NOT_EXIST_PARTITION || rc == BRM::ERR_INVALID_OP_LAST_PARTITION) result.result = PARTITION_WARNING; else if (rc == BRM::ERR_NO_PARTITION_PERFORMED) result.result = WARN_NO_PARTITION; else result.result = DROP_ERROR; result.message = message; try { fDbrm->releaseTableLock(uniqueID); } catch (std::exception&) { result.result = DROP_ERROR; result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE); } fSessionManager.rolledback(txnID); return result; } catch (...) { cerr << "DropPartitionProcessor::processPackage: caught unknown exception!" << endl; logging::Message::Args args; logging::Message message(1); args.add("Drop partition failed: "); args.add("encountered unkown exception"); args.add(""); args.add(""); message.format(args); result.result = DROP_ERROR; result.message = message; try { fDbrm->releaseTableLock(uniqueID); } catch (std::exception&) { result.result = DROP_ERROR; result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE); } fSessionManager.rolledback(txnID); return result; } // Log the DDL statement logging::logDDL(dropPartitionStmt->fSessionID, txnID.id, dropPartitionStmt->fSql, dropPartitionStmt->fOwner); // Remove the log file // release the transaction try { fDbrm->releaseTableLock(uniqueID); deleteLogFile(DROPPART_LOG, roPair.objnum, uniqueId); } catch (std::exception&) { result.result = DROP_ERROR; result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE); fSessionManager.rolledback(txnID); return result; } fSessionManager.committed(txnID); VERBOSE_INFO("DropPartitionProcessor:: commited"); return result; } } // namespace ddlpackageprocessor