1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-5263 Add support to ROLLBACK when PP were restarted.

DMLProc starts ROLLBACK when SELECT part of UPDATE fails b/c EM facility in PP were restarted.
Unfortunately this ROLLBACK stuck if EM/PP are not yet available.
DMLProc must have a t/o with re-try doing ROLLBACK.
This commit is contained in:
Denis Khalikov
2022-12-07 17:24:45 +03:00
parent e243a5332b
commit d61780cab1
8 changed files with 280 additions and 221 deletions

View File

@ -37,6 +37,8 @@
#include "we_ddlcommandclient.h" #include "we_ddlcommandclient.h"
#include "oamcache.h" #include "oamcache.h"
#include "liboamcpp.h" #include "liboamcpp.h"
#include "resourcemanager.h"
using namespace std; using namespace std;
using namespace WriteEngine; using namespace WriteEngine;
using namespace dmlpackage; using namespace dmlpackage;
@ -370,7 +372,7 @@ DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackage(
int weRc = 0; int weRc = 0;
// version rollback, Bulkrollback // version rollback, Bulkrollback
weRc = rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg); weRc = tryToRollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
if (weRc == 0) if (weRc == 0)
{ {
@ -413,7 +415,7 @@ DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackage(
{ {
std::string errorMsg(""); std::string errorMsg("");
logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;"); logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;");
int weRc = rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg); int weRc = tryToRollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
if (weRc != 0) if (weRc != 0)
{ {

View File

@ -266,6 +266,27 @@ int DMLPackageProcessor::commitTransaction(uint64_t uniqueId, BRM::TxnID txnID)
return rc; return rc;
} }
// Tries to rollback transaction, if network error tries one more time
// MCOL-5263.
int32_t DMLPackageProcessor::tryToRollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
string& errorMsg)
{
auto weRc = rollBackTransaction(uniqueId, txnID, sessionID, errorMsg);
if (weRc)
{
weRc = rollBackTransaction(uniqueId, txnID, sessionID, errorMsg);
if (weRc == 0)
{
// Setup connection in WE with PS.
joblist::ResourceManager* rm = joblist::ResourceManager::instance(true);
joblist::DistributedEngineComm* fEc = joblist::DistributedEngineComm::instance(rm);
weRc = fEc->Setup();
}
}
return weRc;
}
int DMLPackageProcessor::rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID, int DMLPackageProcessor::rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
std::string& errorMsg) std::string& errorMsg)
{ {

View File

@ -249,6 +249,9 @@ class DMLPackageProcessor
EXPORT int rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID, EXPORT int rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
std::string& errorMsg); std::string& errorMsg);
EXPORT int32_t tryToRollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
std::string& errorMsg);
EXPORT int rollBackBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID, EXPORT int rollBackBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
const uint32_t tableOid, std::string& errorMsg); const uint32_t tableOid, std::string& errorMsg);
/** /**

View File

@ -221,7 +221,7 @@ DistributedEngineComm::~DistributedEngineComm()
fInstance = 0; fInstance = 0;
} }
void DistributedEngineComm::Setup() int32_t DistributedEngineComm::Setup()
{ {
// This is here to ensure that this function does not get invoked multiple times simultaneously. // This is here to ensure that this function does not get invoked multiple times simultaneously.
boost::mutex::scoped_lock setupLock(fSetupMutex); boost::mutex::scoped_lock setupLock(fSetupMutex);
@ -309,10 +309,9 @@ void DistributedEngineComm::Setup()
"Could not connect to PMS" + std::to_string(connectionId) + ": " + ex.what(), "Could not connect to PMS" + std::to_string(connectionId) + ": " + ex.what(),
LOG_TYPE_ERROR); LOG_TYPE_ERROR);
if (newPmCount == 0) if (newPmCount == 0)
{
writeToLog(__FILE__, __LINE__, "No more PMs to try to connect to", LOG_TYPE_ERROR); writeToLog(__FILE__, __LINE__, "No more PMs to try to connect to", LOG_TYPE_ERROR);
break;
} return 1;
} }
catch (...) catch (...)
{ {
@ -322,10 +321,8 @@ void DistributedEngineComm::Setup()
writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId), writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId),
LOG_TYPE_ERROR); LOG_TYPE_ERROR);
if (newPmCount == 0) if (newPmCount == 0)
{
writeToLog(__FILE__, __LINE__, "No more PMs to try to connect to", LOG_TYPE_ERROR); writeToLog(__FILE__, __LINE__, "No more PMs to try to connect to", LOG_TYPE_ERROR);
break; return 1;
}
} }
} }
@ -362,6 +359,7 @@ void DistributedEngineComm::Setup()
newLocks.clear(); newLocks.clear();
newClients.clear(); newClients.clear();
return 0;
} }
int DistributedEngineComm::Close() int DistributedEngineComm::Close()
@ -428,8 +426,8 @@ Error:
decltype(pmCount) originalPMCount = pmCount; decltype(pmCount) originalPMCount = pmCount;
// Re-establish if a remote PM restarted. // Re-establish if a remote PM restarted.
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(3));
Setup(); auto rc = Setup();
if (originalPMCount != pmCount) if (rc || originalPMCount != pmCount)
{ {
ostringstream os; ostringstream os;
os << "DEC: lost connection to " << client->addr2String(); os << "DEC: lost connection to " << client->addr2String();
@ -889,7 +887,7 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos
writeToClient(localConnectionId, msg); writeToClient(localConnectionId, msg);
} }
void DistributedEngineComm::write(uint32_t senderID, const SBS& msg) int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
{ {
ISMPacketHeader* ism = (ISMPacketHeader*)msg->buf(); ISMPacketHeader* ism = (ISMPacketHeader*)msg->buf();
uint32_t dest; uint32_t dest;
@ -914,6 +912,7 @@ void DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
entries in the config file point to unique PMs */ entries in the config file point to unique PMs */
{ {
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max(); uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();
int32_t rc = 0;
for (uint32_t i = 0; i < pmCount; ++i) for (uint32_t i = 0; i < pmCount; ++i)
{ {
@ -922,21 +921,25 @@ void DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
localConnectionId = i; localConnectionId = i;
continue; continue;
} }
writeToClient(i, msg, senderID);
rc =writeToClient(i, msg, senderID);
if (rc)
return rc;
} }
if (localConnectionId < fPmConnections.size()) if (localConnectionId < fPmConnections.size())
writeToClient(localConnectionId, msg); rc = writeToClient(localConnectionId, msg);
return rc;
} }
return;
case BATCH_PRIMITIVE_RUN: case BATCH_PRIMITIVE_RUN:
case DICT_TOKEN_BY_SCAN_COMPARE: case DICT_TOKEN_BY_SCAN_COMPARE:
{
// for efficiency, writeToClient() grabs the interleaving factor for the caller, // for efficiency, writeToClient() grabs the interleaving factor for the caller,
// and decides the final connection index because it already grabs the // and decides the final connection index because it already grabs the
// caller's queue information // caller's queue information
dest = ism->Interleave; dest = ism->Interleave;
writeToClient(dest, msg, senderID, true); return writeToClient(dest, msg, senderID, true);
break; }
default: idbassert_s(0, "Unknown message type"); default: idbassert_s(0, "Unknown message type");
} }
@ -946,6 +949,7 @@ void DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
writeToLog(__FILE__, __LINE__, "No PrimProcs are running", LOG_TYPE_DEBUG); writeToLog(__FILE__, __LINE__, "No PrimProcs are running", LOG_TYPE_DEBUG);
throw IDBExcept(ERR_NO_PRIMPROC); throw IDBExcept(ERR_NO_PRIMPROC);
} }
return 0;
} }
void DistributedEngineComm::write(messageqcpp::ByteStream& msg, uint32_t connection) void DistributedEngineComm::write(messageqcpp::ByteStream& msg, uint32_t connection)
@ -1135,16 +1139,14 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
} }
} }
ClientList::value_type client = fPmConnections[connectionId];
try try
{ {
ClientList::value_type client = fPmConnections[connectionId];
if (!client->isAvailable()) if (!client->isAvailable())
return 0; return 0;
std::lock_guard lk(*(fWlock[connectionId])); std::lock_guard lk(*(fWlock[connectionId]));
client->write(bs, NULL, senderStats); client->write(bs, NULL, senderStats);
return 0;
} }
catch (...) catch (...)
{ {
@ -1163,7 +1165,30 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
map_tok->second->queue.push(sbs); map_tok->second->queue.push(sbs);
} }
int tries = 0;
// Try to setup connection with PS, it could be a situation that PS is starting.
// MCOL-5263.
while (tries < 10 && Setup())
{
++tries;
std::this_thread::sleep_for(std::chrono::seconds(3));
}
lk.unlock(); lk.unlock();
if (tries == 10)
{
ostringstream os;
os << "DEC: lost connection to " << client->addr2String();
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_ERROR);
if (!fIsExeMgr)
abort();
else
throw runtime_error("DistributedEngineComm::write: Broken Pipe error");
}
// Connection was established.
return 1;
/* /*
// reconfig the connection array // reconfig the connection array
ClientList tempConns; ClientList tempConns;
@ -1195,8 +1220,8 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
alarmItem.append(" PrimProc"); alarmItem.append(" PrimProc");
alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET); alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
*/ */
throw runtime_error("DistributedEngineComm::write: Broken Pipe error");
} }
return 0;
} }
uint32_t DistributedEngineComm::size(uint32_t key) uint32_t DistributedEngineComm::size(uint32_t key)

