You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-31 18:30:33 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			418 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			418 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
|    Copyright (C) 2016 MariaDB Corporation
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /***********************************************************************
 | |
|  *   $Id: droppartitionprocessor.cpp 6567 2010-04-27 19:45:29Z rdempsey $
 | |
|  *
 | |
|  *
 | |
|  ***********************************************************************/
 | |
| #include "droppartitionprocessor.h"
 | |
| 
 | |
| #include "messagelog.h"
 | |
| #include "sqllogger.h"
 | |
| #include "cacheutils.h"
 | |
| #include "oamcache.h"
 | |
| #include "logicalpartition.h"
 | |
| 
 | |
| using namespace std;
 | |
| using namespace execplan;
 | |
| using namespace logging;
 | |
| using namespace WriteEngine;
 | |
| using namespace oam;
 | |
| 
 | |
| namespace ddlpackageprocessor
 | |
| {
 | |
| DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackageInternal(
 | |
|     ddlpackage::SqlStatement* sqlStmt)
 | |
| {
 | |
|   SUMMARY_INFO("DropPartitionProcessor::processPackage");
 | |
| 
 | |
|   DDLResult result;
 | |
|   result.result = NO_ERROR;
 | |
|   std::string err;
 | |
|   int rc = 0;
 | |
|   rc = fDbrm->isReadWrite();
 | |
|   BRM::TxnID txnID;
 | |
|   txnID.id = fTxnid.id;
 | |
|   txnID.valid = fTxnid.valid;
 | |
| 
 | |
|   if (rc != 0)
 | |
|   {
 | |
|     logging::Message::Args args;
 | |
|     logging::Message message(9);
 | |
|     args.add("Unable to execute the statement due to DBRM is read only");
 | |
|     message.format(args);
 | |
|     result.result = DROP_ERROR;
 | |
|     result.message = message;
 | |
|     fSessionManager.rolledback(txnID);
 | |
|     return result;
 | |
|   }
 | |
| 
 | |
|   auto* dropPartitionStmt = dynamic_cast<ddlpackage::DropPartitionStatement*>(sqlStmt);
 | |
|   if (!dropPartitionStmt)
 | |
|   {
 | |
|     logging::Message::Args args;
 | |
|     logging::Message message(9);
 | |
|     args.add("DropPartitionStatement wrong cast");
 | |
|     message.format(args);
 | |
|     result.result = ALTER_ERROR;
 | |
|     result.message = message;
 | |
|     return result;
 | |
|   }
 | |
| 
 | |
|   VERBOSE_INFO(dropPartitionStmt);
 | |
|   // Commit current transaction.
 | |
|   // all DDL statements cause an implicit commit
 | |
|   VERBOSE_INFO("Getting current txnID");
 | |
| 
 | |
|   std::vector<CalpontSystemCatalog::OID> oidList;
 | |
|   CalpontSystemCatalog::OID tableAuxColOid;
 | |
|   CalpontSystemCatalog::RIDList tableColRidList;
 | |
|   CalpontSystemCatalog::DictOIDList dictOIDList;
 | |
|   execplan::CalpontSystemCatalog::ROPair roPair;
 | |
|   uint32_t processID = 0;
 | |
|   uint64_t uniqueID = 0;
 | |
|   uint32_t sessionID = dropPartitionStmt->fSessionID;
 | |
|   std::string processName("DDLProc");
 | |
|   uint64_t uniqueId = 0;
 | |
| 
 | |
|   // Bug 5070. Added exception handling
 | |
|   try
 | |
|   {
 | |
|     uniqueId = fDbrm->getUnique64();
 | |
|   }
 | |
|   catch (std::exception& ex)
 | |
|   {
 | |
|     logging::Message::Args args;
 | |
|     logging::Message message(9);
 | |
|     args.add(ex.what());
 | |
|     message.format(args);
 | |
|     result.result = ALTER_ERROR;
 | |
|     result.message = message;
 | |
|     fSessionManager.rolledback(txnID);
 | |
|     return result;
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     logging::Message::Args args;
 | |
|     logging::Message message(9);
 | |
|     args.add("Unknown error occurred while getting unique number.");
 | |
|     message.format(args);
 | |
|     result.result = ALTER_ERROR;
 | |
|     result.message = message;
 | |
|     fSessionManager.rolledback(txnID);
 | |
|     return result;
 | |
|   }
 | |
| 
 | |
|   auto stmt = formatStatementString(dropPartitionStmt->fSql, dropPartitionStmt->fTableName->fSchema,
 | |
|                                     dropPartitionStmt->fTableName->fName, dropPartitionStmt->fPartitions);
 | |
|   SQLLogger logger(stmt, fDDLLoggingId, sessionID, txnID.id);
 | |
|   ostringstream debugLog;
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     // check table lock
 | |
|     boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | |
|         CalpontSystemCatalog::makeCalpontSystemCatalog(dropPartitionStmt->fSessionID);
 | |
|     systemCatalogPtr->identity(CalpontSystemCatalog::EC);
 | |
|     systemCatalogPtr->sessionID(dropPartitionStmt->fSessionID);
 | |
|     CalpontSystemCatalog::TableName tableName;
 | |
|     tableName.schema = dropPartitionStmt->fTableName->fSchema;
 | |
|     tableName.table = dropPartitionStmt->fTableName->fName;
 | |
|     roPair = systemCatalogPtr->tableRID(tableName);
 | |
| 
 | |
|     //@Bug 3054 check for system catalog
 | |
|     if (roPair.objnum < 3000)
 | |
|     {
 | |
|       throw std::runtime_error("Drop partition cannot be operated on Calpont system catalog.");
 | |
|     }
 | |
| 
 | |
|     int i = 0;
 | |
|     processID = ::getpid();
 | |
|     oam::OamCache* oamcache = OamCache::makeOamCache();
 | |
|     std::vector<int> pmList = oamcache->getModuleIds();
 | |
|     std::vector<uint32_t> pms;
 | |
| 
 | |
|     for (unsigned i = 0; i < pmList.size(); i++)
 | |
|     {
 | |
|       pms.push_back((uint32_t)pmList[i]);
 | |
|     }
 | |
| 
 | |
|     try
 | |
|     {
 | |
|       uniqueID = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, (int32_t*)&sessionID,
 | |
|                                      (int32_t*)&txnID.id, BRM::LOADING);
 | |
|     }
 | |
|     catch (std::exception&)
 | |
|     {
 | |
|       result.result = DROP_ERROR;
 | |
|       result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
 | |
|       // no need to release lock. dbrm un-hold the lock
 | |
|       fSessionManager.rolledback(txnID);
 | |
|       return result;
 | |
|     }
 | |
| 
 | |
|     if (uniqueID == 0)
 | |
|     {
 | |
|       int waitPeriod = 10;
 | |
|       int sleepTime = 100;  // sleep 100 milliseconds between checks
 | |
|       int numTries = 10;    // try 10 times per second
 | |
|       waitPeriod = Config::getWaitPeriod();
 | |
|       numTries = waitPeriod * 10;
 | |
|       struct timespec rm_ts;
 | |
| 
 | |
|       rm_ts.tv_sec = sleepTime / 1000;
 | |
|       rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
 | |
| 
 | |
|       for (; i < numTries; i++)
 | |
|       {
 | |
|         struct timespec abs_ts;
 | |
| 
 | |
|         do
 | |
|         {
 | |
|           abs_ts.tv_sec = rm_ts.tv_sec;
 | |
|           abs_ts.tv_nsec = rm_ts.tv_nsec;
 | |
|         } while (nanosleep(&abs_ts, &rm_ts) < 0);
 | |
| 
 | |
|         // reset
 | |
|         sessionID = dropPartitionStmt->fSessionID;
 | |
|         txnID.id = fTxnid.id;
 | |
|         txnID.valid = fTxnid.valid;
 | |
|         processID = ::getpid();
 | |
|         processName = "DDLProc";
 | |
| 
 | |
|         try
 | |
|         {
 | |
|           uniqueID = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, (int32_t*)&sessionID,
 | |
|                                          (int32_t*)&txnID.id, BRM::LOADING);
 | |
|         }
 | |
|         catch (std::exception&)
 | |
|         {
 | |
|           result.result = DROP_ERROR;
 | |
|           result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
 | |
|           fSessionManager.rolledback(txnID);
 | |
|           return result;
 | |
|         }
 | |
| 
 | |
|         if (uniqueID > 0)
 | |
|           break;
 | |
|       }
 | |
| 
 | |
|       if (i >= numTries)  // error out
 | |
|       {
 | |
|         result.result = DROP_ERROR;
 | |
|         logging::Message::Args args;
 | |
|         string strOp("drop partition");
 | |
|         args.add(strOp);
 | |
|         args.add(processName);
 | |
|         args.add((uint64_t)processID);
 | |
|         args.add((uint64_t)sessionID);
 | |
|         result.message = Message(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
 | |
|         fSessionManager.rolledback(txnID);
 | |
|         return result;
 | |
|       }
 | |
|     }
 | |
|     debugLog << "DropPartitionProcessor: got table lock (id) " << uniqueID << " for table (OID)"
 | |
|              << roPair.objnum << endl;
 | |
| 
 | |
|     // 1. Get the OIDs for the columns
 | |
|     // 2. Get the OIDs for the dictionaries
 | |
|     // 3. Save the OIDs to a log file
 | |
|     // 4. Disable the extents from extentmap for the partition
 | |
|     // 5. Remove the column and dictionary  files for the partition
 | |
|     // 6. Flush PrimProc Cache
 | |
|     // 7. Remove the extents from extentmap for the partition
 | |
| 
 | |
|     CalpontSystemCatalog::TableName userTableName;
 | |
|     userTableName.schema = dropPartitionStmt->fTableName->fSchema;
 | |
|     userTableName.table = dropPartitionStmt->fTableName->fName;
 | |
| 
 | |
|     tableColRidList = systemCatalogPtr->columnRIDs(userTableName);
 | |
|     tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName);
 | |
