1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
2016-06-01 14:54:11 -05:00

739 lines
20 KiB
C++

/* Copyright (C) 2016 MariaDB, Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: createtableprocessor.cpp 9627 2013-06-18 13:59:21Z rdempsey $
#include <unistd.h>
#include <string>
using namespace std;
#include <boost/algorithm/string/case_conv.hpp>
#include "createtableprocessor.h"
#include "ddlpkg.h"
using namespace ddlpackage;
#include "we_messages.h"
using namespace WriteEngine;
#include "oamcache.h"
using namespace oam;
#include "bytestream.h"
using namespace messageqcpp;
#include "calpontsystemcatalog.h"
using namespace execplan;
#include "sqllogger.h"
#include "messagelog.h"
using namespace logging;
namespace ddlpackageprocessor
{
CreateTableProcessor::DDLResult CreateTableProcessor::processPackage(
ddlpackage::CreateTableStatement& createTableStmt)
{
SUMMARY_INFO("CreateTableProcessor::processPackage");
DDLResult result;
BRM::TxnID txnID;
txnID.id= fTxnid.id;
txnID.valid= fTxnid.valid;
result.result = NO_ERROR;
int rc1 = 0;
rc1 = fDbrm->isReadWrite();
if (rc1 != 0 )
{
Message::Args args;
Message message(9);
args.add("Unable to execute the statement due to DBRM is read only");
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
DETAIL_INFO(createTableStmt);
ddlpackage::TableDef& tableDef = *createTableStmt.fTableDef;
//If schema = CALPONTSYS, do not create table
boost::algorithm::to_lower(tableDef.fQualifiedName->fSchema);
if (tableDef.fQualifiedName->fSchema == CALPONT_SCHEMA)
{
//release the transaction
fSessionManager.rolledback(txnID);
return result;
}
// Commit current transaction.
// all DDL statements cause an implicut commit
VERBOSE_INFO("Getting current txnID");
//Check whether the table is existed already
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(createTableStmt.fSessionID);
execplan::CalpontSystemCatalog::TableName tableName;
tableName.schema = tableDef.fQualifiedName->fSchema;
tableName.table = tableDef.fQualifiedName->fName;
execplan::CalpontSystemCatalog::ROPair roPair;
roPair.objnum = 0;
ByteStream::byte rc = 0;
/** @Bug 217 */
/** @Bug 225 */
try
{
roPair = systemCatalogPtr->tableRID(tableName);
}
catch (IDBExcept &ie)
{
// TODO: What is and is not an error here?
if (ie.errorCode() == ERR_DATA_OFFLINE)
{
//release transaction
fSessionManager.rolledback(txnID);
// Return the error for display to user
Message::Args args;
Message message(9);
args.add(ie.what());
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
return result;
}
else if ( ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG)
{
roPair.objnum = 0;
}
else //error out
{
//release transaction
fSessionManager.rolledback(txnID);
// Return the error for display to user
Message::Args args;
Message message(9);
args.add(ie.what());
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
return result;
}
}
catch (std::exception& ex) //error out
{
//release transaction
fSessionManager.rolledback(txnID);
// Return the error for display to user
Message::Args args;
Message message(9);
args.add(ex.what());
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
return result;
}
catch (...) //error out
{
//release transaction
fSessionManager.rolledback(txnID);
// Return the error for display to user
Message::Args args;
Message message(9);
args.add("Unknown exception caught when checking if the table name is already in use.");
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
return result;
}
//This is a current db bug, it should not turn OID is it cannot find
if (roPair.objnum >= 3000)
{
#ifdef _MSC_VER
//FIXME: Why do we need to do this???
systemCatalogPtr->flushCache();
try { roPair = systemCatalogPtr->tableRID(tableName); }
catch (...) { roPair.objnum = 0; }
if (roPair.objnum < 3000)
goto keepGoing;
#endif
Message::Args args;
Message message(9);
args.add("Internal create table error for");
args.add(tableName.toString());
args.add(": table already exists");
args.add("(your schema is probably out-of-sync)");
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
//release the transaction
fSessionManager.rolledback(txnID);
return result;
}
#ifdef _MSC_VER
keepGoing:
#endif
// Start a new transaction
VERBOSE_INFO("Starting a new transaction");
string stmt = createTableStmt.fSql + "|" + tableDef.fQualifiedName->fSchema +"|";
SQLLogger logger(stmt, fDDLLoggingId, createTableStmt.fSessionID, txnID.id);
std::string err;
execplan::ObjectIDManager fObjectIDManager;
OamCache * oamcache = OamCache::makeOamCache();
string errorMsg;
//get a unique number
uint64_t uniqueId = 0;
//Bug 5070. Added exception handling
try {
uniqueId = fDbrm->getUnique64();
}
catch (std::exception& ex)
{
Message::Args args;
Message message(9);
args.add(ex.what());
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
catch ( ... )
{
Message::Args args;
Message message(9);
args.add("Unknown error occured while getting unique number.");
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
fWEClient->addQueue(uniqueId);
try
{
//Allocate tableoid table identification
VERBOSE_INFO("Allocating object ID for table");
// Allocate a object ID for each column we are about to create
VERBOSE_INFO("Allocating object IDs for columns");
uint32_t numColumns = tableDef.fColumns.size();
uint32_t numDictCols = 0;
for (unsigned i=0; i < numColumns; i++)
{
int dataType;
dataType = convertDataType(tableDef.fColumns[i]->fType->fType);
if ( (dataType == CalpontSystemCatalog::CHAR && tableDef.fColumns[i]->fType->fLength > 8) ||
(dataType == CalpontSystemCatalog::VARCHAR && tableDef.fColumns[i]->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::VARBINARY && tableDef.fColumns[i]->fType->fLength > 7) )
numDictCols++;
}
fStartingColOID = fObjectIDManager.allocOIDs(numColumns+numDictCols+1); //include column, oids,dictionary oids and tableoid
#ifdef IDB_DDL_DEBUG
cout << "Create table allocOIDs got the stating oid " << fStartingColOID << endl;
#endif
if (fStartingColOID < 0)
{
result.result = CREATE_ERROR;
errorMsg = "Error in getting objectid from oidmanager.";
Message::Args args;
Message message(9);
args.add("(1)Create table failed due to ");
args.add(errorMsg);
message.format(args);
result.message = message;
fSessionManager.rolledback(txnID);
return result;
}
// Write the table metadata to the systemtable
VERBOSE_INFO("Writing meta data to SYSTABLE");
ByteStream bytestream;
bytestream << (ByteStream::byte)WE_SVR_WRITE_SYSTABLE;
bytestream << uniqueId;
bytestream << (uint32_t) createTableStmt.fSessionID;
bytestream << (uint32_t)txnID.id;
bytestream << (uint32_t)fStartingColOID;
bytestream << (uint32_t)createTableStmt.fTableWithAutoi;
uint16_t dbRoot;
BRM::OID_t sysOid = 1001;
//Find out where systable is
rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
if (rc != 0)
{
result.result =(ResultCode) rc;
Message::Args args;
Message message(9);
args.add("Error while calling getSysCatDBRoot ");
args.add(errorMsg);
message.format(args);
result.message = message;
//release transaction
fSessionManager.rolledback(txnID);
return result;
}
int pmNum = 1;
bytestream << (uint32_t)dbRoot;
tableDef.serialize(bytestream);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
pmNum = (*dbRootPMMap)[dbRoot];
try
{
fWEClient->write(bytestream, (unsigned)pmNum);
#ifdef IDB_DDL_DEBUG
cout << "create table sending We_SVR_WRITE_SYSTABLE to pm " << pmNum << endl;
#endif
while (1)
{
bsIn.reset(new ByteStream());
fWEClient->read(uniqueId, bsIn);
if ( bsIn->length() == 0 ) //read error
{
rc = NETWORK_ERROR;
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
break;
}
else {
*bsIn >> rc;
if (rc != 0) {
errorMsg.clear();
*bsIn >> errorMsg;
#ifdef IDB_DDL_DEBUG
cout << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
#endif
}
break;
}
}
}
catch (runtime_error& ex) //write error
{
#ifdef IDB_DDL_DEBUG
cout << "create table got exception" << ex.what() << endl;
#endif
rc = NETWORK_ERROR;
errorMsg = ex.what();
}
catch (...)
{
rc = NETWORK_ERROR;
#ifdef IDB_DDL_DEBUG
cout << "create table got unknown exception" << endl;
#endif
}
if (rc != 0)
{
result.result =(ResultCode) rc;
Message::Args args;
Message message(9);
args.add("(2)Create table failed due to ");
args.add(errorMsg);
message.format( args );
result.message = message;
if (rc != NETWORK_ERROR)
{
rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID ); //What to do with the error code
}
//release transaction
fSessionManager.rolledback(txnID);
return result;
}
VERBOSE_INFO("Writing meta data to SYSCOLUMN");
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATE_SYSCOLUMN;
bytestream << uniqueId;
bytestream << (uint32_t) createTableStmt.fSessionID;
bytestream << (uint32_t)txnID.id;
bytestream << numColumns;
for (unsigned i = 0; i <numColumns; ++i) {
bytestream << (uint32_t)(fStartingColOID+i+1);
}
bytestream << numDictCols;
for (unsigned i = 0; i <numDictCols; ++i) {
bytestream << (uint32_t)(fStartingColOID+numColumns+i+1);
}
uint8_t alterFlag = 0;
int colPos = 0;
bytestream << (ByteStream::byte)alterFlag;
bytestream << (uint32_t)colPos;
sysOid = 1021;
//Find out where syscolumn is
rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot);
if (rc != 0)
{
result.result =(ResultCode) rc;
Message::Args args;
Message message(9);
args.add("Error while calling getSysCatDBRoot ");
args.add(errorMsg);
message.format(args);
result.message = message;
//release transaction
fSessionManager.rolledback(txnID);
return result;
}
bytestream << (uint32_t)dbRoot;
tableDef.serialize(bytestream);
pmNum = (*dbRootPMMap)[dbRoot];
try
{
fWEClient->write(bytestream, (uint32_t)pmNum);
#ifdef IDB_DDL_DEBUG
cout << "create table sending We_SVR_WRITE_SYSTABLE to pm " << pmNum << endl;
#endif
while (1)
{
bsIn.reset(new ByteStream());
fWEClient->read(uniqueId, bsIn);
if ( bsIn->length() == 0 ) //read error
{
rc = NETWORK_ERROR;
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
break;
}
else {
*bsIn >> rc;
if (rc != 0) {
errorMsg.clear();
*bsIn >> errorMsg;
#ifdef IDB_DDL_DEBUG
cout << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
#endif
}
break;
}
}
}
catch (runtime_error& ex) //write error
{
#ifdef IDB_DDL_DEBUG
cout << "create table got exception" << ex.what() << endl;
#endif
rc = NETWORK_ERROR;
errorMsg = ex.what();
}
catch (...)
{
rc = NETWORK_ERROR;
#ifdef IDB_DDL_DEBUG
cout << "create table got unknown exception" << endl;
#endif
}
if (rc != 0)
{
result.result =(ResultCode) rc;
Message::Args args;
Message message(9);
args.add("(3)Create table failed due to ");
args.add(errorMsg);
message.format( args );
result.message = message;
if (rc != NETWORK_ERROR)
{
rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID ); //What to do with the error code
}
//release transaction
fSessionManager.rolledback(txnID);
return result;
}
//Get the number of tables in the database, the current table is included.
int tableCount = systemCatalogPtr->getTableCount();
//Calculate which dbroot the columns should start
DBRootConfigList dbRootList = oamcache->getDBRootNums();
uint16_t useDBRootIndex = tableCount % dbRootList.size();
//Find out the dbroot# corresponding the useDBRootIndex from oam
uint16_t useDBRoot = dbRootList[useDBRootIndex];
VERBOSE_INFO("Creating column files");
ColumnDef* colDefPtr;
ddlpackage::ColumnDefList tableDefCols = tableDef.fColumns;
ColumnDefList::const_iterator iter = tableDefCols.begin();
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES;
bytestream << uniqueId;
bytestream << (uint32_t)txnID.id;
bytestream << (numColumns + numDictCols);
unsigned colNum = 0;
unsigned dictNum = 0;
while (iter != tableDefCols.end())
{
colDefPtr = *iter;
CalpontSystemCatalog::ColDataType dataType = convertDataType(colDefPtr->fType->fType);
if (dataType == CalpontSystemCatalog::DECIMAL ||
dataType == CalpontSystemCatalog::UDECIMAL)
{
if (colDefPtr->fType->fPrecision == -1 || colDefPtr->fType->fPrecision == 0)
{
colDefPtr->fType->fLength = 8;
}
else if ((colDefPtr->fType->fPrecision > 0) && (colDefPtr->fType->fPrecision < 3))
{
colDefPtr->fType->fLength = 1;
}
else if (colDefPtr->fType->fPrecision < 5 && (colDefPtr->fType->fPrecision > 2))
{
colDefPtr->fType->fLength = 2;
}
else if (colDefPtr->fType->fPrecision > 4 && colDefPtr->fType->fPrecision < 10)
{
colDefPtr->fType->fLength = 4;
}
else if (colDefPtr->fType->fPrecision > 9 && colDefPtr->fType->fPrecision < 19)
{
colDefPtr->fType->fLength = 8;
}
}
bytestream << (fStartingColOID + (colNum++) + 1);
bytestream << (uint8_t) dataType;
bytestream << (uint8_t) false;
bytestream << (uint32_t) colDefPtr->fType->fLength;
bytestream << (uint16_t) useDBRoot;
bytestream << (uint32_t) colDefPtr->fType->fCompressiontype;
if ( (dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) ||
(dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) )
{
bytestream << (uint32_t) (fStartingColOID+numColumns+(dictNum++)+1);
bytestream << (uint8_t) dataType;
bytestream << (uint8_t) true;
bytestream << (uint32_t) colDefPtr->fType->fLength;
bytestream << (uint16_t) useDBRoot;
bytestream << (uint32_t) colDefPtr->fType->fCompressiontype;
}
++iter;
}
//@Bug 4176. save oids to a log file for cleanup after fail over.
std::vector <CalpontSystemCatalog::OID> oidList;
for (unsigned i = 0; i <numColumns; ++i)
{
oidList.push_back(fStartingColOID+i+1);
}
bytestream << numDictCols;
for (unsigned i = 0; i <numDictCols; ++i)
{
oidList.push_back(fStartingColOID+numColumns+i+1);
}
try {
createWriteDropLogFile( fStartingColOID, uniqueId, oidList );
}
catch (std::exception& ex)
{
result.result =(ResultCode) rc;
Message::Args args;
Message message(9);
args.add("(4)Create table failed due to ");
args.add(ex.what());
message.format( args );
result.message = message;
if (rc != NETWORK_ERROR)
{
rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID ); //What to do with the error code
}
//release transaction
fSessionManager.rolledback(txnID);
return result;
}
pmNum = (*dbRootPMMap)[useDBRoot];
try
{
fWEClient->write(bytestream, pmNum);
while (1)
{
bsIn.reset(new ByteStream());
fWEClient->read(uniqueId, bsIn);
if ( bsIn->length() == 0 ) //read error
{
rc = NETWORK_ERROR;
errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES";
break;
}
else {
*bsIn >> rc;
if (rc != 0) {
errorMsg.clear();
*bsIn >> errorMsg;
#ifdef IDB_DDL_DEBUG
cout << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl;
#endif
}
break;
}
}
if (rc != 0) {
//drop the newly created files
bytestream.restart();
bytestream << (ByteStream::byte) WE_SVR_WRITE_DROPFILES;
bytestream << uniqueId;
bytestream << (uint32_t)(numColumns+numDictCols);
for (unsigned i = 0; i < (numColumns+numDictCols); i++)
{
bytestream << (uint32_t)(fStartingColOID + i + 1);
}
fWEClient->write(bytestream, pmNum);
while (1)
{
bsIn.reset(new ByteStream());
fWEClient->read(uniqueId, bsIn);
if ( bsIn->length() == 0 ) //read error
{
break;
}
else {
break;
}
}
//@Bug 5464. Delete from extent map.
fDbrm->deleteOIDs(oidList);
}
}
catch (runtime_error&)
{
errorMsg = "Lost connection to Write Engine Server";
}
if (rc != 0)
{
rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID); //What to do with the error code
fSessionManager.rolledback(txnID);
}
else
{
commitTransaction(uniqueId, txnID);
fSessionManager.committed(txnID);
fWEClient->removeQueue(uniqueId);
deleteLogFile(DROPTABLE_LOG, fStartingColOID, uniqueId);
}
// Log the DDL statement.
logDDL(createTableStmt.fSessionID, txnID.id, createTableStmt.fSql, createTableStmt.fOwner);
}
catch (std::exception& ex)
{
result.result = CREATE_ERROR;
Message::Args args;
Message message(9);
args.add("(5)Create table failed due to ");
args.add(ex.what());
message.format( args );
result.message = message;
fSessionManager.rolledback(txnID);
fWEClient->removeQueue(uniqueId);
return result;
}
//fWEClient->removeQueue(uniqueId);
if (rc !=0)
{
result.result = CREATE_ERROR;
Message::Args args;
Message message(9);
args.add("(6)Create table failed due to ");
args.add(errorMsg);
message.format( args );
result.message = message;
}
return result;
}
void CreateTableProcessor::rollBackCreateTable(const string& error, BRM::TxnID txnID, int sessionId,
ddlpackage::TableDef& tableDef, DDLResult& result)
{
cerr << "CreatetableProcessor::processPackage: " << error << endl;
Message::Args args;
Message message(1);
args.add("(7)Create table Failed: ");
args.add(error);
args.add("");
args.add("");
message.format(args);
result.result = CREATE_ERROR;
result.message = message;
fWriteEngine.rollbackTran(txnID.id, sessionId);
size_t size = tableDef.fColumns.size();
for (size_t i = 0; i < size; ++i)
{
fWriteEngine.dropColumn(txnID.id, fStartingColOID + i);
}
try
{
execplan::ObjectIDManager fObjectIDManager;
fObjectIDManager.returnOID(fTableOID);
fObjectIDManager.returnOIDs(fStartingColOID,
fStartingColOID + tableDef.fColumns.size() - 1);
}
catch (std::exception& ex)
{
Message::Args args;
Message message(6);
args.add(ex.what());
message.format(args);
result.message = message;
result.result = CREATE_ERROR;
}
catch (...)
{
Message::Args args;
Message message(6);
args.add("Unknown exception");
message.format(args);
result.message = message;
result.result = CREATE_ERROR;
//cout << "returnOIDs error" << endl;
}
DictionaryOIDList::const_iterator dictoid_iter = fDictionaryOIDList.begin();
while (dictoid_iter != fDictionaryOIDList.end())
{
DictOID dictOID = *dictoid_iter;
fWriteEngine.dropDctnry(txnID.id, dictOID.dictOID, dictOID.treeOID, dictOID.listOID);
//fObjectIDManager.returnOID(dictOID.dictOID);
++dictoid_iter;
}
fSessionManager.rolledback(txnID);
}
} // namespace ddlpackageprocessor
// vim:ts=4 sw=4: