1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-10 01:22:48 +03:00
Files
mariadb-columnstore-engine/dbcon/dmlpackageproc/commandpackageprocessor.cpp
Sergey Zefirov 4545a86a80 Porting old MCOL-2044-update...: new interface for writeColRecs
Progress keep and test commit

Progress keep and test commit

Progress keep and test commit

Again, trying to pinpoint problematic part of a change

Revert "Again, trying to pinpoint problematic part of a change"

This reverts commit 71874e7c0d7e4eeed0c201b12d306b583c07b9e2.

Revert "Progress keep and test commit"

This reverts commit 63c7bc67ae55bdb81433ca58bbd239d6171a1031.

Revert "Progress keep and test commit"

This reverts commit 121c09febd78dacd37158caeab9ac70f65b493df.

Small steps - I walk minefield here

Propagating changes - now CPInfo in convertValArray

Progress keep commit

Restoring old functionality

Progress keep commit

Small steps to avoid/better locate old problem with the write engine.

Progress keeping commit

Thread the CPInfo up to convertValArray call in writeColumnRec

About to test changes - I should get no regression and no updates in
ranges either.

Testing out why I get a regression

Investigating source of regression

Debugging prints

Fix compile error

Debugging print - debug regression

I clearly see calls to writeColumnRec and prints there added to discern
between these.

Fix warning error

Possible culprit

Add forgotten default parameter for convertValArray

New logic to test

Max/min gets updated during value conversion

To test results of updates

Debug logs

Debug logs

An attempt to provide proper sequence index

Debug logs

An attempt to provide proper sequence index - now magic for resetting

Debug logs

Debug logs

Debug logs

Trying to perform correct updates

Trying to perform correct updates - seqNum woes fight

COMMIT after INSERT performs 'mark extent as invalid' operation - investigating

To test: cut setting of CPInfo upon commit from DML processor

It may be superfluous as write engine does that too

Debug logs

Debug logs

Better interface for CPMaxMin

Old interface forgot to set isBinaryColumn field

Possible fix for the problems

I forgot to reassign the value in cpinfoList

Debug logs

Computation of 'binary' column property

logs indicated that it was not set in getExtentCPMaxMin, and it was impossible to compute there so I had to move that into writeengine.

To test: code to allow cross-extent insertion

To test: removed another assertion for probable cause of errors

Debug logs

Dropped excessive logs

Better reset code

Again, trying to fix ordering

Fixing order of rowids for LBID computation

Debug logs

Remove update of second LBID in split insert

I have to validate incorrect behaviour for this test

Restoring the case where everything almost worked

Tracking changes in newly created extents

Progress keeping commit

Fixing build errors with recent server

An ability to get old values from blocks we update

Progress keeping commit

Adding analysis of old values to write engine code.

It is needed for updates and deletes.

Progress keeping commit

Moving max/min range update from convertValArray into separate function with simpler logic.

To test and debug - logic is there

Fix build errors

Update logic to debug

There is a suspicious write engine method updateColumnRecs which
receives a vector of column types but does not iterate over them
(otherwise it will be identical to updateColumnRec in logic).

Other than that, the updateColumnRec looks like the center of all
updates - deleteRow calls it, for example, dml processor also calls it.

Debug logs for insert bookkeeping regression

Set up operation type in externally-callable interface

Internal operations depend on the operation type and consistency is what matters there.

Debug logs

Fix for extent range update failure during update operation

Fix build error

Debug logs

Fix for update on deletion

I am not completely sure in it - to debug.

Debug log

writeColumnRec cannot set m_opType to UPDATE unconditionally

It is called from deleteRow

Better diagnostics

Debug logs

Fixed search condition

Debug logs

Debugging invalid LBID appearance

Debug logs - fixed condition

Fix problems with std::vector reallocation during growth

Fix growing std::vector data dangling access error

Still fixing indexing errors

Make in-range update to work

Correct sequence numbers

Debug logs

Debug logs

Remove range drop from DML part of write engine

A hack to test the culprit of range non-keeping

Tests - no results for now

MTR-style comments

Empty test results

To be filled with actual results.

Special database and result selects for all tests

Pleasing MTR with better folder name

Pleasing MTR - testing test result comparison

Pleasing MTR by disabling warnings

All test results

Cleaning up result files

Reset ranges before update

Remove comments from results - point of failure in MTR

Remove empty line from result - another MTR failure point

Probably fix for deletes

Possible fix for remaining failed delete test

Fix a bug in writeRows

It should not affect delete-with-range test case, yet it is a bug.

Debug logs

Debug logs

Tests reorganization and description

Support for unsigned integer for new tests

Fix type omission

Fix test failure due to warnings on clean installation

Support for bigint to test

Fix for failed signed bigint test

Set proper unsignedness flag

Removed that assignment during refactoring.

Tests for types with column width 1 and 2

Support for types in new tests

Remove trailing empty lines from results

Tests had failed because of extra empty lines.

Remove debug logs

Update README with info about new tests

Move tests for easier testing

Add task tag to tests

Fix invalid unsaigned range check

Fix for signed types