|     dictOIDList = systemCatalogPtr->dictOIDs(userTableName);
 | |
| 
 | |
|     debugLog << "DropPartitionProcessor:" << endl;
 | |
|     debugLog << "column RIDS: ";
 | |
|     for (const auto& rid : tableColRidList)
 | |
|       debugLog << rid.objnum << " ";
 | |
|     debugLog << endl;
 | |
|     debugLog << "dict OIDS: ";
 | |
|     for (const auto& dictOid : dictOIDList)
 | |
|       debugLog << dictOid.dictOID << " ";
 | |
|     debugLog << endl;
 | |
| 
 | |
|     // Save qualified tablename, all column, dictionary OIDs, and transaction ID into a file in ASCII format
 | |
|     for (unsigned i = 0; i < tableColRidList.size(); i++)
 | |
|     {
 | |
|       if (tableColRidList[i].objnum > 3000)
 | |
|         oidList.push_back(tableColRidList[i].objnum);
 | |
|     }
 | |
| 
 | |
|     if (tableAuxColOid > 3000)
 | |
|     {
 | |
|       oidList.push_back(tableAuxColOid);
 | |
|     }
 | |
| 
 | |
|     for (unsigned i = 0; i < dictOIDList.size(); i++)
 | |
|     {
 | |
|       if (dictOIDList[i].dictOID > 3000)
 | |
|         oidList.push_back(dictOIDList[i].dictOID);
 | |
|     }
 | |
