/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: createtableprocessor.cpp 9627 2013-06-18 13:59:21Z rdempsey $ #include #include using namespace std; #include #include "createtableprocessor.h" #include "ddlpkg.h" using namespace ddlpackage; #include "we_messages.h" using namespace WriteEngine; #include "oamcache.h" using namespace oam; #include "bytestream.h" using namespace messageqcpp; #include "calpontsystemcatalog.h" using namespace execplan; #include "sqllogger.h" #include "messagelog.h" using namespace logging; namespace ddlpackageprocessor { CreateTableProcessor::DDLResult CreateTableProcessor::processPackage( ddlpackage::CreateTableStatement& createTableStmt) { SUMMARY_INFO("CreateTableProcessor::processPackage"); DDLResult result; BRM::TxnID txnID; txnID.id= fTxnid.id; txnID.valid= fTxnid.valid; result.result = NO_ERROR; int rc1 = 0; rc1 = fDbrm->isReadWrite(); if (rc1 != 0 ) { Message::Args args; Message message(9); args.add("Unable to execute the statement due to DBRM is read only"); message.format(args); result.result = CREATE_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } DETAIL_INFO(createTableStmt); ddlpackage::TableDef& tableDef = *createTableStmt.fTableDef; //If schema = CALPONTSYS, do not create table boost::algorithm::to_lower(tableDef.fQualifiedName->fSchema); if (tableDef.fQualifiedName->fSchema == CALPONT_SCHEMA) { //release the transaction fSessionManager.rolledback(txnID); return result; } // Commit current transaction. // all DDL statements cause an implicut commit VERBOSE_INFO("Getting current txnID"); //Check whether the table is existed already boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(createTableStmt.fSessionID); execplan::CalpontSystemCatalog::TableName tableName; tableName.schema = tableDef.fQualifiedName->fSchema; tableName.table = tableDef.fQualifiedName->fName; execplan::CalpontSystemCatalog::ROPair roPair; roPair.objnum = 0; ByteStream::byte rc = 0; /** @Bug 217 */ /** @Bug 225 */ try { roPair = systemCatalogPtr->tableRID(tableName); } catch (IDBExcept &ie) { // TODO: What is and is not an error here? if (ie.errorCode() == ERR_DATA_OFFLINE) { //release transaction fSessionManager.rolledback(txnID); // Return the error for display to user Message::Args args; Message message(9); args.add(ie.what()); message.format(args); result.result = CREATE_ERROR; result.message = message; return result; } else if ( ie.errorCode() == ERR_TABLE_NOT_IN_CATALOG) { roPair.objnum = 0; } else //error out { //release transaction fSessionManager.rolledback(txnID); // Return the error for display to user Message::Args args; Message message(9); args.add(ie.what()); message.format(args); result.result = CREATE_ERROR; result.message = message; return result; } } catch (std::exception& ex) //error out { //release transaction fSessionManager.rolledback(txnID); // Return the error for display to user Message::Args args; Message message(9); args.add(ex.what()); message.format(args); result.result = CREATE_ERROR; result.message = message; return result; } catch (...) //error out { //release transaction fSessionManager.rolledback(txnID); // Return the error for display to user Message::Args args; Message message(9); args.add("Unknown exception caught when checking if the table name is already in use."); message.format(args); result.result = CREATE_ERROR; result.message = message; return result; } //This is a current db bug, it should not turn OID is it cannot find if (roPair.objnum >= 3000) { #ifdef _MSC_VER //FIXME: Why do we need to do this??? systemCatalogPtr->flushCache(); try { roPair = systemCatalogPtr->tableRID(tableName); } catch (...) { roPair.objnum = 0; } if (roPair.objnum < 3000) goto keepGoing; #endif Message::Args args; Message message(9); args.add("Internal create table error for"); args.add(tableName.toString()); args.add(": table already exists"); args.add("(your schema is probably out-of-sync)"); message.format(args); result.result = CREATE_ERROR; result.message = message; //release the transaction fSessionManager.rolledback(txnID); return result; } #ifdef _MSC_VER keepGoing: #endif // Start a new transaction VERBOSE_INFO("Starting a new transaction"); string stmt = createTableStmt.fSql + "|" + tableDef.fQualifiedName->fSchema +"|"; SQLLogger logger(stmt, fDDLLoggingId, createTableStmt.fSessionID, txnID.id); std::string err; execplan::ObjectIDManager fObjectIDManager; OamCache * oamcache = OamCache::makeOamCache(); string errorMsg; //get a unique number uint64_t uniqueId = 0; //Bug 5070. Added exception handling try { uniqueId = fDbrm->getUnique64(); } catch (std::exception& ex) { Message::Args args; Message message(9); args.add(ex.what()); message.format(args); result.result = CREATE_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } catch ( ... ) { Message::Args args; Message message(9); args.add("Unknown error occured while getting unique number."); message.format(args); result.result = CREATE_ERROR; result.message = message; fSessionManager.rolledback(txnID); return result; } fWEClient->addQueue(uniqueId); try { //Allocate tableoid table identification VERBOSE_INFO("Allocating object ID for table"); // Allocate a object ID for each column we are about to create VERBOSE_INFO("Allocating object IDs for columns"); uint32_t numColumns = tableDef.fColumns.size(); uint32_t numDictCols = 0; for (unsigned i=0; i < numColumns; i++) { int dataType; dataType = convertDataType(tableDef.fColumns[i]->fType->fType); if ( (dataType == CalpontSystemCatalog::CHAR && tableDef.fColumns[i]->fType->fLength > 8) || (dataType == CalpontSystemCatalog::VARCHAR && tableDef.fColumns[i]->fType->fLength > 7) || (dataType == CalpontSystemCatalog::VARBINARY && tableDef.fColumns[i]->fType->fLength > 7) ) numDictCols++; } fStartingColOID = fObjectIDManager.allocOIDs(numColumns+numDictCols+1); //include column, oids,dictionary oids and tableoid #ifdef IDB_DDL_DEBUG cout << "Create table allocOIDs got the stating oid " << fStartingColOID << endl; #endif if (fStartingColOID < 0) { result.result = CREATE_ERROR; errorMsg = "Error in getting objectid from oidmanager."; Message::Args args; Message message(9); args.add("(1)Create table failed due to "); args.add(errorMsg); message.format(args); result.message = message; fSessionManager.rolledback(txnID); return result; } // Write the table metadata to the systemtable VERBOSE_INFO("Writing meta data to SYSTABLE"); ByteStream bytestream; bytestream << (ByteStream::byte)WE_SVR_WRITE_SYSTABLE; bytestream << uniqueId; bytestream << (uint32_t) createTableStmt.fSessionID; bytestream << (uint32_t)txnID.id; bytestream << (uint32_t)fStartingColOID; bytestream << (uint32_t)createTableStmt.fTableWithAutoi; uint16_t dbRoot; BRM::OID_t sysOid = 1001; //Find out where systable is rc = fDbrm->getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) { result.result =(ResultCode) rc; Message::Args args; Message message(9); args.add("Error while calling getSysCatDBRoot "); args.add(errorMsg); message.format(args); result.message = message; //release transaction fSessionManager.rolledback(txnID); return result; } int pmNum = 1; bytestream << (uint32_t)dbRoot; tableDef.serialize(bytestream); boost::shared_ptr bsIn; boost::shared_ptr > dbRootPMMap = oamcache->getDBRootToPMMap(); pmNum = (*dbRootPMMap)[dbRoot]; try { fWEClient->write(bytestream, (unsigned)pmNum); #ifdef IDB_DDL_DEBUG cout << "create table sending We_SVR_WRITE_SYSTABLE to pm " << pmNum << endl; #endif while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; if (rc != 0) { errorMsg.clear(); *bsIn >> errorMsg; #ifdef IDB_DDL_DEBUG cout << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl; #endif } break; } } } catch (runtime_error& ex) //write error { #ifdef IDB_DDL_DEBUG cout << "create table got exception" << ex.what() << endl; #endif rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; #ifdef IDB_DDL_DEBUG cout << "create table got unknown exception" << endl; #endif } if (rc != 0) { result.result =(ResultCode) rc; Message::Args args; Message message(9); args.add("(2)Create table failed due to "); args.add(errorMsg); message.format( args ); result.message = message; if (rc != NETWORK_ERROR) { rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID ); //What to do with the error code } //release transaction fSessionManager.rolledback(txnID); return result; } VERBOSE_INFO("Writing meta data to SYSCOLUMN"); bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATE_SYSCOLUMN; bytestream << uniqueId; bytestream << (uint32_t) createTableStmt.fSessionID; bytestream << (uint32_t)txnID.id; bytestream << numColumns; for (unsigned i = 0; i getSysCatDBRoot(sysOid, dbRoot); if (rc != 0) { result.result =(ResultCode) rc; Message::Args args; Message message(9); args.add("Error while calling getSysCatDBRoot "); args.add(errorMsg); message.format(args); result.message = message; //release transaction fSessionManager.rolledback(txnID); return result; } bytestream << (uint32_t)dbRoot; tableDef.serialize(bytestream); pmNum = (*dbRootPMMap)[dbRoot]; try { fWEClient->write(bytestream, (uint32_t)pmNum); #ifdef IDB_DDL_DEBUG cout << "create table sending We_SVR_WRITE_SYSTABLE to pm " << pmNum << endl; #endif while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; if (rc != 0) { errorMsg.clear(); *bsIn >> errorMsg; #ifdef IDB_DDL_DEBUG cout << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl; #endif } break; } } } catch (runtime_error& ex) //write error { #ifdef IDB_DDL_DEBUG cout << "create table got exception" << ex.what() << endl; #endif rc = NETWORK_ERROR; errorMsg = ex.what(); } catch (...) { rc = NETWORK_ERROR; #ifdef IDB_DDL_DEBUG cout << "create table got unknown exception" << endl; #endif } if (rc != 0) { result.result =(ResultCode) rc; Message::Args args; Message message(9); args.add("(3)Create table failed due to "); args.add(errorMsg); message.format( args ); result.message = message; if (rc != NETWORK_ERROR) { rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID ); //What to do with the error code } //release transaction fSessionManager.rolledback(txnID); return result; } //Get the number of tables in the database, the current table is included. int tableCount = systemCatalogPtr->getTableCount(); //Calculate which dbroot the columns should start DBRootConfigList dbRootList = oamcache->getDBRootNums(); uint16_t useDBRootIndex = tableCount % dbRootList.size(); //Find out the dbroot# corresponding the useDBRootIndex from oam uint16_t useDBRoot = dbRootList[useDBRootIndex]; VERBOSE_INFO("Creating column files"); ColumnDef* colDefPtr; ddlpackage::ColumnDefList tableDefCols = tableDef.fColumns; ColumnDefList::const_iterator iter = tableDefCols.begin(); bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES; bytestream << uniqueId; bytestream << (uint32_t)txnID.id; bytestream << (numColumns + numDictCols); unsigned colNum = 0; unsigned dictNum = 0; while (iter != tableDefCols.end()) { colDefPtr = *iter; CalpontSystemCatalog::ColDataType dataType = convertDataType(colDefPtr->fType->fType); if (dataType == CalpontSystemCatalog::DECIMAL || dataType == CalpontSystemCatalog::UDECIMAL) { if (colDefPtr->fType->fPrecision == -1 || colDefPtr->fType->fPrecision == 0) { colDefPtr->fType->fLength = 8; } else if ((colDefPtr->fType->fPrecision > 0) && (colDefPtr->fType->fPrecision < 3)) { colDefPtr->fType->fLength = 1; } else if (colDefPtr->fType->fPrecision < 5 && (colDefPtr->fType->fPrecision > 2)) { colDefPtr->fType->fLength = 2; } else if (colDefPtr->fType->fPrecision > 4 && colDefPtr->fType->fPrecision < 10) { colDefPtr->fType->fLength = 4; } else if (colDefPtr->fType->fPrecision > 9 && colDefPtr->fType->fPrecision < 19) { colDefPtr->fType->fLength = 8; } } bytestream << (fStartingColOID + (colNum++) + 1); bytestream << (uint8_t) dataType; bytestream << (uint8_t) false; bytestream << (uint32_t) colDefPtr->fType->fLength; bytestream << (uint16_t) useDBRoot; bytestream << (uint32_t) colDefPtr->fType->fCompressiontype; if ( (dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) || (dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) || (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) ) { bytestream << (uint32_t) (fStartingColOID+numColumns+(dictNum++)+1); bytestream << (uint8_t) dataType; bytestream << (uint8_t) true; bytestream << (uint32_t) colDefPtr->fType->fLength; bytestream << (uint16_t) useDBRoot; bytestream << (uint32_t) colDefPtr->fType->fCompressiontype; } ++iter; } //@Bug 4176. save oids to a log file for cleanup after fail over. std::vector oidList; for (unsigned i = 0; i write(bytestream, pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { rc = NETWORK_ERROR; errorMsg = "Lost connection to Write Engine Server while updating SYSTABLES"; break; } else { *bsIn >> rc; if (rc != 0) { errorMsg.clear(); *bsIn >> errorMsg; #ifdef IDB_DDL_DEBUG cout << "Create table We_SVR_WRITE_CREATETABLEFILES: " << errorMsg << endl; #endif } break; } } if (rc != 0) { //drop the newly created files bytestream.restart(); bytestream << (ByteStream::byte) WE_SVR_WRITE_DROPFILES; bytestream << uniqueId; bytestream << (uint32_t)(numColumns+numDictCols); for (unsigned i = 0; i < (numColumns+numDictCols); i++) { bytestream << (uint32_t)(fStartingColOID + i + 1); } fWEClient->write(bytestream, pmNum); while (1) { bsIn.reset(new ByteStream()); fWEClient->read(uniqueId, bsIn); if ( bsIn->length() == 0 ) //read error { break; } else { break; } } //@Bug 5464. Delete from extent map. fDbrm->deleteOIDs(oidList); } } catch (runtime_error&) { errorMsg = "Lost connection to Write Engine Server"; } if (rc != 0) { rollBackTransaction( uniqueId, txnID, createTableStmt.fSessionID); //What to do with the error code fSessionManager.rolledback(txnID); } else { commitTransaction(uniqueId, txnID); fSessionManager.committed(txnID); fWEClient->removeQueue(uniqueId); deleteLogFile(DROPTABLE_LOG, fStartingColOID, uniqueId); } // Log the DDL statement. logDDL(createTableStmt.fSessionID, txnID.id, createTableStmt.fSql, createTableStmt.fOwner); } catch (std::exception& ex) { result.result = CREATE_ERROR; Message::Args args; Message message(9); args.add("(5)Create table failed due to "); args.add(ex.what()); message.format( args ); result.message = message; fSessionManager.rolledback(txnID); fWEClient->removeQueue(uniqueId); return result; } //fWEClient->removeQueue(uniqueId); if (rc !=0) { result.result = CREATE_ERROR; Message::Args args; Message message(9); args.add("(6)Create table failed due to "); args.add(errorMsg); message.format( args ); result.message = message; } return result; } void CreateTableProcessor::rollBackCreateTable(const string& error, BRM::TxnID txnID, int sessionId, ddlpackage::TableDef& tableDef, DDLResult& result) { cerr << "CreatetableProcessor::processPackage: " << error << endl; Message::Args args; Message message(1); args.add("(7)Create table Failed: "); args.add(error); args.add(""); args.add(""); message.format(args); result.result = CREATE_ERROR; result.message = message; fWriteEngine.rollbackTran(txnID.id, sessionId); size_t size = tableDef.fColumns.size(); for (size_t i = 0; i < size; ++i) { fWriteEngine.dropColumn(txnID.id, fStartingColOID + i); } try { execplan::ObjectIDManager fObjectIDManager; fObjectIDManager.returnOID(fTableOID); fObjectIDManager.returnOIDs(fStartingColOID, fStartingColOID + tableDef.fColumns.size() - 1); } catch (std::exception& ex) { Message::Args args; Message message(6); args.add(ex.what()); message.format(args); result.message = message; result.result = CREATE_ERROR; } catch (...) { Message::Args args; Message message(6); args.add("Unknown exception"); message.format(args); result.message = message; result.result = CREATE_ERROR; //cout << "returnOIDs error" << endl; } DictionaryOIDList::const_iterator dictoid_iter = fDictionaryOIDList.begin(); while (dictoid_iter != fDictionaryOIDList.end()) { DictOID dictOID = *dictoid_iter; fWriteEngine.dropDctnry(txnID.id, dictOID.dictOID, dictOID.treeOID, dictOID.listOID); //fObjectIDManager.returnOID(dictOID.dictOID); ++dictoid_iter; } fSessionManager.rolledback(txnID); } } // namespace ddlpackageprocessor // vim:ts=4 sw=4: