mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
1162 lines
38 KiB
C++
1162 lines
38 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
* Copyright (C) 2016 MariaDB Corporation.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: commandpackageprocessor.cpp 9613 2013-06-12 15:44:24Z dhall $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
#include <ctime>
|
|
#include <iostream>
|
|
#include <sstream>
|
|
#include <set>
|
|
#include <vector>
|
|
#include <boost/scoped_ptr.hpp>
|
|
|
|
#include "commandpackageprocessor.h"
|
|
#include "messagelog.h"
|
|
#include "dbrm.h"
|
|
#include "sqllogger.h"
|
|
#include "tablelockdata.h"
|
|
#include "we_messages.h"
|
|
#include "we_ddlcommandclient.h"
|
|
#include "oamcache.h"
|
|
#include "liboamcpp.h"
|
|
#include "resourcemanager.h"
|
|
|
|
using namespace std;
|
|
using namespace WriteEngine;
|
|
using namespace dmlpackage;
|
|
using namespace execplan;
|
|
using namespace logging;
|
|
using namespace boost;
|
|
using namespace BRM;
|
|
|
|
namespace dmlpackageprocessor
|
|
{
|
|
// Tracks active cleartablelock commands by storing set of table lock IDs
|
|
/*static*/ std::set<uint64_t> CommandPackageProcessor::fActiveClearTableLockCmds;
|
|
/*static*/ boost::mutex CommandPackageProcessor::fActiveClearTableLockCmdMutex;
|
|
|
|
DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackageInternal(
|
|
dmlpackage::CalpontDMLPackage& cpackage)
|
|
{
|
|
SUMMARY_INFO("CommandPackageProcessor::processPackage");
|
|
|
|
DMLResult result;
|
|
result.result = NO_ERROR;
|
|
|
|
VERBOSE_INFO("Processing Command DML Package...");
|
|
std::string stmt = cpackage.get_DMLStatement();
|
|
boost::algorithm::to_upper(stmt);
|
|
trim(stmt);
|
|
fSessionID = cpackage.get_SessionID();
|
|
BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID());
|
|
uint64_t uniqueId = 0;
|
|
|
|
// Bug 5070. Added exception handling
|
|
try
|
|
{
|
|
uniqueId = fDbrm->getUnique64();
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(9);
|
|
args.add(ex.what());
|
|
message.format(args);
|
|
result.result = COMMAND_ERROR;
|
|
result.message = message;
|
|
fSessionManager.rolledback(txnid);
|
|
return result;
|
|
}
|
|
catch (...)
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(9);
|
|
args.add("Unknown error occurred while getting unique number.");
|
|
message.format(args);
|
|
result.result = COMMAND_ERROR;
|
|
result.message = message;
|
|
fSessionManager.rolledback(txnid);
|
|
return result;
|
|
}
|
|
|
|
string errorMsg;
|
|
bool queRemoved = false;
|
|
logging::LoggingID lid(20);
|
|
logging::MessageLog ml(lid);
|
|
LoggingID logid(DMLLoggingId, fSessionID, txnid.id);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
Logger logger(logid.fSubsysID);
|
|
|
|
if (stmt != "CLEANUP")
|
|
{
|
|
args1.add("Start SQL statement: ");
|
|
args1.add(stmt);
|
|
msg.format(args1);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
}
|
|
|
|
// fWEClient->addQueue(uniqueId);
|
|
try
|
|
{
|
|
// set-up the transaction
|
|
if ((stmt == "COMMIT") || (stmt == "ROLLBACK"))
|
|
{
|
|
// SQLLogger sqlLogger(stmt, DMLLoggingId, cpackage.get_SessionID(), txnid.id);
|
|
|
|
if ((txnid.valid))
|
|
{
|
|
vector<LBID_t> lbidList;
|
|
fDbrm->getUncommittedExtentLBIDs(static_cast<VER_t>(txnid.id), lbidList);
|
|
bool cpInvalidated = false;
|
|
|
|
// cout << "get a valid txnid " << txnid.id << " and stmt is " << stmt << " and isBachinsert is " <<
|
|
// cpackage.get_isBatchInsert() << endl;
|
|
if ((stmt == "COMMIT") && (cpackage.get_isBatchInsert()))
|
|
{
|
|
// update syscolumn for the next value
|
|
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
|
|
CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
|
|
CalpontSystemCatalog::TableName tableName;
|
|
tableName = systemCatalogPtr->tableName(cpackage.getTableOid());
|
|
|
|
try
|
|
{
|
|
uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
|
|
|
|
if (nextVal != AUTOINCR_SATURATED) // need to update syscolumn
|
|
{
|
|
// get autoincrement column oid
|
|
int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
|
|
// get the current nextVal from controller
|
|
scoped_ptr<DBRM> aDbrm(new DBRM());
|
|
uint64_t nextValInController = 0;
|
|
bool validNextVal = false;
|
|
aDbrm->getAILock(columnOid);
|
|
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); // in case it has changed
|
|
validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);
|
|
|
|
if ((validNextVal) && (nextValInController > nextVal))
|
|
{
|
|
fWEClient->removeQueue(uniqueId);
|
|
queRemoved = true;
|
|
WE_DDLCommandClient ddlClient;
|
|
uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController);
|
|
//@bug 5894. Need release lock.
|
|
aDbrm->releaseAILock(columnOid);
|
|
|
|
if (rc != 0)
|
|
throw std::runtime_error("Error in UpdateSyscolumnNextval");
|
|
}
|
|
else
|
|
aDbrm->releaseAILock(columnOid);
|
|
}
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
// Rollback transaction
|
|
rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
|
|
fSessionManager.rolledback(txnid);
|
|
throw std::runtime_error(ex.what());
|
|
}
|
|
|
|
// systemCatalogPtr->updateColinfoCache(nextValMap);
|
|
int weRc = 0;
|
|
|
|
if (cpackage.get_isAutocommitOn())
|
|
{
|
|
weRc = commitBatchAutoOnTransaction(uniqueId, txnid, cpackage.getTableOid(), errorMsg);
|
|
|
|
if (weRc != 0)
|
|
BRM::errString(weRc, errorMsg);
|
|
|
|
cpInvalidated = true;
|
|
}
|
|
else
|
|
{
|
|
weRc = fDbrm->vbCommit(txnid.id);
|
|
|
|
if (weRc != 0)
|
|
BRM::errString(weRc, errorMsg);
|
|
|
|
// weRc = commitBatchAutoOffTransaction(uniqueId, txnid, cpackage.getTableOid(), errorMsg);
|
|
}
|
|
|
|
if (weRc != 0)
|
|
{
|
|
throw std::runtime_error(errorMsg);
|
|
}
|
|
|
|
logging::logCommand(cpackage.get_SessionID(), txnid.id, "COMMIT;");
|
|
fSessionManager.committed(txnid);
|
|
// cout << "releasing transaction id for batchinsert" << txnid.id << endl;
|
|
}
|
|
else if (stmt == "COMMIT")
|
|
{
|
|
// cout << "success in commitTransaction" << endl;
|
|
// update syscolumn for the next value
|
|
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
|
|
CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
|
|
CalpontSystemCatalog::TableName tableName;
|
|
uint32_t tableOid = cpackage.getTableOid();
|
|
std::vector<TableLockInfo> tableLocks = fDbrm->getAllTableLocks();
|
|
|
|
if (tableOid == 0) // special case: transaction commit for autocommit off and not following a dml
|
|
// statement immediately
|
|
{
|
|
TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
|
|
TablelockData::OIDTablelock tablelockMap = tablelockData->getOidTablelockMap();
|
|
TablelockData::OIDTablelock::iterator iter;
|
|
|
|
if (!tablelockMap.empty())
|
|
{
|
|
for (unsigned k = 0; k < tableLocks.size(); k++)
|
|
{
|
|
iter = tablelockMap.find(tableLocks[k].tableOID);
|
|
|
|
if (iter != tablelockMap.end())
|
|
{
|
|
tableName = systemCatalogPtr->tableName(tableLocks[k].tableOID);
|
|
|
|
try
|
|
{
|
|
uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
|
|
|
|
if (nextVal != AUTOINCR_SATURATED) // neet to update syscolumn
|
|
{
|
|
// get autoincrement column oid
|
|
int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
|
|
// get the current nextVal from controller
|
|
scoped_ptr<DBRM> aDbrm(new DBRM());
|
|
uint64_t nextValInController = 0;
|
|
bool validNextVal = false;
|
|
aDbrm->getAILock(columnOid);
|
|
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); // in case it has changed
|
|
validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);
|
|
|
|
if ((validNextVal) && (nextValInController > (uint64_t)nextVal))
|
|
{
|
|
fWEClient->removeQueue(uniqueId);
|
|
|
|
queRemoved = true;
|
|
WE_DDLCommandClient ddlClient;
|
|
uint8_t rc =
|
|
ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController, fSessionID);
|
|
aDbrm->releaseAILock(columnOid);
|
|
|
|
if (rc != 0)
|
|
{
|
|
// for now
|
|
fSessionManager.committed(txnid);
|
|
throw std::runtime_error("Error in UpdateSyscolumnNextval");
|
|
}
|
|
}
|
|
else
|
|
aDbrm->releaseAILock(columnOid);
|
|
}
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
// Rollback transaction, release tablelock
|
|
rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
|
|
fDbrm->releaseTableLock(iter->second);
|
|
fSessionManager.rolledback(txnid);
|
|
throw std::runtime_error(ex.what());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (tableOid >= 3000)
|
|
{
|
|
tableName = systemCatalogPtr->tableName(tableOid);
|
|
|
|
try
|
|
{
|
|
uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
|
|
|
|
if (nextVal != AUTOINCR_SATURATED) // need to update syscolumn
|
|
{
|
|
// get autoincrement column oid
|
|
int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
|
|
// get the current nextVal from controller
|
|
scoped_ptr<DBRM> aDbrm(new DBRM());
|
|
uint64_t nextValInController = 0;
|
|
bool validNextVal = false;
|
|
aDbrm->getAILock(columnOid);
|
|
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); // in case it has changed
|
|
validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);
|
|
|
|
if ((validNextVal) && (nextValInController > (uint64_t)nextVal))
|
|
{
|
|
fWEClient->removeQueue(uniqueId);
|
|
|
|
queRemoved = true;
|
|
WE_DDLCommandClient ddlClient;
|
|
uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController, fSessionID);
|
|
aDbrm->releaseAILock(columnOid);
|
|
|
|
if (rc != 0)
|
|
{
|
|
// for now
|
|
fSessionManager.committed(txnid);
|
|
throw std::runtime_error("Error in UpdateSyscolumnNextval");
|
|
}
|
|
}
|
|
else
|
|
aDbrm->releaseAILock(columnOid);
|
|
}
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
// Rollback transaction, release tablelock
|
|
rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
|
|
|
|
for (unsigned k = 0; k < tableLocks.size(); k++)
|
|
{
|
|
if (tableLocks[k].tableOID == tableOid)
|
|
{
|
|
try
|
|
{
|
|
fDbrm->releaseTableLock(tableLocks[k].id);
|
|
}
|
|
catch (std::exception&)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
fSessionManager.rolledback(txnid);
|
|
throw std::runtime_error(ex.what());
|
|
}
|
|
}
|
|
}
|
|
|
|
int weRc = commitTransaction(uniqueId, txnid);
|
|
logging::logCommand(cpackage.get_SessionID(), txnid.id, "COMMIT;");
|
|
|
|
if (weRc != WriteEngine::NO_ERROR)
|
|
{
|
|
// cout << "Got an error in commitTransaction" << endl;
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "COMMIT failed: " << ec.errorString(weRc);
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
|
|
fSessionManager.committed(txnid);
|
|
// cout << "commit releasing transaction id " << txnid.id << endl;
|
|
}
|
|
else if ((stmt == "ROLLBACK") && (cpackage.get_isBatchInsert()))
|
|
{
|
|
int weRc = 0;
|
|
|
|
// version rollback, Bulkrollback
|
|
weRc = tryToRollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
|
|
|
|
if (weRc == 0)
|
|
{
|
|
// MCOL-5021
|
|
fDbrm->addToLBIDList(fSessionID, lbidList);
|
|
|
|
//@Bug 4560 invalidate cp first as bulkrollback will truncate the newly added lbids.
|
|
fDbrm->invalidateUncommittedExtentLBIDs(0, true, &lbidList);
|
|
cpInvalidated = true;
|
|
weRc =
|
|
rollBackBatchAutoOnTransaction(uniqueId, txnid, fSessionID, cpackage.getTableOid(), errorMsg);
|
|
}
|
|
else
|
|
{
|
|
throw std::runtime_error(errorMsg);
|
|
}
|
|
|
|
logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;");
|
|
|
|
if (weRc != 0)
|
|
{
|
|
//@Bug 4524. Don't set to readonly. Just error out.
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "ROLLBACK batch insert failed due to: "
|
|
<< "(" << weRc << ")" << ec.errorString(weRc);
|
|
// Log to error log
|
|
logging::Message::Args args1;
|
|
logging::Message message1(2);
|
|
args1.add(oss.str());
|
|
message1.format(args1);
|
|
ml.logErrorMessage(message1);
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
|
|
fSessionManager.rolledback(txnid);
|
|
// cout << "batch rollback releasing transaction id " << txnid.id << endl;
|
|
}
|
|
else if (stmt == "ROLLBACK")
|
|
{
|
|
std::string errorMsg("");
|
|
logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;");
|
|
int weRc = tryToRollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
|
|
|
|
if (weRc != 0)
|
|
{
|
|
// cout << "Rollback failed" << endl;
|
|
//@Bug 4524. Don't set to readonly. Just error out.
|
|
ostringstream oss;
|
|
oss << "ROLLBACK failed due to: " << errorMsg;
|
|
// Log to error log
|
|
logging::Message::Args args2;
|
|
logging::Message message2(2);
|
|
args2.add(oss.str());
|
|
message2.format(args2);
|
|
ml.logErrorMessage(message2);
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
|
|
fSessionManager.rolledback(txnid);
|
|
// cout << "Rollback releasing transaction id " << txnid.id << endl;
|
|
}
|
|
|
|
if (!cpInvalidated)
|
|
{
|
|
// MCOL-5021
|
|
if (stmt == "ROLLBACK")
|
|
{
|
|
fDbrm->addToLBIDList(fSessionID, lbidList);
|
|
}
|
|
|
|
fDbrm->invalidateUncommittedExtentLBIDs(0, stmt == "ROLLBACK", &lbidList);
|
|
}
|
|
}
|
|
}
|
|
else if (stmt == "CLEANUP")
|
|
{
|
|
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(cpackage.get_SessionID());
|
|
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(cpackage.get_SessionID() | 0x80000000);
|
|
}
|
|
else if (stmt == "VIEWTABLELOCK")
|
|
{
|
|
viewTableLock(cpackage, result);
|
|
}
|
|
else if (stmt == "CLEARTABLELOCK")
|
|
{
|
|
clearTableLock(uniqueId, cpackage, result);
|
|
}
|
|
else if (!cpackage.get_Logging())
|
|
{
|
|
BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID());
|
|
logging::logDML(cpackage.get_SessionID(), txnid.id, cpackage.get_DMLStatement() + ";",
|
|
cpackage.get_SchemaName());
|
|
SQLLogger sqlLogger(cpackage.get_DMLStatement(), DMLLoggingId, fSessionID, txnid.id);
|
|
// cout << "commandpackageprocessor Logging " << cpackage.get_DMLStatement()+ ";" << endl;
|
|
}
|
|
else
|
|
{
|
|
std::string err = "Unknown command.";
|
|
SUMMARY_INFO(err);
|
|
throw std::runtime_error(err);
|
|
}
|
|
}
|
|
catch (logging::IDBExcept& noTable) //@Bug 2606 catch no table found exception
|
|
{
|
|
cerr << "CommandPackageProcessor::processPackage: " << noTable.what() << endl;
|
|
|
|
result.result = COMMAND_ERROR;
|
|
result.message = Message(noTable.what());
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
if (checkPPLostConnection(ex))
|
|
{
|
|
result.result = PP_LOST_CONNECTION;
|
|
}
|
|
else
|
|
{
|
|
cerr << "CommandPackageProcessor::processPackage: " << ex.what() << endl;
|
|
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add(ex.what());
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
|
|
result.result = COMMAND_ERROR;
|
|
result.message = message;
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
cerr << "CommandPackageProcessor::processPackage: caught unknown exception!" << endl;
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add("Command Failed: ");
|
|
args.add("encountered unkown exception");
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
|
|
result.result = COMMAND_ERROR;
|
|
result.message = message;
|
|
}
|
|
|
|
if (!queRemoved)
|
|
fWEClient->removeQueue(uniqueId);
|
|
|
|
// release table lock
|
|
if ((result.result == NO_ERROR) && ((stmt == "COMMIT") || (stmt == "ROLLBACK")))
|
|
{
|
|
TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
|
|
TablelockData::OIDTablelock tablelockMap = tablelockData->getOidTablelockMap();
|
|
bool lockReleased = true;
|
|
|
|
if (!tablelockMap.empty())
|
|
{
|
|
TablelockData::OIDTablelock::iterator it = tablelockMap.begin();
|
|
|
|
while (it != tablelockMap.end())
|
|
{
|
|
try
|
|
{
|
|
lockReleased = fDbrm->releaseTableLock(it->second);
|
|
// cout << "releasing tablelock " << it->second << endl;
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add(ex.what());
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
|
|
result.result = COMMAND_ERROR;
|
|
result.message = message;
|
|
}
|
|
|
|
if (!lockReleased) // log an error
|
|
{
|
|
ostringstream os;
|
|
os << "tablelock for table oid " << it->first << " is not found";
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add(os.str());
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
logging::LoggingID lid(21);
|
|
logging::MessageLog ml(lid);
|
|
|
|
ml.logErrorMessage(message);
|
|
}
|
|
|
|
// cout << "tablelock " << it->second << " is released" << endl;
|
|
it++;
|
|
}
|
|
|
|
//@Bug 3557. Clean tablelock cache after commit/rollback.
|
|
TablelockData::removeTablelockData(cpackage.get_SessionID());
|
|
}
|
|
}
|
|
|
|
VERBOSE_INFO("Finished processing Command DML Package");
|
|
// LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
|
|
logging::Message::Args args2;
|
|
logging::Message msg1(1);
|
|
|
|
if (stmt != "CLEANUP")
|
|
{
|
|
args2.add("End SQL statement");
|
|
msg1.format(args2);
|
|
// Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg1, logid);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Process viewtablelock command; return table lock information for the
|
|
// specified table.
|
|
// This function closely resembles printTableLocks in viewtablelock.cpp.
|
|
//------------------------------------------------------------------------------
|
|
void CommandPackageProcessor::viewTableLock(const dmlpackage::CalpontDMLPackage& cpackage,
|
|
DMLPackageProcessor::DMLResult& result)
|
|
{
|
|
// Initialize System Catalog object used to get table name
|
|
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
|
|
CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
|
|
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
|
|
CalpontSystemCatalog::TableName tableName;
|
|
tableName.schema = cpackage.get_SchemaName();
|
|
tableName.table = cpackage.get_TableName();
|
|
execplan::CalpontSystemCatalog::ROPair roPair;
|
|
|
|
roPair = systemCatalogPtr->tableRID(tableName);
|
|
|
|
// Get list of table locks for the requested table
|
|
std::vector<BRM::TableLockInfo> tableLocks;
|
|
tableLocks = fDbrm->getAllTableLocks();
|
|
|
|
// Make preliminary pass through the table locks in order to determine our
|
|
// output column widths based on the data. Min column widths are based on
|
|
// the width of the column heading (except for the 'state' column).
|
|
uint64_t maxLockID = 0;
|
|
uint32_t maxPID = 0;
|
|
int32_t maxSessionID = 0;
|
|
int32_t minSessionID = 0;
|
|
int32_t maxTxnID = 0;
|
|
|
|
unsigned int lockIDColumnWidth = 6; // "LockID"
|
|
unsigned int ownerColumnWidth = 7; // "Process"
|
|
unsigned int pidColumnWidth = 3; // "PID"
|
|
unsigned int sessionIDColumnWidth = 7; // "Session"
|
|
unsigned int txnIDColumnWidth = 3; // "Txn"
|
|
unsigned int createTimeColumnWidth = 12; // "CreationTime"
|
|
unsigned int pmColumnWidth = 7; // "DBRoots"
|
|
std::vector<std::string> createTimes;
|
|
char cTimeBuffer[1024];
|
|
|
|
for (unsigned idx = 0; idx < tableLocks.size(); idx++)
|
|
{
|
|
if (tableLocks[idx].id > maxLockID)
|
|
maxLockID = tableLocks[idx].id;
|
|
|
|
if (tableLocks[idx].ownerName.length() > ownerColumnWidth)
|
|
ownerColumnWidth = tableLocks[idx].ownerName.length();
|
|
|
|
if (tableLocks[idx].ownerPID > maxPID)
|
|
maxPID = tableLocks[idx].ownerPID;
|
|
|
|
if (tableLocks[idx].ownerSessionID > maxSessionID)
|
|
maxSessionID = tableLocks[idx].ownerSessionID;
|
|
|
|
if (tableLocks[idx].ownerSessionID < minSessionID)
|
|
minSessionID = tableLocks[idx].ownerSessionID;
|
|
|
|
if (tableLocks[idx].ownerTxnID > maxTxnID)
|
|
maxTxnID = tableLocks[idx].ownerTxnID;
|
|
|
|
ctime_r(&tableLocks[idx].creationTime, cTimeBuffer);
|
|
cTimeBuffer[strlen(cTimeBuffer) - 1] = '\0'; // strip trailing '\n'
|
|
std::string cTimeStr(cTimeBuffer);
|
|
|
|
if (cTimeStr.length() > createTimeColumnWidth)
|
|
createTimeColumnWidth = cTimeStr.length();
|
|
|
|
createTimes.push_back(cTimeStr);
|
|
|
|
std::ostringstream pms; // It is dbroots now
|
|
|
|
for (unsigned k = 0; k < tableLocks[idx].dbrootList.size(); k++)
|
|
{
|
|
if (k > 0)
|
|
pms << ',';
|
|
|
|
pms << tableLocks[idx].dbrootList[k];
|
|
}
|
|
|
|
if (pms.str().length() > pmColumnWidth)
|
|
pmColumnWidth = pms.str().length();
|
|
}
|
|
|
|
ownerColumnWidth += 2;
|
|
pmColumnWidth += 2;
|
|
createTimeColumnWidth += 2;
|
|
|
|
std::ostringstream idString;
|
|
idString << maxLockID;
|
|
|
|
if (idString.str().length() > lockIDColumnWidth)
|
|
lockIDColumnWidth = idString.str().length();
|
|
|
|
lockIDColumnWidth += 2;
|
|
|
|
std::ostringstream pidString;
|
|
pidString << maxPID;
|
|
|
|
if (pidString.str().length() > pidColumnWidth)
|
|
pidColumnWidth = pidString.str().length();
|
|
|
|
pidColumnWidth += 2;
|
|
|
|
const std::string sessionNoneStr("BulkLoad");
|
|
std::ostringstream sessionString;
|
|
sessionString << maxSessionID;
|
|
|
|
if (sessionString.str().length() > sessionIDColumnWidth)
|
|
sessionIDColumnWidth = sessionString.str().length();
|
|
|
|
if ((minSessionID < 0) && (sessionNoneStr.length() > sessionIDColumnWidth))
|
|
sessionIDColumnWidth = sessionNoneStr.length();
|
|
|
|
sessionIDColumnWidth += 2;
|
|
|
|
const std::string txnNoneStr("n/a");
|
|
std::ostringstream txnString;
|
|
txnString << maxTxnID;
|
|
|
|
if (txnString.str().length() > txnIDColumnWidth)
|
|
txnIDColumnWidth = txnString.str().length();
|
|
|
|
txnIDColumnWidth += 2;
|
|
|
|
// Make second pass through the table locks to build our result.
|
|
// Keep in mind the same table could have more than 1 lock
|
|
// (on different PMs), so we don't exit loop after "first" match.
|
|
bool found = false;
|
|
ostringstream os;
|
|
|
|
for (unsigned idx = 0; idx < tableLocks.size(); idx++)
|
|
{
|
|
if (roPair.objnum == (CalpontSystemCatalog::OID)tableLocks[idx].tableOID)
|
|
{
|
|
std::ostringstream pms;
|
|
|
|
for (unsigned k = 0; k < tableLocks[idx].dbrootList.size(); k++)
|
|
{
|
|
if (k > 0)
|
|
pms << ',';
|
|
|
|
pms << tableLocks[idx].dbrootList[k];
|
|
}
|
|
|
|
if (found) // write newline between lines
|
|
{
|
|
os << endl;
|
|
}
|
|
else // write the column headers before the first entry
|
|
{
|
|
os.setf(ios::left, ios::adjustfield);
|
|
os << setw(lockIDColumnWidth) << "LockID" << setw(ownerColumnWidth) << "Process"
|
|
<< setw(pidColumnWidth) << "PID" << setw(sessionIDColumnWidth) << "Session"
|
|
<< setw(txnIDColumnWidth) << "Txn" << setw(createTimeColumnWidth) << "CreationTime" << setw(9)
|
|
<< "State" << setw(pmColumnWidth) << "DBRoots" << endl;
|
|
}
|
|
|
|
os << " " << setw(lockIDColumnWidth) << tableLocks[idx].id << setw(ownerColumnWidth)
|
|
<< tableLocks[idx].ownerName << setw(pidColumnWidth) << tableLocks[idx].ownerPID;
|
|
|
|
// Log session ID, or "BulkLoad" if session is -1
|
|
if (tableLocks[idx].ownerSessionID < 0)
|
|
os << setw(sessionIDColumnWidth) << sessionNoneStr;
|
|
else
|
|
os << setw(sessionIDColumnWidth) << tableLocks[idx].ownerSessionID;
|
|
|
|
// Log txn ID, or "n/a" if txn is -1
|
|
if (tableLocks[idx].ownerTxnID < 0)
|
|
os << setw(txnIDColumnWidth) << txnNoneStr;
|
|
else
|
|
os << setw(txnIDColumnWidth) << tableLocks[idx].ownerTxnID;
|
|
|
|
os << setw(createTimeColumnWidth) << createTimes[idx] << setw(9)
|
|
<< ((tableLocks[idx].state == BRM::LOADING) ? "LOADING" : "CLEANUP") << setw(pmColumnWidth)
|
|
<< pms.str();
|
|
|
|
found = true;
|
|
} // end of displaying a table lock match
|
|
} // end of loop through all table locks
|
|
|
|
if (!found)
|
|
{
|
|
os << " Table " << tableName.schema << "." << tableName.table << " is not locked by any process.";
|
|
}
|
|
|
|
result.tableLockInfo = os.str();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Process cleartablelock command; execute bulk rollback and release table lock
|
|
// for the specified table.
|
|
//------------------------------------------------------------------------------
|
|
void CommandPackageProcessor::clearTableLock(uint64_t uniqueId, const dmlpackage::CalpontDMLPackage& cpackage,
|
|
DMLPackageProcessor::DMLResult& result)
|
|
{
|
|
CalpontSystemCatalog::TableName tableName;
|
|
tableName.schema = cpackage.get_SchemaName();
|
|
tableName.table = cpackage.get_TableName();
|
|
|
|
// Get the Table lock ID that is passed in the SQL statement attribute.
|
|
// This is a kludge we may want to consider changing. Could add a table
|
|
// lock ID attribute to the CalpontDMLPackage object.
|
|
std::istringstream lockIDString(cpackage.get_SQLStatement());
|
|
uint64_t tableLockID;
|
|
lockIDString >> tableLockID;
|
|
|
|
//----------------------------------------------------- start of syslog code
|
|
//
|
|
// Log initiation of cleartablelock to syslog
|
|
//
|
|
const std::string APPLNAME("cleartablelock SQL cmd");
|
|
const int SUBSYSTEM_ID = 21; // dmlpackageproc
|
|
const int INIT_MSG_NUM = logging::M0088;
|
|
logging::Message::Args msgArgs;
|
|
logging::Message logMsg1(INIT_MSG_NUM);
|
|
msgArgs.add(APPLNAME);
|
|
msgArgs.add(tableName.toString());
|
|
msgArgs.add(tableLockID);
|
|
logMsg1.format(msgArgs);
|
|
logging::LoggingID lid(SUBSYSTEM_ID);
|
|
logging::MessageLog ml(lid);
|
|
ml.logInfoMessage(logMsg1);
|
|
//------------------------------------------------------- end of syslog code
|
|
|
|
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
|
|
messageqcpp::ByteStream bsOut;
|
|
bool lockGrabbed = false;
|
|
bool bErrFlag = false;
|
|
bool bRemoveMetaErrFlag = false;
|
|
std::ostringstream combinedErrMsg;
|
|
|
|
try
|
|
{
|
|
// Make sure BRM is in READ-WRITE state before starting
|
|
int brmRc = fDbrm->isReadWrite();
|
|
|
|
if (brmRc != BRM::ERR_OK)
|
|
{
|
|
std::string brmErrMsg;
|
|
BRM::errString(brmRc, brmErrMsg);
|
|
std::ostringstream oss;
|
|
oss << "Failed BRM status check: " << brmErrMsg;
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
|
|
BRM::TableLockInfo lockInfo;
|
|
establishTableLockToClear(tableLockID, lockInfo);
|
|
lockGrabbed = true;
|
|
|
|
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
|
|
oam::OamCache::dbRootPMMap_t dbRootPmMap = oamCache->getDBRootToPMMap();
|
|
std::map<int, int>::const_iterator mapIter;
|
|
std::set<int> pmSet;
|
|
|
|
// Construct relevant list of PMs based on the DBRoots associated
|
|
// with the tableLock.
|
|
for (unsigned int k = 0; k < lockInfo.dbrootList.size(); k++)
|
|
{
|
|
mapIter = dbRootPmMap->find(lockInfo.dbrootList[k]);
|
|
|
|
if (mapIter != dbRootPmMap->end())
|
|
{
|
|
int pm = mapIter->second;
|
|
pmSet.insert(pm);
|
|
}
|
|
else
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "DBRoot " << lockInfo.dbrootList[k] << " does not map to a PM. Cannot perform rollback";
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
}
|
|
|
|
std::vector<int> pmList;
|
|
|
|
for (std::set<int>::const_iterator setIter = pmSet.begin(); setIter != pmSet.end(); ++setIter)
|
|
{
|
|
pmList.push_back(*setIter);
|
|
}
|
|
|
|
std::cout << "cleartablelock rollback for table lock " << tableLockID << " being forwarded to PM(s): ";
|
|
|
|
for (unsigned int k = 0; k < pmList.size(); k++)
|
|
{
|
|
if (k > 0)
|
|
std::cout << ", ";
|
|
|
|
std::cout << pmList[k];
|
|
}
|
|
|
|
std::cout << std::endl;
|
|
|
|
// Perform bulk rollback if state is in LOADING state
|
|
if (lockInfo.state == BRM::LOADING)
|
|
{
|
|
fWEClient->addQueue(uniqueId);
|
|
|
|
//------------------------------------------------------------------
|
|
// Send rollback msg to the writeengine server for every PM.
|
|
// We send to each PM, instead of just to PMs in the tablelock's
|
|
// PM list, just in case a DBRoot has been moved to a different PM.
|
|
//------------------------------------------------------------------
|
|
// std::cout << "cleartablelock rollback: tableLock-" << tableLockID <<
|
|
// ": uniqueID-" << uniqueId <<
|
|
// ": oid-" << lockInfo.tableOID <<
|
|
// "; name-" << tableName.toString() <<
|
|
// "; app-" << APPLNAME << std::endl;
|
|
|
|
bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK;
|
|
bsOut << uniqueId;
|
|
bsOut << tableLockID;
|
|
bsOut << lockInfo.tableOID;
|
|
bsOut << tableName.toString();
|
|
bsOut << APPLNAME;
|
|
|
|
for (unsigned j = 0; j < pmList.size(); j++)
|
|
{
|
|
fWEClient->write(bsOut, pmList[j]);
|
|
}
|
|
|
|
// Wait for "all" the responses, and accumulate any/all errors
|
|
unsigned int pmMsgCnt = 0;
|
|
|
|
while (pmMsgCnt < pmList.size())
|
|
{
|
|
std::string rollbackErrMsg;
|
|
bsIn.reset(new messageqcpp::ByteStream());
|
|
fWEClient->read(uniqueId, bsIn);
|
|
|
|
if (bsIn->length() == 0)
|
|
{
|
|
bRemoveMetaErrFlag = true;
|
|
|
|
if (combinedErrMsg.str().length() > 0)
|
|
combinedErrMsg << std::endl;
|
|
|
|
combinedErrMsg << "Network error, PM rollback; ";
|
|
}
|
|
else
|
|
{
|
|
messageqcpp::ByteStream::byte rc;
|
|
uint16_t pmNum;
|
|
*bsIn >> rc;
|
|
*bsIn >> rollbackErrMsg;
|
|
*bsIn >> pmNum;
|
|
|
|
// std::cout << "cleartablelock rollback response from PM"<<
|
|
// pmNum << "; rc-" << (int)rc <<
|
|
// "; errMsg: {" << rollbackErrMsg << '}' << std::endl;
|
|
|
|
if (rc != 0)
|
|
{
|
|
bRemoveMetaErrFlag = true;
|
|
|
|
if (combinedErrMsg.str().empty())
|
|
combinedErrMsg << "Rollback error; ";
|
|
else
|
|
combinedErrMsg << std::endl;
|
|
|
|
combinedErrMsg << "[PM" << pmNum << "] " << rollbackErrMsg;
|
|
}
|
|
}
|
|
|
|
pmMsgCnt++;
|
|
} // end of while loop to process all responses to bulk rollback
|
|
|
|
// If no errors so far, then change state to CLEANUP state.
|
|
// We ignore the return stateChange flag.
|
|
if (!bErrFlag)
|
|
{
|
|
fDbrm->changeState(tableLockID, BRM::CLEANUP);
|
|
}
|
|
} // end of (lockInfo.state == BRM::LOADING)
|
|
|
|
// If no errors so far, then:
|
|
// 1. delete meta data backup rollback files
|
|
// 2. finally release table lock
|
|
if (!bErrFlag)
|
|
{
|
|
bsOut.reset();
|
|
|
|
//------------------------------------------------------------------
|
|
// Delete meta data backup rollback files
|
|
//------------------------------------------------------------------
|
|
// std::cout << "cleartablelock cleanup: uniqueID-" << uniqueId <<
|
|
// ": oid-" << lockInfo.tableOID << std::endl;
|
|
|
|
bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK_CLEANUP;
|
|
bsOut << uniqueId;
|
|
bsOut << lockInfo.tableOID;
|
|
|
|
for (unsigned j = 0; j < pmList.size(); j++)
|
|
{
|
|
fWEClient->write(bsOut, pmList[j]);
|
|
}
|
|
|
|
// Wait for "all" the responses, and accumulate any/all errors
|
|
unsigned int pmMsgCnt = 0;
|
|
|
|
while (pmMsgCnt < pmList.size())
|
|
{
|
|
std::string fileDeleteErrMsg;
|
|
bsIn.reset(new messageqcpp::ByteStream());
|
|
fWEClient->read(uniqueId, bsIn);
|
|
|
|
if (bsIn->length() == 0)
|
|
{
|
|
bRemoveMetaErrFlag = true;
|
|
|
|
if (combinedErrMsg.str().length() > 0)
|
|
combinedErrMsg << std::endl;
|
|
|
|
combinedErrMsg << "Network error, PM rollback cleanup; ";
|
|
}
|
|
else
|
|
{
|
|
messageqcpp::ByteStream::byte rc;
|
|
uint16_t pmNum;
|
|
*bsIn >> rc;
|
|
*bsIn >> fileDeleteErrMsg;
|
|
*bsIn >> pmNum;
|
|
|
|
// std::cout << "cleartablelock cleanup response from PM" <<
|
|
// pmNum << "; rc-" << (int)rc <<
|
|
// "; errMsg: {" << fileDeleteErrMsg << '}' << std::endl;
|
|
|
|
if (rc != 0)
|
|
{
|
|
bRemoveMetaErrFlag = true;
|
|
|
|
if (combinedErrMsg.str().empty())
|
|
combinedErrMsg << "Cleanup error; ";
|
|
else
|
|
combinedErrMsg << std::endl;
|
|
|
|
combinedErrMsg << "[PM" << pmNum << "] " << fileDeleteErrMsg;
|
|
}
|
|
}
|
|
|
|
pmMsgCnt++;
|
|
} // end of while loop to process all responses to rollback cleanup
|
|
|
|
// We ignore return release flag from releaseTableLock().
|
|
if (!bErrFlag)
|
|
{
|
|
fDbrm->releaseTableLock(tableLockID);
|
|
}
|
|
}
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
bErrFlag = true;
|
|
combinedErrMsg << ex.what();
|
|
}
|
|
|
|
if (!bErrFlag)
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Table lock " << tableLockID << " for table " << tableName.toString() << " is cleared.";
|
|
|
|
//@Bug 4517. Release tablelock if remove meta files failed.
|
|
if (bRemoveMetaErrFlag)
|
|
{
|
|
oss << " Warning: " << combinedErrMsg.str();
|
|
}
|
|
|
|
result.tableLockInfo = oss.str();
|
|
}
|
|
else
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Table lock " << tableLockID << " for table " << tableName.toString() << " cannot be cleared. "
|
|
<< combinedErrMsg.str();
|
|
result.tableLockInfo = oss.str();
|
|
}
|
|
|
|
// Remove tableLockID out of the active cleartableLock command list
|
|
if (lockGrabbed)
|
|
{
|
|
boost::mutex::scoped_lock lock(fActiveClearTableLockCmdMutex);
|
|
fActiveClearTableLockCmds.erase(tableLockID);
|
|
}
|
|
|
|
//----------------------------------------------------- start of syslog code
|
|
//
|
|
// Log completion of cleartablelock to syslog
|
|
//
|
|
const int COMP_MSG_NUM = logging::M0089;
|
|
msgArgs.reset();
|
|
logging::Message logMsg2(COMP_MSG_NUM);
|
|
msgArgs.add(APPLNAME);
|
|
msgArgs.add(tableName.toString());
|
|
msgArgs.add(tableLockID);
|
|
std::string finalStatus;
|
|
|
|
if (!bErrFlag)
|
|
{
|
|
finalStatus = "Completed successfully";
|
|
}
|
|
else
|
|
{
|
|
finalStatus = "Encountered errors: ";
|
|
finalStatus += combinedErrMsg.str();
|
|
}
|
|
|
|
msgArgs.add(finalStatus);
|
|
logMsg2.format(msgArgs);
|
|
ml.logInfoMessage(logMsg2);
|
|
//------------------------------------------------------- end of syslog code
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Get/return the current tablelock info for tableLockID, and reassign the table
|
|
// lock to ourselves if we can. If the lock is still in use by another process
|
|
// or by another DML cleartablelock thread, then we error out with exception.
|
|
//------------------------------------------------------------------------------
|
|
void CommandPackageProcessor::establishTableLockToClear(uint64_t tableLockID, BRM::TableLockInfo& lockInfo)
|
|
{
|
|
boost::mutex::scoped_lock lock(fActiveClearTableLockCmdMutex);
|
|
|
|
// Get current table lock info
|
|
bool getLockInfo = fDbrm->getTableLockInfo(tableLockID, &lockInfo);
|
|
|
|
if (!getLockInfo)
|
|
{
|
|
throw std::runtime_error(std::string("Lock does not exist."));
|
|
}
|
|
|
|
std::string processName("DMLProc clearTableLock");
|
|
uint32_t processID = ::getpid();
|
|
|
|
// See if another thread is executing a cleartablelock cmd for this table
|
|
if ((lockInfo.ownerName == processName) && (lockInfo.ownerPID == processID))
|
|
{
|
|
std::set<uint64_t>::const_iterator it = fActiveClearTableLockCmds.find(tableLockID);
|
|
|
|
if (it != fActiveClearTableLockCmds.end())
|
|
{
|
|
throw std::runtime_error(
|
|
std::string("Lock in use. "
|
|
"DML is executing another cleartablelock MySQL cmd."));
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Take over ownership of stale lock.
|
|
// Use "DMLProc clearTableLock" as process name to differentiate
|
|
// from a DMLProc lock used for inserts, updates, etc.
|
|
int32_t sessionID = fSessionID;
|
|
int32_t txnid = -1;
|
|
bool ownerChanged = fDbrm->changeOwner(tableLockID, processName, processID, sessionID, txnid);
|
|
|
|
if (!ownerChanged)
|
|
{
|
|
throw std::runtime_error(std::string("Unable to grab lock; lock not found or still in use."));
|
|
}
|
|
}
|
|
|
|
// Add this cleartablelock command to the list of active cleartablelock cmds
|
|
fActiveClearTableLockCmds.insert(tableLockID);
|
|
}
|
|
|
|
} // namespace dmlpackageprocessor
|