Fix regressions - progress keeping commit

Do not set invalid ranges into valid state

A possible fix for mcs81_self_join test

MCOL 2044 test database cleanup

Missing expected results

Delete extraneous assignment to m_opType

nullptr instead of NULL

Refactor extended CPInfo with TypeHandler

Better handling of ranges - safer types, less copy-paste

Fix logic error related to typo

Fix logic error related to typo

Trying to figure out why invalid ranges aren't displayed as NULL..NULL

Debug logs

Debug logs

Debug logs

Debug logs for worker node

Debug logs for worker node in extent map

Debugging virtual table fill operation

Debugging virtual table fill operation

Fix for invalid range  computation

Remove debug logs

Change handling of invalid ranges

They are also set, but to invalid state.

Complete change

Fix typo

Remove unused code

"Fix" for tests - -1..0 instead of NULL..NULL for invalid unsigned ranges

Not a good change, yet I cannot do better for now.

MTR output requires tabs instead of spaces

Debug logs

Debug logs

Debug logs - fix build

Debug logs and logic error fix

Fix for clearly incorrect firstLBID in CPInfo being set - to test

Fix for system catalog operations suppot

Better interface to fix build errors

Delete tests we cannot satisfy due to extent rescan due to WHERE

Tests for wide decimals

Testing support for wide decimals

Fix for wide decimals tests

Fix for delete within range

Memory leak fix and, possible, double free fix

Dispatch on CalpontSystemCatalog::ColDataType is more robust

Add support for forgotten MEDINT type

Add forgottent BIGINT

empty() instead of size() > 0

Better layout

Remove confusing comment

Sensible names for special values of seqNum field

Tests for wide decimal support

Addressing concerns of drrtuy

Remove test we cannot satisfy

Final touches for PR

Remove unused result file
2021-04-05 14:18:22 +03:00

