You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-31 18:30:33 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			774 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			774 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /***********************************************************************
 | |
|  *   $Id: ddlprocessor.cpp 6 2006-06-23 17:58:51Z rcraighead $
 | |
|  *
 | |
|  *
 | |
|  ***********************************************************************/
 | |
| 
 | |
| #include <map>
 | |
| #include <boost/scoped_ptr.hpp>
 | |
| using namespace std;
 | |
| 
 | |
| #include "ddlpkg.h"
 | |
| #include "ddlprocessor.h"
 | |
| #include "createtableprocessor.h"
 | |
| #include "altertableprocessor.h"
 | |
| #include "droptableprocessor.h"
 | |
| #include "calpontsystemcatalog.h"
 | |
| #include "sqlparser.h"
 | |
| #include "configcpp.h"
 | |
| #include "markpartitionprocessor.h"
 | |
| #include "restorepartitionprocessor.h"
 | |
| #include "droppartitionprocessor.h"
 | |
| //#define 	SERIALIZE_DDL_DML_CPIMPORT 	1
 | |
| 
 | |
| #include "cacheutils.h"
 | |
| #include "vss.h"
 | |
| #include "dbrm.h"
 | |
| #include "idberrorinfo.h"
 | |
| #include "errorids.h"
 | |
| #include "we_messages.h"
 | |
| using namespace BRM;
 | |
| 
 | |
| using namespace config;
 | |
| using namespace messageqcpp;
 | |
| using namespace ddlpackageprocessor;
 | |
| using namespace ddlpackage;
 | |
| using namespace execplan;
 | |
| using namespace logging;
 | |
| using namespace WriteEngine;
 | |
| 
 | |
| #include "querytele.h"
 | |
| using namespace querytele;
 | |
| 
 | |
| #include "oamcache.h"
 | |
| 
 | |
| namespace
 | |
