mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-21 19:45:56 +03:00
1173 lines
45 KiB
C++
1173 lines
45 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
* Copyright (C) 2016 MariaDB Corporation.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: commandpackageprocessor.cpp 9613 2013-06-12 15:44:24Z dhall $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
#include <ctime>
|
|
#include <iostream>
|
|
#include <sstream>
|
|
#include <set>
|
|
#include <vector>
|
|
#include <boost/scoped_ptr.hpp>
|
|
|
|
#include "commandpackageprocessor.h"
|
|
#include "messagelog.h"
|
|
#include "dbrm.h"
|
|
#include "sqllogger.h"
|
|
#include "tablelockdata.h"
|
|
#include "we_messages.h"
|
|
#include "we_ddlcommandclient.h"
|
|
#include "oamcache.h"
|
|
#include "liboamcpp.h"
|
|
using namespace std;
|
|
using namespace WriteEngine;
|
|
using namespace dmlpackage;
|
|
using namespace execplan;
|
|
using namespace logging;
|
|
using namespace boost;
|
|
using namespace BRM;
|
|
|
|
namespace dmlpackageprocessor
|
|
{
|
|
// Tracks active cleartablelock commands by storing set of table lock IDs
|
|
/*static*/ std::set<uint64_t>
|
|
CommandPackageProcessor::fActiveClearTableLockCmds;
|
|
/*static*/ boost::mutex
|
|
CommandPackageProcessor::fActiveClearTableLockCmdMutex;
|
|
|
|
DMLPackageProcessor::DMLResult
|
|
CommandPackageProcessor::processPackage(dmlpackage::CalpontDMLPackage& cpackage)
|
|
{
|
|
SUMMARY_INFO("CommandPackageProcessor::processPackage");
|
|
|
|
DMLResult result;
|
|
result.result = NO_ERROR;
|
|
|
|
VERBOSE_INFO("Processing Command DML Package...");
|
|
std::string stmt = cpackage.get_DMLStatement();
|
|
boost::algorithm::to_upper(stmt);
|
|
trim(stmt);
|
|
fSessionID = cpackage.get_SessionID();
|
|
BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID());
|
|
uint64_t uniqueId = 0;
|
|
|
|
//Bug 5070. Added exception handling
|
|
try
|
|
{
|
|
uniqueId = fDbrm->getUnique64();
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(9);
|
|
args.add(ex.what());
|
|
message.format(args);
|
|
result.result = COMMAND_ERROR;
|
|
result.message = message;
|
|
fSessionManager.rolledback(txnid);
|
|
return result;
|
|
}
|
|
catch ( ... )
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(9);
|
|
args.add("Unknown error occurred while getting unique number.");
|
|
message.format(args);
|
|
result.result = COMMAND_ERROR;
|
|
result.message = message;
|
|
fSessionManager.rolledback(txnid);
|
|
return result;
|
|
}
|
|
|
|
string errorMsg;
|
|
bool queRemoved = false;
|
|
logging::LoggingID lid(20);
|
|
logging::MessageLog ml(lid);
|
|
LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
Logger logger(logid.fSubsysID);
|
|
|
|
if (stmt != "CLEANUP")
|
|
{
|
|
args1.add("Start SQL statement: ");
|
|
args1.add(stmt);
|
|
msg.format( args1 );
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
}
|
|
|
|
//fWEClient->addQueue(uniqueId);
|
|
try
|
|
{
|
|
// set-up the transaction
|
|
if ( (stmt == "COMMIT") || (stmt == "ROLLBACK") )
|
|
{
|
|
//SQLLogger sqlLogger(stmt, DMLLoggingId, cpackage.get_SessionID(), txnid.id);
|
|
|
|
if ((txnid.valid))
|
|
{
|
|
vector<LBID_t> lbidList;
|
|
fDbrm->getUncommittedExtentLBIDs(static_cast<VER_t>(txnid.id), lbidList);
|
|
bool cpInvalidated = false;
|
|
|
|
//cout << "get a valid txnid " << txnid.id << " and stmt is " << stmt << " and isBachinsert is " << cpackage.get_isBatchInsert() << endl;
|
|
if ((stmt == "COMMIT") && (cpackage.get_isBatchInsert()))
|
|
{
|
|
//update syscolumn for the next value
|
|
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
|
|
CalpontSystemCatalog::TableName tableName;
|
|
tableName = systemCatalogPtr->tableName(cpackage.getTableOid());
|
|
|
|
try
|
|
{
|
|
uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
|
|
|
|
if (nextVal != AUTOINCR_SATURATED) //need to update syscolumn
|
|
{
|
|
//get autoincrement column oid
|
|
int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
|
|
//get the current nextVal from controller
|
|
scoped_ptr<DBRM> aDbrm(new DBRM());
|
|
uint64_t nextValInController = 0;
|
|
bool validNextVal = false;
|
|
aDbrm->getAILock(columnOid);
|
|
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); //in case it has changed
|
|
validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);
|
|
|
|
if ((validNextVal) && (nextValInController > nextVal))
|
|
{
|
|
fWEClient->removeQueue(uniqueId);
|
|
queRemoved = true;
|
|
WE_DDLCommandClient ddlClient;
|
|
uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController);
|
|
//@bug 5894. Need release lock.
|
|
aDbrm->releaseAILock(columnOid);
|
|
|
|
if (rc != 0)
|
|
throw std::runtime_error("Error in UpdateSyscolumnNextval");
|
|
}
|
|
else
|
|
aDbrm->releaseAILock(columnOid);
|
|
}
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
//Rollback transaction
|
|
rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
|
|
fSessionManager.rolledback( txnid );
|
|
throw std::runtime_error(ex.what());
|
|
}
|
|
|
|
//systemCatalogPtr->updateColinfoCache(nextValMap);
|
|
int weRc = 0;
|
|
|
|
if (cpackage.get_isAutocommitOn())
|
|
{
|
|
weRc = commitBatchAutoOnTransaction(uniqueId, txnid, cpackage.getTableOid(), errorMsg);
|
|
|
|
if (weRc != 0)
|
|
BRM::errString(weRc, errorMsg);
|
|
|
|
cpInvalidated = true;
|
|
}
|
|
else
|
|
{
|
|
weRc = fDbrm->vbCommit(txnid.id);
|
|
|
|
if (weRc != 0)
|
|
BRM::errString(weRc, errorMsg);
|
|
|
|
//weRc = commitBatchAutoOffTransaction(uniqueId, txnid, cpackage.getTableOid(), errorMsg);
|
|
}
|
|
|
|
if (weRc != 0)
|
|
{
|
|
throw std::runtime_error(errorMsg);
|
|
}
|
|
|
|
logging::logCommand(cpackage.get_SessionID(), txnid.id, "COMMIT;");
|
|
fSessionManager.committed( txnid );
|
|
//cout << "releasing transaction id for batchinsert" << txnid.id << endl;
|
|
}
|
|
else if (stmt == "COMMIT")
|
|
{
|
|
//cout << "success in commitTransaction" << endl;
|
|
//update syscolumn for the next value
|
|
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
|
|
CalpontSystemCatalog::TableName tableName;
|
|
uint32_t tableOid = cpackage.getTableOid();
|
|
std::vector<TableLockInfo> tableLocks = fDbrm->getAllTableLocks();
|
|
|
|
if (tableOid == 0) //special case: transaction commit for autocommit off and not following a dml statement immediately
|
|
{
|
|
TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
|
|
TablelockData::OIDTablelock tablelockMap = tablelockData->getOidTablelockMap();
|
|
TablelockData::OIDTablelock::iterator iter;
|
|
|
|
if (!tablelockMap.empty())
|
|
{
|
|
for ( unsigned k = 0; k < tableLocks.size(); k++)
|
|
{
|
|
iter = tablelockMap.find(tableLocks[k].tableOID);
|
|
|
|
if ( iter != tablelockMap.end() )
|
|
{
|
|
tableName = systemCatalogPtr->tableName(tableLocks[k].tableOID);
|
|
|
|
try
|
|
{
|
|
uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
|
|
|
|
if (nextVal != AUTOINCR_SATURATED) //neet to update syscolumn
|
|
{
|
|
//get autoincrement column oid
|
|
int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
|
|
//get the current nextVal from controller
|
|
scoped_ptr<DBRM> aDbrm(new DBRM());
|
|
uint64_t nextValInController = 0;
|
|
bool validNextVal = false;
|
|
aDbrm->getAILock(columnOid);
|
|
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); //in case it has changed
|
|
validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);
|
|
|
|
if ((validNextVal) && (nextValInController > (uint64_t)nextVal))
|
|
{
|
|
fWEClient->removeQueue(uniqueId);
|
|
|
|
queRemoved = true;
|
|
WE_DDLCommandClient ddlClient;
|
|
uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController, fSessionID);
|
|
aDbrm->releaseAILock(columnOid);
|
|
|
|
if (rc != 0)
|
|
{
|
|
//for now
|
|
fSessionManager.committed( txnid );
|
|
throw std::runtime_error("Error in UpdateSyscolumnNextval");
|
|
}
|
|
}
|
|
else
|
|
aDbrm->releaseAILock(columnOid);
|
|
}
|
|
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
//Rollback transaction, release tablelock
|
|
rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
|
|
fDbrm->releaseTableLock(iter->second);
|
|
fSessionManager.rolledback( txnid );
|
|
throw std::runtime_error(ex.what());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (tableOid >= 3000)
|
|
{
|
|
tableName = systemCatalogPtr->tableName(tableOid);
|
|
|
|
try
|
|
{
|
|
uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
|
|
|
|
if (nextVal != AUTOINCR_SATURATED) //need to update syscolumn
|
|
{
|
|
//get autoincrement column oid
|
|
int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
|
|
//get the current nextVal from controller
|
|
scoped_ptr<DBRM> aDbrm(new DBRM());
|
|
uint64_t nextValInController = 0;
|
|
bool validNextVal = false;
|
|
aDbrm->getAILock(columnOid);
|
|
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); //in case it has changed
|
|
validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);
|
|
|
|
if ((validNextVal) && (nextValInController > (uint64_t)nextVal))
|
|
{
|
|
fWEClient->removeQueue(uniqueId);
|
|
|
|
queRemoved = true;
|
|
WE_DDLCommandClient ddlClient;
|
|
uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController, fSessionID);
|
|
aDbrm->releaseAILock(columnOid);
|
|
|
|
if (rc != 0)
|
|
{
|
|
//for now
|
|
fSessionManager.committed( txnid );
|
|
throw std::runtime_error("Error in UpdateSyscolumnNextval");
|
|
}
|
|
}
|
|
else
|
|
aDbrm->releaseAILock(columnOid);
|
|
}
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
//Rollback transaction, release tablelock
|
|
rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
|
|
|
|
for ( unsigned k = 0; k < tableLocks.size(); k++)
|
|
{
|
|
if ( tableLocks[k].tableOID == tableOid )
|
|
{
|
|
try
|
|
{
|
|
fDbrm->releaseTableLock(tableLocks[k].id);
|
|
}
|
|
catch (std::exception&)
|
|
{}
|
|
}
|
|
}
|
|
|
|
fSessionManager.rolledback( txnid );
|
|
throw std::runtime_error(ex.what());
|
|
}
|
|
}
|
|
}
|
|
|
|
int weRc = commitTransaction(uniqueId, txnid );
|
|
logging::logCommand(cpackage.get_SessionID(), txnid.id, "COMMIT;");
|
|
|
|
if (weRc != WriteEngine::NO_ERROR)
|
|
{
|
|
//cout << "Got an error in commitTransaction" << endl;
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "COMMIT failed: " << ec.errorString(weRc);
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
|
|
fSessionManager.committed( txnid );
|
|
//cout << "commit releasing transaction id " << txnid.id << endl;
|
|
}
|
|
else if ((stmt == "ROLLBACK") && (cpackage.get_isBatchInsert()))
|
|
{
|
|
int weRc = 0;
|
|
|
|
|
|
//version rollback, Bulkrollback
|
|
weRc = rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
|
|
|
|
if (weRc == 0)
|
|
{
|
|
//@Bug 4560 invalidate cp first as bulkrollback will truncate the newly added lbids.
|
|
fDbrm->invalidateUncommittedExtentLBIDs(0, &lbidList);
|
|
cpInvalidated = true;
|
|
weRc = rollBackBatchAutoOnTransaction(uniqueId, txnid, fSessionID, cpackage.getTableOid(), errorMsg);
|
|
|
|
}
|
|
else
|
|
{
|
|
throw std::runtime_error(errorMsg);
|
|
}
|
|
|
|
logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;");
|
|
|
|
if (weRc != 0)
|
|
{
|
|
//@Bug 4524. Don't set to readonly. Just error out.
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "ROLLBACK batch insert failed due to: " << "(" << weRc << ")" << ec.errorString(weRc);
|
|
//Log to error log
|
|
logging::Message::Args args1;
|
|
logging::Message message1(2);
|
|
args1.add(oss.str());
|
|
message1.format( args1 );
|
|
ml.logErrorMessage( message1 );
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
|
|
fSessionManager.rolledback( txnid );
|
|
//cout << "batch rollback releasing transaction id " << txnid.id << endl;
|
|
}
|
|
else if (stmt == "ROLLBACK")
|
|
{
|
|
std::string errorMsg("");
|
|
logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;");
|
|
int weRc = rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
|
|
|
|
if (weRc != 0)
|
|
{
|
|
//cout << "Rollback failed" << endl;
|
|
//@Bug 4524. Don't set to readonly. Just error out.
|
|
ostringstream oss;
|
|
oss << "ROLLBACK failed due to: " << errorMsg;
|
|
//Log to error log
|
|
logging::Message::Args args2;
|
|
logging::Message message2(2);
|
|
args2.add(oss.str());
|
|
message2.format( args2 );
|
|
ml.logErrorMessage( message2 );
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
|
|
fSessionManager.rolledback( txnid );
|
|
//cout << "Rollback releasing transaction id " << txnid.id << endl;
|
|
}
|
|
|
|
if (!cpInvalidated)
|
|
{
|
|
fDbrm->invalidateUncommittedExtentLBIDs(0, &lbidList);
|
|
}
|
|
}
|
|
}
|
|
else if (stmt == "CLEANUP")
|
|
{
|
|
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog
|
|
(cpackage.get_SessionID());
|
|
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog
|
|
(cpackage.get_SessionID() | 0x80000000);
|
|
}
|
|
else if (stmt == "VIEWTABLELOCK")
|
|
{
|
|
viewTableLock( cpackage, result );
|
|
}
|
|
else if (stmt == "CLEARTABLELOCK")
|
|
{
|
|
clearTableLock( uniqueId, cpackage, result );
|
|
}
|
|
else if ( !cpackage.get_Logging())
|
|
{
|
|
BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID());
|
|
logging::logDML(cpackage.get_SessionID(), txnid.id, cpackage.get_DMLStatement() + ";", cpackage.get_SchemaName());
|
|
SQLLogger sqlLogger(cpackage.get_DMLStatement(), DMLLoggingId, fSessionID, txnid.id);
|
|
//cout << "commandpackageprocessor Logging " << cpackage.get_DMLStatement()+ ";" << endl;
|
|
}
|
|
else
|
|
{
|
|
std::string err = "Unknown command.";
|
|
SUMMARY_INFO(err);
|
|
throw std::runtime_error(err);
|
|
}
|
|
|
|
}
|
|
catch ( logging::IDBExcept& noTable) //@Bug 2606 catch no table found exception
|
|
{
|
|
cerr << "CommandPackageProcessor::processPackage: " << noTable.what() << endl;
|
|
|
|
result.result = COMMAND_ERROR;
|
|
result.message = Message(noTable.what());
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
cerr << "CommandPackageProcessor::processPackage: " << ex.what() << endl;
|
|
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add( ex.what() );
|
|
args.add("");
|
|
args.add("");
|
|
message.format( args );
|
|
|
|
result.result = COMMAND_ERROR;
|
|
result.message = message;
|
|
}
|
|
catch (...)
|
|
{
|
|
cerr << "CommandPackageProcessor::processPackage: caught unknown exception!" << endl;
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add( "Command Failed: ");
|
|
args.add( "encountered unkown exception" );
|
|
args.add("");
|
|
args.add("");
|
|
message.format( args );
|
|
|
|
result.result = COMMAND_ERROR;
|
|
result.message = message;
|
|
}
|
|
|
|
if (!queRemoved)
|
|
fWEClient->removeQueue(uniqueId);
|
|
|
|
//release table lock
|
|
if ((result.result == NO_ERROR) && ((stmt == "COMMIT") || (stmt == "ROLLBACK")) )
|
|
{
|
|
TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
|
|
TablelockData::OIDTablelock tablelockMap = tablelockData->getOidTablelockMap();
|
|
bool lockReleased = true;
|
|
|
|
if (!tablelockMap.empty())
|
|
{
|
|
TablelockData::OIDTablelock::iterator it = tablelockMap.begin();
|
|
|
|
while (it != tablelockMap.end())
|
|
{
|
|
|
|
try
|
|
{
|
|
lockReleased = fDbrm->releaseTableLock(it->second);
|
|
//cout << "releasing tablelock " << it->second << endl;
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add( ex.what() );
|
|
args.add("");
|
|
args.add("");
|
|
message.format( args );
|
|
|
|
result.result = COMMAND_ERROR;
|
|
result.message = message;
|
|
}
|
|
|
|
if (!lockReleased) //log an error
|
|
{
|
|
ostringstream os;
|
|
os << "tablelock for table oid " << it->first << " is not found";
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add( os.str());
|
|
args.add("");
|
|
args.add("");
|
|
message.format( args );
|
|
logging::LoggingID lid(21);
|
|
logging::MessageLog ml(lid);
|
|
|
|
ml.logErrorMessage(message);
|
|
}
|
|
|
|
//cout << "tablelock " << it->second << " is released" << endl;
|
|
it++;
|
|
}
|
|
|
|
//@Bug 3557. Clean tablelock cache after commit/rollback.
|
|
TablelockData::removeTablelockData(cpackage.get_SessionID());
|
|
}
|
|
|
|
}
|
|
|
|
VERBOSE_INFO("Finished processing Command DML Package");
|
|
//LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
|
|
logging::Message::Args args2;
|
|
logging::Message msg1(1);
|
|
|
|
if (stmt != "CLEANUP")
|
|
{
|
|
args2.add("End SQL statement");
|
|
msg1.format( args2 );
|
|
//Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg1, logid);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Process viewtablelock command; return table lock information for the
|
|
// specified table.
|
|
// This function closely resembles printTableLocks in viewtablelock.cpp.
|
|
//------------------------------------------------------------------------------
|
|
void CommandPackageProcessor::viewTableLock(
|
|
const dmlpackage::CalpontDMLPackage& cpackage,
|
|
DMLPackageProcessor::DMLResult& result)
|
|
{
|
|
// Initialize System Catalog object used to get table name
|
|
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
|
|
CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
|
|
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
|
|
CalpontSystemCatalog::TableName tableName;
|
|
tableName.schema = cpackage.get_SchemaName();
|
|
tableName.table = cpackage.get_TableName();
|
|
execplan::CalpontSystemCatalog::ROPair roPair;
|
|
|
|
roPair = systemCatalogPtr->tableRID( tableName );
|
|
|
|
// Get list of table locks for the requested table
|
|
std::vector<BRM::TableLockInfo> tableLocks;
|
|
tableLocks = fDbrm->getAllTableLocks();
|
|
|
|
// Make preliminary pass through the table locks in order to determine our
|
|
// output column widths based on the data. Min column widths are based on
|
|
// the width of the column heading (except for the 'state' column).
|
|
uint64_t maxLockID = 0;
|
|
uint32_t maxPID = 0;
|
|
int32_t maxSessionID = 0;
|
|
int32_t minSessionID = 0;
|
|
int32_t maxTxnID = 0;
|
|
|
|
unsigned int lockIDColumnWidth = 6; // "LockID"
|
|
unsigned int ownerColumnWidth = 7; // "Process"
|
|
unsigned int pidColumnWidth = 3; // "PID"
|
|
unsigned int sessionIDColumnWidth = 7; // "Session"
|
|
unsigned int txnIDColumnWidth = 3; // "Txn"
|
|
unsigned int createTimeColumnWidth = 12; // "CreationTime"
|
|
unsigned int pmColumnWidth = 7; // "DBRoots"
|
|
std::vector<std::string> createTimes;
|
|
char cTimeBuffer[1024];
|
|
|
|
for (unsigned idx = 0; idx < tableLocks.size(); idx++)
|
|
{
|
|
if (tableLocks[idx].id > maxLockID)
|
|
maxLockID = tableLocks[idx].id;
|
|
|
|
if (tableLocks[idx].ownerName.length() > ownerColumnWidth)
|
|
ownerColumnWidth = tableLocks[idx].ownerName.length();
|
|
|
|
if (tableLocks[idx].ownerPID > maxPID)
|
|
maxPID = tableLocks[idx].ownerPID;
|
|
|
|
if (tableLocks[idx].ownerSessionID > maxSessionID)
|
|
maxSessionID = tableLocks[idx].ownerSessionID;
|
|
|
|
if (tableLocks[idx].ownerSessionID < minSessionID)
|
|
minSessionID = tableLocks[idx].ownerSessionID;
|
|
|
|
if (tableLocks[idx].ownerTxnID > maxTxnID)
|
|
maxTxnID = tableLocks[idx].ownerTxnID;
|
|
|
|
ctime_r( &tableLocks[idx].creationTime, cTimeBuffer );
|
|
cTimeBuffer[ strlen(cTimeBuffer) - 1 ] = '\0'; // strip trailing '\n'
|
|
std::string cTimeStr( cTimeBuffer );
|
|
|
|
if (cTimeStr.length() > createTimeColumnWidth)
|
|
createTimeColumnWidth = cTimeStr.length();
|
|
|
|
createTimes.push_back( cTimeStr );
|
|
|
|
std::ostringstream pms; //It is dbroots now
|
|
|
|
for (unsigned k = 0; k < tableLocks[idx].dbrootList.size(); k++)
|
|
{
|
|
if (k > 0)
|
|
pms << ',';
|
|
|
|
pms << tableLocks[idx].dbrootList[k];
|
|
}
|
|
|
|
if (pms.str().length() > pmColumnWidth)
|
|
pmColumnWidth = pms.str().length();
|
|
}
|
|
|
|
ownerColumnWidth += 2;
|
|
pmColumnWidth += 2;
|
|
createTimeColumnWidth += 2;
|
|
|
|
std::ostringstream idString;
|
|
idString << maxLockID;
|
|
|
|
if (idString.str().length() > lockIDColumnWidth)
|
|
lockIDColumnWidth = idString.str().length();
|
|
|
|
lockIDColumnWidth += 2;
|
|
|
|
std::ostringstream pidString;
|
|
pidString << maxPID;
|
|
|
|
if (pidString.str().length() > pidColumnWidth)
|
|
pidColumnWidth = pidString.str().length();
|
|
|
|
pidColumnWidth += 2;
|
|
|
|
const std::string sessionNoneStr("BulkLoad");
|
|
std::ostringstream sessionString;
|
|
sessionString << maxSessionID;
|
|
|
|
if (sessionString.str().length() > sessionIDColumnWidth)
|
|
sessionIDColumnWidth = sessionString.str().length();
|
|
|
|
if ((minSessionID < 0) &&
|
|
(sessionNoneStr.length() > sessionIDColumnWidth))
|
|
sessionIDColumnWidth = sessionNoneStr.length();
|
|
|
|
sessionIDColumnWidth += 2;
|
|
|
|
const std::string txnNoneStr("n/a");
|
|
std::ostringstream txnString;
|
|
txnString << maxTxnID;
|
|
|
|
if (txnString.str().length() > txnIDColumnWidth)
|
|
txnIDColumnWidth = txnString.str().length();
|
|
|
|
txnIDColumnWidth += 2;
|
|
|
|
// Make second pass through the table locks to build our result.
|
|
// Keep in mind the same table could have more than 1 lock
|
|
// (on different PMs), so we don't exit loop after "first" match.
|
|
bool found = false;
|
|
ostringstream os;
|
|
|
|
for (unsigned idx = 0; idx < tableLocks.size(); idx++)
|
|
{
|
|
if (roPair.objnum == (CalpontSystemCatalog::OID)
|
|
tableLocks[idx].tableOID)
|
|
{
|
|
std::ostringstream pms;
|
|
|
|
for (unsigned k = 0; k < tableLocks[idx].dbrootList.size(); k++)
|
|
{
|
|
if (k > 0)
|
|
pms << ',';
|
|
|
|
pms << tableLocks[idx].dbrootList[k];
|
|
}
|
|
|
|
if (found) // write newline between lines
|
|
{
|
|
os << endl;
|
|
}
|
|
else // write the column headers before the first entry
|
|
{
|
|
os.setf(ios::left, ios::adjustfield);
|
|
os << setw(lockIDColumnWidth) << "LockID" <<
|
|
setw(ownerColumnWidth) << "Process" <<
|
|
setw(pidColumnWidth) << "PID" <<
|
|
setw(sessionIDColumnWidth) << "Session" <<
|
|
setw(txnIDColumnWidth) << "Txn" <<
|
|
setw(createTimeColumnWidth) << "CreationTime" <<
|
|
setw(9) << "State" <<
|
|
setw(pmColumnWidth) << "DBRoots" << endl;
|
|
}
|
|
|
|
os << " " <<
|
|
setw(lockIDColumnWidth) << tableLocks[idx].id <<
|
|
setw(ownerColumnWidth) << tableLocks[idx].ownerName <<
|
|
setw(pidColumnWidth) << tableLocks[idx].ownerPID;
|
|
|
|
// Log session ID, or "BulkLoad" if session is -1
|
|
if (tableLocks[idx].ownerSessionID < 0)
|
|
os << setw(sessionIDColumnWidth) << sessionNoneStr;
|
|
else
|
|
os << setw(sessionIDColumnWidth) <<
|
|
tableLocks[idx].ownerSessionID;
|
|
|
|
// Log txn ID, or "n/a" if txn is -1
|
|
if (tableLocks[idx].ownerTxnID < 0)
|
|
os << setw(txnIDColumnWidth) << txnNoneStr;
|
|
else
|
|
os << setw(txnIDColumnWidth) <<
|
|
tableLocks[idx].ownerTxnID;
|
|
|
|
os << setw(createTimeColumnWidth) <<
|
|
createTimes[idx] <<
|
|
setw(9) << ((tableLocks[idx].state == BRM::LOADING) ?
|
|
"LOADING" : "CLEANUP") <<
|
|
setw(pmColumnWidth) << pms.str();
|
|
|
|
found = true;
|
|
} // end of displaying a table lock match
|
|
} // end of loop through all table locks
|
|
|
|
if (!found)
|
|
{
|
|
os << " Table " << tableName.schema << "." <<
|
|
tableName.table << " is not locked by any process.";
|
|
}
|
|
|
|
result.tableLockInfo = os.str();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Process cleartablelock command; execute bulk rollback and release table lock
|
|
// for the specified table.
|
|
//------------------------------------------------------------------------------
|
|
void CommandPackageProcessor::clearTableLock( uint64_t uniqueId,
|
|
const dmlpackage::CalpontDMLPackage& cpackage,
|
|
DMLPackageProcessor::DMLResult& result)
|
|
{
|
|
CalpontSystemCatalog::TableName tableName;
|
|
tableName.schema = cpackage.get_SchemaName();
|
|
tableName.table = cpackage.get_TableName ();
|
|
|
|
// Get the Table lock ID that is passed in the SQL statement attribute.
|
|
// This is a kludge we may want to consider changing. Could add a table
|
|
// lock ID attribute to the CalpontDMLPackage object.
|
|
std::istringstream lockIDString( cpackage.get_SQLStatement() );
|
|
uint64_t tableLockID;
|
|
lockIDString >> tableLockID;
|
|
|
|
//----------------------------------------------------- start of syslog code
|
|
//
|
|
// Log initiation of cleartablelock to syslog
|
|
//
|
|
const std::string APPLNAME("cleartablelock SQL cmd");
|
|
const int SUBSYSTEM_ID = 21; // dmlpackageproc
|
|
const int INIT_MSG_NUM = logging::M0088;
|
|
logging::Message::Args msgArgs;
|
|
logging::Message logMsg1( INIT_MSG_NUM );
|
|
msgArgs.add( APPLNAME );
|
|
msgArgs.add( tableName.toString() );
|
|
msgArgs.add( tableLockID );
|
|
logMsg1.format( msgArgs );
|
|
logging::LoggingID lid( SUBSYSTEM_ID );
|
|
logging::MessageLog ml( lid );
|
|
ml.logInfoMessage( logMsg1 );
|
|
//------------------------------------------------------- end of syslog code
|
|
|
|
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
|
|
messageqcpp::ByteStream bsOut;
|
|
bool lockGrabbed = false;
|
|
bool bErrFlag = false;
|
|
bool bRemoveMetaErrFlag = false;
|
|
std::ostringstream combinedErrMsg;
|
|
|
|
try
|
|
{
|
|
// Make sure BRM is in READ-WRITE state before starting
|
|
int brmRc = fDbrm->isReadWrite( );
|
|
|
|
if (brmRc != BRM::ERR_OK)
|
|
{
|
|
std::string brmErrMsg;
|
|
BRM::errString( brmRc, brmErrMsg );
|
|
std::ostringstream oss;
|
|
oss << "Failed BRM status check: " << brmErrMsg;
|
|
throw std::runtime_error( oss.str() );
|
|
}
|
|
|
|
BRM::TableLockInfo lockInfo;
|
|
establishTableLockToClear( tableLockID, lockInfo );
|
|
lockGrabbed = true;
|
|
|
|
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
|
|
oam::OamCache::dbRootPMMap_t dbRootPmMap = oamCache->getDBRootToPMMap();
|
|
std::map<int, int>::const_iterator mapIter;
|
|
std::set<int> pmSet;
|
|
|
|
// Construct relevant list of PMs based on the DBRoots associated
|
|
// with the tableLock.
|
|
for (unsigned int k = 0; k < lockInfo.dbrootList.size(); k++)
|
|
{
|
|
mapIter = dbRootPmMap->find( lockInfo.dbrootList[k] );
|
|
|
|
if (mapIter != dbRootPmMap->end())
|
|
{
|
|
int pm = mapIter->second;
|
|
pmSet.insert( pm );
|
|
}
|
|
else
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "DBRoot " << lockInfo.dbrootList[k] <<
|
|
" does not map to a PM. Cannot perform rollback";
|
|
throw std::runtime_error( oss.str() );
|
|
}
|
|
}
|
|
|
|
std::vector<int> pmList;
|
|
|
|
for (std::set<int>::const_iterator setIter = pmSet.begin();
|
|
setIter != pmSet.end();
|
|
++setIter)
|
|
{
|
|
pmList.push_back( *setIter );
|
|
}
|
|
|
|
std::cout << "cleartablelock rollback for table lock " << tableLockID <<
|
|
" being forwarded to PM(s): ";
|
|
|
|
for (unsigned int k = 0; k < pmList.size(); k++)
|
|
{
|
|
if (k > 0)
|
|
std::cout << ", ";
|
|
|
|
std::cout << pmList[k];
|
|
}
|
|
|
|
std::cout << std::endl;
|
|
|
|
// Perform bulk rollback if state is in LOADING state
|
|
if (lockInfo.state == BRM::LOADING)
|
|
{
|
|
fWEClient->addQueue(uniqueId);
|
|
|
|
//------------------------------------------------------------------
|
|
// Send rollback msg to the writeengine server for every PM.
|
|
// We send to each PM, instead of just to PMs in the tablelock's
|
|
// PM list, just in case a DBRoot has been moved to a different PM.
|
|
//------------------------------------------------------------------
|
|
// std::cout << "cleartablelock rollback: tableLock-" << tableLockID <<
|
|
// ": uniqueID-" << uniqueId <<
|
|
// ": oid-" << lockInfo.tableOID <<
|
|
// "; name-" << tableName.toString() <<
|
|
// "; app-" << APPLNAME << std::endl;
|
|
|
|
bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK;
|
|
bsOut << uniqueId;
|
|
bsOut << tableLockID;
|
|
bsOut << lockInfo.tableOID;
|
|
bsOut << tableName.toString();
|
|
bsOut << APPLNAME;
|
|
|
|
for (unsigned j = 0; j < pmList.size(); j++)
|
|
{
|
|
fWEClient->write(bsOut, pmList[j]);
|
|
}
|
|
|
|
// Wait for "all" the responses, and accumulate any/all errors
|
|
unsigned int pmMsgCnt = 0;
|
|
|
|
while (pmMsgCnt < pmList.size())
|
|
{
|
|
std::string rollbackErrMsg;
|
|
bsIn.reset(new messageqcpp::ByteStream());
|
|
fWEClient->read(uniqueId, bsIn);
|
|
|
|
if (bsIn->length() == 0)
|
|
{
|
|
bRemoveMetaErrFlag = true;
|
|
|
|
if (combinedErrMsg.str().length() > 0)
|
|
combinedErrMsg << std::endl;
|
|
|
|
combinedErrMsg << "Network error, PM rollback; ";
|
|
}
|
|
else
|
|
{
|
|
messageqcpp::ByteStream::byte rc;
|
|
uint16_t pmNum;
|
|
*bsIn >> rc;
|
|
*bsIn >> rollbackErrMsg;
|
|
*bsIn >> pmNum;
|
|
|
|
// std::cout << "cleartablelock rollback response from PM"<<
|
|
// pmNum << "; rc-" << (int)rc <<
|
|
// "; errMsg: {" << rollbackErrMsg << '}' << std::endl;
|
|
|
|
if (rc != 0)
|
|
{
|
|
bRemoveMetaErrFlag = true;
|
|
|
|
if (combinedErrMsg.str().empty())
|
|
combinedErrMsg << "Rollback error; ";
|
|
else
|
|
combinedErrMsg << std::endl;
|
|
|
|
combinedErrMsg << "[PM" << pmNum << "] " <<
|
|
rollbackErrMsg;
|
|
}
|
|
}
|
|
|
|
pmMsgCnt++;
|
|
} // end of while loop to process all responses to bulk rollback
|
|
|
|
// If no errors so far, then change state to CLEANUP state.
|
|
// We ignore the return stateChange flag.
|
|
if (!bErrFlag)
|
|
{
|
|
fDbrm->changeState( tableLockID, BRM::CLEANUP );
|
|
}
|
|
} // end of (lockInfo.state == BRM::LOADING)
|
|
|
|
// If no errors so far, then:
|
|
// 1. delete meta data backup rollback files
|
|
// 2. finally release table lock
|
|
if (!bErrFlag)
|
|
{
|
|
bsOut.reset();
|
|
|
|
//------------------------------------------------------------------
|
|
// Delete meta data backup rollback files
|
|
//------------------------------------------------------------------
|
|
// std::cout << "cleartablelock cleanup: uniqueID-" << uniqueId <<
|
|
// ": oid-" << lockInfo.tableOID << std::endl;
|
|
|
|
bsOut << (messageqcpp::ByteStream::byte)
|
|
WE_SVR_DML_BULKROLLBACK_CLEANUP;
|
|
bsOut << uniqueId;
|
|
bsOut << lockInfo.tableOID;
|
|
|
|
for (unsigned j = 0; j < pmList.size(); j++)
|
|
{
|
|
fWEClient->write(bsOut, pmList[j]);
|
|
}
|
|
|
|
// Wait for "all" the responses, and accumulate any/all errors
|
|
unsigned int pmMsgCnt = 0;
|
|
|
|
while (pmMsgCnt < pmList.size())
|
|
{
|
|
std::string fileDeleteErrMsg;
|
|
bsIn.reset(new messageqcpp::ByteStream());
|
|
fWEClient->read(uniqueId, bsIn);
|
|
|
|
if (bsIn->length() == 0)
|
|
{
|
|
bRemoveMetaErrFlag = true;
|
|
|
|
if (combinedErrMsg.str().length() > 0)
|
|
combinedErrMsg << std::endl;
|
|
|
|
combinedErrMsg << "Network error, PM rollback cleanup; ";
|
|
}
|
|
else
|
|
{
|
|
messageqcpp::ByteStream::byte rc;
|
|
uint16_t pmNum;
|
|
*bsIn >> rc;
|
|
*bsIn >> fileDeleteErrMsg;
|
|
*bsIn >> pmNum;
|
|
|
|
// std::cout << "cleartablelock cleanup response from PM" <<
|
|
// pmNum << "; rc-" << (int)rc <<
|
|
// "; errMsg: {" << fileDeleteErrMsg << '}' << std::endl;
|
|
|
|
if (rc != 0)
|
|
{
|
|
bRemoveMetaErrFlag = true;
|
|
|
|
if (combinedErrMsg.str().empty())
|
|
combinedErrMsg << "Cleanup error; ";
|
|
else
|
|
combinedErrMsg << std::endl;
|
|
|
|
combinedErrMsg << "[PM" << pmNum << "] " <<
|
|
fileDeleteErrMsg;
|
|
}
|
|
}
|
|
|
|
pmMsgCnt++;
|
|
} // end of while loop to process all responses to rollback cleanup
|
|
|
|
// We ignore return release flag from releaseTableLock().
|
|
if (!bErrFlag)
|
|
{
|
|
fDbrm->releaseTableLock( tableLockID );
|
|
}
|
|
}
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
bErrFlag = true;
|
|
combinedErrMsg << ex.what();
|
|
}
|
|
|
|
if (!bErrFlag)
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Table lock " << tableLockID << " for table " <<
|
|
tableName.toString() << " is cleared.";
|
|
|
|
//@Bug 4517. Release tablelock if remove meta files failed.
|
|
if (bRemoveMetaErrFlag)
|
|
{
|
|
oss << " Warning: " << combinedErrMsg.str();
|
|
}
|
|
|
|
result.tableLockInfo = oss.str();
|
|
}
|
|
else
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Table lock " << tableLockID << " for table " <<
|
|
tableName.toString() << " cannot be cleared. " <<
|
|
combinedErrMsg.str();
|
|
result.tableLockInfo = oss.str();
|
|
}
|
|
|
|
// Remove tableLockID out of the active cleartableLock command list
|
|
if (lockGrabbed)
|
|
{
|
|
boost::mutex::scoped_lock lock( fActiveClearTableLockCmdMutex );
|
|
fActiveClearTableLockCmds.erase( tableLockID );
|
|
}
|
|
|
|
//----------------------------------------------------- start of syslog code
|
|
//
|
|
// Log completion of cleartablelock to syslog
|
|
//
|
|
const int COMP_MSG_NUM = logging::M0089;
|
|
msgArgs.reset();
|
|
logging::Message logMsg2( COMP_MSG_NUM );
|
|
msgArgs.add( APPLNAME );
|
|
msgArgs.add( tableName.toString() );
|
|
msgArgs.add( tableLockID );
|
|
std::string finalStatus;
|
|
|
|
if (!bErrFlag)
|
|
{
|
|
finalStatus = "Completed successfully";
|
|
}
|
|
else
|
|
{
|
|
finalStatus = "Encountered errors: ";
|
|
finalStatus += combinedErrMsg.str();
|
|
}
|
|
|
|
msgArgs.add( finalStatus );
|
|
logMsg2.format( msgArgs );
|
|
ml.logInfoMessage( logMsg2 );
|
|
//------------------------------------------------------- end of syslog code
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Get/return the current tablelock info for tableLockID, and reassign the table
|
|
// lock to ourselves if we can. If the lock is still in use by another process
|
|
// or by another DML cleartablelock thread, then we error out with exception.
|
|
//------------------------------------------------------------------------------
|
|
void CommandPackageProcessor::establishTableLockToClear( uint64_t tableLockID,
|
|
BRM::TableLockInfo& lockInfo )
|
|
{
|
|
boost::mutex::scoped_lock lock( fActiveClearTableLockCmdMutex );
|
|
|
|
// Get current table lock info
|
|
bool getLockInfo = fDbrm->getTableLockInfo(tableLockID, &lockInfo);
|
|
|
|
if (!getLockInfo)
|
|
{
|
|
throw std::runtime_error( std::string("Lock does not exist.") );
|
|
}
|
|
|
|
std::string processName("DMLProc clearTableLock");
|
|
uint32_t processID = ::getpid();
|
|
|
|
// See if another thread is executing a cleartablelock cmd for this table
|
|
if ((lockInfo.ownerName == processName) &&
|
|
(lockInfo.ownerPID == processID))
|
|
{
|
|
std::set<uint64_t>::const_iterator it =
|
|
fActiveClearTableLockCmds.find( tableLockID );
|
|
|
|
if (it != fActiveClearTableLockCmds.end())
|
|
{
|
|
throw std::runtime_error( std::string( "Lock in use. "
|
|
"DML is executing another cleartablelock MySQL cmd.") );
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Take over ownership of stale lock.
|
|
// Use "DMLProc clearTableLock" as process name to differentiate
|
|
// from a DMLProc lock used for inserts, updates, etc.
|
|
int32_t sessionID = fSessionID;
|
|
int32_t txnid = -1;
|
|
bool ownerChanged = fDbrm->changeOwner(
|
|
tableLockID, processName, processID, sessionID, txnid);
|
|
|
|
if (!ownerChanged)
|
|
{
|
|
throw std::runtime_error( std::string(
|
|
"Unable to grab lock; lock not found or still in use.") );
|
|
}
|
|
}
|
|
|
|
// Add this cleartablelock command to the list of active cleartablelock cmds
|
|
fActiveClearTableLockCmds.insert( tableLockID );
|
|
}
|
|
|
|
} // namespace dmlpackageprocessor
|