You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
This patch introduces centralized logic of selecting what dbroot is accessible in PrimProc on what node. The logic is in OamCache for time being and can be moved later.
858 lines
24 KiB
C++
858 lines
24 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2016 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: createtableprocessor.cpp 9627 2013-06-18 13:59:21Z rdempsey $
|
|
|
|
#include <unistd.h>
|
|
#include <string>
|
|
using namespace std;
|
|
|
|
#include <boost/algorithm/string/case_conv.hpp>
|
|
|
|
#include "createtableprocessor.h"
|
|
|
|
#include "ddlpkg.h"
|
|
using namespace ddlpackage;
|
|
|
|
#include "we_messages.h"
|
|
using namespace WriteEngine;
|
|
|
|
#include "oamcache.h"
|
|
using namespace oam;
|
|
|
|
#include "bytestream.h"
|
|
using namespace messageqcpp;
|
|
|
|
#include "calpontsystemcatalog.h"
|
|
using namespace execplan;
|
|
|
|
#include "sqllogger.h"
|
|
#include "messagelog.h"
|
|
using namespace logging;
|
|
|
|
namespace ddlpackageprocessor
|
|
{
|
|
CreateTableProcessor::DDLResult CreateTableProcessor::processPackageInternal(
|
|
ddlpackage::SqlStatement* sqlStmt)
|
|
{
|
|
SUMMARY_INFO("CreateTableProcessor::processPackage");
|
|
|
|
DDLResult result;
|
|
BRM::TxnID txnID;
|
|
txnID.id = fTxnid.id;
|
|
txnID.valid = fTxnid.valid;
|
|
result.result = NO_ERROR;
|
|
int rc1 = 0;
|
|
rc1 = fDbrm->isReadWrite();
|
|
|
|
if (rc1 != 0)
|
|
{
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("Unable to execute the statement due to DBRM is read only");
|
|
message.format(args);
|
|
result.result = CREATE_ERROR;
|
|
result.message = message;
|
|
fSessionManager.rolledback(txnID);
|
|
return result;
|
|
}
|
|
|
|
ddlpackage::CreateTableStatement* createTableStmt =
|
|
dynamic_cast<ddlpackage::CreateTableStatement*>(sqlStmt);
|
|
|
|
if (!createTableStmt)
|
|
{
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("CreateTableStatement wrong cast");
|
|
message.format(args);
|
|
result.result = CREATE_ERROR;
|
|
result.message = message;
|
|
return result;
|
|
}
|
|
|
|
DETAIL_INFO(createTableStmt);
|
|
ddlpackage::TableDef& tableDef = *(createTableStmt->fTableDef);
|
|
// If schema = CALPONTSYS, do not create table
|
|
|
|
if (tableDef.fQualifiedName->fSchema == CALPONT_SCHEMA)
|
|
{
|
|
// release the transaction
|
|
fSessionManager.rolledback(txnID);
|
|
return result;
|
|
}
|
|
|
|
// Commit current transaction.
|
|
// all DDL statements cause an implicut commit
|
|
VERBOSE_INFO("Getting current txnID");
|
|
|
|
// Check whether the table is existed already
|
|
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
|
|
CalpontSystemCatalog::makeCalpontSystemCatalog(createTableStmt->fSessionID);
|
|
execplan::CalpontSystemCatalog::TableName tableName;
|
|
tableName.schema = tableDef.fQualifiedName->fSchema;
|
|
tableName.table = tableDef.fQualifiedName->fName;
|
|
execplan::CalpontSystemCatalog::ROPair roPair;
|
|
roPair.objnum = 0;
|
|
ByteStream::byte rc = 0;
|
|
|
|
/** @Bug 217 */
|
|
/** @Bug 225 */
|
|
try
|
|
{
|
|
roPair = systemCatalogPtr->tableRID(tableName);
|
|
}
|
|
catch (IDBExcept& ie)
|
|
{
|
|
if (checkPPLostConnection(ie.what()))
|
|
{
|
|
result.result = NETWORK_ERROR;
|
|
return result;
|
|
}
|
|
else
|
|
{
|
|
// TODO: What is and is not an error here?
|
|
if (ie.errorCode() == ERR_DATA_OFFLINE)
|
|
{
|
|
// release transaction
|
|
fSessionManager.rolledback(txnID);
|
|
// Return the error for display to user
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add(ie.what());
|
|
message.format(args);
|
|
result.result = CREATE_ERROR;
|
|
result.message = message;
|
|
return result;
|
|
}
|
|
else if (ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG)
|
|
{
|
|
roPair.objnum = 0;
|
|
}
|
|
else // error out
|
|
{
|
|
// release transaction
|
|
fSessionManager.rolledback(txnID);
|
|
// Return the error for display to user
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add(ie.what());
|
|
message.format(args);
|
|
result.result = CREATE_ERROR;
|
|
result.message = message;
|
|
return result;
|
|
}
|
|
}
|
|
}
|
|
catch (std::exception& ex) // error out
|
|
{
|
|
// release transaction
|
|
fSessionManager.rolledback(txnID);
|
|
// Return the error for display to user
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add(ex.what());
|
|
message.format(args);
|
|
result.result = CREATE_ERROR;
|
|
result.message = message;
|
|
return result;
|
|
}
|
|
catch (...) // error out
|
|
{
|
|
// release transaction
|
|
fSessionManager.rolledback(txnID);
|
|
// Return the error for display to user
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("Unknown exception caught when checking if the table name is already in use.");
|
|
message.format(args);
|
|
result.result = CREATE_ERROR;
|
|
result.message = message;
|
|
return result;
|
|
}
|
|
|
|
// This is a current db bug, it should not turn OID is it cannot find
|
|
if (roPair.objnum >= 3000)
|
|
{
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("Internal create table error for");
|
|
args.add(tableName.toString());
|
|
args.add(": table already exists");
|
|
args.add("(your schema is probably out-of-sync)");
|
|
message.format(args);
|
|
|
|
result.result = CREATE_ERROR;
|
|
result.message = message;
|
|
// release the transaction
|
|
fSessionManager.rolledback(txnID);
|
|
return result;
|
|
}
|
|
|
|
// Start a new transaction
|
|
VERBOSE_INFO("Starting a new transaction");
|
|
|
|
string stmt = createTableStmt->fSql + "|" + tableDef.fQualifiedName->fSchema + "|";
|
|
SQLLogger logger(stmt, fDDLLoggingId, createTableStmt->fSessionID, txnID.id);
|
|
|
|
std::string err;
|
|
execplan::ObjectIDManager fObjectIDManager;
|
|
OamCache* oamcache = OamCache::makeOamCache();
|
|
string errorMsg;
|
|
// get a unique number
|
|
uint64_t uniqueId = 0;
|
|
|
|
// Bug 5070. Added exception handling
|
|
try
|
|
{
|
|
uniqueId = fDbrm->getUnique64();
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add(ex.what());
|
|
message.format(args);
|
|
result.result = CREATE_ERROR;
|
|
result.message = message;
|
|
fSessionManager.rolledback(txnID);
|
|
return result;
|
|
}
|
|
catch (...)
|
|
{
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("Unknown error occurred while getting unique number.");
|
|
message.format(args);
|
|
result.result = CREATE_ERROR;
|
|
result.message = message;
|
|
fSessionManager.rolledback(txnID);
|
|
return result;
|
|
}
|
|
|
|
fWEClient->addQueue(uniqueId);
|
|
|
|
try
|
|
{
|
|
// Allocate tableoid table identification
|
|
VERBOSE_INFO("Allocating object ID for table");
|
|
// Allocate a object ID for each column we are about to create
|
|
VERBOSE_INFO("Allocating object IDs for columns");
|
|
uint32_t numColumns = tableDef.fColumns.size();
|
|
uint32_t numDictCols = 0;
|
|
|
|
for (unsigned i = 0; i < numColumns; i++)
|
|
{
|
|
int dataType;
|
|
dataType = convertDataType(tableDef.fColumns[i]->fType->fType);
|
|
|
|
if ((dataType == CalpontSystemCatalog::CHAR && tableDef.fColumns[i]->fType->fLength > 8) ||
|
|
(dataType == CalpontSystemCatalog::VARCHAR && tableDef.fColumns[i]->fType->fLength > 7) ||
|
|
(dataType == CalpontSystemCatalog::VARBINARY && tableDef.fColumns[i]->fType->fLength > 7) ||
|
|
(dataType == CalpontSystemCatalog::BLOB && tableDef.fColumns[i]->fType->fLength > 7) ||
|
|
(dataType == CalpontSystemCatalog::TEXT && tableDef.fColumns[i]->fType->fLength > 7))
|
|
numDictCols++;
|
|
}
|
|
|
|
// Include column oids, dictionary oids, tableoid, and
|
|
// also include AUX oid as of MCOL-5021
|
|
fStartingColOID = fObjectIDManager.allocOIDs(numColumns + numDictCols + 2);
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << fTxnid.id << " Create table allocOIDs got the starting oid " << fStartingColOID << endl;
|
|
#endif
|
|
|
|
uint32_t numColumnOids = numColumns + numDictCols;
|
|
numColumnOids += 1; // MCOL-5021
|
|
|
|
if (fStartingColOID < 0)
|
|
{
|
|
result.result = CREATE_ERROR;
|
|
errorMsg = "Error in getting objectid from oidmanager.";
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("(1)Create table failed due to ");
|
|
args.add(errorMsg);
|
|
message.format(args);
|
|
result.message = message;
|
|
fSessionManager.rolledback(txnID);
|
|
return result;
|
|
}
|
|
|
|
// Write the table metadata to the systemtable
|
|
VERBOSE_INFO("Writing meta data to SYSTABLE");
|
|
ByteStream bytestream;
|
|
bytestream << (ByteStream::byte)WE_SVR_WRITE_SYSTABLE;
|
|
bytestream << uniqueId;
|
|
bytestream << (uint32_t)createTableStmt->fSessionID;
|
|
bytestream << (uint32_t)txnID.id;
|
|
bytestream << (uint32_t)fStartingColOID;
|
|
bytestream << (uint32_t)(fStartingColOID + numColumnOids);
|
|
bytestream << (uint32_t)createTableStmt->fTableWithAutoi;
|
|
uint16_t dbRoot;
|
|
BRM::OID_t sysOid = 1001;
|
|
// Find out where systable is
|
|
rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
|
|
|
|
if (rc != 0)
|
|
{
|
|
result.result = (ResultCode)rc;
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("Error while calling getSysCatDBRoot ");
|
|
args.add(errorMsg);
|
|
message.format(args);
|
|
result.message = message;
|
|
// release transaction
|
|
fSessionManager.rolledback(txnID);
|
|
return result;
|
|
}
|
|
|
|
int pmNum = 1;
|
|
bytestream << (uint32_t)dbRoot;
|
|
tableDef.serialize(bytestream);
|
|
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
|
|
pmNum = oamcache->getOwnerPM(dbRoot);
|
|
// MCOL-66 The DBRM can't handle concurrent DDL
|
|
boost::mutex::scoped_lock lk(dbrmMutex);
|
|
|
|
try
|
|
{
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << fTxnid.id << " create table sending We_SVR_WRITE_SYSTABLE to pm " << pmNum << endl;
|
|
#endif
|
|
fWEClient->write(bytestream, (unsigned)pmNum);
|
|
|
|
while (1)
|
|
{
|
|
bsIn.reset(new ByteStream());
|
|
fWEClient->read(uniqueId, bsIn);
|
|
|
|
if (bsIn->length() == 0) // read error
|
|
{
|
|
rc = NETWORK_ERROR;
|
|
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
*bsIn >> rc;
|
|
|
|
if (rc != 0)
|
|
{
|
|
errorMsg.clear();
|
|
*bsIn >> errorMsg;
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << fTxnid.id << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
|
|
#endif
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (runtime_error& ex) // write error
|
|
{
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << fTxnid.id << " create table got exception" << ex.what() << endl;
|
|
#endif
|
|
rc = NETWORK_ERROR;
|
|
errorMsg = ex.what();
|
|
}
|
|
catch (...)
|
|
{
|
|
rc = NETWORK_ERROR;
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << "create table got unknown exception" << endl;
|
|
#endif
|
|
}
|
|
|
|
if (rc != 0)
|
|
{
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << fTxnid.id << " Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
|
|
#endif
|
|
result.result = (ResultCode)rc;
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("(2)Create table failed due to ");
|
|
args.add(errorMsg);
|
|
message.format(args);
|
|
result.message = message;
|
|
|
|
if (rc != NETWORK_ERROR)
|
|
{
|
|
rollBackTransaction(uniqueId, txnID, createTableStmt->fSessionID); // What to do with the error code
|
|
}
|
|
|
|
// release transaction
|
|
fSessionManager.rolledback(txnID);
|
|
return result;
|
|
}
|
|
|
|
VERBOSE_INFO("Writing meta data to SYSCOLUMN");
|
|
bytestream.restart();
|
|
bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATE_SYSCOLUMN;
|
|
bytestream << uniqueId;
|
|
bytestream << (uint32_t)createTableStmt->fSessionID;
|
|
bytestream << (uint32_t)txnID.id;
|
|
bytestream << numColumns;
|
|
|
|
for (unsigned i = 0; i < numColumns; ++i)
|
|
{
|
|
bytestream << (uint32_t)(fStartingColOID + i + 1);
|
|
}
|
|
|
|
bytestream << numDictCols;
|
|
|
|
for (unsigned i = 0; i < numDictCols; ++i)
|
|
{
|
|
bytestream << (uint32_t)(fStartingColOID + numColumns + i + 1);
|
|
}
|
|
|
|
uint8_t alterFlag = 0;
|
|
int colPos = 0;
|
|
bytestream << (ByteStream::byte)alterFlag;
|
|
bytestream << (uint32_t)colPos;
|
|
|
|
sysOid = 1021;
|
|
// Find out where syscolumn is
|
|
rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
|
|
|
|
if (rc != 0)
|
|
{
|
|
result.result = (ResultCode)rc;
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("Error while calling getSysCatDBRoot ");
|
|
args.add(errorMsg);
|
|
message.format(args);
|
|
result.message = message;
|
|
// release transaction
|
|
fSessionManager.rolledback(txnID);
|
|
return result;
|
|
}
|
|
|
|
bytestream << (uint32_t)dbRoot;
|
|
tableDef.serialize(bytestream);
|
|
pmNum = oamcache->getOwnerPM(dbRoot);
|
|
|
|
try
|
|
{
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << fTxnid.id << " create table sending WE_SVR_WRITE_CREATE_SYSCOLUMN to pm " << pmNum << endl;
|
|
#endif
|
|
fWEClient->write(bytestream, (uint32_t)pmNum);
|
|
|
|
while (1)
|
|
{
|
|
bsIn.reset(new ByteStream());
|
|
fWEClient->read(uniqueId, bsIn);
|
|
|
|
if (bsIn->length() == 0) // read error
|
|
{
|
|
rc = NETWORK_ERROR;
|
|
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
*bsIn >> rc;
|
|
|
|
if (rc != 0)
|
|
{
|
|
errorMsg.clear();
|
|
*bsIn >> errorMsg;
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << fTxnid.id << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
|
|
#endif
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (runtime_error& ex) // write error
|
|
{
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << fTxnid.id << " create table got exception" << ex.what() << endl;
|
|
#endif
|
|
rc = NETWORK_ERROR;
|
|
errorMsg = ex.what();
|
|
}
|
|
catch (...)
|
|
{
|
|
rc = NETWORK_ERROR;
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << fTxnid.id << " create table got unknown exception" << endl;
|
|
#endif
|
|
}
|
|
|
|
if (rc != 0)
|
|
{
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << fTxnid.id << " Create table WE_SVR_WRITE_CREATE_SYSCOLUMN: " << errorMsg << endl;
|
|
#endif
|
|
if (checkPPLostConnection(errorMsg))
|
|
{
|
|
result.result = PP_LOST_CONNECTION;
|
|
return result;
|
|
}
|
|
else
|
|
{
|
|
result.result = (ResultCode)rc;
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("(3)Create table failed due to ");
|
|
args.add(errorMsg);
|
|
message.format(args);
|
|
result.message = message;
|
|
|
|
if (rc != NETWORK_ERROR)
|
|
{
|
|
rollBackTransaction(uniqueId, txnID,
|
|
createTableStmt->fSessionID); // What to do with the error code
|
|
}
|
|
|
|
// release transaction
|
|
fSessionManager.rolledback(txnID);
|
|
return result;
|
|
}
|
|
}
|
|
|
|
// Get the number of tables in the database, the current table is included.
|
|
int tableCount = systemCatalogPtr->getTableCount();
|
|
|
|
// Calculate which dbroot the columns should start
|
|
DBRootConfigList dbRootList = oamcache->getDBRootNums();
|
|
|
|
uint16_t useDBRootIndex = tableCount % dbRootList.size();
|
|
// Find out the dbroot# corresponding the useDBRootIndex from oam
|
|
uint16_t useDBRoot = dbRootList[useDBRootIndex];
|
|
|
|
VERBOSE_INFO("Creating column files");
|
|
ColumnDef* colDefPtr;
|
|
ddlpackage::ColumnDefList tableDefCols = tableDef.fColumns;
|
|
ColumnDefList::const_iterator iter = tableDefCols.begin();
|
|
bytestream.restart();
|
|
bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES;
|
|
bytestream << uniqueId;
|
|
bytestream << (uint32_t)txnID.id;
|
|
bytestream << numColumnOids;
|
|
unsigned colNum = 0;
|
|
unsigned dictNum = 0;
|
|
|
|
while (iter != tableDefCols.end())
|
|
{
|
|
colDefPtr = *iter;
|
|
|
|
CalpontSystemCatalog::ColDataType dataType = convertDataType(colDefPtr->fType->fType);
|
|
|
|
if (dataType == CalpontSystemCatalog::DECIMAL || dataType == CalpontSystemCatalog::UDECIMAL)
|
|
{
|
|
if (colDefPtr->fType->fPrecision == -1 || colDefPtr->fType->fPrecision == 0)
|
|
{
|
|
colDefPtr->fType->fLength = 8;
|
|
}
|
|
else if ((colDefPtr->fType->fPrecision > 0) && (colDefPtr->fType->fPrecision < 3))
|
|
{
|
|
colDefPtr->fType->fLength = 1;
|
|
}
|
|
|
|
else if (colDefPtr->fType->fPrecision < 5 && (colDefPtr->fType->fPrecision > 2))
|
|
{
|
|
colDefPtr->fType->fLength = 2;
|
|
}
|
|
else if (colDefPtr->fType->fPrecision > 4 && colDefPtr->fType->fPrecision < 10)
|
|
{
|
|
colDefPtr->fType->fLength = 4;
|
|
}
|
|
else if (colDefPtr->fType->fPrecision > 9 && colDefPtr->fType->fPrecision < 19)
|
|
{
|
|
colDefPtr->fType->fLength = 8;
|
|
}
|
|
else if (colDefPtr->fType->fPrecision > 18 && colDefPtr->fType->fPrecision < 39)
|
|
{
|
|
colDefPtr->fType->fLength = 16;
|
|
}
|
|
}
|
|
|
|
bytestream << (fStartingColOID + (colNum++) + 1);
|
|
bytestream << (uint8_t)dataType;
|
|
bytestream << (uint8_t) false;
|
|
|
|
bytestream << (uint32_t)colDefPtr->fType->fLength;
|
|
bytestream << (uint16_t)useDBRoot;
|
|
bytestream << (uint32_t)colDefPtr->fType->fCompressiontype;
|
|
|
|
if ((dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) ||
|
|
(dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) ||
|
|
(dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) ||
|
|
(dataType == CalpontSystemCatalog::BLOB && colDefPtr->fType->fLength > 7) ||
|
|
(dataType == CalpontSystemCatalog::TEXT && colDefPtr->fType->fLength > 7))
|
|
{
|
|
bytestream << (uint32_t)(fStartingColOID + numColumns + (dictNum++) + 1);
|
|
bytestream << (uint8_t)dataType;
|
|
bytestream << (uint8_t) true;
|
|
bytestream << (uint32_t)colDefPtr->fType->fLength;
|
|
bytestream << (uint16_t)useDBRoot;
|
|
bytestream << (uint32_t)colDefPtr->fType->fCompressiontype;
|
|
}
|
|
|
|
++iter;
|
|
}
|
|
|
|
bytestream << (fStartingColOID + numColumnOids);
|
|
bytestream << (uint8_t)execplan::AUX_COL_DATATYPE;
|
|
bytestream << (uint8_t) false;
|
|
bytestream << (uint32_t)execplan::AUX_COL_WIDTH;
|
|
bytestream << (uint16_t)useDBRoot;
|
|
bytestream << (uint32_t)execplan::AUX_COL_COMPRESSION_TYPE;
|
|
|
|
//@Bug 4176. save oids to a log file for cleanup after fail over.
|
|
std::vector<CalpontSystemCatalog::OID> oidList;
|
|
|
|
for (unsigned i = 0; i < numColumns; ++i)
|
|
{
|
|
oidList.push_back(fStartingColOID + i + 1);
|
|
}
|
|
|
|
bytestream << numDictCols;
|
|
|
|
for (unsigned i = 0; i < numDictCols; ++i)
|
|
{
|
|
oidList.push_back(fStartingColOID + numColumns + i + 1);
|
|
}
|
|
|
|
// MCOL-5021
|
|
oidList.push_back(fStartingColOID + numColumnOids);
|
|
|
|
try
|
|
{
|
|
createWriteDropLogFile(fStartingColOID, uniqueId, oidList);
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
result.result = (ResultCode)rc;
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("(4)Create table failed due to ");
|
|
args.add(ex.what());
|
|
message.format(args);
|
|
result.message = message;
|
|
|
|
if (rc != NETWORK_ERROR)
|
|
{
|
|
rollBackTransaction(uniqueId, txnID, createTableStmt->fSessionID); // What to do with the error code
|
|
}
|
|
|
|
// release transaction
|
|
fSessionManager.rolledback(txnID);
|
|
return result;
|
|
}
|
|
|
|
pmNum = oamcache->getOwnerPM(useDBRoot);
|
|
|
|
try
|
|
{
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << fTxnid.id << " create table sending WE_SVR_WRITE_CREATETABLEFILES to pm " << pmNum << endl;
|
|
#endif
|
|
fWEClient->write(bytestream, pmNum);
|
|
|
|
while (1)
|
|
{
|
|
bsIn.reset(new ByteStream());
|
|
fWEClient->read(uniqueId, bsIn);
|
|
|
|
if (bsIn->length() == 0) // read error
|
|
{
|
|
rc = NETWORK_ERROR;
|
|
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
*bsIn >> rc;
|
|
|
|
if (rc != 0)
|
|
{
|
|
errorMsg.clear();
|
|
*bsIn >> errorMsg;
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
|
|
#endif
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (rc != 0)
|
|
{
|
|
// drop the newly created files
|
|
bytestream.restart();
|
|
bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
|
|
bytestream << uniqueId;
|
|
bytestream << (uint32_t)numColumnOids;
|
|
|
|
for (unsigned i = 0; i < numColumnOids; i++)
|
|
{
|
|
bytestream << (uint32_t)(fStartingColOID + i + 1);
|
|
}
|
|
|
|
fWEClient->write(bytestream, pmNum);
|
|
|
|
while (1)
|
|
{
|
|
bsIn.reset(new ByteStream());
|
|
fWEClient->read(uniqueId, bsIn);
|
|
|
|
if (bsIn->length() == 0) // read error
|
|
{
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
//@Bug 5464. Delete from extent map.
|
|
fDbrm->deleteOIDs(oidList);
|
|
}
|
|
}
|
|
catch (runtime_error&)
|
|
{
|
|
errorMsg = "Lost connection to Write Engine Server";
|
|
}
|
|
|
|
if (rc != 0)
|
|
{
|
|
#ifdef IDB_DDL_DEBUG
|
|
cout << fTxnid.id << " Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
|
|
#endif
|
|
rollBackTransaction(uniqueId, txnID, createTableStmt->fSessionID); // What to do with the error code
|
|
fSessionManager.rolledback(txnID);
|
|
}
|
|
else
|
|
{
|
|
commitTransaction(uniqueId, txnID);
|
|
fSessionManager.committed(txnID);
|
|
fWEClient->removeQueue(uniqueId);
|
|
deleteLogFile(DROPTABLE_LOG, fStartingColOID, uniqueId);
|
|
}
|
|
|
|
// Log the DDL statement.
|
|
logDDL(createTableStmt->fSessionID, txnID.id, createTableStmt->fSql, createTableStmt->fOwner);
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
result.result = CREATE_ERROR;
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("(5)Create table failed due to ");
|
|
args.add(ex.what());
|
|
message.format(args);
|
|
result.message = message;
|
|
fSessionManager.rolledback(txnID);
|
|
fWEClient->removeQueue(uniqueId);
|
|
return result;
|
|
}
|
|
|
|
// fWEClient->removeQueue(uniqueId);
|
|
if (rc != 0)
|
|
{
|
|
result.result = CREATE_ERROR;
|
|
Message::Args args;
|
|
Message message(9);
|
|
args.add("(6)Create table failed due to ");
|
|
args.add(errorMsg);
|
|
message.format(args);
|
|
result.message = message;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
void CreateTableProcessor::rollBackCreateTable(const string& error, BRM::TxnID txnID, int sessionId,
|
|
ddlpackage::TableDef& tableDef, DDLResult& result)
|
|
{
|
|
cerr << "CreatetableProcessor::processPackage: " << error << endl;
|
|
|
|
Message::Args args;
|
|
Message message(1);
|
|
args.add("(7)Create table Failed: ");
|
|
args.add(error);
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
|
|
result.result = CREATE_ERROR;
|
|
result.message = message;
|
|
|
|
fWriteEngine.rollbackTran(txnID.id, sessionId);
|
|
|
|
size_t size = tableDef.fColumns.size();
|
|
|
|
for (size_t i = 0; i < size; ++i)
|
|
{
|
|
fWriteEngine.dropColumn(txnID.id, fStartingColOID + i);
|
|
}
|
|
|
|
try
|
|
{
|
|
execplan::ObjectIDManager fObjectIDManager;
|
|
fObjectIDManager.returnOID(fTableOID);
|
|
fObjectIDManager.returnOIDs(fStartingColOID, fStartingColOID + tableDef.fColumns.size() - 1);
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
Message::Args args;
|
|
Message message(6);
|
|
args.add(ex.what());
|
|
message.format(args);
|
|
result.message = message;
|
|
result.result = CREATE_ERROR;
|
|
}
|
|
catch (...)
|
|
{
|
|
Message::Args args;
|
|
Message message(6);
|
|
args.add("Unknown exception");
|
|
message.format(args);
|
|
result.message = message;
|
|
result.result = CREATE_ERROR;
|
|
// cout << "returnOIDs error" << endl;
|
|
}
|
|
|
|
DictionaryOIDList::const_iterator dictoid_iter = fDictionaryOIDList.begin();
|
|
|
|
while (dictoid_iter != fDictionaryOIDList.end())
|
|
{
|
|
DictOID dictOID = *dictoid_iter;
|
|
fWriteEngine.dropDctnry(txnID.id, dictOID.dictOID, dictOID.treeOID, dictOID.listOID);
|
|
// fObjectIDManager.returnOID(dictOID.dictOID);
|
|
|
|
++dictoid_iter;
|
|
}
|
|
|
|
fSessionManager.rolledback(txnID);
|
|
}
|
|
|
|
} // namespace ddlpackageprocessor
|