1180 lines
45 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
* Copyright (C) 2016 MariaDB Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id: commandpackageprocessor.cpp 9613 2013-06-12 15:44:24Z dhall $
*
*
***********************************************************************/
#include <ctime>
#include <iostream>
#include <sstream>
#include <set>
#include <vector>
#include <boost/scoped_ptr.hpp>
#include "commandpackageprocessor.h"
#include "messagelog.h"
#include "dbrm.h"
#include "sqllogger.h"
#include "tablelockdata.h"
#include "we_messages.h"
#include "we_ddlcommandclient.h"
#include "oamcache.h"
#include "liboamcpp.h"
using namespace std;
using namespace WriteEngine;
using namespace dmlpackage;
using namespace execplan;
using namespace logging;
using namespace boost;
using namespace BRM;
namespace dmlpackageprocessor
{
// Tracks active cleartablelock commands by storing set of table lock IDs
/*static*/ std::set<uint64_t>
CommandPackageProcessor::fActiveClearTableLockCmds;
/*static*/ boost::mutex
CommandPackageProcessor::fActiveClearTableLockCmdMutex;
DMLPackageProcessor::DMLResult
CommandPackageProcessor::processPackage(dmlpackage::CalpontDMLPackage& cpackage)
{
SUMMARY_INFO("CommandPackageProcessor::processPackage");
DMLResult result;
result.result = NO_ERROR;
VERBOSE_INFO("Processing Command DML Package...");
std::string stmt = cpackage.get_DMLStatement();
boost::algorithm::to_upper(stmt);
trim(stmt);
fSessionID = cpackage.get_SessionID();
BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID());
uint64_t uniqueId = 0;
//Bug 5070. Added exception handling
try
{
uniqueId = fDbrm->getUnique64();
}
catch (std::exception& ex)
{
logging::Message::Args args;
logging::Message message(9);
args.add(ex.what());
message.format(args);
result.result = COMMAND_ERROR;
result.message = message;
fSessionManager.rolledback(txnid);
return result;
}
catch ( ... )
{
logging::Message::Args args;
logging::Message message(9);
args.add("Unknown error occurred while getting unique number.");
message.format(args);
result.result = COMMAND_ERROR;
result.message = message;
fSessionManager.rolledback(txnid);
return result;
}
string errorMsg;
bool queRemoved = false;
logging::LoggingID lid(20);
logging::MessageLog ml(lid);
LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
logging::Message::Args args1;
logging::Message msg(1);
Logger logger(logid.fSubsysID);
if (stmt != "CLEANUP")
{
args1.add("Start SQL statement: ");
args1.add(stmt);
msg.format( args1 );
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
}
//fWEClient->addQueue(uniqueId);
try
{
// set-up the transaction
if ( (stmt == "COMMIT") || (stmt == "ROLLBACK") )
{
//SQLLogger sqlLogger(stmt, DMLLoggingId, cpackage.get_SessionID(), txnid.id);
if ((txnid.valid))
{
vector<LBID_t> lbidList;
fDbrm->getUncommittedExtentLBIDs(static_cast<VER_t>(txnid.id), lbidList);
bool cpInvalidated = false;
//cout << "get a valid txnid " << txnid.id << " and stmt is " << stmt << " and isBachinsert is " << cpackage.get_isBatchInsert() << endl;
if ((stmt == "COMMIT") && (cpackage.get_isBatchInsert()))
{
//update syscolumn for the next value
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
CalpontSystemCatalog::TableName tableName;
tableName = systemCatalogPtr->tableName(cpackage.getTableOid());
try
{
uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
if (nextVal != AUTOINCR_SATURATED) //need to update syscolumn
{
//get autoincrement column oid
int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
//get the current nextVal from controller
scoped_ptr<DBRM> aDbrm(new DBRM());
uint64_t nextValInController = 0;
bool validNextVal = false;
aDbrm->getAILock(columnOid);
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); //in case it has changed
validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);
if ((validNextVal) && (nextValInController > nextVal))
{
fWEClient->removeQueue(uniqueId);
queRemoved = true;
WE_DDLCommandClient ddlClient;
uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController);
//@bug 5894. Need release lock.
aDbrm->releaseAILock(columnOid);
if (rc != 0)
throw std::runtime_error("Error in UpdateSyscolumnNextval");
}
else
aDbrm->releaseAILock(columnOid);
}
}
catch (std::exception& ex)
{
//Rollback transaction
rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
fSessionManager.rolledback( txnid );
throw std::runtime_error(ex.what());
}
//systemCatalogPtr->updateColinfoCache(nextValMap);
int weRc = 0;
if (cpackage.get_isAutocommitOn())
{
weRc = commitBatchAutoOnTransaction(uniqueId, txnid, cpackage.getTableOid(), errorMsg);
if (weRc != 0)
BRM::errString(weRc, errorMsg);
cpInvalidated = true;
}
else
{
weRc = fDbrm->vbCommit(txnid.id);
if (weRc != 0)
BRM::errString(weRc, errorMsg);
//weRc = commitBatchAutoOffTransaction(uniqueId, txnid, cpackage.getTableOid(), errorMsg);
}
if (weRc != 0)
{
throw std::runtime_error(errorMsg);
}
logging::logCommand(cpackage.get_SessionID(), txnid.id, "COMMIT;");
fSessionManager.committed( txnid );
//cout << "releasing transaction id for batchinsert" << txnid.id << endl;
}
else if (stmt == "COMMIT")
{
//cout << "success in commitTransaction" << endl;
//update syscolumn for the next value
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
CalpontSystemCatalog::TableName tableName;
uint32_t tableOid = cpackage.getTableOid();
std::vector<TableLockInfo> tableLocks = fDbrm->getAllTableLocks();
if (tableOid == 0) //special case: transaction commit for autocommit off and not following a dml statement immediately
{
TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
TablelockData::OIDTablelock tablelockMap = tablelockData->getOidTablelockMap();
TablelockData::OIDTablelock::iterator iter;
if (!tablelockMap.empty())
{
for ( unsigned k = 0; k < tableLocks.size(); k++)
{
iter = tablelockMap.find(tableLocks[k].tableOID);
if ( iter != tablelockMap.end() )
{
tableName = systemCatalogPtr->tableName(tableLocks[k].tableOID);
try
{
uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
if (nextVal != AUTOINCR_SATURATED) //neet to update syscolumn
{
//get autoincrement column oid
int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
//get the current nextVal from controller
scoped_ptr<DBRM> aDbrm(new DBRM());
uint64_t nextValInController = 0;
bool validNextVal = false;
aDbrm->getAILock(columnOid);
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); //in case it has changed
validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);
if ((validNextVal) && (nextValInController > (uint64_t)nextVal))
{
fWEClient->removeQueue(uniqueId);
queRemoved = true;
WE_DDLCommandClient ddlClient;
uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController, fSessionID);
aDbrm->releaseAILock(columnOid);
if (rc != 0)
{
//for now
fSessionManager.committed( txnid );
throw std::runtime_error("Error in UpdateSyscolumnNextval");
}
}
else
aDbrm->releaseAILock(columnOid);
}
}
catch (std::exception& ex)
{
//Rollback transaction, release tablelock
rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
fDbrm->releaseTableLock(iter->second);
fSessionManager.rolledback( txnid );
throw std::runtime_error(ex.what());
}
}
}
}
}
else
{
if (tableOid >= 3000)
{
tableName = systemCatalogPtr->tableName(tableOid);
try
{
uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
if (nextVal != AUTOINCR_SATURATED) //need to update syscolumn
{
//get autoincrement column oid
int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
//get the current nextVal from controller
scoped_ptr<DBRM> aDbrm(new DBRM());
uint64_t nextValInController = 0;
bool validNextVal = false;
aDbrm->getAILock(columnOid);
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); //in case it has changed
validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);
if ((validNextVal) && (nextValInController > (uint64_t)nextVal))
{
fWEClient->removeQueue(uniqueId);
queRemoved = true;
WE_DDLCommandClient ddlClient;
uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController, fSessionID);
aDbrm->releaseAILock(columnOid);
if (rc != 0)
{
//for now
fSessionManager.committed( txnid );
throw std::runtime_error("Error in UpdateSyscolumnNextval");
}
}
else
aDbrm->releaseAILock(columnOid);
}
}
catch (std::exception& ex)
{
//Rollback transaction, release tablelock
rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
for ( unsigned k = 0; k < tableLocks.size(); k++)
{
if ( tableLocks[k].tableOID == tableOid )
{
try
{
fDbrm->releaseTableLock(tableLocks[k].id);
}
catch (std::exception&)
{}
}
}
fSessionManager.rolledback( txnid );
throw std::runtime_error(ex.what());
}
}
}
int weRc = commitTransaction(uniqueId, txnid );
logging::logCommand(cpackage.get_SessionID(), txnid.id, "COMMIT;");
if (weRc != WriteEngine::NO_ERROR)
{
//cout << "Got an error in commitTransaction" << endl;
WErrorCodes ec;
ostringstream oss;
oss << "COMMIT failed: " << ec.errorString(weRc);
throw std::runtime_error(oss.str());
}
fSessionManager.committed( txnid );
//cout << "commit releasing transaction id " << txnid.id << endl;
}
else if ((stmt == "ROLLBACK") && (cpackage.get_isBatchInsert()))
{
int weRc = 0;
//version rollback, Bulkrollback
weRc = rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
if (weRc == 0)
{
//@Bug 4560 invalidate cp first as bulkrollback will truncate the newly added lbids.
fDbrm->invalidateUncommittedExtentLBIDs(0, &lbidList);
cpInvalidated = true;
weRc = rollBackBatchAutoOnTransaction(uniqueId, txnid, fSessionID, cpackage.getTableOid(), errorMsg);
}
else
{
throw std::runtime_error(errorMsg);
}
logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;");
if (weRc != 0)
{
//@Bug 4524. Don't set to readonly. Just error out.
WErrorCodes ec;
ostringstream oss;
oss << "ROLLBACK batch insert failed due to: " << "(" << weRc << ")" << ec.errorString(weRc);
//Log to error log
logging::Message::Args args1;
logging::Message message1(2);
args1.add(oss.str());
message1.format( args1 );
ml.logErrorMessage( message1 );
throw std::runtime_error(oss.str());
}
fSessionManager.rolledback( txnid );
//cout << "batch rollback releasing transaction id " << txnid.id << endl;
}
else if (stmt == "ROLLBACK")
{
std::string errorMsg("");
logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;");
int weRc = rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
if (weRc != 0)
{
//cout << "Rollback failed" << endl;
//@Bug 4524. Don't set to readonly. Just error out.
ostringstream oss;
oss << "ROLLBACK failed due to: " << errorMsg;
//Log to error log
logging::Message::Args args2;
logging::Message message2(2);
args2.add(oss.str());
message2.format( args2 );
ml.logErrorMessage( message2 );
throw std::runtime_error(oss.str());
}
fSessionManager.rolledback( txnid );
//cout << "Rollback releasing transaction id " << txnid.id << endl;
}
if (!cpInvalidated)
{
// The code below assumes that in case of COMMIT all ranges for all touched LBIDs
// are either correctly set or correctly reset.
// It is also assumes that ROLLBACK or other operations but COMMIT may not return ranges
// to state that is correct. This is why we invalidate extents when we are not committing.
if (stmt != "COMMIT")
{
fDbrm->invalidateUncommittedExtentLBIDs(0, &lbidList);
}
}
}
}
else if (stmt == "CLEANUP")
{
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog
(cpackage.get_SessionID());
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog
(cpackage.get_SessionID() | 0x80000000);
}
else if (stmt == "VIEWTABLELOCK")
{
viewTableLock( cpackage, result );
}
else if (stmt == "CLEARTABLELOCK")
{
clearTableLock( uniqueId, cpackage, result );
}
else if ( !cpackage.get_Logging())
{
BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID());
logging::logDML(cpackage.get_SessionID(), txnid.id, cpackage.get_DMLStatement() + ";", cpackage.get_SchemaName());
SQLLogger sqlLogger(cpackage.get_DMLStatement(), DMLLoggingId, fSessionID, txnid.id);
//cout << "commandpackageprocessor Logging " << cpackage.get_DMLStatement()+ ";" << endl;
}
else
{
std::string err = "Unknown command.";
SUMMARY_INFO(err);
throw std::runtime_error(err);
}
}
catch ( logging::IDBExcept& noTable) //@Bug 2606 catch no table found exception
{
cerr << "CommandPackageProcessor::processPackage: " << noTable.what() << endl;
result.result = COMMAND_ERROR;
result.message = Message(noTable.what());
}
catch (std::exception& ex)
{
cerr << "CommandPackageProcessor::processPackage: " << ex.what() << endl;
logging::Message::Args args;
logging::Message message(1);
args.add( ex.what() );
args.add("");
args.add("");
message.format( args );
result.result = COMMAND_ERROR;
result.message = message;
}
catch (...)
{
cerr << "CommandPackageProcessor::processPackage: caught unknown exception!" << endl;
logging::Message::Args args;
logging::Message message(1);
args.add( "Command Failed: ");
args.add( "encountered unkown exception" );
args.add("");
args.add("");
message.format( args );
result.result = COMMAND_ERROR;
result.message = message;
}
if (!queRemoved)
fWEClient->removeQueue(uniqueId);
//release table lock
if ((result.result == NO_ERROR) && ((stmt == "COMMIT") || (stmt == "ROLLBACK")) )
{
TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
TablelockData::OIDTablelock tablelockMap = tablelockData->getOidTablelockMap();
bool lockReleased = true;
if (!tablelockMap.empty())
{
TablelockData::OIDTablelock::iterator it = tablelockMap.begin();
while (it != tablelockMap.end())
{
try
{
lockReleased = fDbrm->releaseTableLock(it->second);
//cout << "releasing tablelock " << it->second << endl;
}
catch (std::exception& ex)
{
logging::Message::Args args;
logging::Message message(1);
args.add( ex.what() );
args.add("");
args.add("");
message.format( args );
result.result = COMMAND_ERROR;
result.message = message;
}
if (!lockReleased) //log an error
{
ostringstream os;
os << "tablelock for table oid " << it->first << " is not found";
logging::Message::Args args;
logging::Message message(1);
args.add( os.str());
args.add("");
args.add("");
message.format( args );
logging::LoggingID lid(21);
logging::MessageLog ml(lid);
ml.logErrorMessage(message);
}
//cout << "tablelock " << it->second << " is released" << endl;
it++;
}
//@Bug 3557. Clean tablelock cache after commit/rollback.
TablelockData::removeTablelockData(cpackage.get_SessionID());
}
}
VERBOSE_INFO("Finished processing Command DML Package");
//LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
logging::Message::Args args2;
logging::Message msg1(1);
if (stmt != "CLEANUP")
{
args2.add("End SQL statement");
msg1.format( args2 );
//Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg1, logid);
}
return result;
}
//------------------------------------------------------------------------------
// Process viewtablelock command; return table lock information for the
// specified table.
// This function closely resembles printTableLocks in viewtablelock.cpp.
//------------------------------------------------------------------------------
void CommandPackageProcessor::viewTableLock(
const dmlpackage::CalpontDMLPackage& cpackage,
DMLPackageProcessor::DMLResult& result)
{
// Initialize System Catalog object used to get table name
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::TableName tableName;
tableName.schema = cpackage.get_SchemaName();
tableName.table = cpackage.get_TableName();
execplan::CalpontSystemCatalog::ROPair roPair;
roPair = systemCatalogPtr->tableRID( tableName );
// Get list of table locks for the requested table
std::vector<BRM::TableLockInfo> tableLocks;
tableLocks = fDbrm->getAllTableLocks();
// Make preliminary pass through the table locks in order to determine our
// output column widths based on the data. Min column widths are based on
// the width of the column heading (except for the 'state' column).
uint64_t maxLockID = 0;
uint32_t maxPID = 0;
int32_t maxSessionID = 0;
int32_t minSessionID = 0;
int32_t maxTxnID = 0;
unsigned int lockIDColumnWidth = 6; // "LockID"
unsigned int ownerColumnWidth = 7; // "Process"
unsigned int pidColumnWidth = 3; // "PID"
unsigned int sessionIDColumnWidth = 7; // "Session"
unsigned int txnIDColumnWidth = 3; // "Txn"
unsigned int createTimeColumnWidth = 12; // "CreationTime"
unsigned int pmColumnWidth = 7; // "DBRoots"
std::vector<std::string> createTimes;
char cTimeBuffer[1024];
for (unsigned idx = 0; idx < tableLocks.size(); idx++)
{
if (tableLocks[idx].id > maxLockID)
maxLockID = tableLocks[idx].id;
if (tableLocks[idx].ownerName.length() > ownerColumnWidth)
ownerColumnWidth = tableLocks[idx].ownerName.length();
if (tableLocks[idx].ownerPID > maxPID)
maxPID = tableLocks[idx].ownerPID;
if (tableLocks[idx].ownerSessionID > maxSessionID)
maxSessionID = tableLocks[idx].ownerSessionID;
if (tableLocks[idx].ownerSessionID < minSessionID)
minSessionID = tableLocks[idx].ownerSessionID;
if (tableLocks[idx].ownerTxnID > maxTxnID)
maxTxnID = tableLocks[idx].ownerTxnID;
ctime_r( &tableLocks[idx].creationTime, cTimeBuffer );
cTimeBuffer[ strlen(cTimeBuffer) - 1 ] = '\0'; // strip trailing '\n'
std::string cTimeStr( cTimeBuffer );
if (cTimeStr.length() > createTimeColumnWidth)
createTimeColumnWidth = cTimeStr.length();
createTimes.push_back( cTimeStr );
std::ostringstream pms; //It is dbroots now
for (unsigned k = 0; k < tableLocks[idx].dbrootList.size(); k++)
{
if (k > 0)
pms << ',';
pms << tableLocks[idx].dbrootList[k];
}
if (pms.str().length() > pmColumnWidth)
pmColumnWidth = pms.str().length();
}
ownerColumnWidth += 2;
pmColumnWidth += 2;
createTimeColumnWidth += 2;
std::ostringstream idString;
idString << maxLockID;
if (idString.str().length() > lockIDColumnWidth)
lockIDColumnWidth = idString.str().length();
lockIDColumnWidth += 2;
std::ostringstream pidString;
pidString << maxPID;
if (pidString.str().length() > pidColumnWidth)
pidColumnWidth = pidString.str().length();
pidColumnWidth += 2;
const std::string sessionNoneStr("BulkLoad");
std::ostringstream sessionString;
sessionString << maxSessionID;
if (sessionString.str().length() > sessionIDColumnWidth)
sessionIDColumnWidth = sessionString.str().length();
if ((minSessionID < 0) &&
(sessionNoneStr.length() > sessionIDColumnWidth))
sessionIDColumnWidth = sessionNoneStr.length();
sessionIDColumnWidth += 2;
const std::string txnNoneStr("n/a");
std::ostringstream txnString;
txnString << maxTxnID;
if (txnString.str().length() > txnIDColumnWidth)
txnIDColumnWidth = txnString.str().length();
txnIDColumnWidth += 2;
// Make second pass through the table locks to build our result.
// Keep in mind the same table could have more than 1 lock
// (on different PMs), so we don't exit loop after "first" match.
bool found = false;
ostringstream os;
for (unsigned idx = 0; idx < tableLocks.size(); idx++)
{
if (roPair.objnum == (CalpontSystemCatalog::OID)
tableLocks[idx].tableOID)
{
std::ostringstream pms;
for (unsigned k = 0; k < tableLocks[idx].dbrootList.size(); k++)
{
if (k > 0)
pms << ',';
pms << tableLocks[idx].dbrootList[k];
}
if (found) // write newline between lines
{
os << endl;
}
else // write the column headers before the first entry
{
os.setf(ios::left, ios::adjustfield);
os << setw(lockIDColumnWidth) << "LockID" <<
setw(ownerColumnWidth) << "Process" <<
setw(pidColumnWidth) << "PID" <<
setw(sessionIDColumnWidth) << "Session" <<
setw(txnIDColumnWidth) << "Txn" <<
setw(createTimeColumnWidth) << "CreationTime" <<
setw(9) << "State" <<
setw(pmColumnWidth) << "DBRoots" << endl;
}
os << " " <<
setw(lockIDColumnWidth) << tableLocks[idx].id <<
setw(ownerColumnWidth) << tableLocks[idx].ownerName <<
setw(pidColumnWidth) << tableLocks[idx].ownerPID;
// Log session ID, or "BulkLoad" if session is -1
if (tableLocks[idx].ownerSessionID < 0)
os << setw(sessionIDColumnWidth) << sessionNoneStr;
else
os << setw(sessionIDColumnWidth) <<
tableLocks[idx].ownerSessionID;
// Log txn ID, or "n/a" if txn is -1
if (tableLocks[idx].ownerTxnID < 0)
os << setw(txnIDColumnWidth) << txnNoneStr;
else
os << setw(txnIDColumnWidth) <<
tableLocks[idx].ownerTxnID;
os << setw(createTimeColumnWidth) <<
createTimes[idx] <<
setw(9) << ((tableLocks[idx].state == BRM::LOADING) ?
"LOADING" : "CLEANUP") <<
setw(pmColumnWidth) << pms.str();
found = true;
} // end of displaying a table lock match
} // end of loop through all table locks
if (!found)
{
os << " Table " << tableName.schema << "." <<
tableName.table << " is not locked by any process.";
}
result.tableLockInfo = os.str();
}
//------------------------------------------------------------------------------
// Process cleartablelock command; execute bulk rollback and release table lock
// for the specified table.
//------------------------------------------------------------------------------
void CommandPackageProcessor::clearTableLock( uint64_t uniqueId,
const dmlpackage::CalpontDMLPackage& cpackage,
DMLPackageProcessor::DMLResult& result)
{
CalpontSystemCatalog::TableName tableName;
tableName.schema = cpackage.get_SchemaName();
tableName.table = cpackage.get_TableName ();
// Get the Table lock ID that is passed in the SQL statement attribute.
// This is a kludge we may want to consider changing. Could add a table
// lock ID attribute to the CalpontDMLPackage object.
std::istringstream lockIDString( cpackage.get_SQLStatement() );
uint64_t tableLockID;
lockIDString >> tableLockID;
//----------------------------------------------------- start of syslog code
//
// Log initiation of cleartablelock to syslog
//
const std::string APPLNAME("cleartablelock SQL cmd");
const int SUBSYSTEM_ID = 21; // dmlpackageproc
const int INIT_MSG_NUM = logging::M0088;
logging::Message::Args msgArgs;
logging::Message logMsg1( INIT_MSG_NUM );
msgArgs.add( APPLNAME );
msgArgs.add( tableName.toString() );
msgArgs.add( tableLockID );
logMsg1.format( msgArgs );
logging::LoggingID lid( SUBSYSTEM_ID );
logging::MessageLog ml( lid );
ml.logInfoMessage( logMsg1 );
//------------------------------------------------------- end of syslog code
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
messageqcpp::ByteStream bsOut;
bool lockGrabbed = false;
bool bErrFlag = false;
bool bRemoveMetaErrFlag = false;
std::ostringstream combinedErrMsg;
try
{
// Make sure BRM is in READ-WRITE state before starting
int brmRc = fDbrm->isReadWrite( );
if (brmRc != BRM::ERR_OK)
{
std::string brmErrMsg;
BRM::errString( brmRc, brmErrMsg );
std::ostringstream oss;
oss << "Failed BRM status check: " << brmErrMsg;
throw std::runtime_error( oss.str() );
}
BRM::TableLockInfo lockInfo;
establishTableLockToClear( tableLockID, lockInfo );
lockGrabbed = true;
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
oam::OamCache::dbRootPMMap_t dbRootPmMap = oamCache->getDBRootToPMMap();
std::map<int, int>::const_iterator mapIter;
std::set<int> pmSet;
// Construct relevant list of PMs based on the DBRoots associated
// with the tableLock.
for (unsigned int k = 0; k < lockInfo.dbrootList.size(); k++)
{
mapIter = dbRootPmMap->find( lockInfo.dbrootList[k] );
if (mapIter != dbRootPmMap->end())
{
int pm = mapIter->second;
pmSet.insert( pm );
}
else
{
std::ostringstream oss;
oss << "DBRoot " << lockInfo.dbrootList[k] <<
" does not map to a PM. Cannot perform rollback";
throw std::runtime_error( oss.str() );
}
}
std::vector<int> pmList;
for (std::set<int>::const_iterator setIter = pmSet.begin();
setIter != pmSet.end();
++setIter)
{
pmList.push_back( *setIter );
}
std::cout << "cleartablelock rollback for table lock " << tableLockID <<
" being forwarded to PM(s): ";
for (unsigned int k = 0; k < pmList.size(); k++)
{
if (k > 0)
std::cout << ", ";
std::cout << pmList[k];
}
std::cout << std::endl;
// Perform bulk rollback if state is in LOADING state
if (lockInfo.state == BRM::LOADING)
{
fWEClient->addQueue(uniqueId);
//------------------------------------------------------------------
// Send rollback msg to the writeengine server for every PM.
// We send to each PM, instead of just to PMs in the tablelock's
// PM list, just in case a DBRoot has been moved to a different PM.
//------------------------------------------------------------------
// std::cout << "cleartablelock rollback: tableLock-" << tableLockID <<
// ": uniqueID-" << uniqueId <<
// ": oid-" << lockInfo.tableOID <<
// "; name-" << tableName.toString() <<
// "; app-" << APPLNAME << std::endl;
bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK;
bsOut << uniqueId;
bsOut << tableLockID;
bsOut << lockInfo.tableOID;
bsOut << tableName.toString();
bsOut << APPLNAME;
for (unsigned j = 0; j < pmList.size(); j++)
{
fWEClient->write(bsOut, pmList[j]);
}
// Wait for "all" the responses, and accumulate any/all errors
unsigned int pmMsgCnt = 0;
while (pmMsgCnt < pmList.size())
{
std::string rollbackErrMsg;
bsIn.reset(new messageqcpp::ByteStream());
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0)
{
bRemoveMetaErrFlag = true;
if (combinedErrMsg.str().length() > 0)
combinedErrMsg << std::endl;
combinedErrMsg << "Network error, PM rollback; ";
}
else
{
messageqcpp::ByteStream::byte rc;
uint16_t pmNum;
*bsIn >> rc;
*bsIn >> rollbackErrMsg;
*bsIn >> pmNum;
// std::cout << "cleartablelock rollback response from PM"<<
// pmNum << "; rc-" << (int)rc <<
// "; errMsg: {" << rollbackErrMsg << '}' << std::endl;
if (rc != 0)
{
bRemoveMetaErrFlag = true;
if (combinedErrMsg.str().empty())
combinedErrMsg << "Rollback error; ";
else
combinedErrMsg << std::endl;
combinedErrMsg << "[PM" << pmNum << "] " <<
rollbackErrMsg;
}
}
pmMsgCnt++;
} // end of while loop to process all responses to bulk rollback
// If no errors so far, then change state to CLEANUP state.
// We ignore the return stateChange flag.
if (!bErrFlag)
{
fDbrm->changeState( tableLockID, BRM::CLEANUP );
}
} // end of (lockInfo.state == BRM::LOADING)
// If no errors so far, then:
// 1. delete meta data backup rollback files
// 2. finally release table lock
if (!bErrFlag)
{
bsOut.reset();
//------------------------------------------------------------------
// Delete meta data backup rollback files
//------------------------------------------------------------------
// std::cout << "cleartablelock cleanup: uniqueID-" << uniqueId <<
// ": oid-" << lockInfo.tableOID << std::endl;
bsOut << (messageqcpp::ByteStream::byte)
WE_SVR_DML_BULKROLLBACK_CLEANUP;
bsOut << uniqueId;
bsOut << lockInfo.tableOID;
for (unsigned j = 0; j < pmList.size(); j++)
{
fWEClient->write(bsOut, pmList[j]);
}
// Wait for "all" the responses, and accumulate any/all errors
unsigned int pmMsgCnt = 0;
while (pmMsgCnt < pmList.size())
{
std::string fileDeleteErrMsg;
bsIn.reset(new messageqcpp::ByteStream());
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0)
{
bRemoveMetaErrFlag = true;
if (combinedErrMsg.str().length() > 0)
combinedErrMsg << std::endl;
combinedErrMsg << "Network error, PM rollback cleanup; ";
}
else
{
messageqcpp::ByteStream::byte rc;
uint16_t pmNum;
*bsIn >> rc;
*bsIn >> fileDeleteErrMsg;
*bsIn >> pmNum;
// std::cout << "cleartablelock cleanup response from PM" <<
// pmNum << "; rc-" << (int)rc <<
// "; errMsg: {" << fileDeleteErrMsg << '}' << std::endl;
if (rc != 0)
{
bRemoveMetaErrFlag = true;
if (combinedErrMsg.str().empty())
combinedErrMsg << "Cleanup error; ";
else
combinedErrMsg << std::endl;
combinedErrMsg << "[PM" << pmNum << "] " <<
fileDeleteErrMsg;
}
}
pmMsgCnt++;
} // end of while loop to process all responses to rollback cleanup
// We ignore return release flag from releaseTableLock().
if (!bErrFlag)
{
fDbrm->releaseTableLock( tableLockID );
}
}
}
catch (std::exception& ex)
{
bErrFlag = true;
combinedErrMsg << ex.what();
}
if (!bErrFlag)
{
std::ostringstream oss;
oss << "Table lock " << tableLockID << " for table " <<
tableName.toString() << " is cleared.";
//@Bug 4517. Release tablelock if remove meta files failed.
if (bRemoveMetaErrFlag)
{
oss << " Warning: " << combinedErrMsg.str();
}
result.tableLockInfo = oss.str();
}
else
{
std::ostringstream oss;
oss << "Table lock " << tableLockID << " for table " <<
tableName.toString() << " cannot be cleared. " <<
combinedErrMsg.str();
result.tableLockInfo = oss.str();
}
// Remove tableLockID out of the active cleartableLock command list
if (lockGrabbed)
{
boost::mutex::scoped_lock lock( fActiveClearTableLockCmdMutex );
fActiveClearTableLockCmds.erase( tableLockID );
}
//----------------------------------------------------- start of syslog code
//
// Log completion of cleartablelock to syslog
//
const int COMP_MSG_NUM = logging::M0089;
msgArgs.reset();
logging::Message logMsg2( COMP_MSG_NUM );
msgArgs.add( APPLNAME );
msgArgs.add( tableName.toString() );
msgArgs.add( tableLockID );
std::string finalStatus;
if (!bErrFlag)
{
finalStatus = "Completed successfully";
}
else
{
finalStatus = "Encountered errors: ";
finalStatus += combinedErrMsg.str();
}
msgArgs.add( finalStatus );
logMsg2.format( msgArgs );
ml.logInfoMessage( logMsg2 );
//------------------------------------------------------- end of syslog code
}
//------------------------------------------------------------------------------
// Get/return the current tablelock info for tableLockID, and reassign the table
// lock to ourselves if we can. If the lock is still in use by another process
// or by another DML cleartablelock thread, then we error out with exception.
//------------------------------------------------------------------------------
void CommandPackageProcessor::establishTableLockToClear( uint64_t tableLockID,
BRM::TableLockInfo& lockInfo )
{
boost::mutex::scoped_lock lock( fActiveClearTableLockCmdMutex );
// Get current table lock info
bool getLockInfo = fDbrm->getTableLockInfo(tableLockID, &lockInfo);
if (!getLockInfo)
{
throw std::runtime_error( std::string("Lock does not exist.") );
}
std::string processName("DMLProc clearTableLock");
uint32_t processID = ::getpid();
// See if another thread is executing a cleartablelock cmd for this table
if ((lockInfo.ownerName == processName) &&
(lockInfo.ownerPID == processID))
{
std::set<uint64_t>::const_iterator it =
fActiveClearTableLockCmds.find( tableLockID );
if (it != fActiveClearTableLockCmds.end())
{
throw std::runtime_error( std::string( "Lock in use. "
"DML is executing another cleartablelock MySQL cmd.") );
}
}
else
{
// Take over ownership of stale lock.
// Use "DMLProc clearTableLock" as process name to differentiate
// from a DMLProc lock used for inserts, updates, etc.
int32_t sessionID = fSessionID;
int32_t txnid = -1;
bool ownerChanged = fDbrm->changeOwner(
tableLockID, processName, processID, sessionID, txnid);
if (!ownerChanged)
{
throw std::runtime_error( std::string(
"Unable to grab lock; lock not found or still in use.") );
}
}
// Add this cleartablelock command to the list of active cleartablelock cmds
fActiveClearTableLockCmds.insert( tableLockID );
}
} // namespace dmlpackageprocessor