| 
 | |
|     // Mark the partition disabled from extent map
 | |
|     string emsg;
 | |
|     rc = fDbrm->markPartitionForDeletion(oidList, dropPartitionStmt->fPartitions, emsg);
 | |
| 
 | |
|     if (rc != 0 && rc != BRM::ERR_PARTITION_DISABLED && rc != BRM::ERR_INVALID_OP_LAST_PARTITION &&
 | |
|         rc != BRM::ERR_NOT_EXIST_PARTITION)
 | |
|     {
 | |
|       throw std::runtime_error(emsg);
 | |
|     }
 | |
|     debugLog << "DropPartitionProcessor: marked partitions for deletion:" << endl;
 | |
|     for (const auto& partition : dropPartitionStmt->fPartitions)
 | |
|       debugLog << "dbroot: " << partition.dbroot << " physical parition: " << partition.pp
 | |
|                << " segment: " << partition.seg << endl;
 | |
|     VERBOSE_INFO(debugLog.str());
 | |
|     debugLog.clear();
 | |
| 
 | |
|     set<BRM::LogicalPartition> markedPartitions;
 | |
|     set<BRM::LogicalPartition> outOfServicePartitions;
 | |
| 
 | |
|     // only log partitions that are successfully marked disabled.
 | |
|     rc = fDbrm->getOutOfServicePartitions(oidList[0], outOfServicePartitions);
 | |
| 
 | |
|     if (rc != 0)
 | |
|     {
 | |
|       string errorMsg;
 | |
|       BRM::errString(rc, errorMsg);
 | |
|       ostringstream oss;
 | |
|       oss << "getOutOfServicePartitions failed  due to " << errorMsg;
 | |
|       throw std::runtime_error(oss.str());
 | |
|     }
 | |
| 
 | |
|     set<BRM::LogicalPartition>::iterator it;
 | |
| 
 | |
