1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-06-12 05:01:56 +03:00
Files
mariadb-columnstore-engine/tools/cleartablelock/cleartablelock.cpp
2025-05-15 19:05:38 +04:00

715 lines
22 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*
* $Id: cleartablelock.cpp 2336 2013-06-25 19:11:36Z rdempsey $
*/
#include <cstdlib>
#include <iostream>
#include <stdexcept>
#include <sstream>
#include <string>
#include <vector>
#include <set>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include "dbrm.h"
#include "calpontsystemcatalog.h"
#include "liboamcpp.h"
#include "writeengine.h"
#include "we_messages.h"
#include "messageobj.h"
#include "messagelog.h"
#include "messageids.h"
#include "cleartablelockthread.h"
namespace
{
//------------------------------------------------------------------------------
// Print command line usage
//------------------------------------------------------------------------------
void usage()
{
std::cout << "Usage: cleartablelock lockID" << std::endl << " -h to display this menu" << std::endl;
}
//------------------------------------------------------------------------------
// Log initial syslog message
// tableName -
// tableLockID -
//------------------------------------------------------------------------------
void logInitStatus(const std::string& tableName, uint64_t tableLockID)
{
const std::string APPLNAME("cleartablelock command");
const int SUBSYSTEM_ID = 19; // writeengine?
const int INIT_MSG_NUM = logging::M0088;
logging::Message::Args msgArgs;
logging::Message logMsg(INIT_MSG_NUM);
msgArgs.add(APPLNAME);
msgArgs.add(tableName);
msgArgs.add(tableLockID);
logMsg.format(msgArgs);
logging::LoggingID lid(SUBSYSTEM_ID);
logging::MessageLog ml(lid);
ml.logInfoMessage(logMsg);
}
//------------------------------------------------------------------------------
// Log final status syslog message
// tableName - Name of table of interest
// tableLockID - Table lock ID associated with tableName
// statusMsg - Error message to be logged; if empty then success is assumed.
//------------------------------------------------------------------------------
void logFinalStatus(const std::string& tableName, uint64_t tableLockID, const std::string& statusMsg)
{
const std::string APPLNAME("cleartablelock command");
const int SUBSYSTEM_ID = 19; // writeengine?
const int COMP_MSG_NUM = logging::M0089;
logging::Message::Args msgArgs;
logging::Message logMsg(COMP_MSG_NUM);
msgArgs.add(APPLNAME);
msgArgs.add(tableName);
msgArgs.add(tableLockID);
std::string finalStatus;
if (statusMsg.empty())
{
finalStatus = "Completed successfully";
}
else
{
finalStatus = "Encountered errors: ";
finalStatus += statusMsg;
}
msgArgs.add(finalStatus);
logMsg.format(msgArgs);
logging::LoggingID lid(SUBSYSTEM_ID);
logging::MessageLog ml(lid);
ml.logInfoMessage(logMsg);
}
//------------------------------------------------------------------------------
// Construct list of all PMs.
// pmList - (output) List of "all" PMs where msgs are to be sent.
//------------------------------------------------------------------------------
int constructPMList(std::vector<uint32_t>& pmList)
{
try
{
// Get OAM information
oam::Oam oam;
oam::ModuleTypeConfig modTypeConfig;
oam.getSystemConfig("pm", modTypeConfig);
unsigned int pmCount = modTypeConfig.ModuleCount;
for (unsigned i = 0; i < pmCount; i++)
{
std::string devName = modTypeConfig.ModuleNetworkList[i].DeviceName;
int moduleID = ::atoi(devName.substr(oam::MAX_MODULE_TYPE_SIZE, oam::MAX_MODULE_ID_SIZE).c_str());
pmList.push_back(moduleID);
}
}
catch (std::exception& ex)
{
std::cout << "Error getting PM systemConfig. " << ex.what() << std::endl;
return 4;
}
catch (...)
{
std::cout << "Error getting PM systemConfig. Unknown exception." << std::endl;
return 4;
}
return 0;
}
//------------------------------------------------------------------------------
// Construct partial PM list based on specified list of DBRoots.
// dbRootList - List of applicable DBRoots with PMs to be notified.
// pmList - (output) List of PMs to receive msgs, based on dbRootList.
//------------------------------------------------------------------------------
int constructPMList(const std::vector<uint32_t>& dbRootList, std::vector<uint32_t>& pmList)
{
uint32_t dbRoot = 0;
try
{
// Get OAM information
oam::Oam oam;
std::set<uint32_t> pmSet; // used to filter out duplicates
int pm;
for (unsigned j = 0; j < dbRootList.size(); j++)
{
dbRoot = dbRootList[j];
oam.getDbrootPmConfig(dbRootList[j], pm);
pmSet.insert(pm);
}
// Store unique set of PM IDs into output vector
for (std::set<uint32_t>::const_iterator iter = pmSet.begin(); iter != pmSet.end(); ++iter)
{
pmList.push_back(*iter);
}
}
catch (std::exception& ex)
{
std::cout << "Error mapping DBRoot " << dbRoot << " to a PM. " << ex.what()
<< ". Verify that this DBRoot is in Columnstore.xml." << std::endl;
return 5;
}
catch (...)
{
std::cout << "Error mapping DBRoot " << dbRoot << " to a PM. "
<< "Unknown exception. Verify that this DBRoot is in Columnstore.xml" << std::endl;
return 5;
}
return 0;
}
//------------------------------------------------------------------------------
// Create MessageQueueClient objects
// pmList - (input) List of PMs where msgs are to be sent.
// msgQueClts - (output) Vector of created MessageQueueClients.
// errMsg - (output) Any applicable error message if nonzero return
// code is returned.
//------------------------------------------------------------------------------
int createMsgQueClts(const std::vector<uint32_t>& pmList,
std::vector<boost::shared_ptr<messageqcpp::MessageQueueClient> >& msgQueClts,
std::string& errMsg)
{
errMsg.clear();
std::string srvName;
try
{
// Send bulk rollback request to each specified PM.
for (unsigned k = 0; k < pmList.size(); k++)
{
std::ostringstream weServerName;
weServerName << "pm" << pmList[k] << "_WriteEngineServer";
srvName = weServerName.str();
// std::cout << "Connecting to " << srvName << std::endl;
messageqcpp::MessageQueueClient* cl = new messageqcpp::MessageQueueClient(srvName);
msgQueClts.push_back(boost::shared_ptr<messageqcpp::MessageQueueClient>(cl));
}
}
catch (std::exception& ex)
{
std::ostringstream ossStatus;
ossStatus << "Error creating connection to " << srvName << ": " << ex.what();
errMsg = ossStatus.str();
return 3;
}
catch (...)
{
std::ostringstream ossStatus;
ossStatus << "Error creating connection to " << srvName;
errMsg = ossStatus.str();
return 3;
}
return 0;
}
//------------------------------------------------------------------------------
// Send Request to each PM and Wait for corresponding Responses
// msgQueClts - MessageQueueClients with connections to be used in sending
// messages.
// pmList - PM list corresponding to msgQueClts.
// brm - Handle to use in sending msgs to BRM.
// tInfo - Table lock of interest.
// tblName - Table name associated with tInfo.
// msgType - Message type to be sent to each MessageQueueClient.
// errMsg - (output) Any applicable error message if nonzero return
// code is returned.
//------------------------------------------------------------------------------
int sendRequest(const std::vector<boost::shared_ptr<messageqcpp::MessageQueueClient> >& msgQueClts,
const std::vector<uint32_t>& pmList, BRM::DBRM* /*brm*/, const BRM::TableLockInfo& tInfo,
const std::string& tblName, ClearTableLockThread::CLRTBLLOCK_MSGTYPE msgType,
std::string& errMsg)
{
unsigned int pmCount = msgQueClts.size();
boost::thread_group tg;
std::vector<ClearTableLockStatus*> statusVec;
// Send request to each PM.
for (unsigned i = 0; i < pmCount; i++)
{
int moduleID = pmList[i];
ClearTableLockStatus* pStatus = new ClearTableLockStatus(moduleID);
statusVec.push_back(pStatus);
tg.create_thread(ClearTableLockThread(msgQueClts[i].get(), tInfo, tblName, msgType, pStatus));
}
// Wait for all the requests to complete
tg.join_all();
// Determine the completion status of each request
int rc = 0;
std::ostringstream ossStatus;
for (unsigned i = 0; i < statusVec.size(); i++)
{
int retStatus = statusVec[i]->retStatus();
if (retStatus != 0)
{
rc = retStatus;
if (!statusVec[i]->retMsg().empty())
{
ossStatus << "From PM" << statusVec[i]->moduleID() << ": " << statusVec[i]->retMsg() << ". "
<< std::endl;
}
}
delete statusVec[i];
}
if (!(ossStatus.str().empty()))
errMsg = ossStatus.str();
return rc;
}
//------------------------------------------------------------------------------
// We send the bulk rollback request to all the PMs and not just the ones
// referenced in the table lock, because there is always the possiblity that
// 1 or more DBRoots have been moved since the table lock was instantiated;
// in which case a portion of the table could reside on a PM other than the
// one(s) first listed in the table lock.
// msgQueClts - MessageQueueClients with connections to be used in sending
// rollback message.
// pmList - PM list corresponding to msgQueClts.
// brm - Handle to use in sending msgs to BRM.
// tInfo - Table lock of interest.
// tblName - Table name associated with tInfo.
// rollbackOnly- true -> rollback only; false -> rollback and release lock
// errMsg - (output) Any applicable error message if nonzero return
// code is returned.
//------------------------------------------------------------------------------
int execBulkRollbackReq(const std::vector<boost::shared_ptr<messageqcpp::MessageQueueClient> >& msgQueClts,
const std::vector<uint32_t>& pmList, BRM::DBRM* brm, const BRM::TableLockInfo& tInfo,
const std::string& tblName, bool rollbackOnly, std::string& errMsg)
{
errMsg.clear();
try
{
if (!rollbackOnly)
{
// Take over ownership of stale lock.
std::string processName("clearTableLock");
uint32_t processID = ::getpid();
int32_t sessionID = -1;
int32_t transID = -1;
bool ownerChanged = brm->changeOwner(tInfo.id, processName, processID, sessionID, transID);
if (!ownerChanged)
{
errMsg = "Unable to grab lock; Lock not found or still in use.";
return 2;
}
}
}
catch (std::exception& ex)
{
std::ostringstream oss;
oss << "Error changing table lock owner; " << ex.what();
errMsg = oss.str();
return 2;
}
catch (...)
{
std::ostringstream oss;
oss << "Unknown error changing table lock owner;";
errMsg = oss.str();
return 2;
}
// Perform bulk rollback if state is in LOADING state
int rc = 0;
if (tInfo.state == BRM::LOADING)
{
rc = sendRequest(msgQueClts, pmList, brm, tInfo, tblName,
ClearTableLockThread::CLRTBLLOCK_MSGTYPE_ROLLBACK, errMsg);
}
return rc;
}
//------------------------------------------------------------------------------
// We send the file cleanup request to all the PMs and not just the ones
// referenced in the table lock, because there is always the possiblity that
// 1 or more DBRoots have been moved since the table lock was instantiated;
// in which case a portion of the table could reside on a PM other than the
// one(s) first listed in the table lock.
// msgQueClts - MessageQueueClients with connections to be used in sending
// cleanup message.
// pmList - PM list corresponding to msgQueClts.
// brm - Handle to use in sending msgs to BRM.
// tInfo - Table lock of interest.
// tblName - Table name associated with tInfo.
// errMsg - (output) Any applicable error message if nonzero return
// code is returned.
//------------------------------------------------------------------------------
int execFileCleanupReq(const std::vector<boost::shared_ptr<messageqcpp::MessageQueueClient> >& msgQueClts,
const std::vector<uint32_t>& pmList, BRM::DBRM* brm, const BRM::TableLockInfo& tInfo,
const std::string& tblName, std::string& errMsg)
{
errMsg.clear();
int rc = sendRequest(msgQueClts, pmList, brm, tInfo, tblName,
ClearTableLockThread::CLRTBLLOCK_MSGTYPE_CLEANUP, errMsg);
return rc;
}
} // namespace
//------------------------------------------------------------------------------
// Main entry point to this program
//------------------------------------------------------------------------------
int main(int argc, char** argv)
{
int c;
bool clearLockOnly = false;
bool rollbackOnly = false;
uint32_t tableOID = 0;
while ((c = getopt(argc, argv, "hlr:")) != EOF)
{
switch (c)
{
case 'l':
clearLockOnly = true;
break;
// Only allow '-r' option for development/debugging
#if 1
case 'r': // hidden option to rollback specified table w/o a lock
rollbackOnly = true;
tableOID = ::strtoull(optarg, 0, 10);
break;
#endif
case 'h':
case '?':
default:
usage();
return (c == 'h' ? 0 : 1);
break;
}
}
if ((argc - optind) != 1)
{
usage();
return 1;
}
// Get the table lock ID specified by the user
uint64_t lockID = ::strtoull(argv[optind], 0, 10);
// If user specified both clearlock and rollback then we need to do both
if (clearLockOnly && rollbackOnly)
{
clearLockOnly = false;
rollbackOnly = false;
}
//--------------------------------------------------------------------------
// Verify that BRM is up and in a read/write state
//--------------------------------------------------------------------------
BRM::DBRM brm;
int brmRc = brm.isReadWrite();
if (brmRc != BRM::ERR_OK)
{
std::string brmErrMsg;
BRM::errString(brmRc, brmErrMsg);
std::cout << "BRM error: " << brmErrMsg << std::endl;
std::cout << "Table lock " << lockID << " is not cleared." << std::endl;
return 1;
}
if (brm.getSystemReady() < 1)
{
std::cout << "System is not ready. Verify that ColumnStore "
"is up and ready before running this program"
<< std::endl;
return 1;
}
//--------------------------------------------------------------------------
//@Bug 3711 Check whether the table is locked; does the table lock exist.
//--------------------------------------------------------------------------
execplan::CalpontSystemCatalog::TableName tblName;
BRM::TableLockInfo tInfo;
std::string task;
std::vector<uint32_t> pmList;
int rc;
const std::string taskSysCat("getting system catalog information");
try
{
if (rollbackOnly)
{
tInfo.id = lockID;
tInfo.tableOID = tableOID;
tInfo.ownerPID = 0;
tInfo.ownerSessionID = 1;
tInfo.state = BRM::LOADING;
// Construct PM list using all PMs, since we don't have a table
// lock with the list of applicable DBRoots.
task = "constructing total PM list";
rc = constructPMList(pmList);
if (rc != 0)
{
return 1;
}
}
else
{
task = "getting table lock";
bool lockExists = brm.getTableLockInfo(lockID, &tInfo);
if (!lockExists)
{
std::cout << "Table lock " << lockID << " does not exist." << std::endl;
return 1;
}
// Construct PM list based on list of affected DBRoots
task = "mapping DBRoots to PMs";
rc = constructPMList(tInfo.dbrootList, pmList);
if (rc != 0)
{
return 1;
}
}
uint32_t sessionID = 1;
task = taskSysCat;
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(execplan::CalpontSystemCatalog::EC);
tblName = systemCatalogPtr->tableName(tInfo.tableOID);
}
catch (std::exception& ex)
{
std::cout << "Error " << task << ". " << ex.what() << std::endl;
if (clearLockOnly && (task == taskSysCat))
{
tblName.schema = "[unknown name]";
tblName.table.clear();
std::cout << "Will still try to clear table lock." << std::endl;
}
else
{
std::cout << "Table lock " << lockID << " is not cleared." << std::endl;
return 1;
}
}
catch (...)
{
std::cout << "Error " << task << ". " << std::endl;
if (clearLockOnly && (task == taskSysCat))
{
tblName.schema = "[unknown name]";
tblName.table.clear();
std::cout << "Will still try to clear table lock." << std::endl;
}
else
{
std::cout << "Table lock " << lockID << " is not cleared." << std::endl;
return 1;
}
}
logInitStatus(tblName.toString(), lockID);
if (rollbackOnly)
{
std::cout << "Rolling back table " << tblName.toString() << std::endl << std::endl;
}
else if (clearLockOnly)
{
std::cout << "Clearing table lock " << lockID << " for table " << tblName.toString() << std::endl
<< std::endl;
}
else
{
std::cout << "Rolling back and clearing table lock for table " << tblName.toString() << "; table lock "
<< lockID << std::endl
<< std::endl;
}
//--------------------------------------------------------------------------
// Perform bulk rollback
//--------------------------------------------------------------------------
std::string errMsg;
if (!clearLockOnly)
{
std::vector<boost::shared_ptr<messageqcpp::MessageQueueClient> > msgQueClts;
rc = createMsgQueClts(pmList, msgQueClts, errMsg);
if (rc != 0)
{
logFinalStatus(tblName.toString(), lockID, errMsg);
std::cout << errMsg << std::endl;
std::cout << "Table lock " << lockID << " for table " << tblName.toString() << " is not cleared."
<< std::endl;
return rc;
}
rc = execBulkRollbackReq(msgQueClts, pmList, &brm, tInfo, tblName.toString(), rollbackOnly, errMsg);
BRM::TxnID txn;
txn.id = tInfo.ownerTxnID;
txn.valid = true;
brm.rolledback(txn);
if (rc != 0)
{
logFinalStatus(tblName.toString(), lockID, errMsg);
std::cout << "Rollback error: " << errMsg << std::endl;
std::cout << "Table lock " << lockID << " for table " << tblName.toString() << " is not cleared."
<< std::endl;
return rc;
}
//----------------------------------------------------------------------
// Change the state of the table lock to cleanup state.
// We ignore the return stateChange flag.
//----------------------------------------------------------------------
if (!rollbackOnly)
{
try
{
brm.changeState(tInfo.id, BRM::CLEANUP);
}
catch (std::exception& ex)
{
std::ostringstream oss;
oss << "Error changing state. " << ex.what();
logFinalStatus(tblName.toString(), lockID, oss.str());
std::cout << oss.str() << std::endl;
std::cout << "Table lock " << lockID << " is not cleared." << std::endl;
return 1;
}
catch (...)
{
std::ostringstream oss;
oss << "Error changing state. " << std::endl;
logFinalStatus(tblName.toString(), lockID, oss.str());
std::cout << oss.str() << std::endl;
std::cout << "Table lock " << lockID << " is not cleared." << std::endl;
return 1;
}
}
//----------------------------------------------------------------------
// Delete the meta data files
//----------------------------------------------------------------------
rc = execFileCleanupReq(msgQueClts, pmList, &brm, tInfo, tblName.toString(), errMsg);
if (rc != 0)
{
//@Bug 4517. Release tablelock if remove meta files failed
const std::string APPLNAME("cleartablelock command");
const int SUBSYSTEM_ID = 19; // writeengine?
const int COMP_MSG_NUM = logging::M0089;
logging::Message::Args msgArgs;
logging::Message logMsg(COMP_MSG_NUM);
msgArgs.add(APPLNAME);
msgArgs.add(tblName.toString());
msgArgs.add(lockID);
msgArgs.add(errMsg);
logMsg.format(msgArgs);
logging::LoggingID lid(SUBSYSTEM_ID);
logging::MessageLog ml(lid);
ml.logWarningMessage(logMsg);
std::cout << "File cleanup error: " << errMsg << std::endl;
}
} // end of: if (!clearLockOnly)
//--------------------------------------------------------------------------
// Finally, release the actual table lock
//--------------------------------------------------------------------------
std::cout << std::endl;
if (rollbackOnly)
{
logFinalStatus(tblName.toString(), lockID, std::string());
std::cout << "Bulk Rollback Only for table " << tblName.toString() << " completed successfully."
<< std::endl;
}
else
{
try
{
brm.releaseTableLock(lockID); // ignore boolean return value
}
catch (std::exception& ex)
{
logFinalStatus(tblName.toString(), lockID, ex.what());
std::cout << "Error releasing table lock " << lockID << " for table " << tblName.toString()
<< std::endl;
std::cout << ex.what() << std::endl;
return 1;
}
catch (...)
{
std::string unknownErr("Unknown exception");
logFinalStatus(tblName.toString(), lockID, unknownErr);
std::cout << "Error releasing table lock " << lockID << " for table " << tblName.toString()
<< std::endl;
std::cout << unknownErr << std::endl;
return 1;
}
logFinalStatus(tblName.toString(), lockID, std::string());
std::cout << "Table lock " << lockID << " for table " << tblName.toString() << " is cleared."
<< std::endl;
}
return 0;
}