1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-5352 Properly handle PP restart for DML/DDL operations.

This commit is contained in:
Denis Khalikov
2024-02-01 13:27:01 +00:00
committed by Leonid Fedorov
parent d3896efb0f
commit 7f14dae5c7
31 changed files with 726 additions and 401 deletions

View File

@ -299,8 +299,8 @@ bool comptypesAreCompat(int oldCtype, int newCtype)
namespace ddlpackageprocessor
{
AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
ddlpackage::AlterTableStatement& alterTableStmt)
AlterTableProcessor::DDLResult AlterTableProcessor::processPackageInternal(
ddlpackage::SqlStatement* sqlTableStmt)
{
SUMMARY_INFO("AlterTableProcessor::processPackage");
@ -311,6 +311,20 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
result.result = NO_ERROR;
std::string err;
uint64_t tableLockId = 0;
auto* alterTableStmt = dynamic_cast<AlterTableStatement*>(sqlTableStmt);
if (!alterTableStmt)
{
logging::Message::Args args;
logging::Message message(9);
args.add("AlterTableStatement wrong cast");
message.format(args);
result.result = ALTER_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
DETAIL_INFO(alterTableStmt);
int rc = 0;
rc = fDbrm->isReadWrite();
@ -328,8 +342,8 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
}
//@Bug 4538. Log the sql statement before grabbing tablelock
string stmt = alterTableStmt.fSql + "|" + (alterTableStmt.fTableName)->fSchema + "|";
SQLLogger logger(stmt, fDDLLoggingId, alterTableStmt.fSessionID, txnID.id);
string stmt = alterTableStmt->fSql + "|" + (alterTableStmt->fTableName)->fSchema + "|";
SQLLogger logger(stmt, fDDLLoggingId, alterTableStmt->fSessionID, txnID.id);
VERBOSE_INFO("Getting current txnID");
OamCache* oamcache = OamCache::makeOamCache();
@ -370,18 +384,18 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
{
// check table lock
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(alterTableStmt.fSessionID);
CalpontSystemCatalog::makeCalpontSystemCatalog(alterTableStmt->fSessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
systemCatalogPtr->sessionID(alterTableStmt.fSessionID);
systemCatalogPtr->sessionID(alterTableStmt->fSessionID);
CalpontSystemCatalog::TableName tableName;
tableName.schema = (alterTableStmt.fTableName)->fSchema;
tableName.table = (alterTableStmt.fTableName)->fName;
tableName.schema = (alterTableStmt->fTableName)->fSchema;
tableName.table = (alterTableStmt->fTableName)->fName;
execplan::CalpontSystemCatalog::ROPair roPair;
roPair = systemCatalogPtr->tableRID(tableName);
uint32_t processID = ::getpid();
int32_t txnid = txnID.id;
int32_t sessionId = alterTableStmt.fSessionID;
int32_t sessionId = alterTableStmt->fSessionID;
std::string processName("DDLProc");
int i = 0;
@ -428,7 +442,7 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
{
processID = ::getpid();
txnid = txnID.id;
sessionId = alterTableStmt.fSessionID;
sessionId = alterTableStmt->fSessionID;
;
processName = "DDLProc";
tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid,
@ -455,7 +469,7 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
}
}
ddlpackage::AlterTableActionList actionList = alterTableStmt.fActions;
ddlpackage::AlterTableActionList actionList = alterTableStmt->fActions;
AlterTableActionList::const_iterator action_iterator = actionList.begin();
while (action_iterator != actionList.end())
@ -480,7 +494,7 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
columnDefPtr = addColumns.fColumns[0];
}
addColumn(alterTableStmt.fSessionID, txnID.id, result, columnDefPtr, *(alterTableStmt.fTableName),
addColumn(alterTableStmt->fSessionID, txnID.id, result, columnDefPtr, *(alterTableStmt->fTableName),
uniqueId);
if (result.result != NO_ERROR)
@ -492,9 +506,9 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
else if (s.find(AlterActionString[6]) != string::npos)
{
// Drop Column Default
dropColumnDefault(alterTableStmt.fSessionID, txnID.id, result,
dropColumnDefault(alterTableStmt->fSessionID, txnID.id, result,
*(dynamic_cast<AtaDropColumnDefault*>(*action_iterator)),
*(alterTableStmt.fTableName), uniqueId);
*(alterTableStmt->fTableName), uniqueId);
if (result.result != NO_ERROR)
{
@ -505,15 +519,16 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
else if (s.find(AlterActionString[3]) != string::npos)
{
// Drop Columns
dropColumns(alterTableStmt.fSessionID, txnID.id, result,
*(dynamic_cast<AtaDropColumns*>(*action_iterator)), *(alterTableStmt.fTableName),
dropColumns(alterTableStmt->fSessionID, txnID.id, result,
*(dynamic_cast<AtaDropColumns*>(*action_iterator)), *(alterTableStmt->fTableName),
uniqueId);
}
else if (s.find(AlterActionString[2]) != string::npos)
{
// Drop a column
dropColumn(alterTableStmt.fSessionID, txnID.id, result,
*(dynamic_cast<AtaDropColumn*>(*action_iterator)), *(alterTableStmt.fTableName), uniqueId);
dropColumn(alterTableStmt->fSessionID, txnID.id, result,
*(dynamic_cast<AtaDropColumn*>(*action_iterator)), *(alterTableStmt->fTableName),
uniqueId);
}
#if 0
@ -528,9 +543,9 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
else if (s.find(AlterActionString[5]) != string::npos)
{
// Set Column Default
setColumnDefault(alterTableStmt.fSessionID, txnID.id, result,
setColumnDefault(alterTableStmt->fSessionID, txnID.id, result,
*(dynamic_cast<AtaSetColumnDefault*>(*action_iterator)),
*(alterTableStmt.fTableName), uniqueId);
*(alterTableStmt->fTableName), uniqueId);
}
#if 0
@ -546,23 +561,23 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
else if (s.find(AlterActionString[8]) != string::npos)
{
// Rename Table
renameTable(alterTableStmt.fSessionID, txnID.id, result,
*(dynamic_cast<AtaRenameTable*>(*action_iterator)), *(alterTableStmt.fTableName),
renameTable(alterTableStmt->fSessionID, txnID.id, result,
*(dynamic_cast<AtaRenameTable*>(*action_iterator)), *(alterTableStmt->fTableName),
uniqueId);
}
else if (s.find(AlterActionString[10]) != string::npos)
{
// Rename a Column
renameColumn(alterTableStmt.fSessionID, txnID.id, result,
*(dynamic_cast<AtaRenameColumn*>(*action_iterator)), *(alterTableStmt.fTableName),
renameColumn(alterTableStmt->fSessionID, txnID.id, result,
*(dynamic_cast<AtaRenameColumn*>(*action_iterator)), *(alterTableStmt->fTableName),
uniqueId);
}
else if (s.find(AlterActionString[11]) != string::npos)
{
// Table Comment
tableComment(alterTableStmt.fSessionID, txnID.id, result,
*(dynamic_cast<AtaTableComment*>(*action_iterator)), *(alterTableStmt.fTableName),
tableComment(alterTableStmt->fSessionID, txnID.id, result,
*(dynamic_cast<AtaTableComment*>(*action_iterator)), *(alterTableStmt->fTableName),
uniqueId);
}
else
@ -574,7 +589,7 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
}
// Log the DDL statement.
logging::logDDL(alterTableStmt.fSessionID, txnID.id, alterTableStmt.fSql, alterTableStmt.fOwner);
logging::logDDL(alterTableStmt->fSessionID, txnID.id, alterTableStmt->fSql, alterTableStmt->fOwner);
DETAIL_INFO("Commiting transaction");
commitTransaction(uniqueId, txnID);
@ -582,11 +597,43 @@ AlterTableProcessor::DDLResult AlterTableProcessor::processPackage(
}
catch (std::exception& ex)
{
rollBackAlter(ex.what(), txnID, alterTableStmt.fSessionID, result, uniqueId);
if (checkPPLostConnection(ex.what()))
{
if (tableLockId)
{
try
{
(void)fDbrm->releaseTableLock(tableLockId);
}
catch (std::exception&)
{
if (result.result == NO_ERROR)
{
logging::Message::Args args;
logging::Message message(1);
args.add("Table lock is not released due to ");
args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
args.add("");
args.add("");
message.format(args);
result.result = ALTER_ERROR;
result.message = message;
return result;
}
}
}
result.result = PP_LOST_CONNECTION;
fWEClient->removeQueue(uniqueId);
return result;
}
else
{
rollBackAlter(ex.what(), txnID, alterTableStmt->fSessionID, result, uniqueId);
}
}
catch (...)
{
rollBackAlter("encountered unknown exception. ", txnID, alterTableStmt.fSessionID, result, uniqueId);
rollBackAlter("encountered unknown exception. ", txnID, alterTableStmt->fSessionID, result, uniqueId);
}
// release table lock