|     for (it = dropPartitionStmt->fPartitions.begin(); it != dropPartitionStmt->fPartitions.end(); ++it)
 | |
|     {
 | |
|       if (outOfServicePartitions.find(*it) != outOfServicePartitions.end())
 | |
|         markedPartitions.insert(*it);
 | |
|     }
 | |
| 
 | |
|     // Save the oids to a file
 | |
|     createWritePartitionLogFile(roPair.objnum, markedPartitions, oidList, uniqueId);
 | |
| 
 | |
|     VERBOSE_INFO("Removing parition files");
 | |
|     removePartitionFiles(oidList, markedPartitions, uniqueId);
 | |
|     // Flush PrimProc cache for those lbids
 | |
|     VERBOSE_INFO("Flushing paritions");
 | |
|     rc = cacheutils::flushPartition(oidList, markedPartitions);
 | |
| 
 | |
|     // Remove the partition from extent map
 | |
|     emsg.clear();
 | |
|     rc = fDbrm->deletePartition(oidList, dropPartitionStmt->fPartitions, emsg);
 | |
|     VERBOSE_INFO("DropPartitionProcessor: partitions removed");
 | |
| 
 | |
|     if (rc != 0)
 | |
|       throw std::runtime_error(emsg);
 | |
|   }
 | |
|   catch (exception& ex)
 | |
|   {
 | |
|     cerr << "DropPartitionProcessor::processPackage: " << ex.what() << endl;
 | |
| 
 | |
|     logging::Message::Args args;
 | |
|     logging::Message message(ex.what());
 | |
| 
 | |
|     if (rc == BRM::ERR_TABLE_NOT_LOCKED)
 | |
|       result.result = USER_ERROR;
 | |
|     else if (rc == BRM::ERR_NOT_EXIST_PARTITION || rc == BRM::ERR_INVALID_OP_LAST_PARTITION)
 | |
|       result.result = PARTITION_WARNING;
 | |
|     else if (rc == BRM::ERR_NO_PARTITION_PERFORMED)
 | |
|       result.result = WARN_NO_PARTITION;
 | |
|     else
 | |
|       result.result = DROP_ERROR;
 | |
| 
 | |
|     result.message = message;
 | |
| 
 | |
|     try
 | |
|     {
 | |
|       fDbrm->releaseTableLock(uniqueID);
 | |
|     }
 | |
|     catch (std::exception&)
 | |
|     {
 | |
|       result.result = DROP_ERROR;
 | |
|       result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
 | |
|     }
 | |
| 
 | |
|     fSessionManager.rolledback(txnID);
 | |
|     return result;
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     cerr << "DropPartitionProcessor::processPackage: caught unknown exception!" << endl;
 | |
| 
 | |
|     logging::Message::Args args;
 | |
|     logging::Message message(1);
 | |
|     args.add("Drop partition failed: ");
 | |
|     args.add("encountered unkown exception");
 | |
|     args.add("");
 | |
|     args.add("");
 | |
|     message.format(args);
 | |
| 
 | |
|     result.result = DROP_ERROR;
 | |
|     result.message = message;
 | |
| 
 | |
|     try
 | |
|     {
 | |
|       fDbrm->releaseTableLock(uniqueID);
 | |
|     }
 | |
|     catch (std::exception&)
 | |
|     {
 | |
|       result.result = DROP_ERROR;
 | |
|       result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
 | |
|     }
 | |
| 
 | |
|     fSessionManager.rolledback(txnID);
 | |
|     return result;
 | |
|   }
 | |
| 
 | |
|   // Log the DDL statement
 | |
|   logging::logDDL(dropPartitionStmt->fSessionID, txnID.id, dropPartitionStmt->fSql,
 | |
|                   dropPartitionStmt->fOwner);
 | |
| 
 | |
|   // Remove the log file
 | |
|   // release the transaction
 | |
|   try
 | |
|   {
 | |
|     fDbrm->releaseTableLock(uniqueID);
 | |
|     deleteLogFile(DROPPART_LOG, roPair.objnum, uniqueId);
 | |
|   }
 | |
|   catch (std::exception&)
 | |
|   {
 | |
|     result.result = DROP_ERROR;
 | |
|     result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
 | |
|     fSessionManager.rolledback(txnID);
 | |
|     return result;
 | |
|   }
 | |
| 
 | |
|   fSessionManager.committed(txnID);
 | |
|   VERBOSE_INFO("DropPartitionProcessor:: commited");
 | |
|   return result;
 | |
| }
 | |
| 
 | |
| }  // namespace ddlpackageprocessor
 |