You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
@ -75,8 +75,9 @@ void cleanPMSysCache()
|
||||
class PackageHandler
|
||||
{
|
||||
public:
|
||||
PackageHandler(QueryTeleClient qtc, DBRM* dbrm, messageqcpp::IOSocket& ios, bool concurrentSupport)
|
||||
: fIos(ios), fDbrm(dbrm), fQtc(qtc), fConcurrentSupport(concurrentSupport)
|
||||
PackageHandler(QueryTeleClient qtc, DBRM* dbrm, messageqcpp::IOSocket& ios, bool concurrentSupport,
|
||||
uint32_t* debugLevel)
|
||||
: fIos(ios), fDbrm(dbrm), fQtc(qtc), fConcurrentSupport(concurrentSupport), fDebugLevel(debugLevel)
|
||||
{
|
||||
}
|
||||
|
||||
@ -490,6 +491,8 @@ class PackageHandler
|
||||
boost::scoped_ptr<CreateTableProcessor> processor(new CreateTableProcessor(fDbrm));
|
||||
processor->fTxnid.id = fTxnid.id;
|
||||
processor->fTxnid.valid = true;
|
||||
if (fDebugLevel)
|
||||
processor->setDebugLevel(static_cast<DDLPackageProcessor::DebugLevel>(*fDebugLevel));
|
||||
// cout << "create table using txnid " << fTxnid.id << endl;
|
||||
|
||||
QueryTeleStats qts;
|
||||
@ -524,6 +527,8 @@ class PackageHandler
|
||||
boost::scoped_ptr<AlterTableProcessor> processor(new AlterTableProcessor(fDbrm));
|
||||
processor->fTxnid.id = fTxnid.id;
|
||||
processor->fTxnid.valid = true;
|
||||
if (fDebugLevel)
|
||||
processor->setDebugLevel(static_cast<DDLPackageProcessor::DebugLevel>(*fDebugLevel));
|
||||
|
||||
QueryTeleStats qts;
|
||||
qts.query_uuid = QueryTeleClient::genUUID();
|
||||
@ -560,6 +565,8 @@ class PackageHandler
|
||||
|
||||
processor->fTxnid.id = fTxnid.id;
|
||||
processor->fTxnid.valid = true;
|
||||
if (fDebugLevel)
|
||||
processor->setDebugLevel(static_cast<DDLPackageProcessor::DebugLevel>(*fDebugLevel));
|
||||
|
||||
QueryTeleStats qts;
|
||||
qts.query_uuid = QueryTeleClient::genUUID();
|
||||
@ -595,6 +602,8 @@ class PackageHandler
|
||||
|
||||
processor->fTxnid.id = fTxnid.id;
|
||||
processor->fTxnid.valid = true;
|
||||
if (fDebugLevel)
|
||||
processor->setDebugLevel(static_cast<DDLPackageProcessor::DebugLevel>(*fDebugLevel));
|
||||
|
||||
QueryTeleStats qts;
|
||||
qts.query_uuid = QueryTeleClient::genUUID();
|
||||
@ -628,6 +637,9 @@ class PackageHandler
|
||||
boost::scoped_ptr<MarkPartitionProcessor> processor(new MarkPartitionProcessor(fDbrm));
|
||||
(processor->fTxnid).id = fTxnid.id;
|
||||
(processor->fTxnid).valid = true;
|
||||
if (fDebugLevel)
|
||||
processor->setDebugLevel(static_cast<DDLPackageProcessor::DebugLevel>(*fDebugLevel));
|
||||
|
||||
result = processor->processPackage(&markPartitionStmt);
|
||||
systemCatalogPtr->removeCalpontSystemCatalog(markPartitionStmt.fSessionID);
|
||||
systemCatalogPtr->removeCalpontSystemCatalog(markPartitionStmt.fSessionID | 0x80000000);
|
||||
@ -643,6 +655,9 @@ class PackageHandler
|
||||
boost::scoped_ptr<RestorePartitionProcessor> processor(new RestorePartitionProcessor(fDbrm));
|
||||
(processor->fTxnid).id = fTxnid.id;
|
||||
(processor->fTxnid).valid = true;
|
||||
if (fDebugLevel)
|
||||
processor->setDebugLevel(static_cast<DDLPackageProcessor::DebugLevel>(*fDebugLevel));
|
||||
|
||||
result = processor->processPackage(&restorePartitionStmt);
|
||||
systemCatalogPtr->removeCalpontSystemCatalog(restorePartitionStmt.fSessionID);
|
||||
systemCatalogPtr->removeCalpontSystemCatalog(restorePartitionStmt.fSessionID | 0x80000000);
|
||||
@ -658,12 +673,24 @@ class PackageHandler
|
||||
boost::scoped_ptr<DropPartitionProcessor> processor(new DropPartitionProcessor(fDbrm));
|
||||
(processor->fTxnid).id = fTxnid.id;
|
||||
(processor->fTxnid).valid = true;
|
||||
if (fDebugLevel)
|
||||
processor->setDebugLevel(static_cast<DDLPackageProcessor::DebugLevel>(*fDebugLevel));
|
||||
|
||||
result = processor->processPackage(&dropPartitionStmt);
|
||||
systemCatalogPtr->removeCalpontSystemCatalog(dropPartitionStmt.fSessionID);
|
||||
systemCatalogPtr->removeCalpontSystemCatalog(dropPartitionStmt.fSessionID | 0x80000000);
|
||||
}
|
||||
break;
|
||||
|
||||
case ddlpackage::DDL_DEBUG_STATEMENT:
|
||||
{
|
||||
DebugDDLStatement stmt;
|
||||
stmt.unserialize(fByteStream);
|
||||
if (fDebugLevel)
|
||||
*fDebugLevel = stmt.fDebugLevel;
|
||||
}
|
||||
break;
|
||||
|
||||
default: throw UNRECOGNIZED_PACKAGE_TYPE; break;
|
||||
}
|
||||
|
||||
@ -728,6 +755,7 @@ class PackageHandler
|
||||
QueryTeleClient fQtc;
|
||||
oam::OamCache* fOamCache = nullptr;
|
||||
bool fConcurrentSupport;
|
||||
uint32_t* fDebugLevel{nullptr};
|
||||
};
|
||||
|
||||
} // namespace
|
||||
@ -782,7 +810,7 @@ void DDLProcessor::process()
|
||||
try
|
||||
{
|
||||
ios = fMqServer.accept();
|
||||
fDdlPackagepool.invoke(PackageHandler(fQtc, &dbrm, ios, concurrentSupport));
|
||||
fDdlPackagepool.invoke(PackageHandler(fQtc, &dbrm, ios, concurrentSupport, &debugLevel));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
Reference in New Issue
Block a user