| {
 | |
| typedef messageqcpp::ByteStream::quadbyte quadbyte;
 | |
| 
 | |
| const quadbyte UNRECOGNIZED_PACKAGE_TYPE = 100;
 | |
| const quadbyte NO_PKNAME_AVAILABLE = 101;
 | |
| 
 | |
| const std::string DDLProcName = "DDLProc";
 | |
| 
 | |
| 
 | |
| void cleanPMSysCache()
 | |
| {
 | |
| 	vector<BRM::OID_t> oidList = getAllSysCatOIDs();
 | |
| 	cacheutils::flushOIDsFromCache ( oidList );
 | |
| }
 | |
| 
 | |
| struct PackageHandler
 | |
| {
 | |
|     void operator ()()
 | |
|     {
 | |
| 
 | |
|         DDLPackageProcessor::DDLResult result;
 | |
|         result.result = DDLPackageProcessor::NO_ERROR;
 | |
|         //boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
 | |
|                
 | |
|         try
 | |
|         {
 | |
| 			//cout << "DDLProc received package " << fPackageType << endl;
 | |
|             switch( fPackageType )
 | |
|             {
 | |
|                 case ddlpackage::DDL_CREATE_TABLE_STATEMENT:
 | |
|                     {
 | |
|                         CreateTableStatement createTableStmt;
 | |
|                         createTableStmt.unserialize(fByteStream);
 | |
| 						boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | |
| 							CalpontSystemCatalog::makeCalpontSystemCatalog(createTableStmt.fSessionID );
 | |
| 						boost::scoped_ptr<CreateTableProcessor> processor(new CreateTableProcessor(fDbrm));	
 | |
| 						processor->fTxnid.id = fTxnid.id;
 | |
| 						processor->fTxnid.valid = true;
 | |
| 						//cout << "create table using txnid " << fTxnid.id << endl;
 | |
| 
 | |
| 						QueryTeleStats qts;
 | |
| 						qts.query_uuid = QueryTeleClient::genUUID();
 | |
| 						qts.msg_type = QueryTeleStats::QT_START;
 | |
| 						qts.start_time = QueryTeleClient::timeNowms();
 | |
| 						qts.session_id = createTableStmt.fSessionID;
 | |
| 						qts.query_type = "CREATE";
 | |
| 						qts.query = createTableStmt.fSql;
 | |
| 						qts.system_name = fOamCache->getSystemName();
 | |
| 						qts.module_name = fOamCache->getModuleName();
 | |
| 						qts.schema_name = createTableStmt.schemaName();
 | |
| 						fQtc.postQueryTele(qts);
 | |
| 
 | |
|                         result = processor->processPackage(createTableStmt);                        
 | |
|                         
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog( createTableStmt.fSessionID );
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog( createTableStmt.fSessionID | 0x80000000);
 | |
| 
 | |
| 						qts.msg_type = QueryTeleStats::QT_SUMMARY;
 | |
| 						qts.end_time = QueryTeleClient::timeNowms();
 | |
| 						fQtc.postQueryTele(qts);
 | |
|                     }
 | |
|                     break;
 | |
| 
 | |
|                 case ddlpackage::DDL_ALTER_TABLE_STATEMENT:
 | |
|                     {
 | |
|                     	AlterTableStatement alterTableStmt;
 | |
|                     	alterTableStmt.unserialize(fByteStream);
 | |
|                     	boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | |
| 							CalpontSystemCatalog::makeCalpontSystemCatalog(alterTableStmt.fSessionID );
 | |
| 						boost::scoped_ptr<AlterTableProcessor> processor(new AlterTableProcessor(fDbrm));						
 | |
| 						processor->fTxnid.id = fTxnid.id;
 | |
| 						processor->fTxnid.valid = true;
 | |
| 
 | |
| 						QueryTeleStats qts;
 | |
| 						qts.query_uuid = QueryTeleClient::genUUID();
 | |
| 						qts.msg_type = QueryTeleStats::QT_START;
 | |
| 						qts.start_time = QueryTeleClient::timeNowms();
 | |
| 						qts.session_id = alterTableStmt.fSessionID;
 | |
| 						qts.query_type = "ALTER";
 | |
| 						qts.query = alterTableStmt.fSql;
 | |
| 						qts.system_name = fOamCache->getSystemName();
 | |
| 						qts.module_name = fOamCache->getModuleName();
 | |
| 						qts.schema_name = alterTableStmt.schemaName();
 | |
| 						fQtc.postQueryTele(qts);
 | |
| 
 | |
|                     	result = processor->processPackage(alterTableStmt);                        
 | |
|                     	
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog( alterTableStmt.fSessionID );
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog( alterTableStmt.fSessionID | 0x80000000);
 | |
| 
 | |
| 						qts.msg_type = QueryTeleStats::QT_SUMMARY;
 | |
| 						qts.end_time = QueryTeleClient::timeNowms();
 | |
| 						fQtc.postQueryTele(qts);
 | |
|                     }
 | |
|                     break;
 | |
| 
 | |
|                 case ddlpackage::DDL_DROP_TABLE_STATEMENT:
 | |
|                     {
 | |
|                         DropTableStatement dropTableStmt;
 | |
|                         dropTableStmt.unserialize(fByteStream);
 | |
| 						boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | |
| 							CalpontSystemCatalog::makeCalpontSystemCatalog(dropTableStmt.fSessionID );
 | |
|                         boost::scoped_ptr<DropTableProcessor> processor(new DropTableProcessor(fDbrm));
 | |
| 
 | |
| 						processor->fTxnid.id = fTxnid.id;
 | |
| 						processor->fTxnid.valid = true;
 | |
| 
 | |
| 						QueryTeleStats qts;
 | |
| 						qts.query_uuid = QueryTeleClient::genUUID();
 | |
| 						qts.msg_type = QueryTeleStats::QT_START;
 | |
| 						qts.start_time = QueryTeleClient::timeNowms();
 | |
| 						qts.session_id = dropTableStmt.fSessionID;
 | |
| 						qts.query_type = "DROP";
 | |
| 						qts.query = dropTableStmt.fSql;
 | |
| 						qts.system_name = fOamCache->getSystemName();
 | |
| 						qts.module_name = fOamCache->getModuleName();
 | |
| 						qts.schema_name = dropTableStmt.schemaName();
 | |
| 						fQtc.postQueryTele(qts);
 | |
| 
 | |
| 						//cout << "Drop table using txnid " << fTxnid.id << endl;
 | |
|                         result = processor->processPackage(dropTableStmt);
 | |
| 
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog( dropTableStmt.fSessionID );
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog( dropTableStmt.fSessionID | 0x80000000);
 | |
| 
 | |
| 						qts.msg_type = QueryTeleStats::QT_SUMMARY;
 | |
| 						qts.end_time = QueryTeleClient::timeNowms();
 | |
| 						fQtc.postQueryTele(qts);
 | |
|                     }
 | |
|                     break;
 | |
| 
 | |
|                 case ddlpackage::DDL_TRUNC_TABLE_STATEMENT:
 | |
|                     {
 | |
|                         TruncTableStatement truncTableStmt;
 | |
|                         truncTableStmt.unserialize(fByteStream);
 | |
|                         boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | |
| 						CalpontSystemCatalog::makeCalpontSystemCatalog(truncTableStmt.fSessionID);
 | |
|                         boost::scoped_ptr<TruncTableProcessor> processor(new TruncTableProcessor(fDbrm));
 | |
| 
 | |
| 						processor->fTxnid.id = fTxnid.id;
 | |
| 						processor->fTxnid.valid = true;
 | |
| 
 | |
| 						QueryTeleStats qts;
 | |
| 						qts.query_uuid = QueryTeleClient::genUUID();
 | |
| 						qts.msg_type = QueryTeleStats::QT_START;
 | |
| 						qts.start_time = QueryTeleClient::timeNowms();
 | |
| 						qts.session_id = truncTableStmt.fSessionID;
 | |
| 						qts.query_type = "TRUNCATE";
 | |
| 						qts.query = truncTableStmt.fSql;
 | |
| 						qts.system_name = fOamCache->getSystemName();
 | |
| 						qts.module_name = fOamCache->getModuleName();
 | |
| 						qts.schema_name = truncTableStmt.schemaName();
 | |
| 						fQtc.postQueryTele(qts);
 | |
| 
 | |
|                         result = processor->processPackage(truncTableStmt);
 | |
| 
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog(truncTableStmt.fSessionID );
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog(truncTableStmt.fSessionID | 0x80000000);
 | |
| 
 | |
| 						qts.msg_type = QueryTeleStats::QT_SUMMARY;
 | |
| 						qts.end_time = QueryTeleClient::timeNowms();
 | |
| 						fQtc.postQueryTele(qts);
 | |
|                     }
 | |
|                     break;
 | |
| 
 | |
| 				case ddlpackage::DDL_MARK_PARTITION_STATEMENT:
 | |
|                     {
 | |
|                         MarkPartitionStatement markPartitionStmt;
 | |
|                         markPartitionStmt.unserialize(fByteStream);
 | |
|                         boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | |
|                         CalpontSystemCatalog::makeCalpontSystemCatalog(markPartitionStmt.fSessionID);
 | |
|                         boost::scoped_ptr<MarkPartitionProcessor> processor(new MarkPartitionProcessor(fDbrm));
 | |
| 						(processor->fTxnid).id = fTxnid.id;
 | |
| 						(processor->fTxnid).valid = true;
 | |
|                         result = processor->processPackage(markPartitionStmt);
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog(markPartitionStmt.fSessionID );
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog(markPartitionStmt.fSessionID | 0x80000000);
 | |
|                     }
 | |
|                     break;
 | |
| 
 | |
| 				case ddlpackage::DDL_RESTORE_PARTITION_STATEMENT:
 | |
|                     {
 | |
|                         RestorePartitionStatement restorePartitionStmt;
 | |
|                         restorePartitionStmt.unserialize(fByteStream);
 | |
|                         boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | |
| 						CalpontSystemCatalog::makeCalpontSystemCatalog(restorePartitionStmt.fSessionID);
 | |
|                         boost::scoped_ptr<RestorePartitionProcessor> processor(new RestorePartitionProcessor(fDbrm));
 | |
| 						(processor->fTxnid).id = fTxnid.id;
 | |
| 						(processor->fTxnid).valid = true;
 | |
|                         result = processor->processPackage(restorePartitionStmt);
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog(restorePartitionStmt.fSessionID );
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog(restorePartitionStmt.fSessionID | 0x80000000);
 | |
|                     }
 | |
|                     break;
 | |
| 
 | |
| 				case ddlpackage::DDL_DROP_PARTITION_STATEMENT:
 | |
|                     {
 | |
|                         DropPartitionStatement dropPartitionStmt;
 | |
|                         dropPartitionStmt.unserialize(fByteStream);
 | |
|                         boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | |
| 						CalpontSystemCatalog::makeCalpontSystemCatalog(dropPartitionStmt.fSessionID);
 | |
|                         boost::scoped_ptr<DropPartitionProcessor> processor(new DropPartitionProcessor(fDbrm));
 | |
| 						(processor->fTxnid).id = fTxnid.id;
 | |
| 						(processor->fTxnid).valid = true;
 | |
|                         result = processor->processPackage(dropPartitionStmt);
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog(dropPartitionStmt.fSessionID );
 | |
|                         systemCatalogPtr->removeCalpontSystemCatalog(dropPartitionStmt.fSessionID | 0x80000000);
 | |
|                     }
 | |
|                     break;
 | |
| 
 | |
|                 default:
 | |
|                     throw UNRECOGNIZED_PACKAGE_TYPE;
 | |
| 					break;
 | |
|             }
 | |
| 
 | |
|             //@Bug 3427. No need to log user rror, just return the message to user.
 | |
|             //Log errors
 | |
|             if ((result.result != DDLPackageProcessor::NO_ERROR) && (result.result != DDLPackageProcessor::USER_ERROR))
 | |
|             {
 | |
|                 logging::LoggingID lid(23);
 | |
|                 logging::MessageLog ml(lid);
 | |
| 
 | |
|                 ml.logErrorMessage( result.message );
 | |
|             }
 | |
| 			string hdfstest = config::Config::makeConfig()->getConfig("Installation", "DBRootStorageType");
 | |
| 			if (hdfstest == "hdfs" || hdfstest == "HDFS")
 | |
| 				cleanPMSysCache();
 | |
|             messageqcpp::ByteStream results;
 | |
|             messageqcpp::ByteStream::byte status =  result.result;
 | |
|             results << status;
 | |
|             results << result.message.msg();
 | |
| 		
 | |
|             fIos.write(results);
 | |
| 
 | |
|             fIos.close();
 | |
|         }
 | |
|         catch(quadbyte& /*foo*/)
 | |
|         {
 | |
|             fIos.close();
 | |
|             cout << "Unrecognized package type" << endl;
 | |
|         }
 | |
| 		catch(logging::IDBExcept& idbEx)
 | |
| 		{
 | |
| 			cleanPMSysCache();
 | |
|             messageqcpp::ByteStream results;
 | |
| 			messageqcpp::ByteStream::byte status = DDLPackageProcessor::CREATE_ERROR;
 | |
|             results << status;
 | |
| 			results << string(idbEx.what());
 | |
| 		
 | |
|             fIos.write(results);
 | |
| 
 | |
|             fIos.close();
 | |
| 		}
 | |
|         catch(...)
 | |
|         {
 | |
|             fIos.close();
 | |
|         }
 | |
| 
 | |
|     }
 | |
| 
 | |
|     messageqcpp::IOSocket fIos;
 | |
|     messageqcpp::ByteStream fByteStream;
 | |
|     messageqcpp::ByteStream::quadbyte fPackageType;
 | |
|     BRM::TxnID fTxnid;
 | |
|     BRM::DBRM* fDbrm;
 | |
|     QueryTeleClient fQtc;
 | |
|     oam::OamCache* fOamCache;
 | |
| };
 | |
| 
 | |
| }
 | |
