mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
2025 lines
72 KiB
C++
2025 lines
72 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2016 MariaDB Corporation
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: dmlprocessor.cpp 1024 2013-07-26 16:23:59Z chao $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
/** @file */
|
|
#include "configcpp.h"
|
|
#include <signal.h>
|
|
#include <ctime>
|
|
|
|
//#define SERIALIZE_DDL_DML_CPIMPORT 1
|
|
#include <boost/thread/mutex.hpp>
|
|
#include <boost/scoped_ptr.hpp>
|
|
#include <boost/scoped_array.hpp>
|
|
#include <boost/shared_ptr.hpp>
|
|
using namespace boost;
|
|
|
|
#include "cacheutils.h"
|
|
#include "vss.h"
|
|
#include "dbrm.h"
|
|
#include "brmtypes.h"
|
|
#include "idberrorinfo.h"
|
|
#include "errorids.h"
|
|
#include "batchinsertprocessor.h"
|
|
#include "tablelockdata.h"
|
|
#include "oamcache.h"
|
|
#include "messagelog.h"
|
|
#include "sqllogger.h"
|
|
#include "we_messages.h"
|
|
#include "dmlprocessor.h"
|
|
using namespace BRM;
|
|
using namespace config;
|
|
using namespace execplan;
|
|
using namespace std;
|
|
using namespace messageqcpp;
|
|
using namespace dmlpackage;
|
|
using namespace dmlpackageprocessor;
|
|
using namespace joblist;
|
|
using namespace logging;
|
|
using namespace oam;
|
|
using namespace WriteEngine;
|
|
|
|
#include "querytele.h"
|
|
using namespace querytele;
|
|
|
|
extern boost::mutex mute;
|
|
extern boost::condition_variable cond;
|
|
|
|
#define MCOL_140 // Undefine to test VSS for out of order transactions
|
|
|
|
namespace
|
|
{
|
|
[[maybe_unused]] const std::string myname = "DMLProc";
|
|
}
|
|
|
|
namespace dmlprocessor
|
|
{
|
|
// Map to store the package handler objects so we can set flags during execution
|
|
// for things like ctrl+c
|
|
DMLProcessor::PackageHandlerMap_t DMLProcessor::packageHandlerMap;
|
|
boost::mutex DMLProcessor::packageHandlerMapLock;
|
|
|
|
// Map to store the BatchInsertProc object
|
|
std::map<uint32_t, BatchInsertProc*> DMLProcessor::batchinsertProcessorMap;
|
|
boost::mutex DMLProcessor::batchinsertProcessorMapLock;
|
|
|
|
// MCOL-140 Map to hold table oids for tables being changed.
|
|
std::map<uint32_t, PackageHandler::tableAccessQueue_t> PackageHandler::tableOidMap;
|
|
boost::condition_variable PackageHandler::tableOidCond;
|
|
boost::mutex PackageHandler::tableOidMutex;
|
|
|
|
//------------------------------------------------------------------------------
|
|
// A thread to periodically call dbrm to see if a user is
|
|
// shutting down the system or has put the system into write
|
|
// suspend mode. DBRM has 2 flags to check in this case, the
|
|
// ROLLBACK flag, and the FORCE flag. These flags will be
|
|
// reported when we ask for the Shutdown Pending flag (which we
|
|
// ignore at this point). Even if the user is putting the system
|
|
// into write suspend mode, this call will return the flags we
|
|
// are interested in. If ROLLBACK is set, we cancel normally.
|
|
// If FORCE is set, we can't rollback.
|
|
struct CancellationThread
|
|
{
|
|
CancellationThread(DBRM* aDbrm, DMLServer& aServer) : fDbrm(aDbrm), fServer(aServer)
|
|
{
|
|
}
|
|
void operator()()
|
|
{
|
|
bool bDoingRollback = false;
|
|
bool bRollback = false;
|
|
bool bForce = false;
|
|
ostringstream oss;
|
|
std::vector<BRM::TableLockInfo> tableLocks;
|
|
BRM::TxnID txnId;
|
|
DMLProcessor::PackageHandlerMap_t::iterator phIter;
|
|
uint32_t sessionID;
|
|
int rc = 0;
|
|
|
|
while (true)
|
|
{
|
|
usleep(1000000); // 1 seconds
|
|
// Check to see if someone has ordered a shutdown or suspend with rollback.
|
|
(void)fDbrm->getSystemShutdownPending(bRollback, bForce);
|
|
|
|
if (bForce)
|
|
break;
|
|
|
|
if (bDoingRollback && bRollback)
|
|
{
|
|
continue;
|
|
// We've already started the rollbacks. Don't start again.
|
|
}
|
|
|
|
bDoingRollback = false;
|
|
|
|
if (bRollback)
|
|
{
|
|
RollbackTransactionProcessor rollbackProcessor(fDbrm);
|
|
SessionManager sessionManager;
|
|
uint64_t uniqueId = fDbrm->getUnique64();
|
|
std::string errorMsg;
|
|
int activeTransCount = 0;
|
|
int idleTransCount = 0;
|
|
bDoingRollback = true;
|
|
ostringstream oss;
|
|
oss << "DMLProc has been told to rollback all DML transactions.";
|
|
DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
|
|
// Tell any active processors to stop working and return an error
|
|
// The front end will respond with a ROLLBACK command.
|
|
// Mark all active processors to rollback
|
|
boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock);
|
|
|
|
for (phIter = DMLProcessor::packageHandlerMap.begin();
|
|
phIter != DMLProcessor::packageHandlerMap.end(); ++phIter)
|
|
{
|
|
ostringstream oss;
|
|
oss << "DMLProc will rollback active session " << phIter->second->getSessionID() << " Transaction "
|
|
<< phIter->second->getTxnid();
|
|
DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
|
|
|
|
++activeTransCount;
|
|
phIter->second->rollbackPending();
|
|
}
|
|
|
|
if (activeTransCount > 0)
|
|
{
|
|
ostringstream oss1;
|
|
oss1 << "DMLProc is rolling back back " << activeTransCount << " active transactions.";
|
|
DMLProcessor::log(oss1.str(), logging::LOG_TYPE_INFO);
|
|
}
|
|
|
|
// WIP Need to set cluster to read-only via CMAPI before shutting the cluster down.
|
|
if (fDbrm->isReadWrite())
|
|
{
|
|
continue;
|
|
}
|
|
|
|
// Check for any open DML transactions that don't currently have
|
|
// a processor
|
|
tableLocks = fDbrm->getAllTableLocks();
|
|
|
|
if (tableLocks.size() > 0)
|
|
{
|
|
for (uint32_t i = 0; i < tableLocks.size(); ++i)
|
|
{
|
|
sessionID = tableLocks[i].ownerSessionID;
|
|
phIter = DMLProcessor::packageHandlerMap.find(sessionID);
|
|
|
|
if (phIter == DMLProcessor::packageHandlerMap.end())
|
|
{
|
|
// We have found an active transaction without a packagehandler.
|
|
// This means that a transaction is open with autocommit turned
|
|
// off, but there's no current activity on the transaction. We
|
|
// need to roll it back if it's a DML transaction.
|
|
// If ownerName == "DMLProc" then it's a DML transaction.
|
|
if (tableLocks[i].ownerName == "DMLProc")
|
|
{
|
|
// OK, we know this is an idle DML transaction, so roll it back.
|
|
++idleTransCount;
|
|
txnId.id = tableLocks[i].ownerTxnID;
|
|
txnId.valid = true;
|
|
rc = rollbackProcessor.rollBackTransaction(uniqueId, txnId, sessionID, errorMsg);
|
|
|
|
if (rc == 0)
|
|
{
|
|
fDbrm->invalidateUncommittedExtentLBIDs(txnId.id, false);
|
|
|
|
//@Bug 4524. In case it is batchinsert, call bulkrollback.
|
|
rc = rollbackProcessor.rollBackBatchAutoOnTransaction(uniqueId, txnId, sessionID,
|
|
tableLocks[i].tableOID, errorMsg);
|
|
|
|
if (rc == 0)
|
|
{
|
|
logging::logCommand(0, tableLocks[i].ownerTxnID, "ROLLBACK;");
|
|
|
|
bool lockReleased = true;
|
|
|
|
try
|
|
{
|
|
lockReleased = fDbrm->releaseTableLock(tableLocks[i].id);
|
|
TablelockData::removeTablelockData(sessionID);
|
|
}
|
|
catch (std::exception&)
|
|
{
|
|
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
|
|
}
|
|
|
|
if (lockReleased)
|
|
{
|
|
sessionManager.rolledback(txnId);
|
|
ostringstream oss;
|
|
oss << "DMLProc rolled back idle transaction " << tableLocks[i].ownerTxnID
|
|
<< " and table lock id " << tableLocks[i].id << " is released.";
|
|
DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "DMLProc rolled back idle transaction " << tableLocks[i].ownerTxnID
|
|
<< " and tble lock id " << tableLocks[i].id << " is not released.";
|
|
DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << " problem with bulk rollback of idle transaction " << tableLocks[i].ownerTxnID
|
|
<< "and DBRM is setting to readonly and table lock is not released: " << errorMsg;
|
|
DMLProcessor::log(oss.str(), logging::LOG_TYPE_CRITICAL);
|
|
rc = fDbrm->setReadOnly(true);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << " problem with rollback of idle transaction " << tableLocks[i].ownerTxnID
|
|
<< "and DBRM is setting to readonly and table lock is not released: " << errorMsg;
|
|
DMLProcessor::log(oss.str(), logging::LOG_TYPE_CRITICAL);
|
|
rc = fDbrm->setReadOnly(true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// If there are any abandonded transactions without locks
|
|
// release them.
|
|
int len;
|
|
std::shared_ptr<BRM::SIDTIDEntry[]> activeTxns = sessionManager.SIDTIDMap(len);
|
|
|
|
for (int i = 0; i < len; i++)
|
|
{
|
|
// If there isn't a table lock for this transaction, roll it back. Otherwise, assume
|
|
// it has an active processor or is not DML initiated and leave it alone. It's someone
|
|
// else's concern.
|
|
bool bFoundit = false;
|
|
|
|
for (uint32_t j = 0; j < tableLocks.size(); ++j)
|
|
{
|
|
if (tableLocks[j].ownerTxnID == activeTxns[i].txnid.id)
|
|
{
|
|
bFoundit = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!bFoundit && activeTxns[i].txnid.valid)
|
|
{
|
|
rollbackProcessor.rollBackTransaction(uniqueId, activeTxns[i].txnid, activeTxns[i].sessionid,
|
|
errorMsg);
|
|
sessionManager.rolledback(activeTxns[i].txnid);
|
|
++idleTransCount;
|
|
ostringstream oss;
|
|
oss << "DMLProc rolled back idle transaction with no tablelock" << tableLocks[i].ownerTxnID;
|
|
DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO);
|
|
}
|
|
}
|
|
|
|
if (idleTransCount > 0)
|
|
{
|
|
ostringstream oss2;
|
|
oss2 << "DMLProc has rolled back " << idleTransCount << " idle transactions.";
|
|
DMLProcessor::log(oss2.str(), logging::LOG_TYPE_INFO);
|
|
}
|
|
// Here is the end of the rollback if so DMLProc rollbacks what it can.
|
|
break;
|
|
}
|
|
}
|
|
// Setting the flag to tell DMLServer to exit.
|
|
fServer.startShutdown();
|
|
}
|
|
DBRM* fDbrm;
|
|
DMLServer& fServer;
|
|
};
|
|
|
|
PackageHandler::PackageHandler(const messageqcpp::IOSocket& ios,
|
|
boost::shared_ptr<messageqcpp::ByteStream> bs, uint8_t packageType,
|
|
joblist::DistributedEngineComm* ec, bool concurrentSupport,
|
|
uint64_t maxDeleteRows, uint32_t sessionID,
|
|
execplan::CalpontSystemCatalog::SCN txnId, DBRM* aDbrm,
|
|
const QueryTeleClient& qtc,
|
|
boost::shared_ptr<execplan::CalpontSystemCatalog> csc)
|
|
: fIos(ios)
|
|
, fByteStream(bs)
|
|
, fPackageType(packageType)
|
|
, fEC(ec)
|
|
, fConcurrentSupport(concurrentSupport)
|
|
, fMaxDeleteRows(maxDeleteRows)
|
|
, fSessionID(sessionID)
|
|
, fTableOid(0)
|
|
, fTxnid(txnId)
|
|
, fDbrm(aDbrm)
|
|
, fQtc(qtc)
|
|
, fcsc(csc)
|
|
{
|
|
}
|
|
|
|
PackageHandler::~PackageHandler()
|
|
{
|
|
// cout << "In destructor" << endl;
|
|
}
|
|
|
|
// MCOL-140
|
|
// Blocks a thread if there is another trx working on the same fTableOid
|
|
// return 1 when thread should continue.
|
|
// return 0 if error. Right now, no error detection is implemented.
|
|
//
|
|
// txnid was being created before the call to this function. This caused race conditions
|
|
// so creation is delayed until we're inside the lock here. Nothing needs it before
|
|
// this point in the execution.
|
|
//
|
|
// The algorithm is this. When the first txn for a given fTableOid arrives, start a queue
|
|
// containing a list of waiting or working txnId. Put this txnId into the queue (working)
|
|
// Put the queue into a map keyed on fTableOid.
|
|
//
|
|
// When the next txn for this fTableOid arrives, it finds the queue in the map and adds itself,
|
|
// then waits for condition.
|
|
// When a thread finishes, it removes its txnId from the queue and notifies all. If the queue is
|
|
// empty, it removes the entry from the map.
|
|
// Upon wakeup from wait(), a thread checks to see if it's next in the queue. If so, it is released
|
|
// to do work. Otherwise it goes back to wait.
|
|
//
|
|
// There's a chance (CTRL+C) for instance, that the txn is no longer in the queue. Release it to work.
|
|
// Rollback will most likely be next.
|
|
//
|
|
// A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid.
|
|
int PackageHandler::synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage)
|
|
{
|
|
// MCOL-140 Wait for any other DML using this table.
|
|
std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
|
|
boost::unique_lock<boost::mutex> lock(tableOidMutex);
|
|
BRM::TxnID txnid;
|
|
|
|
if (fPackageType != dmlpackage::DML_COMMAND)
|
|
{
|
|
txnid = sessionManager.getTxnID(fSessionID);
|
|
|
|
if (!txnid.valid)
|
|
{
|
|
txnid = sessionManager.newTxnID(fSessionID, true);
|
|
|
|
if (!txnid.valid)
|
|
{
|
|
throw std::runtime_error(std::string("Unable to start a transaction. Check critical log."));
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
txnid = sessionManager.getTxnID(fSessionID);
|
|
}
|
|
|
|
fTxnid = txnid.id;
|
|
|
|
if ((it = tableOidMap.find(fTableOid)) != tableOidMap.end())
|
|
{
|
|
PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
|
|
|
|
// There's at least one working txn on this table. We may be the same txn.
|
|
if (fTxnid == tableOidQueue.front())
|
|
{
|
|
return 1; // We're next in line or the same as the last. Keep working
|
|
}
|
|
|
|
tableOidQueue.push(fTxnid); // Get on the waiting list.
|
|
|
|
// We need to wait
|
|
// tableOidQueue here is the queue holding the waitng transactions for this fTableOid
|
|
while (true)
|
|
{
|
|
// Log something that we're waiting
|
|
LoggingID logid(21, fSessionID, fTxnid);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
ostringstream oss;
|
|
oss << "Txn is waiting for" << tableOidQueue.front() << " " << dmlPackage->get_SQLStatement() << "; |"
|
|
<< dmlPackage->get_SchemaName() << "|";
|
|
args1.add(oss.str());
|
|
args1.add((uint64_t)fTableOid);
|
|
msg.format(args1);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
|
|
tableOidCond.wait(lock);
|
|
// In case of CTRL+C, the tableOidQueue could be invalidated
|
|
if ((tableOidMap.find(fTableOid))->second != tableOidQueue)
|
|
{
|
|
break;
|
|
}
|
|
if (tableOidQueue.front() == fTxnid)
|
|
{
|
|
// We're up next. Let's go do stuff.
|
|
break;
|
|
}
|
|
|
|
if (tableOidQueue.empty())
|
|
{
|
|
// If we had been the last txn waiting and CTRL+C was hit, then the queue is empty now.
|
|
// Empty queues must be erased from the map.
|
|
tableOidMap.erase(fTableOid);
|
|
break;
|
|
}
|
|
|
|
// If we're not in the queue at all, then continue. CTRL+C was probably hit.
|
|
PackageHandler::tableAccessQueue_t::container_type::iterator c_it = tableOidQueue.find(fTxnid);
|
|
|
|
if (c_it == tableOidQueue.end())
|
|
{
|
|
break;
|
|
}
|
|
|
|
// We're still in the queue and not on top. Go back and wait some more.
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// We're the first for this tableoid. Start a new queue.
|
|
tableAccessQueue_t tableOidQueue;
|
|
tableOidQueue.push(fTxnid);
|
|
tableOidMap[fTableOid] = tableOidQueue;
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
|
|
// MCOL-140 Called when it's time to release the next thread for this tablOid
|
|
int PackageHandler::releaseTableAccess()
|
|
{
|
|
// take us out of the queue
|
|
std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
|
|
boost::lock_guard<boost::mutex> lock(tableOidMutex);
|
|
|
|
if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end())
|
|
{
|
|
return 2; // For now, return codes are not used
|
|
}
|
|
|
|
PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
|
|
|
|
if (tableOidQueue.front() != fTxnid)
|
|
{
|
|
// This is a severe error. The front should be the working thread. If we're here,
|
|
// we're the working thread and should be front().
|
|
cout << fTxnid << " " << fTableOid << " We got to release and we're not on top " << tableOidQueue.front()
|
|
<< endl;
|
|
LoggingID logid(21, fSessionID, fTxnid);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add(
|
|
"ReleaseTableAccess: Txn being released is not the current txn in the tablOidQueue for tableid");
|
|
args1.add((uint64_t)fTableOid);
|
|
msg.format(args1);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_ERROR, msg, logid);
|
|
}
|
|
else
|
|
{
|
|
if (!tableOidQueue.empty())
|
|
tableOidQueue.pop(); // Get off the waiting list.
|
|
|
|
if (tableOidQueue.empty())
|
|
{
|
|
// remove the queue from the map.
|
|
tableOidMap.erase(fTableOid);
|
|
}
|
|
}
|
|
|
|
// release the condition
|
|
tableOidCond.notify_all();
|
|
return 1;
|
|
}
|
|
|
|
int PackageHandler::forceReleaseTableAccess()
|
|
{
|
|
// By removing the txnid from the queue, the logic after the wait in
|
|
// synchTableAccess() will release the thread and clean up if needed.
|
|
std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
|
|
boost::lock_guard<boost::mutex> lock(tableOidMutex);
|
|
|
|
if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end())
|
|
{
|
|
// This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess
|
|
return 2;
|
|
}
|
|
|
|
PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
|
|
tableOidQueue.erase(fTxnid);
|
|
if (tableOidQueue.empty())
|
|
{
|
|
// remove the queue from the map.
|
|
tableOidMap.erase(fTableOid);
|
|
}
|
|
// release the condition
|
|
tableOidCond.notify_all();
|
|
return 1;
|
|
}
|
|
|
|
// static
|
|
// Called upon sighup, often because PrimProc crashed. We don't want to leave all the transactions hung,
|
|
// though some may be because they never returned from PrimProc and will leave the table lock on.
|
|
int PackageHandler::clearTableAccess()
|
|
{
|
|
tableOidMap.clear();
|
|
return 1;
|
|
}
|
|
|
|
CalpontSystemCatalog::ROPair PackageHandler::getTableRID(
|
|
boost::shared_ptr<execplan::CalpontSystemCatalog> fcsc,
|
|
execplan::CalpontSystemCatalog::TableName& tableName)
|
|
{
|
|
execplan::CalpontSystemCatalog::ROPair roPair;
|
|
try
|
|
{
|
|
roPair = fcsc->tableRID(tableName);
|
|
}
|
|
catch (...)
|
|
{
|
|
if (setupDec())
|
|
throw;
|
|
roPair = fcsc->tableRID(tableName);
|
|
}
|
|
|
|
return roPair;
|
|
}
|
|
|
|
void PackageHandler::run()
|
|
{
|
|
ResourceManager* frm = ResourceManager::instance();
|
|
dmlpackageprocessor::DMLPackageProcessor::DMLResult result;
|
|
result.result = dmlpackageprocessor::DMLPackageProcessor::NO_ERROR;
|
|
// cout << "PackageHandler handling ";
|
|
std::string stmt;
|
|
unsigned DMLLoggingId = 21;
|
|
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
|
|
SynchTable synchTable;
|
|
|
|
try
|
|
{
|
|
switch (fPackageType)
|
|
{
|
|
case dmlpackage::DML_INSERT:
|
|
{
|
|
// build an InsertDMLPackage from the bytestream
|
|
// cout << "an INSERT package" << endl;
|
|
dmlpackage::InsertDMLPackage insertPkg;
|
|
// boost::shared_ptr<messageqcpp::ByteStream> insertBs (new messageqcpp::ByteStream);
|
|
messageqcpp::ByteStream bsSave = *(fByteStream.get());
|
|
insertPkg.readMetaData(*(fByteStream.get()));
|
|
#ifdef MCOL_140
|
|
|
|
if (fConcurrentSupport)
|
|
{
|
|
fTableOid = insertPkg.getTableOid();
|
|
|
|
// Single Insert has no start like bulk does, so insertPkg.getTableOid()
|
|
// isn't set. Go get it now.
|
|
if (fTableOid == 0)
|
|
{
|
|
CalpontSystemCatalog::TableName tableName;
|
|
tableName.schema = insertPkg.get_Table()->get_SchemaName();
|
|
tableName.table = insertPkg.get_Table()->get_TableName();
|
|
CalpontSystemCatalog::ROPair roPair = getTableRID(fcsc, tableName);
|
|
fTableOid = roPair.objnum;
|
|
}
|
|
synchTable.setPackage(this, &insertPkg); // Blocks if another DML thread is using this fTableOid
|
|
}
|
|
|
|
#endif
|
|
QueryTeleStats qts;
|
|
qts.query_uuid = QueryTeleClient::genUUID();
|
|
qts.msg_type = QueryTeleStats::QT_START;
|
|
qts.start_time = QueryTeleClient::timeNowms();
|
|
qts.session_id = fSessionID;
|
|
qts.query_type = "INSERT";
|
|
qts.query = insertPkg.get_SQLStatement();
|
|
qts.system_name = oamCache->getSystemName();
|
|
qts.module_name = oamCache->getModuleName();
|
|
qts.schema_name = insertPkg.get_SchemaName();
|
|
fQtc.postQueryTele(qts);
|
|
|
|
// cout << "This is batch insert " << insertPkg->get_isBatchInsert() << endl;
|
|
if (insertPkg.get_isBatchInsert())
|
|
{
|
|
fByteStream->reset();
|
|
// cout << "This is batch insert " << endl;
|
|
BatchInsertProc* batchProcessor = NULL;
|
|
{
|
|
boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
|
|
|
|
std::map<uint32_t, BatchInsertProc*>::iterator batchIter =
|
|
DMLProcessor::batchinsertProcessorMap.find(fSessionID);
|
|
|
|
if (batchIter == DMLProcessor::batchinsertProcessorMap.end())
|
|
{
|
|
batchProcessor =
|
|
new BatchInsertProc(insertPkg.get_isAutocommitOn(), insertPkg.getTableOid(), fTxnid, fDbrm);
|
|
DMLProcessor::batchinsertProcessorMap[fSessionID] = batchProcessor;
|
|
// cout << "batchProcessor is created " << batchProcessor << endl;
|
|
}
|
|
else
|
|
{
|
|
batchProcessor = batchIter->second;
|
|
// cout << "Found batchProcessor " << batchProcessor << endl;
|
|
}
|
|
}
|
|
|
|
if (insertPkg.get_Logging())
|
|
{
|
|
LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add("Start SQL statement: ");
|
|
|
|
if (!insertPkg.get_isCacheInsert())
|
|
{
|
|
ostringstream oss;
|
|
oss << insertPkg.get_SQLStatement() << "; |" << insertPkg.get_SchemaName() << "|";
|
|
args1.add(oss.str());
|
|
}
|
|
|
|
msg.format(args1);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
TablelockData* tablelockData = TablelockData::makeTablelockData(insertPkg.get_SessionID());
|
|
uint64_t tableLockId = tablelockData->getTablelockId(insertPkg.getTableOid());
|
|
|
|
// cout << "Processing table oid " << insertPkg.getTableOid() << " for transaction "<< (int)fTxnid
|
|
// << endl;
|
|
if (tableLockId == 0)
|
|
{
|
|
// cout << "Grabing tablelock for batchProcessor " << batchProcessor << endl;
|
|
tableLockId = batchProcessor->grabTableLock(insertPkg.get_SessionID());
|
|
|
|
if (tableLockId == 0)
|
|
{
|
|
BRM::TxnID brmTxnID;
|
|
brmTxnID.id = fTxnid;
|
|
brmTxnID.valid = true;
|
|
sessionManager.rolledback(brmTxnID);
|
|
string errMsg;
|
|
int rc = 0;
|
|
batchProcessor->getError(rc, errMsg);
|
|
result.result = DMLPackageProcessor::TABLE_LOCK_ERROR;
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add("Insert Failed: ");
|
|
args.add(errMsg);
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
result.message = message;
|
|
break;
|
|
}
|
|
|
|
if (tableLockId > 0)
|
|
tablelockData->setTablelock(insertPkg.getTableOid(), tableLockId);
|
|
}
|
|
}
|
|
|
|
if (insertPkg.get_Logending() && insertPkg.get_Logging()) // only one batch need to be processed.
|
|
{
|
|
// cout << "dmlprocessor add last pkg" << endl;
|
|
// need to add error handling.
|
|
batchProcessor->addPkg(bsSave);
|
|
batchProcessor->sendFirstBatch();
|
|
batchProcessor->receiveOutstandingMsg();
|
|
//@Bug 5162. Get the correct error message before the last message.
|
|
string errMsg;
|
|
int rc = 0;
|
|
batchProcessor->getError(rc, errMsg);
|
|
batchProcessor->sendlastBatch();
|
|
batchProcessor->receiveAllMsg();
|
|
|
|
if (rc == DMLPackageProcessor::IDBRANGE_WARNING)
|
|
{
|
|
result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
|
|
LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add("End SQL statement with warnings");
|
|
msg.format(args1);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";",
|
|
insertPkg.get_SchemaName());
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add(errMsg);
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
result.message = message;
|
|
}
|
|
else if (rc != 0)
|
|
{
|
|
result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
cout << "Got error in the end of one batchinsert." << endl;
|
|
args.add("Insert Failed: ");
|
|
args.add(errMsg);
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
result.message = message;
|
|
LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add("End SQL statement with error");
|
|
msg.format(args1);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";",
|
|
insertPkg.get_SchemaName());
|
|
}
|
|
else
|
|
{
|
|
// if (!insertPkg.get_isAutocommitOn())
|
|
// {
|
|
// batchProcessor->setHwm();
|
|
// }
|
|
LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add("End SQL statement");
|
|
msg.format(args1);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";",
|
|
insertPkg.get_SchemaName());
|
|
}
|
|
|
|
// remove the batch insert object
|
|
{
|
|
boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
|
|
|
|
std::map<uint32_t, BatchInsertProc*>::iterator batchIter =
|
|
DMLProcessor::batchinsertProcessorMap.find(fSessionID);
|
|
|
|
if (batchIter != DMLProcessor::batchinsertProcessorMap.end())
|
|
{
|
|
delete batchIter->second;
|
|
DMLProcessor::batchinsertProcessorMap.erase(fSessionID);
|
|
}
|
|
}
|
|
}
|
|
else if (insertPkg.get_Logending()) // Last batch
|
|
{
|
|
int rc = 0;
|
|
string errMsg;
|
|
batchProcessor->getError(rc, errMsg);
|
|
|
|
// cout <<"dmlprocessor received last pkg from mysql rc == " << rc << endl;
|
|
if ((rc == 0) || (rc == DMLPackageProcessor::IDBRANGE_WARNING))
|
|
{
|
|
// cout << " rc = " << rc << endl;
|
|
batchProcessor->addPkg(bsSave);
|
|
batchProcessor->sendNextBatch();
|
|
batchProcessor->receiveOutstandingMsg();
|
|
//@Bug 5162. Get the correct error message before the last message.
|
|
batchProcessor->getError(rc, errMsg);
|
|
batchProcessor->sendlastBatch();
|
|
batchProcessor->receiveAllMsg();
|
|
|
|
if (rc == DMLPackageProcessor::IDBRANGE_WARNING)
|
|
{
|
|
result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
|
|
LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add("End SQL statement with warnings");
|
|
msg.format(args1);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";",
|
|
insertPkg.get_SchemaName());
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add(errMsg);
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
result.message = message;
|
|
}
|
|
else if (rc != 0)
|
|
{
|
|
// cout << "Got error in the end of last batchinsert. error message is " << errMsg << endl;
|
|
result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add("Insert Failed: ");
|
|
args.add(errMsg);
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
result.message = message;
|
|
LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add("End SQL statement with error");
|
|
msg.format(args1);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";",
|
|
insertPkg.get_SchemaName());
|
|
}
|
|
else
|
|
{
|
|
LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add("End SQL statement");
|
|
msg.format(args1);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";",
|
|
insertPkg.get_SchemaName());
|
|
}
|
|
|
|
// cout << "finished batch insert" << endl;
|
|
}
|
|
else
|
|
{
|
|
// error occurred. Receive all outstanding messages before erroring out.
|
|
batchProcessor->receiveOutstandingMsg();
|
|
batchProcessor->sendlastBatch(); // needs to flush files
|
|
batchProcessor->receiveAllMsg();
|
|
result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
|
|
// cout << "Got error in the end of batchinsert2. error msg is " << errMsg<< endl;
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add("Insert Failed: ");
|
|
args.add(errMsg);
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
result.message = message;
|
|
LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add("End SQL statement with error");
|
|
msg.format(args1);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";",
|
|
insertPkg.get_SchemaName());
|
|
}
|
|
|
|
// remove from map
|
|
{
|
|
boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
|
|
std::map<uint32_t, BatchInsertProc*>::iterator batchIter =
|
|
DMLProcessor::batchinsertProcessorMap.find(fSessionID);
|
|
|
|
if (batchIter != DMLProcessor::batchinsertProcessorMap.end())
|
|
{
|
|
// cout << "Batchinsertprcessor is deleted. " << batchIter->second << endl;
|
|
delete batchIter->second;
|
|
DMLProcessor::batchinsertProcessorMap.erase(fSessionID);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
int rc = 0;
|
|
string errMsg;
|
|
batchProcessor->getError(rc, errMsg);
|
|
|
|
if (rc == DMLPackageProcessor::IDBRANGE_WARNING)
|
|
{
|
|
result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
|
|
}
|
|
else if (rc != 0)
|
|
{
|
|
result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
|
|
//@Bug
|
|
// cout << "Got error during batchinsert. with message " << errMsg << endl;
|
|
logging::Message::Args args;
|
|
logging::Message message(6);
|
|
args.add(errMsg);
|
|
message.format(args);
|
|
result.message = message;
|
|
batchProcessor->receiveOutstandingMsg();
|
|
batchProcessor->sendlastBatch(); // needs to flush files
|
|
// cout << "Last batch is sent to WES." << endl;
|
|
batchProcessor->receiveAllMsg();
|
|
LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add("End SQL statement with error");
|
|
msg.format(args1);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
// remove from map
|
|
{
|
|
boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
|
|
std::map<uint32_t, BatchInsertProc*>::iterator batchIter =
|
|
DMLProcessor::batchinsertProcessorMap.find(fSessionID);
|
|
|
|
if (batchIter != DMLProcessor::batchinsertProcessorMap.end())
|
|
{
|
|
// cout << "Batchinsertprcessor is deleted. " << batchIter->second << endl;
|
|
delete batchIter->second;
|
|
DMLProcessor::batchinsertProcessorMap.erase(fSessionID);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
batchProcessor->addPkg(bsSave);
|
|
batchProcessor->sendNextBatch();
|
|
break;
|
|
}
|
|
}
|
|
else // Single Insert
|
|
{
|
|
// make sure insertPkg.readMetaData() is called before
|
|
// this on fByteStream!
|
|
// TODO: Similar to batch inserts, don't
|
|
// deserialize the row data here for single inserts.
|
|
insertPkg.readRowData(*(fByteStream.get()));
|
|
insertPkg.set_TxnID(fTxnid);
|
|
fProcessor.reset(new dmlpackageprocessor::InsertPackageProcessor(fDbrm, insertPkg.get_SessionID()));
|
|
result = fProcessor->processPackage(insertPkg);
|
|
}
|
|
|
|
qts.msg_type = QueryTeleStats::QT_SUMMARY;
|
|
qts.max_mem_pct = result.stats.fMaxMemPct;
|
|
qts.num_files = result.stats.fNumFiles;
|
|
qts.phy_io = result.stats.fPhyIO;
|
|
qts.cache_io = result.stats.fCacheIO;
|
|
qts.msg_rcv_cnt = result.stats.fMsgRcvCnt;
|
|
qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped;
|
|
qts.msg_bytes_in = result.stats.fMsgBytesIn;
|
|
qts.msg_bytes_out = result.stats.fMsgBytesOut;
|
|
qts.rows = result.stats.fRows;
|
|
qts.end_time = QueryTeleClient::timeNowms();
|
|
qts.blocks_changed = result.stats.fBlocksChanged;
|
|
fQtc.postQueryTele(qts);
|
|
}
|
|
break;
|
|
|
|
case dmlpackage::DML_UPDATE:
|
|
{
|
|
// build an UpdateDMLPackage from the bytestream
|
|
// cout << "an UPDATE package" << endl;
|
|
boost::scoped_ptr<dmlpackage::UpdateDMLPackage> updatePkg(new dmlpackage::UpdateDMLPackage());
|
|
updatePkg->read(*(fByteStream.get()));
|
|
#ifdef MCOL_140
|
|
|
|
if (fConcurrentSupport)
|
|
{
|
|
fTableOid = updatePkg->getTableOid();
|
|
|
|
// Update generally doesn't set fTableOid in updatePkg. Go get it now.
|
|
if (fTableOid == 0)
|
|
{
|
|
CalpontSystemCatalog::TableName tableName;
|
|
tableName.schema = updatePkg->get_Table()->get_SchemaName();
|
|
tableName.table = updatePkg->get_Table()->get_TableName();
|
|
CalpontSystemCatalog::ROPair roPair = getTableRID(fcsc, tableName);
|
|
fTableOid = roPair.objnum;
|
|
}
|
|
synchTable.setPackage(this,
|
|
updatePkg.get()); // Blocks if another DML thread is using this fTableOid
|
|
}
|
|
|
|
#endif
|
|
updatePkg->set_TxnID(fTxnid);
|
|
QueryTeleStats qts;
|
|
qts.query_uuid = updatePkg->uuid();
|
|
qts.msg_type = QueryTeleStats::QT_START;
|
|
qts.start_time = QueryTeleClient::timeNowms();
|
|
qts.session_id = fSessionID;
|
|
qts.query_type = "UPDATE";
|
|
qts.query = updatePkg->get_SQLStatement();
|
|
qts.system_name = oamCache->getSystemName();
|
|
qts.module_name = oamCache->getModuleName();
|
|
qts.schema_name = updatePkg->get_SchemaName();
|
|
fQtc.postQueryTele(qts);
|
|
// process it
|
|
//@Bug 1341. Don't remove calpontsystemcatalog from this
|
|
// session to take advantage of cache.
|
|
fProcessor.reset(new dmlpackageprocessor::UpdatePackageProcessor(fDbrm, updatePkg->get_SessionID()));
|
|
fProcessor->setEngineComm(fEC);
|
|
fProcessor->setRM(frm);
|
|
idbassert(fTxnid != 0);
|
|
result = fProcessor->processPackage(*(updatePkg.get()));
|
|
qts.msg_type = QueryTeleStats::QT_SUMMARY;
|
|
qts.max_mem_pct = result.stats.fMaxMemPct;
|
|
qts.num_files = result.stats.fNumFiles;
|
|
qts.phy_io = result.stats.fPhyIO;
|
|
qts.cache_io = result.stats.fCacheIO;
|
|
qts.msg_rcv_cnt = result.stats.fMsgRcvCnt;
|
|
qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped;
|
|
qts.msg_bytes_in = result.stats.fMsgBytesIn;
|
|
qts.msg_bytes_out = result.stats.fMsgBytesOut;
|
|
qts.rows = result.stats.fRows;
|
|
qts.end_time = QueryTeleClient::timeNowms();
|
|
qts.blocks_changed = result.stats.fBlocksChanged;
|
|
fQtc.postQueryTele(qts);
|
|
}
|
|
break;
|
|
|
|
case dmlpackage::DML_DELETE:
|
|
{
|
|
boost::scoped_ptr<dmlpackage::DeleteDMLPackage> deletePkg(new dmlpackage::DeleteDMLPackage());
|
|
deletePkg->read(*(fByteStream.get()));
|
|
#ifdef MCOL_140
|
|
|
|
if (fConcurrentSupport)
|
|
{
|
|
fTableOid = deletePkg->getTableOid();
|
|
|
|
// Delete generally doesn't set fTableOid in updatePkg. Go get it now.
|
|
if (fTableOid == 0)
|
|
{
|
|
CalpontSystemCatalog::TableName tableName;
|
|
tableName.schema = deletePkg->get_Table()->get_SchemaName();
|
|
tableName.table = deletePkg->get_Table()->get_TableName();
|
|
CalpontSystemCatalog::ROPair roPair = getTableRID(fcsc, tableName);
|
|
fTableOid = roPair.objnum;
|
|
}
|
|
synchTable.setPackage(this,
|
|
deletePkg.get()); // Blocks if another DML thread is using this fTableOid
|
|
}
|
|
|
|
#endif
|
|
deletePkg->set_TxnID(fTxnid);
|
|
QueryTeleStats qts;
|
|
qts.query_uuid = deletePkg->uuid();
|
|
qts.msg_type = QueryTeleStats::QT_START;
|
|
qts.start_time = QueryTeleClient::timeNowms();
|
|
qts.session_id = fSessionID;
|
|
qts.query_type = "DELETE";
|
|
qts.query = deletePkg->get_SQLStatement();
|
|
qts.system_name = oamCache->getSystemName();
|
|
qts.module_name = oamCache->getModuleName();
|
|
qts.schema_name = deletePkg->get_SchemaName();
|
|
fQtc.postQueryTele(qts);
|
|
// process it
|
|
//@Bug 1341. Don't remove calpontsystemcatalog from this session to take advantage of cache.
|
|
fProcessor.reset(new dmlpackageprocessor::DeletePackageProcessor(fDbrm, deletePkg->get_SessionID()));
|
|
fProcessor->setEngineComm(fEC);
|
|
fProcessor->setRM(frm);
|
|
idbassert(fTxnid != 0);
|
|
result = fProcessor->processPackage(*(deletePkg.get()));
|
|
qts.msg_type = QueryTeleStats::QT_SUMMARY;
|
|
qts.max_mem_pct = result.stats.fMaxMemPct;
|
|
qts.num_files = result.stats.fNumFiles;
|
|
qts.phy_io = result.stats.fPhyIO;
|
|
qts.cache_io = result.stats.fCacheIO;
|
|
qts.msg_rcv_cnt = result.stats.fMsgRcvCnt;
|
|
qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped;
|
|
qts.msg_bytes_in = result.stats.fMsgBytesIn;
|
|
qts.msg_bytes_out = result.stats.fMsgBytesOut;
|
|
qts.rows = result.stats.fRows;
|
|
qts.end_time = QueryTeleClient::timeNowms();
|
|
qts.blocks_changed = result.stats.fBlocksChanged;
|
|
fQtc.postQueryTele(qts);
|
|
}
|
|
break;
|
|
|
|
case dmlpackage::DML_COMMAND:
|
|
{
|
|
// build a CommandDMLPackage from the bytestream
|
|
// cout << "a COMMAND package" << endl;
|
|
dmlpackage::CommandDMLPackage commandPkg;
|
|
commandPkg.read(*(fByteStream.get()));
|
|
stmt = commandPkg.get_DMLStatement();
|
|
boost::algorithm::to_upper(stmt);
|
|
trim(stmt);
|
|
|
|
if (stmt == "CLEANUP")
|
|
{
|
|
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(commandPkg.get_SessionID());
|
|
execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(commandPkg.get_SessionID() | 0x80000000);
|
|
}
|
|
else
|
|
{
|
|
// process it
|
|
//@Bug 1341. Don't remove calpontsystemcatalog from this session to take advantage of cache.
|
|
fProcessor.reset(
|
|
new dmlpackageprocessor::CommandPackageProcessor(fDbrm, commandPkg.get_SessionID()));
|
|
|
|
// cout << "got command " << stmt << " for session " << commandPkg.get_SessionID() << endl;
|
|
result = fProcessor->processPackage(commandPkg);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
// Log errors
|
|
if ((result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR) &&
|
|
(result.result != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) &&
|
|
(result.result != dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR) &&
|
|
(result.result != dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR))
|
|
{
|
|
logging::LoggingID lid(21);
|
|
logging::MessageLog ml(lid);
|
|
|
|
ml.logErrorMessage(result.message);
|
|
}
|
|
else if (result.result == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
|
|
{
|
|
logging::LoggingID lid(21);
|
|
logging::MessageLog ml(lid);
|
|
|
|
ml.logWarningMessage(result.message);
|
|
}
|
|
}
|
|
catch (std::exception& e)
|
|
{
|
|
cout << "dmlprocessor.cpp PackageHandler::run() package type(" << fPackageType
|
|
<< ") exception: " << e.what() << endl;
|
|
logging::LoggingID lid(21);
|
|
logging::MessageLog ml(lid);
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add("dmlprocessor.cpp PackageHandler::run() package type");
|
|
args.add((uint64_t)fPackageType);
|
|
args.add(e.what());
|
|
message.format(args);
|
|
ml.logErrorMessage(message);
|
|
result.result = DMLPackageProcessor::COMMAND_ERROR;
|
|
result.message = message;
|
|
}
|
|
catch (...)
|
|
{
|
|
logging::LoggingID lid(21);
|
|
logging::MessageLog ml(lid);
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add("dmlprocessor.cpp PackageHandler::run() ... exception package type");
|
|
args.add((uint64_t)fPackageType);
|
|
message.format(args);
|
|
ml.logErrorMessage(message);
|
|
result.result = DMLPackageProcessor::COMMAND_ERROR;
|
|
result.message = message;
|
|
}
|
|
|
|
// We put the packageHandler into a map so that if we receive a
|
|
// message to affect the previous command, we can find it.
|
|
// We need to remove it from the list before sending the response back.
|
|
// If we remove it after sending the results, it's possible for a commit
|
|
// or rollback be sent and get processed before it is removed, and that
|
|
// will fail.
|
|
boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock);
|
|
DMLProcessor::packageHandlerMap.erase(getSessionID());
|
|
lk2.unlock();
|
|
|
|
// send back the results
|
|
messageqcpp::ByteStream results;
|
|
messageqcpp::ByteStream::octbyte rowCount = result.rowCount;
|
|
messageqcpp::ByteStream::byte retval = result.result;
|
|
results << retval;
|
|
results << rowCount;
|
|
results << result.message.msg();
|
|
results << result.tableLockInfo; // ? connector does not get
|
|
// query stats
|
|
results << result.queryStats;
|
|
results << result.extendedStats;
|
|
results << result.miniStats;
|
|
result.stats.serialize(results);
|
|
fIos.write(results);
|
|
// Bug 5226. dmlprocessor thread will close the socket to mysqld.
|
|
// if (stmt == "CLEANUP")
|
|
// fIos.close();
|
|
}
|
|
|
|
void PackageHandler::rollbackPending()
|
|
{
|
|
if (fProcessor.get() == NULL)
|
|
{
|
|
// This happens when batch insert
|
|
return;
|
|
}
|
|
|
|
fProcessor->setRollbackPending(true);
|
|
|
|
// Force a release of the processing from MCOL-140
|
|
#ifdef MCOL_140
|
|
if (fConcurrentSupport)
|
|
{
|
|
// MCOL-140 We're not necessarily the next in line.
|
|
// This forces this thread to be released anyway.
|
|
forceReleaseTableAccess();
|
|
}
|
|
|
|
#endif
|
|
|
|
ostringstream oss;
|
|
oss << "PackageHandler::rollbackPending: Processing DMLPackage.";
|
|
DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG);
|
|
}
|
|
|
|
void added_a_pm(int)
|
|
{
|
|
DistributedEngineComm* dec;
|
|
ResourceManager* rm = ResourceManager::instance();
|
|
dec = DistributedEngineComm::instance(rm);
|
|
dec->Setup();
|
|
// MCOL-140 clear the waiting queue as all transactions are probably going to fail
|
|
PackageHandler::clearTableAccess();
|
|
}
|
|
|
|
DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm)
|
|
: fPackageMaxThreads(packageMaxThreads)
|
|
, fPackageWorkQueueSize(packageWorkQueueSize)
|
|
, fDbrm(dbrm)
|
|
, fShutdownFlag(false)
|
|
{
|
|
fMqServer.reset(new MessageQueueServer("DMLProc"));
|
|
|
|
fDmlPackagepool.setMaxThreads(fPackageMaxThreads);
|
|
fDmlPackagepool.setQueueSize(fPackageWorkQueueSize);
|
|
fDmlPackagepool.setName("DmlPackagepool");
|
|
}
|
|
|
|
int DMLServer::start()
|
|
{
|
|
messageqcpp::IOSocket ios;
|
|
uint32_t nextID = 1;
|
|
|
|
try
|
|
{
|
|
// CancellationThread is for telling all active transactions
|
|
// to quit working because the system is either going down
|
|
// or going into write suspend mode
|
|
CancellationThread cancelObject(fDbrm, *this);
|
|
boost::thread cancelThread(cancelObject);
|
|
|
|
cout << "DMLProc is ready..." << endl;
|
|
|
|
const static struct timespec timeout = {1, 100}; // roughly 1 second TO
|
|
for (;;)
|
|
{
|
|
ios = fMqServer->accept(&timeout);
|
|
// MCS polls in a loop watching for a pending shutdown
|
|
// that is signalled via fShutdownFlag set in a
|
|
// CancellationThread. CT sets the flag if a cluster state
|
|
// has SS_SHUTDOWNPENDING value set.
|
|
while (!ios.hasSocketDescriptor() && !pendingShutdown())
|
|
ios = fMqServer->accept(&timeout);
|
|
|
|
if (pendingShutdown())
|
|
break;
|
|
ios.setSockID(nextID++);
|
|
fDmlPackagepool.invoke(DMLProcessor(ios, fDbrm));
|
|
}
|
|
|
|
cancelThread.join();
|
|
return EXIT_SUCCESS;
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
cerr << ex.what() << endl;
|
|
logging::LoggingID lid(21);
|
|
Message::Args args;
|
|
Message message(8);
|
|
args.add("DMLProc init caught exception: ");
|
|
args.add(ex.what());
|
|
message.format(args);
|
|
logging::Logger logger(lid.fSubsysID);
|
|
logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid);
|
|
return EXIT_FAILURE;
|
|
}
|
|
catch (...)
|
|
{
|
|
cerr << "Caught unknown exception!" << endl;
|
|
logging::LoggingID lid(21);
|
|
Message::Args args;
|
|
Message message(8);
|
|
args.add("DMLProc init caught unknown exception");
|
|
message.format(args);
|
|
logging::Logger logger(lid.fSubsysID);
|
|
logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid);
|
|
return EXIT_FAILURE;
|
|
}
|
|
}
|
|
|
|
DMLProcessor::DMLProcessor(messageqcpp::IOSocket ios, BRM::DBRM* aDbrm) : fIos(ios), fDbrm(aDbrm)
|
|
{
|
|
csc = CalpontSystemCatalog::makeCalpontSystemCatalog();
|
|
csc->identity(CalpontSystemCatalog::EC);
|
|
string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host"));
|
|
|
|
if (!teleServerHost.empty())
|
|
{
|
|
int teleServerPort =
|
|
config::Config::fromText(config::Config::makeConfig()->getConfig("QueryTele", "Port"));
|
|
|
|
if (teleServerPort > 0)
|
|
{
|
|
fQtc.serverParms(QueryTeleServerParms(teleServerHost, teleServerPort));
|
|
}
|
|
}
|
|
}
|
|
|
|
void DMLProcessor::operator()()
|
|
{
|
|
bool bIsDbrmUp = true;
|
|
|
|
try
|
|
{
|
|
boost::shared_ptr<messageqcpp::ByteStream> bs1(new messageqcpp::ByteStream());
|
|
// messageqcpp::ByteStream bs;
|
|
uint8_t packageType;
|
|
|
|
ResourceManager* rm = ResourceManager::instance();
|
|
DistributedEngineComm* fEC = DistributedEngineComm::instance(rm);
|
|
|
|
uint64_t maxDeleteRows = rm->getDMLMaxDeleteRows();
|
|
|
|
fConcurrentSupport = true;
|
|
string concurrentTranStr =
|
|
config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions");
|
|
|
|
if (concurrentTranStr.length() != 0)
|
|
{
|
|
if ((concurrentTranStr.compare("N") == 0) || (concurrentTranStr.compare("n") == 0))
|
|
fConcurrentSupport = false;
|
|
}
|
|
|
|
struct sigaction ign;
|
|
memset(&ign, 0, sizeof(ign));
|
|
ign.sa_handler = added_a_pm;
|
|
sigaction(SIGHUP, &ign, 0);
|
|
fEC->Open();
|
|
|
|
for (;;)
|
|
{
|
|
// cout << "DMLProc is waiting for a Calpont DML Package on " << fIos.getSockID() << endl;
|
|
try
|
|
{
|
|
bs1.reset(new messageqcpp::ByteStream(fIos.read()));
|
|
// cout << "received from mysql socket " << fIos.getSockID() << endl;
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
// This is an I/O error from InetStreamSocket::read(), just close and move on...
|
|
cout << "runtime error during read on " << fIos.getSockID() << " " << ex.what() << endl;
|
|
bs1->reset();
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "... error during read " << fIos.getSockID() << endl;
|
|
// all this throw does is cause this thread to silently go away. I doubt this is the right
|
|
// thing to do...
|
|
throw;
|
|
}
|
|
|
|
if (!bs1 || bs1->length() == 0)
|
|
{
|
|
cout << "Read 0 bytes. Closing connection " << fIos.getSockID() << endl;
|
|
fIos.close();
|
|
break;
|
|
}
|
|
|
|
uint32_t sessionID;
|
|
*bs1 >> sessionID;
|
|
*bs1 >> packageType;
|
|
// cout << "DMLProc received pkg. sessionid:type = " << sessionID <<":"<<(int)packageType << endl;
|
|
uint32_t stateFlags;
|
|
messageqcpp::ByteStream::byte status = 255;
|
|
messageqcpp::ByteStream::octbyte rowCount = 0;
|
|
|
|
if (fDbrm->getSystemState(stateFlags) >
|
|
0) // > 0 implies succesful retrieval. It doesn't imply anything about the contents
|
|
{
|
|
messageqcpp::ByteStream results;
|
|
std::string responseMsg;
|
|
bool bReject = false;
|
|
|
|
// Check to see if we're in write suspended mode
|
|
// If so, we can't process the request.
|
|
if (stateFlags & SessionManagerServer::SS_SUSPENDED)
|
|
{
|
|
status = DMLPackageProcessor::NOT_ACCEPTING_PACKAGES;
|
|
responseMsg = "Writing to the database is disabled.";
|
|
bReject = true;
|
|
}
|
|
|
|
// Check to see if we're in write suspend or shutdown pending mode
|
|
if (packageType != dmlpackage::DML_COMMAND) // Not a commit or rollback
|
|
{
|
|
if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING ||
|
|
stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING)
|
|
{
|
|
if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING)
|
|
{
|
|
responseMsg = "Writing to the database is disabled.";
|
|
}
|
|
else
|
|
{
|
|
responseMsg = "The database is being shut down.";
|
|
}
|
|
|
|
// Refuse all non active tranasactions
|
|
// Check the rollback flag
|
|
// -- Set: Rollback active transactions.
|
|
// -- Not set: Allow active transactions.
|
|
if (sessionManager.isTransactionActive(sessionID, bIsDbrmUp))
|
|
{
|
|
if (stateFlags & SessionManagerServer::SS_ROLLBACK)
|
|
{
|
|
status = DMLPackageProcessor::JOB_CANCELED;
|
|
bReject = true;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
status = DMLPackageProcessor::NOT_ACCEPTING_PACKAGES;
|
|
bReject = true;
|
|
}
|
|
}
|
|
|
|
// MCOL-4988 Check if DBRM is in READ ONLY mode
|
|
if (fDbrm->isReadWrite() == BRM::ERR_READONLY)
|
|
{
|
|
BRM::errString(BRM::ERR_READONLY, responseMsg);
|
|
status = DMLPackageProcessor::DBRM_READ_ONLY;
|
|
bReject = true;
|
|
}
|
|
|
|
if (bReject)
|
|
{
|
|
// For batch insert, we need to send a lastpkg message
|
|
// to batchInsertProcessor so it can clean things up.
|
|
if (packageType == dmlpackage::DML_INSERT)
|
|
{
|
|
// build an InsertDMLPackage from the bytestream
|
|
// We need the flags from the package to know what
|
|
// type of package we're dealing with before we can
|
|
// take special action for the last package of a
|
|
// batch insert.
|
|
dmlpackage::InsertDMLPackage insertPkg;
|
|
messageqcpp::ByteStream bsSave = *(bs1.get());
|
|
insertPkg.read(*(bs1.get()));
|
|
BatchInsertProc* batchInsertProcessor = NULL;
|
|
|
|
if (insertPkg.get_isBatchInsert() && insertPkg.get_Logending())
|
|
{
|
|
{
|
|
boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
|
|
std::map<uint32_t, BatchInsertProc*>::iterator batchIter =
|
|
DMLProcessor::batchinsertProcessorMap.find(sessionID);
|
|
|
|
if (batchIter !=
|
|
DMLProcessor::batchinsertProcessorMap.end()) // The first batch, no need to do anything
|
|
{
|
|
batchInsertProcessor = batchIter->second;
|
|
batchInsertProcessor->addPkg(bsSave);
|
|
|
|
batchInsertProcessor->sendlastBatch();
|
|
batchInsertProcessor->receiveAllMsg();
|
|
|
|
if (!insertPkg.get_isAutocommitOn())
|
|
{
|
|
batchInsertProcessor->setHwm();
|
|
}
|
|
|
|
batchIter = DMLProcessor::batchinsertProcessorMap.find(sessionID);
|
|
|
|
if (batchIter != DMLProcessor::batchinsertProcessorMap.end())
|
|
{
|
|
DMLProcessor::batchinsertProcessorMap.erase(sessionID);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
results << status;
|
|
results << rowCount;
|
|
logging::Message::Args args;
|
|
logging::Message message(2);
|
|
args.add(responseMsg);
|
|
message.format(args);
|
|
results << message.msg();
|
|
fIos.write(results);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
// This section is to check to see if the user hit CTRL+C while the
|
|
// DML was processing If so, the sessionID will be found in
|
|
// packageHandlerMap and we can set rollbackPending in the
|
|
// associated packageHandler. Other than CTRL+C, we should never
|
|
// find our own sessionID in the map.
|
|
// This mechanism may prove useful for other things, so the above
|
|
// comment may change.
|
|
{
|
|
boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock);
|
|
DMLProcessor::PackageHandlerMap_t::iterator phIter = packageHandlerMap.find(sessionID);
|
|
|
|
if (phIter != packageHandlerMap.end())
|
|
{
|
|
if (packageType == dmlpackage::DML_COMMAND)
|
|
{
|
|
// MCOL-66 It's possible for a commit or rollback to get here if
|
|
// the timing is just right. Don't destroy its data
|
|
messageqcpp::ByteStream bsctrlc(bs1);
|
|
dmlpackage::CommandDMLPackage commandPkg;
|
|
commandPkg.read(bsctrlc);
|
|
std::string stmt = commandPkg.get_DMLStatement();
|
|
boost::algorithm::to_upper(stmt);
|
|
trim(stmt);
|
|
|
|
if (stmt == "CTRL+C")
|
|
{
|
|
phIter->second->rollbackPending();
|
|
fIos.close();
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// If there's a PackageHandler already working for this
|
|
// sessionID, we have a problem. Reject this package
|
|
messageqcpp::ByteStream results;
|
|
ostringstream oss;
|
|
oss << "Received a DML command for session " << sessionID
|
|
<< " while still processing a command for the same sessionID";
|
|
results << static_cast<messageqcpp::ByteStream::byte>(DMLPackageProcessor::DEAD_LOCK_ERROR);
|
|
results << static_cast<messageqcpp::ByteStream::octbyte>(0); // rowcount
|
|
logging::Message::Args args;
|
|
logging::Message message(2);
|
|
args.add(oss.str());
|
|
message.format(args);
|
|
logging::LoggingID lid(20);
|
|
logging::MessageLog ml(lid);
|
|
ml.logErrorMessage(message);
|
|
results << message.msg();
|
|
fIos.write(results);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
// cout << " got a ";
|
|
switch (packageType)
|
|
{
|
|
case dmlpackage::DML_INSERT:
|
|
// cout << "DML_INSERT";
|
|
break;
|
|
|
|
case dmlpackage::DML_UPDATE:
|
|
// cout << "DML_UPDATE";
|
|
break;
|
|
|
|
case dmlpackage::DML_DELETE:
|
|
// cout << "DML_DELETE";
|
|
break;
|
|
|
|
case dmlpackage::DML_COMMAND:
|
|
// cout << "DML_COMMAND";
|
|
break;
|
|
|
|
case dmlpackage::DML_INVALID_TYPE:
|
|
// cout << "DML_INVALID_TYPE";
|
|
break;
|
|
|
|
default:
|
|
// cout << "UNKNOWN";
|
|
break;
|
|
}
|
|
|
|
// cout << " package" << endl;
|
|
|
|
BRM::TxnID txnid;
|
|
|
|
if (!fConcurrentSupport)
|
|
{
|
|
// Check if any other active transaction
|
|
bool anyOtherActiveTransaction = true;
|
|
BRM::SIDTIDEntry blockingsid;
|
|
|
|
// For logout commit trigger
|
|
if (packageType == dmlpackage::DML_COMMAND)
|
|
{
|
|
anyOtherActiveTransaction = false;
|
|
}
|
|
|
|
int i = 0;
|
|
int waitPeriod = 10;
|
|
//@Bug 2487 Check transaction map every 1/10 second
|
|
|
|
int sleepTime = 100; // sleep 100 milliseconds between checks
|
|
int numTries = 10; // try 10 times per second
|
|
|
|
string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod");
|
|
|
|
if (waitPeriodStr.length() != 0)
|
|
waitPeriod = static_cast<int>(config::Config::fromText(waitPeriodStr));
|
|
|
|
numTries = waitPeriod * 10;
|
|
struct timespec rm_ts;
|
|
|
|
rm_ts.tv_sec = sleepTime / 1000;
|
|
rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
|
|
|
|
// cout << "starting i = " << i << endl;
|
|
// txnid = sessionManager.getTxnID(sessionID);
|
|
while (anyOtherActiveTransaction)
|
|
{
|
|
anyOtherActiveTransaction =
|
|
sessionManager.checkActiveTransaction(sessionID, bIsDbrmUp, blockingsid);
|
|
|
|
// cout << "session " << sessionID << " with package type " << (int)packageType << " got
|
|
// anyOtherActiveTransaction " << anyOtherActiveTransaction << endl;
|
|
if (anyOtherActiveTransaction)
|
|
{
|
|
for (; i < numTries; i++)
|
|
{
|
|
struct timespec abs_ts;
|
|
|
|
// cout << "session " << sessionID << " nanosleep on package type " << (int)packageType << endl;
|
|
do
|
|
{
|
|
abs_ts.tv_sec = rm_ts.tv_sec;
|
|
abs_ts.tv_nsec = rm_ts.tv_nsec;
|
|
} while (nanosleep(&abs_ts, &rm_ts) < 0);
|
|
|
|
anyOtherActiveTransaction =
|
|
sessionManager.checkActiveTransaction(sessionID, bIsDbrmUp, blockingsid);
|
|
|
|
if (!anyOtherActiveTransaction)
|
|
{
|
|
txnid = sessionManager.getTxnID(sessionID);
|
|
|
|
// cout << "Ready to process type " << (int)packageType << " with txd " << txnid << endl;
|
|
if (!txnid.valid)
|
|
{
|
|
txnid = sessionManager.newTxnID(sessionID, true);
|
|
|
|
if (txnid.valid)
|
|
{
|
|
// cout << "Ready to process type " << (int)packageType << " for session "<< sessionID <<
|
|
// " with new txnid " << txnid.id << endl;
|
|
anyOtherActiveTransaction = false;
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
anyOtherActiveTransaction = true;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
anyOtherActiveTransaction = false;
|
|
// cout << "already have transaction to process type " << (int)packageType << " for session
|
|
// "<< sessionID <<" with existing txnid " << txnid.id << endl;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// cout << "ending i = " << i << endl;
|
|
}
|
|
else
|
|
{
|
|
// cout << "Ready to process type " << (int)packageType << endl;
|
|
txnid = sessionManager.getTxnID(sessionID);
|
|
|
|
if (!txnid.valid)
|
|
{
|
|
txnid = sessionManager.newTxnID(sessionID, true);
|
|
|
|
if (txnid.valid)
|
|
{
|
|
// cout << "later Ready to process type " << (int)packageType << " for session "<< sessionID
|
|
// << " with new txnid " << txnid.id << endl;
|
|
anyOtherActiveTransaction = false;
|
|
}
|
|
else
|
|
{
|
|
anyOtherActiveTransaction = true;
|
|
// cout << "Cannot get txnid for process type " << (int)packageType << " for session "<<
|
|
// sessionID << endl;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
anyOtherActiveTransaction = false;
|
|
// cout << "already have transaction to process type " << (int)packageType << " for session "<<
|
|
// sessionID <<" with txnid " << txnid.id << endl;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if ((anyOtherActiveTransaction) && (i >= numTries))
|
|
{
|
|
// cout << " Erroring out on package type " << (int)packageType << " for session " << sessionID <<
|
|
// endl;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (anyOtherActiveTransaction && (i >= numTries))
|
|
{
|
|
// cout << " again Erroring out on package type " << (int)packageType << endl;
|
|
messageqcpp::ByteStream results;
|
|
//@Bug 2681 set error code for active transaction
|
|
status = DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR;
|
|
rowCount = 0;
|
|
results << status;
|
|
results << rowCount;
|
|
Message::Args args;
|
|
args.add(static_cast<uint64_t>(blockingsid.sessionid));
|
|
results << IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args);
|
|
//@Bug 3854 Log to debug.log
|
|
LoggingID logid(20, 0, 0);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add(IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args));
|
|
msg.format(args1);
|
|
logging::Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
|
|
fIos.write(results);
|
|
}
|
|
else
|
|
{
|
|
// cout << "starting processing package type " << (int) packageType << " for session " << sessionID
|
|
// << " with id " << txnid.id << endl;
|
|
boost::shared_ptr<PackageHandler> php(new PackageHandler(fIos, bs1, packageType, fEC,
|
|
fConcurrentSupport, maxDeleteRows,
|
|
sessionID, txnid.id, fDbrm, fQtc, csc));
|
|
// We put the packageHandler into a map so that if we receive a
|
|
// message to affect the previous command, we can find it.
|
|
boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock);
|
|
|
|
lk2.lock();
|
|
packageHandlerMap[sessionID] = php;
|
|
lk2.unlock();
|
|
|
|
php->run(); // Operates in this thread.
|
|
|
|
// Move this to the end of PackageHandler so it is removed from the map before the response is sent
|
|
// lk2.lock();
|
|
// packageHandlerMap.erase(sessionID);
|
|
// lk2.unlock();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
#if 0
|
|
|
|
if (packageType != dmlpackage::DML_COMMAND)
|
|
{
|
|
txnid = sessionManager.getTxnID(sessionID);
|
|
|
|
if ( !txnid.valid )
|
|
{
|
|
txnid = sessionManager.newTxnID(sessionID, true);
|
|
|
|
if (!txnid.valid)
|
|
{
|
|
throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
txnid = sessionManager.getTxnID(sessionID);
|
|
}
|
|
|
|
#endif
|
|
boost::shared_ptr<PackageHandler> php(new PackageHandler(
|
|
fIos, bs1, packageType, fEC, fConcurrentSupport, maxDeleteRows, sessionID, 0, fDbrm, fQtc, csc));
|
|
// We put the packageHandler into a map so that if we receive a
|
|
// message to affect the previous command, we can find it.
|
|
boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock);
|
|
|
|
lk2.lock();
|
|
packageHandlerMap[sessionID] = php;
|
|
lk2.unlock();
|
|
|
|
php->run(); // Operates in this thread.
|
|
|
|
// Move this to the end of PackageHandler so it is removed from the map before the response is sent
|
|
// lk2.lock();
|
|
// packageHandlerMap.erase(sessionID);
|
|
// lk2.unlock();
|
|
}
|
|
}
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
ostringstream oss;
|
|
oss << "DMLProcessor failed on: " << ex.what();
|
|
DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG);
|
|
fIos.close();
|
|
}
|
|
catch (...)
|
|
{
|
|
ostringstream oss;
|
|
oss << "DMLProcessor failed on: processing DMLPackage.";
|
|
DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG);
|
|
cerr << "Caught unknown exception! " << oss.str();
|
|
fIos.close();
|
|
}
|
|
}
|
|
|
|
void RollbackTransactionProcessor::processBulkRollback(BRM::TableLockInfo lockInfo, BRM::DBRM* dbrm,
|
|
uint64_t uniqueId,
|
|
OamCache::dbRootPMMap_t& dbRootPMMap,
|
|
bool& lockReleased)
|
|
{
|
|
// Take over ownership of stale lock.
|
|
// Use "DMLProc" as process name, session id and transaction id -1 to distinguish from real DMLProc rollback
|
|
int32_t sessionID = -1;
|
|
int32_t txnid = -1;
|
|
std::string processName("DMLProc");
|
|
uint32_t processID = ::getpid();
|
|
bool ownerChanged = true;
|
|
lockReleased = true;
|
|
|
|
try
|
|
{
|
|
ownerChanged = dbrm->changeOwner(lockInfo.id, processName, processID, sessionID, txnid);
|
|
}
|
|
catch (std::exception&)
|
|
{
|
|
lockReleased = false;
|
|
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
|
|
}
|
|
|
|
if (!ownerChanged)
|
|
{
|
|
lockReleased = false;
|
|
throw std::runtime_error(std::string("Unable to grab lock; lock not found or still in use."));
|
|
}
|
|
|
|
// send to all PMs
|
|
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
|
|
messageqcpp::ByteStream bsOut;
|
|
string tableName("");
|
|
fWEClient->addQueue(uniqueId);
|
|
// find the PMs need to send the message to
|
|
std::set<int> pmSet;
|
|
int pmId;
|
|
|
|
for (uint32_t i = 0; i < lockInfo.dbrootList.size(); i++)
|
|
{
|
|
pmId = (*dbRootPMMap)[lockInfo.dbrootList[i]];
|
|
pmSet.insert(pmId);
|
|
}
|
|
|
|
if (lockInfo.state == BRM::LOADING)
|
|
{
|
|
bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK;
|
|
bsOut << uniqueId;
|
|
bsOut << lockInfo.id;
|
|
bsOut << lockInfo.tableOID;
|
|
bsOut << tableName;
|
|
bsOut << processName;
|
|
std::set<int>::const_iterator iter = pmSet.begin();
|
|
|
|
while (iter != pmSet.end())
|
|
{
|
|
fWEClient->write(bsOut, *iter);
|
|
iter++;
|
|
}
|
|
|
|
// Wait for "all" the responses, and accumulate any/all errors
|
|
unsigned int pmMsgCnt = 0;
|
|
|
|
while (pmMsgCnt < pmSet.size())
|
|
{
|
|
std::string rollbackErrMsg;
|
|
bsIn.reset(new messageqcpp::ByteStream());
|
|
fWEClient->read(uniqueId, bsIn);
|
|
|
|
if (bsIn->length() == 0)
|
|
{
|
|
fWEClient->removeQueue(uniqueId);
|
|
lockReleased = false;
|
|
throw std::runtime_error("Network error, PM rollback; ");
|
|
}
|
|
else
|
|
{
|
|
messageqcpp::ByteStream::byte rc;
|
|
uint16_t pmNum;
|
|
*bsIn >> rc;
|
|
*bsIn >> rollbackErrMsg;
|
|
*bsIn >> pmNum;
|
|
|
|
if (rc != 0)
|
|
{
|
|
fWEClient->removeQueue(uniqueId);
|
|
lockReleased = false;
|
|
throw std::runtime_error(rollbackErrMsg);
|
|
}
|
|
}
|
|
|
|
pmMsgCnt++;
|
|
} // end of while loop to process all responses to bulk rollback
|
|
|
|
// If no errors so far, then change state to CLEANUP state.
|
|
// We ignore the return stateChange flag.
|
|
dbrm->changeState(lockInfo.id, BRM::CLEANUP);
|
|
} // end of (lockInfo.state == BRM::LOADING)
|
|
|
|
// delete meta data backup rollback files
|
|
bsOut.reset();
|
|
|
|
bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK_CLEANUP;
|
|
bsOut << uniqueId;
|
|
bsOut << lockInfo.tableOID;
|
|
std::set<int>::const_iterator iter = pmSet.begin();
|
|
|
|
while (iter != pmSet.end())
|
|
{
|
|
fWEClient->write(bsOut, *iter);
|
|
iter++;
|
|
}
|
|
|
|
// Wait for "all" the responses, and accumulate any/all errors
|
|
unsigned int pmMsgCnt = 0;
|
|
//@Bug 4517 Release tablelock when it is in CLEANUP state
|
|
uint32_t rcCleanup = 0;
|
|
std::string fileDeleteErrMsg;
|
|
|
|
while (pmMsgCnt < pmSet.size())
|
|
{
|
|
bsIn.reset(new messageqcpp::ByteStream());
|
|
fWEClient->read(uniqueId, bsIn);
|
|
|
|
if (bsIn->length() == 0)
|
|
{
|
|
fWEClient->removeQueue(uniqueId);
|
|
rcCleanup = 1;
|
|
fileDeleteErrMsg = "Network error, PM clean up; ";
|
|
}
|
|
else
|
|
{
|
|
messageqcpp::ByteStream::byte rc;
|
|
uint16_t pmNum;
|
|
*bsIn >> rc;
|
|
*bsIn >> fileDeleteErrMsg;
|
|
*bsIn >> pmNum;
|
|
|
|
if ((rc != 0) && (rcCleanup == 0))
|
|
{
|
|
fWEClient->removeQueue(uniqueId);
|
|
rcCleanup = rc;
|
|
}
|
|
}
|
|
|
|
pmMsgCnt++;
|
|
} // end of while loop to process all responses to rollback cleanup
|
|
|
|
fWEClient->removeQueue(uniqueId);
|
|
// We ignore return release flag from releaseTableLock().
|
|
dbrm->releaseTableLock(lockInfo.id);
|
|
|
|
if (rcCleanup != 0)
|
|
throw std::runtime_error(fileDeleteErrMsg);
|
|
}
|
|
|
|
void DMLProcessor::log(const std::string& msg, logging::LOG_TYPE level)
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(2);
|
|
args.add(msg);
|
|
message.format(args);
|
|
logging::LoggingID lid(20);
|
|
logging::MessageLog ml(lid);
|
|
|
|
switch (level)
|
|
{
|
|
case LOG_TYPE_DEBUG: ml.logDebugMessage(message); break;
|
|
|
|
case LOG_TYPE_INFO: ml.logInfoMessage(message); break;
|
|
|
|
case LOG_TYPE_WARNING: ml.logWarningMessage(message); break;
|
|
|
|
case LOG_TYPE_ERROR: ml.logErrorMessage(message); break;
|
|
|
|
case LOG_TYPE_CRITICAL: ml.logCriticalMessage(message); break;
|
|
}
|
|
}
|
|
|
|
} // namespace dmlprocessor
|