You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
This patch introduces centralized logic of selecting what dbroot is accessible in PrimProc on what node. The logic is in OamCache for time being and can be moved later.
448 lines
13 KiB
C++
448 lines
13 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2016 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: insertpackageprocessor.cpp 9642 2013-06-24 14:57:42Z rdempsey $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
#include <iostream>
|
|
#include "insertpackageprocessor.h"
|
|
#include "autoincrementdata.h"
|
|
#include <vector>
|
|
#include <algorithm>
|
|
#include <sstream>
|
|
#include "messagelog.h"
|
|
#include "sqllogger.h"
|
|
#include <boost/algorithm/string/case_conv.hpp>
|
|
#include "oamcache.h"
|
|
#include "bytestream.h"
|
|
#include <map>
|
|
#include <boost/shared_ptr.hpp>
|
|
#include <boost/thread.hpp>
|
|
#include "we_messages.h"
|
|
#include "tablelockdata.h"
|
|
|
|
using namespace boost::algorithm;
|
|
using namespace std;
|
|
using namespace WriteEngine;
|
|
using namespace dmlpackage;
|
|
using namespace execplan;
|
|
using namespace dataconvert;
|
|
using namespace logging;
|
|
using namespace oam;
|
|
using namespace messageqcpp;
|
|
|
|
namespace dmlpackageprocessor
|
|
{
|
|
DMLPackageProcessor::DMLResult InsertPackageProcessor::processPackageInternal(
|
|
dmlpackage::CalpontDMLPackage& cpackage)
|
|
{
|
|
SUMMARY_INFO("InsertPackageProcessor::processPackage");
|
|
|
|
DMLResult result;
|
|
result.result = NO_ERROR;
|
|
BRM::TxnID txnid;
|
|
// set-up the transaction
|
|
txnid.id = cpackage.get_TxnID();
|
|
txnid.valid = true;
|
|
fSessionID = cpackage.get_SessionID();
|
|
DMLTable* tablePtr = cpackage.get_Table();
|
|
|
|
LoggingID logid(DMLLoggingId, fSessionID, txnid.id);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add("Start SQL statement: ");
|
|
ostringstream oss;
|
|
oss << cpackage.get_SQLStatement() << "; |" << tablePtr->get_SchemaName() << "|";
|
|
args1.add(oss.str());
|
|
|
|
msg.format(args1);
|
|
Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
// WriteEngine::ChunkManager* cm = cpackage.get_ChunkManager();
|
|
// fWriteEngine.setChunkManager(cm);
|
|
// std::map<uint32_t,uint32_t> oids;
|
|
VERBOSE_INFO("Processing Insert DML Package...");
|
|
uint64_t uniqueId = 0;
|
|
|
|
// Bug 5070. Added exception handling
|
|
try
|
|
{
|
|
uniqueId = fDbrm->getUnique64();
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(9);
|
|
args.add(ex.what());
|
|
message.format(args);
|
|
result.result = INSERT_ERROR;
|
|
result.message = message;
|
|
fSessionManager.rolledback(txnid);
|
|
return result;
|
|
}
|
|
catch (...)
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(9);
|
|
args.add("Unknown error occurred while getting unique number.");
|
|
message.format(args);
|
|
result.result = INSERT_ERROR;
|
|
result.message = message;
|
|
fSessionManager.rolledback(txnid);
|
|
return result;
|
|
}
|
|
|
|
uint64_t tableLockId = 0;
|
|
int rc = 0;
|
|
std::string errorMsg;
|
|
OamCache* oamcache = OamCache::makeOamCache();
|
|
std::vector<int> moduleIds = oamcache->getModuleIds();
|
|
std::vector<uint32_t> pms;
|
|
|
|
try
|
|
{
|
|
for (unsigned int i = 0; i < moduleIds.size(); i++)
|
|
{
|
|
pms.push_back((uint32_t)moduleIds[i]);
|
|
}
|
|
|
|
// cout << "single insert get transaction id " << txnid.id << endl;
|
|
// get the table object from the package
|
|
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
|
|
CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
|
|
// cout << "DMLProc using syscatptr:sessionid = " << systemCatalogPtr <<":" << fSessionID<< endl;
|
|
CalpontSystemCatalog::TableName tableName;
|
|
execplan::CalpontSystemCatalog::ROPair roPair;
|
|
TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
|
|
|
|
if (0 != tablePtr)
|
|
{
|
|
// check table lock
|
|
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
|
|
systemCatalogPtr->sessionID(fSessionID);
|
|
tableName.schema = tablePtr->get_SchemaName();
|
|
tableName.table = tablePtr->get_TableName();
|
|
roPair = systemCatalogPtr->tableRID(tableName);
|
|
|
|
tableLockId = tablelockData->getTablelockId(
|
|
roPair.objnum); // check whether this table is locked already for this session
|
|
|
|
if (tableLockId == 0)
|
|
{
|
|
// cout << "tablelock is not found in cache, getting from dbrm" << endl;
|
|
uint32_t processID = ::getpid();
|
|
int32_t txnId = txnid.id;
|
|
int32_t sessionId = fSessionID;
|
|
std::string processName("DMLProc");
|
|
int i = 0;
|
|
|
|
try
|
|
{
|
|
tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnId,
|
|
BRM::LOADING);
|
|
}
|
|
catch (std::exception&)
|
|
{
|
|
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
|
|
}
|
|
|
|
if (tableLockId == 0)
|
|
{
|
|
int waitPeriod = 10;
|
|
int sleepTime = 100; // sleep 100 milliseconds between checks
|
|
int numTries = 10; // try 10 times per second
|
|
waitPeriod = Config::getWaitPeriod();
|
|
numTries = waitPeriod * 10;
|
|
struct timespec rm_ts;
|
|
|
|
rm_ts.tv_sec = sleepTime / 1000;
|
|
rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
|
|
|
|
for (; i < numTries; i++)
|
|
{
|
|
struct timespec abs_ts;
|
|
|
|
do
|
|
{
|
|
abs_ts.tv_sec = rm_ts.tv_sec;
|
|
abs_ts.tv_nsec = rm_ts.tv_nsec;
|
|
} while (nanosleep(&abs_ts, &rm_ts) < 0);
|
|
|
|
try
|
|
{
|
|
processID = ::getpid();
|
|
txnId = txnid.id;
|
|
sessionId = fSessionID;
|
|
processName = "DMLProc";
|
|
tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId,
|
|
&txnId, BRM::LOADING);
|
|
}
|
|
catch (std::exception&)
|
|
{
|
|
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
|
|
}
|
|
|
|
if (tableLockId > 0)
|
|
break;
|
|
}
|
|
|
|
if (i >= numTries) // error out
|
|
{
|
|
result.result = INSERT_ERROR;
|
|
logging::Message::Args args;
|
|
string strOp("insert");
|
|
args.add(strOp);
|
|
args.add(processName);
|
|
args.add((uint64_t)processID);
|
|
args.add(sessionId);
|
|
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
|
|
}
|
|
}
|
|
}
|
|
|
|
// cout << " tablelock is obtained with id " << tableLockId << endl;
|
|
tablelockData->setTablelock(roPair.objnum, tableLockId);
|
|
|
|
int pmNum = 0;
|
|
|
|
// Select PM to receive the row.
|
|
// 1. Get BRM information
|
|
// 2. Find the DBRoot with the fewest in-service blocks.
|
|
// DBRoots having no blocks are excluded
|
|
// 3. Map the selected DBRoot to the corresponding PM
|
|
CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(tableName, true);
|
|
std::vector<BRM::EmDbRootHWMInfo_v> allInfo(pms.size());
|
|
|
|
for (unsigned i = 0; i < pms.size(); i++)
|
|
{
|
|
rc = fDbrm->getDbRootHWMInfo((ridList[0].objnum), pms[i], allInfo[i]);
|
|
|
|
if (rc != 0) //@Bug 4760.
|
|
{
|
|
result.result = INSERT_ERROR;
|
|
ostringstream oss;
|
|
oss << "Error getting extent information for table " << tableName.table;
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
}
|
|
|
|
// Find DBRoot with fewest blocks; if all DBRoots
|
|
// have 0 blocks, then we select the first DBRoot
|
|
BRM::EmDbRootHWMInfo tmp;
|
|
bool tmpSet = false;
|
|
|
|
for (unsigned i = 0; i < allInfo.size(); i++)
|
|
{
|
|
BRM::EmDbRootHWMInfo_v emDbRootHWMInfos = allInfo[i];
|
|
|
|
for (unsigned j = 0; j < emDbRootHWMInfos.size(); j++)
|
|
{
|
|
if (!tmpSet)
|
|
{
|
|
tmp = emDbRootHWMInfos[j];
|
|
tmpSet = true;
|
|
}
|
|
else if (emDbRootHWMInfos[j].totalBlocks > 0)
|
|
{
|
|
if ((emDbRootHWMInfos[j].totalBlocks < tmp.totalBlocks) || (tmp.totalBlocks == 0))
|
|
{
|
|
tmp = emDbRootHWMInfos[j];
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Select the PM to receive the row
|
|
uint32_t dbroot;
|
|
|
|
if (tmpSet)
|
|
{
|
|
dbroot = tmp.dbRoot;
|
|
pmNum = oamcache->getOwnerPM(dbroot);
|
|
|
|
//@Bug 4760. validate pm value
|
|
if (pmNum == 0)
|
|
{
|
|
result.result = INSERT_ERROR;
|
|
ostringstream oss;
|
|
oss << "Error mapping extent/DBRoot to PM for table " << tableName.table;
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
}
|
|
else
|
|
{
|
|
result.result = INSERT_ERROR;
|
|
ostringstream oss;
|
|
oss << "There is no extent information for table " << tableName.table;
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
|
|
// This is for single insert only. Batch insert is handled in dmlprocessor.
|
|
// cout << "fWEClient = " << fWEClient << endl;
|
|
fWEClient->addQueue(uniqueId);
|
|
ByteStream bytestream;
|
|
bytestream << (uint8_t)WE_SVR_SINGLE_INSERT;
|
|
bytestream << uniqueId;
|
|
bytestream << (uint32_t)txnid.id;
|
|
bytestream << dbroot;
|
|
cpackage.write(bytestream);
|
|
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
|
|
|
|
ByteStream::byte rc1;
|
|
|
|
try
|
|
{
|
|
fWEClient->write(bytestream, (uint32_t)pmNum);
|
|
#ifdef IDB_DML_DEBUG
|
|
cout << "Single insert sending WE_SVR_SINGLE_INSERT to pm " << pmNum << endl;
|
|
#endif
|
|
|
|
bsIn.reset(new ByteStream());
|
|
fWEClient->read(uniqueId, bsIn);
|
|
|
|
if (bsIn->length() == 0) // read error
|
|
{
|
|
rc = NETWORK_ERROR;
|
|
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
|
|
}
|
|
else
|
|
{
|
|
*bsIn >> rc1;
|
|
|
|
if (rc1 != 0)
|
|
{
|
|
*bsIn >> errorMsg;
|
|
rc = rc1;
|
|
}
|
|
}
|
|
}
|
|
catch (runtime_error& ex) // write error
|
|
{
|
|
#ifdef IDB_DML_DEBUG
|
|
cout << "Single insert got exception" << ex.what() << endl;
|
|
#endif
|
|
rc = NETWORK_ERROR;
|
|
errorMsg = ex.what();
|
|
}
|
|
catch (...)
|
|
{
|
|
errorMsg = "Caught ... exception during single row insert";
|
|
rc = NETWORK_ERROR;
|
|
#ifdef IDB_DML_DEBUG
|
|
cout << "Single insert got unknown exception" << endl;
|
|
#endif
|
|
}
|
|
|
|
// Log the insert statement.
|
|
LoggingID logid(DMLLoggingId, fSessionID, txnid.id);
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
args1.add("End SQL statement");
|
|
msg.format(args1);
|
|
Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
logging::logDML(cpackage.get_SessionID(), txnid.id, cpackage.get_SQLStatement() + ";",
|
|
cpackage.get_SchemaName());
|
|
}
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
if (checkPPLostConnection(ex))
|
|
{
|
|
result.result = PP_LOST_CONNECTION;
|
|
}
|
|
else
|
|
{
|
|
cerr << "InsertPackageProcessor::processPackage: " << ex.what() << endl;
|
|
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add("Insert Failed: ");
|
|
args.add(ex.what());
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
|
|
if (result.result != VB_OVERFLOW_ERROR)
|
|
{
|
|
result.result = INSERT_ERROR;
|
|
result.message = message;
|
|
errorMsg = ex.what();
|
|
}
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
cerr << "InsertPackageProcessor::processPackage: caught unknown exception!" << endl;
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add("Insert Failed: ");
|
|
args.add("encountered unkown exception");
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
|
|
result.result = INSERT_ERROR;
|
|
result.message = message;
|
|
}
|
|
|
|
if (rc == 1)
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add("Insert Failed: ");
|
|
args.add(errorMsg);
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
result.result = PP_LOST_CONNECTION;
|
|
result.message = message;
|
|
}
|
|
else if ((rc != 0) && (rc != IDBRANGE_WARNING))
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add("Insert Failed: ");
|
|
args.add(errorMsg);
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
result.result = INSERT_ERROR;
|
|
result.message = message;
|
|
}
|
|
else if (rc == IDBRANGE_WARNING)
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add(errorMsg);
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
result.result = IDBRANGE_WARNING;
|
|
result.message = message;
|
|
}
|
|
|
|
fWEClient->removeQueue(uniqueId);
|
|
VERBOSE_INFO("Finished Processing Insert DML Package");
|
|
return result;
|
|
}
|
|
|
|
} // namespace dmlpackageprocessor
|