View File

@ -146,7 +146,7 @@ class DistributedEngineComm
* Writes a primitive message to a primitive server. Msg needs to conatin an ISMPacketHeader. The * Writes a primitive message to a primitive server. Msg needs to conatin an ISMPacketHeader. The
* LBID is extracted from the ISMPacketHeader and used to determine the actual P/M to send to. * LBID is extracted from the ISMPacketHeader and used to determine the actual P/M to send to.
*/ */
EXPORT void write(uint32_t key, const messageqcpp::SBS& msg); EXPORT int32_t write(uint32_t key, const messageqcpp::SBS& msg);
// EXPORT void throttledWrite(const messageqcpp::ByteStream& msg); // EXPORT void throttledWrite(const messageqcpp::ByteStream& msg);
@ -188,7 +188,7 @@ class DistributedEngineComm
*/ */
EXPORT uint32_t size(uint32_t key); EXPORT uint32_t size(uint32_t key);
EXPORT void Setup(); EXPORT int32_t Setup();
EXPORT void addDECEventListener(DECEventListener*); EXPORT void addDECEventListener(DECEventListener*);
EXPORT void removeDECEventListener(DECEventListener*); EXPORT void removeDECEventListener(DECEventListener*);

View File

@ -497,7 +497,6 @@ void WEClients::write_to_all(const messageqcpp::ByteStream& msg)
} }
ClientList::iterator itor = fPmConnections.begin(); ClientList::iterator itor = fPmConnections.begin();
while (itor != fPmConnections.end()) while (itor != fPmConnections.end())
{ {
if (itor->second != NULL) if (itor->second != NULL)

View File

@ -817,7 +817,6 @@ uint8_t WE_DMLCommandProc::rollbackBlocks(ByteStream& bs, std::string& err)
{ {
int rc = 0; int rc = 0;
uint32_t sessionID, tmp32; uint32_t sessionID, tmp32;
;
int txnID; int txnID;
bs >> sessionID; bs >> sessionID;
bs >> tmp32; bs >> tmp32;

View File

@ -43,6 +43,7 @@ using namespace boost;
#include "we_dctnrycompress.h" #include "we_dctnrycompress.h"
#include "we_simplesyslog.h" #include "we_simplesyslog.h"
#include "we_config.h" #include "we_config.h"
#include "exceptclasses.h"
#include "IDBDataFile.h" #include "IDBDataFile.h"
#include "IDBPolicy.h" #include "IDBPolicy.h"
using namespace idbdatafile; using namespace idbdatafile;
@ -1134,6 +1135,8 @@ int BRMWrapper::rollBackBlocks(const VER_t transID, int sessionId)
std::vector<BRM::FileInfo> files; std::vector<BRM::FileInfo> files;
try
{
for (i = 0; i < lbidList.size(); i++) for (i = 0; i < lbidList.size(); i++)
{ {
verID = (VER_t)transID; verID = (VER_t)transID;
@ -1155,8 +1158,8 @@ int BRMWrapper::rollBackBlocks(const VER_t transID, int sessionId)
// copy buffer back // copy buffer back
// look for the block in extentmap // look for the block in extentmap
// timer.start("lookupLocalEX"); // timer.start("lookupLocalEX");
rc = blockRsltnMgrPtr->lookupLocal(lbidList[i], /*transID*/ verID, false, weOid, weDbRoot, wePartitionNum, rc = blockRsltnMgrPtr->lookupLocal(lbidList[i], /*transID*/ verID, false, weOid, weDbRoot,
weSegmentNum, weFbo); wePartitionNum, weSegmentNum, weFbo);
if (rc != 0) if (rc != 0)
{ {
@ -1176,6 +1179,7 @@ int BRMWrapper::rollBackBlocks(const VER_t transID, int sessionId)
// timer.stop("lookupLocalEX"); // timer.stop("lookupLocalEX");
Column column; Column column;
execplan::CalpontSystemCatalog::ColType colType = systemCatalogPtr->colType(weOid); execplan::CalpontSystemCatalog::ColType colType = systemCatalogPtr->colType(weOid);
columnOids[weOid] = weOid; columnOids[weOid] = weOid;
// This must be a dict oid // This must be a dict oid
@ -1214,8 +1218,8 @@ int BRMWrapper::rollBackBlocks(const VER_t transID, int sessionId)
if (isDebug(DEBUG_3)) if (isDebug(DEBUG_3))
#ifndef __LP64__ #ifndef __LP64__
printf("\n\tuncommitted lbid - lbidList[i]=%lld weOid =%d weFbo=%d verID=%d, weDbRoot=%d", lbidList[i], printf("\n\tuncommitted lbid - lbidList[i]=%lld weOid =%d weFbo=%d verID=%d, weDbRoot=%d",
weOid, weFbo, verID, weDbRoot); lbidList[i], weOid, weFbo, verID, weDbRoot);
#else #else
printf("\n\tuncommitted lbid - lbidList[i]=%ld weOid =%d weFbo=%d verID=%d, weDbRoot=%d", lbidList[i], printf("\n\tuncommitted lbid - lbidList[i]=%ld weOid =%d weFbo=%d verID=%d, weDbRoot=%d", lbidList[i],
@ -1291,10 +1295,10 @@ int BRMWrapper::rollBackBlocks(const VER_t transID, int sessionId)
targetFileInfo.fPartition = wePartitionNum; targetFileInfo.fPartition = wePartitionNum;
targetFileInfo.fSegment = weSegmentNum; targetFileInfo.fSegment = weSegmentNum;
targetFileInfo.fDbRoot = weDbRoot; targetFileInfo.fDbRoot = weDbRoot;
// printf("\n\tsource file info - oid =%d fPartition=%d fSegment=%d, fDbRoot=%d", sourceFileInfo.oid, // printf("\n\tsource file info - oid =%d fPartition=%d fSegment=%d, fDbRoot=%d",
// sourceFileInfo.fPartition, sourceFileInfo.fSegment, sourceFileInfo.fDbRoot); printf("\n\ttarget // sourceFileInfo.oid, sourceFileInfo.fPartition, sourceFileInfo.fSegment, sourceFileInfo.fDbRoot);
// file info - oid =%d fPartition=%d fSegment=%d, fDbRoot=%d", weOid, wePartitionNum, weSegmentNum, // printf("\n\ttarget file info - oid =%d fPartition=%d fSegment=%d, fDbRoot=%d", weOid,
// weDbRoot); // wePartitionNum, weSegmentNum, weDbRoot);
// Check whether the file is on this pm. // Check whether the file is on this pm.
if (column.compressionType != 0) if (column.compressionType != 0)
@ -1363,6 +1367,12 @@ int BRMWrapper::rollBackBlocks(const VER_t transID, int sessionId)
if (rc != NO_ERROR) if (rc != NO_ERROR)
goto cleanup; goto cleanup;
} }
}
// MCOL-5263.
catch (logging::IDBExcept&)
{
return ERR_BRM_NETWORK;
}
// timer.start("vbRollback"); // timer.start("vbRollback");
// rc = blockRsltnMgrPtr->vbRollback(transID, lbidRangeList); // rc = blockRsltnMgrPtr->vbRollback(transID, lbidRangeList);