| 
 | |
| namespace ddlprocessor
 | |
| {
 | |
| 
 | |
| DDLProcessor::DDLProcessor( int packageMaxThreads, int packageWorkQueueSize )
 | |
|         :fPackageMaxThreads(packageMaxThreads), fPackageWorkQueueSize(packageWorkQueueSize),
 | |
|          fMqServer(DDLProcName)
 | |
| {
 | |
|     fDdlPackagepool.setMaxThreads(fPackageMaxThreads);
 | |
|     fDdlPackagepool.setQueueSize(fPackageWorkQueueSize);
 | |
|     fDdlPackagepool.setName("DdlPackagepool");
 | |
| 	csc = CalpontSystemCatalog::makeCalpontSystemCatalog();
 | |
| 	csc->identity(CalpontSystemCatalog::EC);
 | |
| 	string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host"));
 | |
| 	if (!teleServerHost.empty())
 | |
| 	{
 | |
| 		int teleServerPort = config::Config::fromText(config::Config::makeConfig()->getConfig("QueryTele", "Port"));
 | |
| 		if (teleServerPort > 0)
 | |
| 		{
 | |
| 			fQtc.serverParms(QueryTeleServerParms(teleServerHost, teleServerPort));
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| DDLProcessor::~DDLProcessor()
 | |
| {
 | |
| 
 | |
| }
 | |
| void DDLProcessor::process()
 | |
| {
 | |
|     DBRM dbrm;
 | |
|     messageqcpp::IOSocket ios;
 | |
|     messageqcpp::ByteStream bs;
 | |
|     PackageHandler handler;
 | |
|     messageqcpp::ByteStream::quadbyte packageType;
 | |
| 	bool concurrentSupport = true;
 | |
| 	string concurrentTranStr = config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions");
 | |
| 	if ( concurrentTranStr.length() != 0 )
 | |
| 	{
 | |
| 		if ((concurrentTranStr.compare("N") == 0) || (concurrentTranStr.compare("n") == 0))
 | |
| 			concurrentSupport = false;
 | |
| 	}
 | |
| 	cout << "DDLProc is ready..." << endl;
 | |
| 	
 | |
|     try
 | |
|     {
 | |
|         for (;;)
 | |
|         {
 | |
|             ios = fMqServer.accept();
 | |
|             bs = ios.read();
 | |
|             uint32_t sessionID;
 | |
|             bs >> sessionID;
 | |
|             bs >> packageType;
 | |
| 
 | |
|             uint32_t stateFlags;
 | |
| 			if (dbrm.getSystemState(stateFlags) > 0)		// > 0 implies succesful retrieval. It doesn't imply anything about the contents
 | |
|             {
 | |
|                 messageqcpp::ByteStream results;
 | |
|                 const char* responseMsg=0;
 | |
|                 messageqcpp::ByteStream::byte status;
 | |
|                 bool bReject = false;
 | |
|                 // Check to see if we're in write suspended mode
 | |
|                 // If so, we can't process the request.
 | |
|                 if (stateFlags & SessionManagerServer::SS_SUSPENDED)
 | |
|                 {
 | |
|                     status =  DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
 | |
|                     responseMsg = "Writing to the database is disabled.";
 | |
|                     bReject = true;
 | |
|                 }
 | |
|                 // Check to see if we're in write suspend or shutdown pending mode
 | |
|                 if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING
 | |
|                  || stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING)
 | |
|                 {
 | |
|                     if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING)
 | |
|                     {
 | |
|                         responseMsg = "Writing to the database is disabled.";
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         responseMsg = "The database is being shut down.";
 | |
|                     }
 | |
|                     status =  DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
 | |
|                     bReject = true;
 | |
|                 }
 | |
|                 if (bReject)
 | |
|                 {
 | |
|                     results << status;
 | |
|                     //@bug 266
 | |
|                     MessageLog logger(LoggingID(27));
 | |
|                     logging::Message::Args args;
 | |
|                     logging::Message message(2);
 | |
|                     args.add(responseMsg);
 | |
|                     message.format( args );
 | |
|                     results << message.msg();
 | |
|                     ios.write(results);
 | |
|                     std::cout << responseMsg << endl;
 | |
|                     std::cout << "Command rejected. Status " << (int)status << message.msg() << endl;
 | |
|                     continue;
 | |
|                 }
 | |
|             }
 | |
|  
 | |
|             //check whether the system is ready to process statement.
 | |
|             if (dbrm.getSystemReady() < 1)
 | |
|             {
 | |
|                 messageqcpp::ByteStream results;
 | |
|                 messageqcpp::ByteStream::byte status =  DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
 | |
|             
 | |
|                 results << status;
 | |
|                 string msg ("System is not ready yet. Please try again." );
 | |
| 
 | |
|                 results << msg;
 | |
|                 ios.write(results);
 | |
| 
 | |
|                 ios.close();
 | |
|                 continue;
 | |
|             }
 | |
| 			
 | |
| 			BRM::TxnID txnid;
 | |
| 			int rc = 0;
 | |
| 			if (!concurrentSupport)
 | |
| 			{           
 | |
| 				//Check if any other active transaction
 | |
| 				bool bIsDbrmUp = true;
 | |
| 				bool anyOtherActiveTransaction = true;
 | |
| 				execplan::SessionManager sessionManager; 
 | |
| 				BRM::SIDTIDEntry blockingsid;        
 | |
| 				int i = 0;
 | |
| 				int waitPeriod = 10;
 | |
| 				//@Bug 2487 Check transaction map every 1/10 second
 | |
| 
 | |
| 				int sleepTime = 100; // sleep 100 milliseconds between checks
 | |
| 				int numTries = 10;  // try 10 times per second
 | |
|             
 | |
| 				string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod");
 | |
| 				if ( waitPeriodStr.length() != 0 )
 | |
| 					waitPeriod = static_cast<int>(config::Config::fromText(waitPeriodStr));
 | |
|                 
 | |
| 				numTries = 	waitPeriod * 10;
 | |
| 				struct timespec rm_ts;
 | |
| 
 | |
| 				rm_ts.tv_sec = sleepTime/1000; 
 | |
| 				rm_ts.tv_nsec = sleepTime%1000 *1000000;
 | |
| 				//cout << "starting i = " << i << endl;
 | |
| 				//anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp );	
 | |
| 				while (anyOtherActiveTransaction)
 | |
| 				{
 | |
| 					anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp,
 | |
| 						blockingsid );
 | |
| 					if (anyOtherActiveTransaction) 
 | |
| 					{
 | |
| 						for ( ; i < numTries; i++ )
 | |
| 						{
 | |
| #ifdef _MSC_VER
 | |
| 							Sleep(rm_ts.tv_sec * 1000);
 | |
| #else
 | |
| 							struct timespec abs_ts;
 | |
| 							//cout << "session " << sessionID << " nanosleep on package type " << (int)packageType << endl;
 | |
| 							do
 | |
| 							{
 | |
| 								abs_ts.tv_sec = rm_ts.tv_sec; 
 | |
| 								abs_ts.tv_nsec = rm_ts.tv_nsec;
 | |
| 							} 
 | |
| 							while(nanosleep(&abs_ts,&rm_ts) < 0);
 | |
| #endif
 | |
| 							anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp,
 | |
| 								blockingsid );
 | |
| 							if ( !anyOtherActiveTransaction )
 | |
| 							{
 | |
| 								//cout << "Ready to process type " << (int)packageType << endl;
 | |
| 								txnid = sessionManager.getTxnID(sessionID);
 | |
| 								if ( !txnid.valid )
 | |
| 								{
 | |
| 									txnid = sessionManager.newTxnID(sessionID, true, true);
 | |
| 									if (txnid.valid) {
 | |
| 										//cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl;
 | |
| 										anyOtherActiveTransaction = false;
 | |
| 										break;
 | |
| 									}
 | |
| 									else
 | |
| 									{
 | |
| 										anyOtherActiveTransaction = true;
 | |
| 									}
 | |
| 								}
 | |
| 								else
 | |
| 								{
 | |
| 									string errorMsg;
 | |
| 									rc = commitTransaction(txnid.id, errorMsg);
 | |
| 									if ( rc != 0)
 | |
| 										throw std::runtime_error(errorMsg);
 | |
|                                 //need unlock the table.
 | |
| 									std::vector<TableLockInfo> tableLocks = dbrm.getAllTableLocks();
 | |
| 									bool lockReleased = true;
 | |
| 									for (unsigned k = 0; k < tableLocks.size(); k++)
 | |
| 									{
 | |
| 										if (tableLocks[k].ownerTxnID == txnid.id)
 | |
| 										{
 | |
| 											lockReleased = dbrm.releaseTableLock(tableLocks[k].id);
 | |
| 											if (!lockReleased)
 | |
| 											{
 | |
| 												ostringstream os;
 | |
| 												os << "tablelock id " << tableLocks[k].id << " is not found";
 | |
| 												throw std::runtime_error(os.str());
 | |
| 											}
 | |
| 										}
 | |
| 									}
 | |
| 									dbrm.committed(txnid);
 | |
| 									txnid = dbrm.newTxnID(sessionID, true, true);
 | |
| 									if (txnid.valid) {
 | |
| 										//cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl;
 | |
| 										anyOtherActiveTransaction = false;
 | |
| 										break;
 | |
| 									}
 | |
| 									else
 | |
| 									{
 | |
| 										anyOtherActiveTransaction = true;
 | |
| 									}
 | |
| 								}
 | |
| 							}
 | |
| 						}
 | |
|                         //cout << "ending i = " << i << endl;
 | |
| 					}
 | |
| 					else
 | |
| 					{
 | |
| 						//cout << "Ready to process type " << (int)packageType << endl;
 | |
| 						txnid = sessionManager.getTxnID(sessionID);
 | |
| 						if ( !txnid.valid )
 | |
| 						{
 | |
| 							txnid = sessionManager.newTxnID(sessionID, true, true);
 | |
| 							if (!txnid.valid) {
 | |
| 								//cout << "cannot get txnid " << (int)packageType << " for session " << sessionID <<  endl;
 | |
| 								anyOtherActiveTransaction = true;
 | |
| 							}
 | |
| 							else
 | |
| 							{
 | |
| 								anyOtherActiveTransaction = false;
 | |
| 							}
 | |
| 						}
 | |
| 						else 
 | |
| 						{
 | |
| 							string errorMsg;
 | |
| 							rc = commitTransaction(txnid.id, errorMsg);
 | |
| 							if ( rc != 0)
 | |
| 								throw std::runtime_error(errorMsg);
 | |
| 							//need unlock the table.
 | |
| 							std::vector<TableLockInfo> tableLocks = dbrm.getAllTableLocks();
 | |
| 							bool lockReleased = true;
 | |
| 							for (unsigned k = 0; k < tableLocks.size(); k++)
 | |
| 							{
 | |
| 								if (tableLocks[k].ownerTxnID == txnid.id)
 | |
| 								{
 | |
| 									lockReleased = dbrm.releaseTableLock(tableLocks[k].id);
 | |
| 									if (!lockReleased)
 | |
| 									{
 | |
| 										ostringstream os;
 | |
| 										os << "tablelock id " << tableLocks[k].id << " is not found";
 | |
| 										throw std::runtime_error(os.str());
 | |
| 									}
 | |
| 								}
 | |
| 							}	
 | |
| 							sessionManager.committed(txnid);
 | |
| 							txnid = sessionManager.newTxnID(sessionID, true, true);
 | |
| 							if (!txnid.valid) {
 | |
|                             //cout << "cannot get txnid " << (int)packageType << " for session " << sessionID <<  endl;
 | |
| 								anyOtherActiveTransaction = true;
 | |
| 							}
 | |
| 							else
 | |
| 							{
 | |
| 								anyOtherActiveTransaction = false;
 | |
| 							}
 | |
| 						}
 | |
| 					}
 | |
|                     
 | |
| 					if ((anyOtherActiveTransaction) && (i >= numTries))
 | |
| 					{
 | |
|                     //cout << " Erroring out on package type " << (int)packageType << endl;
 | |
| 						break;  
 | |
| 					}
 | |
| 				}
 | |
| 	
 | |
| 				if ((anyOtherActiveTransaction) && (i >= numTries))
 | |
| 				{
 | |
| 					messageqcpp::ByteStream results;
 | |
| 					messageqcpp::ByteStream::byte status =  DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
 | |
|             
 | |
| 					results << status;
 | |
| 					Message::Args args;
 | |
| 					args.add(static_cast<uint64_t>(blockingsid.sessionid));
 | |
| 
 | |
| 					results << IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args);
 | |
| 					//@Bug 3854 Log to debug.log
 | |
| 					LoggingID logid(15, 0, 0);
 | |
| 					logging::Message::Args args1;
 | |
| 					logging::Message msg(1);
 | |
| 					args1.add(IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args));
 | |
| 					msg.format( args1 );
 | |
| 					logging::Logger logger(logid.fSubsysID);
 | |
| 					logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
 | |
| 				
 | |
| 					ios.write(results);
 | |
| 
 | |
| 					ios.close();
 | |
| 				}
 | |
| 				else
 | |
| 				{
 | |
| 					handler.fIos = ios;
 | |
| 					handler.fByteStream = bs;
 | |
| 					handler.fPackageType = packageType;
 | |
| 					handler.fTxnid = txnid;
 | |
| 					handler.fDbrm = &dbrm;
 | |
| 					handler.fQtc = fQtc;
 | |
| 					handler.fOamCache = oam::OamCache::makeOamCache();
 | |
| 					fDdlPackagepool.invoke(handler);
 | |
|         
 | |
| 				}
 | |
| 			}
 | |
| 			else
 | |
| 			{
 | |
| 				txnid = dbrm.getTxnID(sessionID);
 | |
| 				if ( !txnid.valid )
 | |
| 				{
 | |
| 					txnid = dbrm.newTxnID(sessionID, true, true);
 | |
| 					if (!txnid.valid) {
 | |
| 					throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
 | |
| 					}
 | |
| 				}
 | |
| 				else 
 | |
| 				{
 | |
| 					string errorMsg;
 | |
| 					rc = commitTransaction(txnid.id, errorMsg);
 | |
| 					if ( rc != 0)
 | |
| 						throw std::runtime_error(errorMsg);
 | |
| 					//need unlock the table.
 | |
| 					std::vector<TableLockInfo> tableLocks = dbrm.getAllTableLocks();
 | |
| 					bool lockReleased = true;
 | |
| 					for (unsigned k = 0; k < tableLocks.size(); k++)
 | |
| 					{
 | |
| 						if (tableLocks[k].ownerTxnID == txnid.id)
 | |
| 						{
 | |
| 							lockReleased = dbrm.releaseTableLock(tableLocks[k].id);
 | |
| 							if (!lockReleased)
 | |
| 							{
 | |
| 								ostringstream os;
 | |
| 								os << "tablelock id " << tableLocks[k].id << " is not found";
 | |
| 								throw std::runtime_error(os.str());
 | |
| 							}
 | |
| 						}
 | |
| 					}	
 | |
| 					dbrm.committed(txnid);
 | |
| 					txnid = dbrm.newTxnID(sessionID, true, true);
 | |
| 					if (!txnid.valid) {
 | |
| 						throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
 | |
| 					}	
 | |
| 				}
 | |
| 				handler.fIos = ios;
 | |
| 				handler.fByteStream = bs;
 | |
| 				handler.fPackageType = packageType;
 | |
| 				handler.fTxnid = txnid;
 | |
| 				handler.fDbrm = &dbrm;
 | |
| 				handler.fQtc = fQtc;
 | |
| 				handler.fOamCache = oam::OamCache::makeOamCache();
 | |
| 				fDdlPackagepool.invoke(handler);
 | |
| 			}
 | |
|         }
 | |
