1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00
Files
mariadb-columnstore-engine/dbcon/ddlpackageproc/droptableprocessor.cpp
Serguey Zefirov 5aa2a824c2 feat(MCOL-6082): Multiple readers of dbroots using OamCache logic
This patch introduces centralized logic of selecting what dbroot is
accessible in PrimProc on what node. The logic is in OamCache for time
being and can be moved later.
2025-07-21 14:32:39 +03:00

1454 lines
38 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id: droptableprocessor.cpp 9744 2013-08-07 03:32:19Z bwilkinson $
*
*
***********************************************************************/
#include <unistd.h>
#include <string>
#include <vector>
using namespace std;
#include "droptableprocessor.h"
#include "we_messages.h"
#include "we_ddlcommandclient.h"
using namespace WriteEngine;
#include "cacheutils.h"
using namespace cacheutils;
#include "bytestream.h"
using namespace messageqcpp;
#include "sqllogger.h"
#include "messagelog.h"
using namespace logging;
#include "calpontsystemcatalog.h"
using namespace execplan;
#include "oamcache.h"
using namespace oam;
namespace ddlpackageprocessor
{
DropTableProcessor::DDLResult DropTableProcessor::processPackageInternal(ddlpackage::SqlStatement* sqlStmt)
{
SUMMARY_INFO("DropTableProcessor::processPackage");
DDLResult result;
result.result = NO_ERROR;
std::string err;
auto* dropTableStmt = dynamic_cast<ddlpackage::DropTableStatement*>(sqlStmt);
if (!dropTableStmt)
{
Message::Args args;
Message message(9);
args.add("DropTableStatement wrong cast");
message.format(args);
result.result = DROP_ERROR;
result.message = message;
return result;
}
VERBOSE_INFO(dropTableStmt);
// Commit current transaction.
// all DDL statements cause an implicit commit
VERBOSE_INFO("Getting current txnID");
ByteStream::byte rc = 0;
BRM::TxnID txnID;
txnID.id = fTxnid.id;
txnID.valid = fTxnid.valid;
int rc1 = 0;
rc1 = fDbrm->isReadWrite();
if (rc1 != 0)
{
Message::Args args;
Message message(9);
args.add("Unable to execute the statement due to DBRM is read only");
message.format(args);
result.result = DROP_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
string stmt = dropTableStmt->fSql + "|" + dropTableStmt->fTableName->fSchema + "|";
SQLLogger logger(stmt, fDDLLoggingId, dropTableStmt->fSessionID, txnID.id);
std::vector<CalpontSystemCatalog::OID> oidList;
CalpontSystemCatalog::RIDList tableColRidList;
CalpontSystemCatalog::DictOIDList dictOIDList;
execplan::CalpontSystemCatalog::ROPair roPair;
CalpontSystemCatalog::OID tableAUXColOid;
std::string errorMsg;
ByteStream bytestream;
uint64_t uniqueId = 0;
// Bug 5070. Added exception handling
try
{
uniqueId = fDbrm->getUnique64();
}
catch (std::exception& ex)
{
Message::Args args;
Message message(9);
args.add(ex.what());
message.format(args);
result.result = DROP_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
catch (...)
{
Message::Args args;
Message message(9);
args.add("Unknown error occurred while getting unique number.");
message.format(args);
result.result = DROP_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
fWEClient->addQueue(uniqueId);
int pmNum = 1;
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
uint64_t tableLockId = 0;
OamCache* oamcache = OamCache::makeOamCache();
std::vector<int> moduleIds = oamcache->getModuleIds();
// MCOL-66 The DBRM can't handle concurrent DDL
boost::mutex::scoped_lock lk(dbrmMutex);
try
{
// check table lock
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(dropTableStmt->fSessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
systemCatalogPtr->sessionID(dropTableStmt->fSessionID);
CalpontSystemCatalog::TableName tableName;
tableName.schema = dropTableStmt->fTableName->fSchema;
tableName.table = dropTableStmt->fTableName->fName;
try
{
roPair = systemCatalogPtr->tableRID(tableName);
if (tableName.schema.compare(execplan::CALPONT_SCHEMA) == 0)
{
tableAUXColOid = 0;
}
else
{
tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName);
}
}
catch (IDBExcept& ie)
{
if (checkPPLostConnection(ie.what()))
{
result.result = PP_LOST_CONNECTION;
return result;
}
else
{
if (ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG)
{
Message::Args args;
args.add("Table ");
args.add(tableName.schema + "." + tableName.table);
args.add(" does not exist in ColumnStore.");
if (dropTableStmt->fIfExists) // Check if "IF EXISTS" is specified
{
Message message(1); // Informational log level
message.format(args);
result.result = NO_ERROR; // Success, no error
result.message = message;
fSessionManager.committed(txnID); // No action needed, commit
return result;
}
else
{
Message message(9); // Error log level
message.format(args);
result.result = DROP_TABLE_NOT_IN_CATALOG_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
}
else
{
result.result = DROP_ERROR;
Message::Args args;
Message message(9);
args.add("Drop table ");
args.add(tableName.schema + "." + tableName.table);
args.add(" failed due to ");
args.add(ie.what());
message.format(args);
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
}
}
uint32_t processID = ::getpid();
int32_t txnid = txnID.id;
int32_t sessionId = dropTableStmt->fSessionID;
std::string processName("DDLProc");
int i = 0;
std::vector<uint32_t> pms;
for (unsigned i = 0; i < moduleIds.size(); i++)
{
pms.push_back((uint32_t)moduleIds[i]);
}
try
{
tableLockId =
fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING);
}
catch (std::exception&)
{
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
}
if (tableLockId == 0)
{
int waitPeriod = 10;
int sleepTime = 100; // sleep 100 milliseconds between checks
int numTries = 10; // try 10 times per second
waitPeriod = WriteEngine::Config::getWaitPeriod();
numTries = waitPeriod * 10;
struct timespec rm_ts;
rm_ts.tv_sec = sleepTime / 1000;
rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
for (; i < numTries; i++)
{
struct timespec abs_ts;
do
{
abs_ts.tv_sec = rm_ts.tv_sec;
abs_ts.tv_nsec = rm_ts.tv_nsec;
} while (nanosleep(&abs_ts, &rm_ts) < 0);
try
{
processID = ::getpid();
txnid = txnID.id;
sessionId = dropTableStmt->fSessionID;
;
processName = "DDLProc";
tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid,
BRM::LOADING);
}
catch (std::exception&)
{
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
}
if (tableLockId > 0)
break;
}
if (i >= numTries) // error out
{
Message::Args args;
string strOp("drop table");
args.add(strOp);
args.add(processName);
args.add((uint64_t)processID);
args.add(sessionId);
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
}
}
// 1. Get the OIDs for the columns
// 2. Get the OIDs for the dictionaries
// 3. Save the OIDs to a log file
// 4. Remove the Table from SYSTABLE
// 5. Remove the columns from SYSCOLUMN
// 6. Commit the changes made to systables
// 7. Flush PrimProc Cache
// 8. Update extent map
// 9. Remove the column and dictionary files
// 10.Return the OIDs
CalpontSystemCatalog::TableName userTableName;
userTableName.schema = dropTableStmt->fTableName->fSchema;
userTableName.table = dropTableStmt->fTableName->fName;
tableColRidList = systemCatalogPtr->columnRIDs(userTableName);
dictOIDList = systemCatalogPtr->dictOIDs(userTableName);
Oam oam;
// Save qualified tablename, all column, dictionary OIDs, and transaction ID into a file in ASCII format
for (unsigned i = 0; i < tableColRidList.size(); i++)
{
if (tableColRidList[i].objnum > 3000)
oidList.push_back(tableColRidList[i].objnum);
}
for (unsigned i = 0; i < dictOIDList.size(); i++)
{
if (dictOIDList[i].dictOID > 3000)
oidList.push_back(dictOIDList[i].dictOID);
}
// get a unique number
VERBOSE_INFO("Removing the SYSTABLE meta data");
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Removing the SYSTABLEs meta data" << endl;
#endif
bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSTABLE;
bytestream << uniqueId;
bytestream << (uint32_t)dropTableStmt->fSessionID;
bytestream << (uint32_t)txnID.id;
bytestream << dropTableStmt->fTableName->fSchema;
bytestream << dropTableStmt->fTableName->fName;
// Find out where systable is
BRM::OID_t sysOid = 1001;
ByteStream::byte rc = 0;
uint16_t dbRoot;
rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
if (rc != 0)
{
result.result = (ResultCode)rc;
Message::Args args;
Message message(9);
args.add("Error while calling getSysCatDBRoot");
args.add(errorMsg);
result.message = message;
// release transaction
fSessionManager.rolledback(txnID);
return result;
}
pmNum = oamcache->getOwnerPM(dbRoot);
try
{
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSTABLES to pm " << pmNum << endl;
#endif
// cout << "deleting systable entries with txnid " << txnID.id << endl;
fWEClient->write(bytestream, (uint32_t)pmNum);
while (1)
{
bsIn.reset(new ByteStream());
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
break;
}
else
{
*bsIn >> rc;
if (rc != 0)
{
*bsIn >> errorMsg;
}
break;
}
}
}
catch (runtime_error& ex) // write error
{
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table got exception" << endl;
#endif
rc = NETWORK_ERROR;
errorMsg = ex.what();
}
catch (...)
{
rc = NETWORK_ERROR;
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table got unknown exception" << endl;
#endif
}
if (rc != 0)
{
if (checkPPLostConnection(errorMsg))
{
result.result = PP_LOST_CONNECTION;
(void)fDbrm->releaseTableLock(tableLockId);
fWEClient->removeQueue(uniqueId);
return result;
}
else
{
cout << fTxnid.id << " Error in dropping table from systables(" << (int)rc << ") " << errorMsg.c_str()
<< endl;
Message::Args args;
Message message(9);
args.add(fTxnid.id);
args.add(" Error in dropping table from systables.");
args.add(errorMsg);
message.format(args);
result.result = (ResultCode)rc;
result.message = message;
fSessionManager.rolledback(txnID);
(void)fDbrm->releaseTableLock(tableLockId);
fWEClient->removeQueue(uniqueId);
return result;
}
}
// remove from syscolumn
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSCOLUMN;
bytestream << uniqueId;
bytestream << (uint32_t)dropTableStmt->fSessionID;
bytestream << (uint32_t)txnID.id;
bytestream << dropTableStmt->fTableName->fSchema;
bytestream << dropTableStmt->fTableName->fName;
// Find out where syscolumn is
sysOid = 1021;
rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
if (rc != 0)
{
result.result = (ResultCode)rc;
Message::Args args;
Message message(9);
args.add("Error while calling getSysCatDBRoot");
args.add(errorMsg);
result.message = message;
// release transaction
fSessionManager.rolledback(txnID);
return result;
}
pmNum = oamcache->getOwnerPM(dbRoot);
try
{
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSCOLUMN to pm " << pmNum << endl;
#endif
fWEClient->write(bytestream, (unsigned)pmNum);
while (1)
{
bsIn.reset(new ByteStream());
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
break;
}
else
{
*bsIn >> rc;
if (rc != 0)
{
*bsIn >> errorMsg;
}
break;
}
}
}
catch (runtime_error& ex) // write error
{
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table got exception" << endl;
#endif
rc = NETWORK_ERROR;
errorMsg = ex.what();
}
catch (...)
{
rc = NETWORK_ERROR;
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table got unknown exception" << endl;
#endif
}
if (rc != 0)
{
cout << fTxnid.id << " Error in dropping column from systables(" << (int)rc << ") " << errorMsg.c_str()
<< endl;
Message::Args args;
Message message(9);
args.add("Error in dropping column from systables.");
args.add(errorMsg);
message.format(args);
result.result = (ResultCode)rc;
result.message = message;
// release table lock and session
fSessionManager.rolledback(txnID);
(void)fDbrm->releaseTableLock(tableLockId);
fWEClient->removeQueue(uniqueId);
return result;
}
rc = commitTransaction(uniqueId, txnID);
if (rc != 0)
{
cout << txnID.id << " rolledback transaction " << " and valid is " << txnID.valid << endl;
fSessionManager.rolledback(txnID);
}
else
{
cout << txnID.id << " commiting transaction " << txnID.id << " and valid is " << txnID.valid << endl;
fSessionManager.committed(txnID);
}
if (rc != 0)
{
Message::Args args;
Message message(9);
ostringstream oss;
oss << " Commit failed with error code " << rc;
args.add(oss.str());
fSessionManager.rolledback(txnID);
(void)fDbrm->releaseTableLock(tableLockId);
message.format(args);
result.result = (ResultCode)rc;
result.message = message;
fWEClient->removeQueue(uniqueId);
return result;
}
// Log the DDL statement
logDDL(dropTableStmt->fSessionID, txnID.id, dropTableStmt->fSql, dropTableStmt->fOwner);
}
catch (std::exception& ex)
{
result.result = DROP_ERROR;
Message::Args args;
Message message(9);
args.add("Drop table failed due to ");
args.add(ex.what());
fSessionManager.rolledback(txnID);
try
{
(void)fDbrm->releaseTableLock(tableLockId);
}
catch (std::exception&)
{
args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
}
message.format(args);
result.message = message;
fWEClient->removeQueue(uniqueId);
return result;
}
catch (...)
{
result.result = DROP_ERROR;
errorMsg = "Error in getting information from system catalog or from dbrm.";
Message::Args args;
Message message(9);
args.add("Drop table failed due to ");
args.add(errorMsg);
fSessionManager.rolledback(txnID);
try
{
(void)fDbrm->releaseTableLock(tableLockId);
}
catch (std::exception&)
{
args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
}
message.format(args);
result.message = message;
fWEClient->removeQueue(uniqueId);
return result;
}
try
{
(void)fDbrm->releaseTableLock(tableLockId);
}
catch (std::exception&)
{
result.result = DROP_ERROR;
Message::Args args;
Message message(9);
args.add("Drop table failed due to ");
args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
fSessionManager.rolledback(txnID);
message.format(args);
result.message = message;
fWEClient->removeQueue(uniqueId);
return result;
}
// MCOL-5021 Valid AUX column OID for a table is > 3000
// Tables that were created before this feature was added will have
// tableAUXColOid = 0
if (tableAUXColOid > 3000)
{
oidList.push_back(tableAUXColOid);
CalpontSystemCatalog::ROPair auxRoPair;
auxRoPair.rid = 0;
auxRoPair.objnum = tableAUXColOid;
tableColRidList.push_back(auxRoPair);
}
// Save the oids to a file
try
{
createWriteDropLogFile(roPair.objnum, uniqueId, oidList);
}
catch (std::exception& ex)
{
result.result = WARNING;
Message::Args args;
Message message(9);
args.add("Drop table failed due to ");
args.add(ex.what());
message.format(args);
result.message = message;
fSessionManager.rolledback(txnID);
fWEClient->removeQueue(uniqueId);
return result;
}
// Bug 4208 Drop the PrimProcFDCache before droping the column files
// FOr Windows, this ensures (most likely) that the column files have
// no open handles to hinder the deletion of the files.
rc = cacheutils::dropPrimProcFdCache();
// Drop files
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
bytestream << uniqueId;
bytestream << (uint32_t)oidList.size();
for (unsigned i = 0; i < oidList.size(); i++)
{
bytestream << (uint32_t)oidList[i];
}
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table removing column files" << endl;
#endif
uint32_t msgRecived = 0;
try
{
fWEClient->write_to_all(bytestream);
bsIn.reset(new ByteStream());
ByteStream::byte tmp8;
while (1)
{
if (msgRecived == fWEClient->getPmCount())
break;
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
fWEClient->removeQueue(uniqueId);
break;
}
else
{
*bsIn >> tmp8;
rc = tmp8;
if (rc != 0)
{
*bsIn >> errorMsg;
fWEClient->removeQueue(uniqueId);
break;
}
else
msgRecived++;
}
}
}
catch (std::exception& ex)
{
result.result = WARNING;
Message::Args args;
Message message(9);
args.add("Drop table failed due to ");
args.add(ex.what());
message.format(args);
result.message = message;
fSessionManager.rolledback(txnID);
fWEClient->removeQueue(uniqueId);
return result;
}
catch (...)
{
result.result = WARNING;
errorMsg = "Error in getting information from system catalog or from dbrm.";
Message::Args args;
Message message(9);
args.add("Drop table failed due to ");
args.add(errorMsg);
message.format(args);
result.message = message;
fSessionManager.rolledback(txnID);
fWEClient->removeQueue(uniqueId);
return result;
}
// Drop PrimProc FD cache
rc = cacheutils::dropPrimProcFdCache();
// Flush primProc cache
rc = cacheutils::flushOIDsFromCache(oidList);
// Delete extents from extent map
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table deleteOIDs" << endl;
#endif
rc = fDbrm->deleteOIDs(oidList);
if (rc != 0)
{
Message::Args args;
Message message(1);
args.add("Table dropped with warning ");
args.add("Remove from extent map failed.");
args.add("");
args.add("");
message.format(args);
result.result = WARNING;
result.message = message;
fSessionManager.rolledback(txnID);
fWEClient->removeQueue(uniqueId);
return result;
}
// Remove the log file
fWEClient->removeQueue(uniqueId);
deleteLogFile(DROPTABLE_LOG, roPair.objnum, uniqueId);
// release the transaction
// fSessionManager.committed(txnID);
returnOIDs(tableColRidList, dictOIDList);
return result;
}
TruncTableProcessor::DDLResult TruncTableProcessor::processPackageInternal(ddlpackage::SqlStatement* sqlStmt)
{
SUMMARY_INFO("TruncTableProcessor::processPackage");
// 1. lock the table
// 2. Get the OIDs for the columns
// 3. Get the OIDs for the dictionaries
// 4. Save the OIDs
// 5. Disable all partitions
// 6. Remove the column and dictionary files
// 7. Flush PrimProc Cache
// 8. Update extent map
// 9. Use the OIDs to create new column and dictionary files with abbreviate extent
// 10 Update next value if the table has autoincrement column
DDLResult result;
result.result = NO_ERROR;
std::string err;
auto* truncTableStmt = dynamic_cast<ddlpackage::TruncTableStatement*>(sqlStmt);
VERBOSE_INFO(truncTableStmt);
// @Bug 4150. Check dbrm status before doing anything to the table.
int rc = 0;
rc = fDbrm->isReadWrite();
BRM::TxnID txnID;
txnID.id = fTxnid.id;
txnID.valid = fTxnid.valid;
if (rc != 0)
{
Message::Args args;
Message message(9);
args.add("Unable to execute the statement due to DBRM is read only");
message.format(args);
result.result = DROP_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
//@Bug 5765 log the schema.
string stmt = truncTableStmt->fSql + "|" + truncTableStmt->fTableName->fSchema + "|";
SQLLogger logger(stmt, fDDLLoggingId, truncTableStmt->fSessionID, txnID.id);
std::vector<CalpontSystemCatalog::OID> columnOidList;
std::vector<CalpontSystemCatalog::OID> allOidList;
CalpontSystemCatalog::OID tableAuxColOid;
CalpontSystemCatalog::RIDList tableColRidList;
CalpontSystemCatalog::DictOIDList dictOIDList;
execplan::CalpontSystemCatalog::ROPair roPair;
std::string processName("DDLProc");
uint32_t processID = ::getpid();
int32_t txnid = txnID.id;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(truncTableStmt->fSessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
systemCatalogPtr->sessionID(truncTableStmt->fSessionID);
CalpontSystemCatalog::TableInfo tableInfo;
uint64_t uniqueId = 0;
// Bug 5070. Added exception handling
try
{
uniqueId = fDbrm->getUnique64();
}
catch (std::exception& ex)
{
Message::Args args;
Message message(9);
args.add(ex.what());
message.format(args);
result.result = DROP_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
catch (...)
{
Message::Args args;
Message message(9);
args.add("Unknown error occurred while getting unique number.");
message.format(args);
result.result = DROP_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
fWEClient->addQueue(uniqueId);
int pmNum = 1;
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
string errorMsg;
uint32_t autoIncColOid = 0;
uint64_t tableLockId = 0;
OamCache* oamcache = OamCache::makeOamCache();
std::vector<int> moduleIds = oamcache->getModuleIds();
try
{
// check table lock
CalpontSystemCatalog::TableName tableName;
tableName.schema = truncTableStmt->fTableName->fSchema;
tableName.table = truncTableStmt->fTableName->fName;
roPair = systemCatalogPtr->tableRID(tableName);
int32_t sessionId = truncTableStmt->fSessionID;
std::string processName("DDLProc");
int i = 0;
std::vector<uint32_t> pms;
for (unsigned i = 0; i < moduleIds.size(); i++)
{
pms.push_back((uint32_t)moduleIds[i]);
}
try
{
tableLockId =
fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid, BRM::LOADING);
}
catch (std::exception&)
{
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
}
if (tableLockId == 0)
{
int waitPeriod = 10;
int sleepTime = 100; // sleep 100 milliseconds between checks
int numTries = 10; // try 10 times per second
waitPeriod = Config::getWaitPeriod();
numTries = waitPeriod * 10;
struct timespec rm_ts;
rm_ts.tv_sec = sleepTime / 1000;
rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
for (; i < numTries; i++)
{
struct timespec abs_ts;
do
{
abs_ts.tv_sec = rm_ts.tv_sec;
abs_ts.tv_nsec = rm_ts.tv_nsec;
} while (nanosleep(&abs_ts, &rm_ts) < 0);
try
{
processID = ::getpid();
txnid = txnID.id;
sessionId = truncTableStmt->fSessionID;
processName = "DDLProc";
tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnid,
BRM::LOADING);
}
catch (std::exception&)
{
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
}
if (tableLockId > 0)
break;
}
if (i >= numTries) // error out
{
Message::Args args;
args.add(processName);
args.add((uint64_t)processID);
args.add(sessionId);
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
}
}
CalpontSystemCatalog::TableName userTableName;
userTableName.schema = truncTableStmt->fTableName->fSchema;
userTableName.table = truncTableStmt->fTableName->fName;
tableColRidList = systemCatalogPtr->columnRIDs(userTableName);
tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName);
dictOIDList = systemCatalogPtr->dictOIDs(userTableName);
for (unsigned i = 0; i < tableColRidList.size(); i++)
{
if (tableColRidList[i].objnum > 3000)
{
columnOidList.push_back(tableColRidList[i].objnum);
allOidList.push_back(tableColRidList[i].objnum);
}
}
if (tableAuxColOid > 3000)
{
columnOidList.push_back(tableAuxColOid);
allOidList.push_back(tableAuxColOid);
}
for (unsigned i = 0; i < dictOIDList.size(); i++)
{
if (dictOIDList[i].dictOID > 3000)
allOidList.push_back(dictOIDList[i].dictOID);
}
// Check whether the table has autoincrement column
tableInfo = systemCatalogPtr->tableInfo(userTableName);
}
catch (std::exception& ex)
{
if (checkPPLostConnection(ex.what()))
{
if (tableLockId != 0)
{
Message::Args args;
Message message(9);
args.add("Truncate table failed: ");
args.add(ex.what());
args.add("");
try
{
(void)fDbrm->releaseTableLock(tableLockId);
}
catch (std::exception&)
{
args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
fSessionManager.rolledback(txnID);
message.format(args);
fWEClient->removeQueue(uniqueId);
result.result = TRUNC_ERROR;
return result;
}
}
fWEClient->removeQueue(uniqueId);
result.result = PP_LOST_CONNECTION;
return result;
}
else
{
cerr << "TruncateTableProcessor::processPackage: " << ex.what() << endl;
Message::Args args;
Message message(9);
args.add("Truncate table failed: ");
args.add(ex.what());
args.add("");
fSessionManager.rolledback(txnID);
try
{
(void)fDbrm->releaseTableLock(tableLockId);
}
catch (std::exception&)
{
args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
}
fWEClient->removeQueue(uniqueId);
message.format(args);
result.result = TRUNC_ERROR;
result.message = message;
return result;
}
}
catch (...)
{
cerr << "TruncateTableProcessor::processPackage: caught unknown exception!" << endl;
Message::Args args;
Message message(1);
args.add("Truncate table failed: ");
args.add("encountered unkown exception");
args.add("");
try
{
(void)fDbrm->releaseTableLock(tableLockId);
}
catch (std::exception&)
{
args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
}
fWEClient->removeQueue(uniqueId);
message.format(args);
result.result = TRUNC_ERROR;
result.message = message;
return result;
}
// Save the oids to a file
try
{
createWriteTruncateTableLogFile(roPair.objnum, uniqueId, allOidList);
}
catch (std::exception& ex)
{
Message::Args args;
Message message(9);
args.add("Truncate table failed due to ");
args.add(ex.what());
fSessionManager.rolledback(txnID);
fWEClient->removeQueue(uniqueId);
message.format(args);
//@bug 4515 Release the tablelock as nothing has done to this table.
try
{
(void)fDbrm->releaseTableLock(tableLockId);
}
catch (std::exception&)
{
}
result.result = TRUNC_ERROR;
result.message = message;
return result;
}
ByteStream bytestream;
ByteStream::byte tmp8;
// MCOL-66 The DBRM can't handle concurrent DDL
boost::mutex::scoped_lock lk(dbrmMutex);
try
{
// Disable extents first
int rc1 = fDbrm->markAllPartitionForDeletion(allOidList);
if (rc1 != 0)
{
string errMsg;
BRM::errString(rc, errMsg);
throw std::runtime_error(errMsg);
}
// Bug 4208 Drop the PrimProcFDCache before droping the column files
// FOr Windows, this ensures (most likely) that the column files have
// no open handles to hinder the deletion of the files.
rc = cacheutils::dropPrimProcFdCache();
VERBOSE_INFO("Removing files");
bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
bytestream << uniqueId;
bytestream << (uint32_t)allOidList.size();
for (unsigned i = 0; i < allOidList.size(); i++)
{
bytestream << (uint32_t)allOidList[i];
}
uint32_t msgRecived = 0;
try
{
fWEClient->write_to_all(bytestream);
bsIn.reset(new ByteStream());
while (1)
{
if (msgRecived == fWEClient->getPmCount())
break;
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
fWEClient->removeQueue(uniqueId);
break;
}
else
{
*bsIn >> tmp8;
rc = tmp8;
if (rc != 0)
{
*bsIn >> errorMsg;
fWEClient->removeQueue(uniqueId);
break;
}
else
msgRecived++;
}
}
}
catch (std::exception& ex)
{
Message::Args args;
Message message(9);
args.add("Truncate table failed due to ");
args.add(ex.what());
fSessionManager.rolledback(txnID);
fWEClient->removeQueue(uniqueId);
message.format(args);
result.result = TRUNC_ERROR;
result.message = message;
deleteLogFile(TRUNCATE_LOG, roPair.objnum, uniqueId);
return result;
}
catch (...)
{
result.result = DROP_ERROR;
errorMsg = "Error in getting information from system catalog or from dbrm.";
Message::Args args;
Message message(9);
args.add("Truncate table failed due to ");
args.add(errorMsg);
fSessionManager.rolledback(txnID);
fWEClient->removeQueue(uniqueId);
message.format(args);
result.result = TRUNC_ERROR;
result.message = message;
deleteLogFile(TRUNCATE_LOG, roPair.objnum, uniqueId);
return result;
}
// Drop PrimProc FD cache
rc = cacheutils::dropPrimProcFdCache();
// Flush primProc cache
rc = cacheutils::flushOIDsFromCache(allOidList);
// Delete extents from extent map
rc = fDbrm->deleteOIDs(allOidList);
if (rc != 0)
{
Message::Args args;
Message message(1);
args.add("Table truncated with warning ");
args.add("Remove from extent map failed.");
args.add("");
args.add("");
message.format(args);
result.result = WARNING;
result.message = message;
fSessionManager.rolledback(txnID);
fWEClient->removeQueue(uniqueId);
return result;
}
// Get the number of tables in the database, the current table is included.
int tableCount = systemCatalogPtr->getTableCount();
Oam oam;
// Calculate which dbroot the columns should start
DBRootConfigList dbRootList = oamcache->getDBRootNums();
uint16_t useDBRootIndex = tableCount % dbRootList.size();
// Find out the dbroot# corresponding the useDBRootIndex from oam
uint16_t useDBRoot = dbRootList[useDBRootIndex];
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES;
bytestream << uniqueId;
bytestream << (uint32_t)txnID.id;
uint32_t numOids = columnOidList.size() + dictOIDList.size();
bytestream << numOids;
CalpontSystemCatalog::ColType colType;
for (unsigned col = 0; col < columnOidList.size(); col++)
{
colType = systemCatalogPtr->colType(columnOidList[col]);
if (colType.autoincrement)
autoIncColOid = colType.columnOID;
bytestream << (uint32_t)columnOidList[col];
bytestream << (uint8_t)colType.colDataType;
bytestream << (uint8_t) false;
bytestream << (uint32_t)colType.colWidth;
bytestream << (uint16_t)useDBRoot;
bytestream << (uint32_t)colType.compressionType;
}
for (unsigned col = 0; col < dictOIDList.size(); col++)
{
colType = systemCatalogPtr->colTypeDct(dictOIDList[col].dictOID);
bytestream << (uint32_t)dictOIDList[col].dictOID;
bytestream << (uint8_t)colType.colDataType;
bytestream << (uint8_t) true;
bytestream << (uint32_t)colType.colWidth;
bytestream << (uint16_t)useDBRoot;
bytestream << (uint32_t)colType.compressionType;
}
pmNum = oamcache->getOwnerPM(useDBRoot);
try
{
#ifdef IDB_DDL_DEBUG
cout << "Truncate table sending We_SVR_WRITE_CREATETABLEFILES to pm " << pmNum << endl;
#endif
fWEClient->write(bytestream, pmNum);
while (1)
{
bsIn.reset(new ByteStream());
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
break;
}
else
{
*bsIn >> tmp8;
rc = tmp8;
if (rc != 0)
{
*bsIn >> errorMsg;
}
break;
}
}
if (rc != 0)
{
// drop the newly created files
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
bytestream << uniqueId;
bytestream << (uint32_t)(allOidList.size());
for (unsigned i = 0; i < (allOidList.size()); i++)
{
bytestream << (uint32_t)(allOidList[i]);
}
fWEClient->write(bytestream, pmNum);
while (1)
{
bsIn.reset(new ByteStream());
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
break;
}
else
{
*bsIn >> tmp8;
// rc = tmp8;
break;
}
}
Message::Args args;
Message message(1);
args.add("Truncate table failed.");
args.add(errorMsg);
args.add("");
message.format(args);
result.result = TRUNC_ERROR;
result.message = message;
// rc = fSessionManager.setTableLock( roPair.objnum, truncTableStmt.fSessionID, processID,
// processName, false );
fSessionManager.rolledback(txnID);
return result;
}
}
catch (runtime_error&)
{
rc = NETWORK_ERROR;
errorMsg = "Lost connection to Write Engine Server";
}
}
catch (std::exception& ex)
{
Message::Args args;
Message message(1);
args.add("Truncate table failed.");
args.add(ex.what());
args.add("");
message.format(args);
result.result = TRUNC_ERROR;
result.message = message;
// rc = fSessionManager.setTableLock( roPair.objnum, truncTableStmt.fSessionID, processID, processName,
// false );
fSessionManager.rolledback(txnID);
return result;
}
catch (...)
{
Message::Args args;
Message message(1);
args.add("Truncate table failed: ");
args.add("Remove column files failed.");
args.add("");
args.add("");
message.format(args);
result.result = TRUNC_ERROR;
result.message = message;
// rc = fSessionManager.setTableLock( roPair.objnum, truncTableStmt.fSessionID, processID, processName,
// false );
fSessionManager.rolledback(txnID);
return result;
}
if (rc != 0)
{
rollBackTransaction(uniqueId, txnID, truncTableStmt->fSessionID); // What to do with the error code
fSessionManager.rolledback(txnID);
}
// Check whether the table has autoincrement column
if (tableInfo.tablewithautoincr == 1)
{
// reset nextvalue to 1
WE_DDLCommandClient commandClient;
rc = commandClient.UpdateSyscolumnNextval(autoIncColOid, 1);
}
// Log the DDL statement
logDDL(truncTableStmt->fSessionID, txnID.id, truncTableStmt->fSql, truncTableStmt->fOwner);
try
{
(void)fDbrm->releaseTableLock(tableLockId);
}
catch (std::exception&)
{
Message::Args args;
Message message(1);
args.add("Table truncated with warning ");
args.add("Release table failed.");
args.add(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
args.add("");
message.format(args);
result.result = WARNING;
result.message = message;
fSessionManager.rolledback(txnID);
fWEClient->removeQueue(uniqueId);
}
// release the transaction
fSessionManager.committed(txnID);
fWEClient->removeQueue(uniqueId);
// Remove the log file
try
{
deleteLogFile(TRUNCATE_LOG, roPair.objnum, uniqueId);
}
catch (...)
{
}
return result;
}
} // namespace ddlpackageprocessor