|     }
 | |
|     catch (exception& ex)
 | |
|     {
 | |
|         cerr << ex.what() << endl;
 | |
| 		messageqcpp::ByteStream results;
 | |
|         messageqcpp::ByteStream::byte status =  DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
 | |
|                 
 | |
|         results << status;
 | |
|         results << ex.what();
 | |
|         ios.write(results);
 | |
| 
 | |
|         ios.close();
 | |
|     }
 | |
|     catch(...)
 | |
|     {
 | |
|         cerr << "Caught unknown exception!" << endl;
 | |
| 		messageqcpp::ByteStream results;
 | |
|         messageqcpp::ByteStream::byte status =  DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
 | |
|                 
 | |
|         results << status;
 | |
|         results << "Caught unknown exception!";
 | |
|         ios.write(results);
 | |
| 
 | |
|         ios.close();
 | |
|     }
 | |
| 
 | |
|     // wait for all the threads to exit
 | |
|     fDdlPackagepool.wait();
 | |
| }
 | |
| 
 | |
| int DDLProcessor::commitTransaction(uint32_t txnID, std::string & errorMsg)
 | |
| {
 | |
| 	fWEClient = new WriteEngine::WEClients(WriteEngine::WEClients::DDLPROC);
 | |
| 	fPMCount = fWEClient->getPmCount();
 | |
| 	ByteStream bytestream;
 | |
| 	DBRM dbrm;
 | |
| 	uint64_t uniqueId = dbrm.getUnique64();
 | |
| 	fWEClient->addQueue(uniqueId);
 | |
| 	bytestream << (ByteStream::byte)WE_SVR_COMMIT_VERSION;
 | |
| 	bytestream << uniqueId;
 | |
| 	bytestream << txnID;
 | |
| 	uint32_t msgRecived = 0;
 | |
| 	fWEClient->write_to_all(bytestream);
 | |
| 	boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | |
| 	bsIn.reset(new ByteStream());
 | |
| 	int rc = 0;
 | |
| 	ByteStream::byte tmp8;
 | |
| 	while (1)
 | |
| 	{
 | |
| 		if (msgRecived == fPMCount)
 | |
| 			break;
 | |
| 		fWEClient->read(uniqueId, bsIn);
 | |
| 		if ( bsIn->length() == 0 ) //read error
 | |
| 		{
 | |
| 			rc = 1;
 | |
| 			errorMsg = "DDL cannot communicate with WES"; 
 | |
| 			fWEClient->removeQueue(uniqueId);
 | |
| 			break;
 | |
| 		}			
 | |
| 		else {
 | |
| 			*bsIn >> tmp8;
 | |
| 			rc = tmp8;
 | |
| 			if (rc != 0) {
 | |
| 				*bsIn >> errorMsg;
 | |
| 				fWEClient->removeQueue(uniqueId);
 | |
| 				break;
 | |
| 			}
 | |
| 			else
 | |
| 				msgRecived++;						
 | |
| 		}
 | |
| 	}
 | |
| 	delete fWEClient;
 | |
| 	fWEClient = 0;
 | |
| 	return rc;
 | |
| }
 | |
| }  // namespace ddlprocessor
 | |
| // vim:ts=4 sw=4:
 | |
| 
 |