You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			202 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			202 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /*****************************************************************************
 | |
|  * $Id: cleartablelockthread.cpp 2101 2013-01-21 14:12:52Z rdempsey $
 | |
|  *
 | |
|  ****************************************************************************/
 | |
| #include "cleartablelockthread.h"
 | |
| 
 | |
| #include "bytestream.h"
 | |
| #include "we_messages.h"
 | |
| 
 | |
| #include <stdexcept>
 | |
| #include <sstream>
 | |
| 
 | |
| /*static*/ boost::mutex ClearTableLockThread::fStdOutLock;
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // ClearTableLockThread constructor
 | |
| //------------------------------------------------------------------------------
 | |
| ClearTableLockThread::ClearTableLockThread(messageqcpp::MessageQueueClient* clt,
 | |
|                                            const BRM::TableLockInfo& tInfo, const std::string& tblName,
 | |
|                                            CLRTBLLOCK_MSGTYPE msgType, ClearTableLockStatus* pStatus)
 | |
|  : fTableLockInfo(tInfo), fClt(clt), fTblName(tblName), fMsgType(msgType), fStatus(pStatus)
 | |
| {
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Main entry point for a ClearTableLockThread object, used to forward a
 | |
| // cleartablelock tool command to the WriteEngineServer specified to the ctor.
 | |
| //------------------------------------------------------------------------------
 | |
| void ClearTableLockThread::operator()()
 | |
| {
 | |
|   try
 | |
|   {
 | |
|     if (fMsgType == CLRTBLLOCK_MSGTYPE_ROLLBACK)
 | |
|       executeRollback();
 | |
|     else if (fMsgType == CLRTBLLOCK_MSGTYPE_CLEANUP)
 | |
|       executeFileCleanup();
 | |
|   }
 | |
|   catch (std::exception& ex)
 | |
|   {
 | |
|     setStatus(101, ex.what());
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     std::string errMsg("Unknown exception.");
 | |
|     setStatus(102, errMsg);
 | |
|   }
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Process bulk rollback portion of cleartablelock command.
 | |
| //------------------------------------------------------------------------------
 | |
| void ClearTableLockThread::executeRollback()
 | |
| {
 | |
|   boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | |
|   messageqcpp::ByteStream bsOut;
 | |
| 
 | |
|   const std::string APPLNAME("cleartablelock command");
 | |
| 
 | |
|   //--------------------------------------------------------------------------
 | |
|   // Send rollback msg to the writeengine server connected to fClt.
 | |
|   //--------------------------------------------------------------------------
 | |
|   {
 | |
|     boost::mutex::scoped_lock lk(fStdOutLock);
 | |
|     std::cout << "Sending rollback request to PM" << fStatus->moduleID() << "..." << std::endl;
 | |
|     //		std::cout << "cleartablelock rollback: tableLock-" <<fTableLockInfo.id<<
 | |
|     //			": oid-"  << fTableLockInfo.tableOID <<
 | |
|     //			"; name-" << fTblName                <<
 | |
|     //			"; app-"  << APPLNAME                << std::endl;
 | |
|   }
 | |
| 
 | |
|   bsOut << (messageqcpp::ByteStream::byte)WriteEngine::WE_CLT_SRV_CLEAR_TABLE_LOCK;
 | |
|   bsOut << fTableLockInfo.id;
 | |
|   bsOut << fTableLockInfo.tableOID;
 | |
|   bsOut << fTblName;
 | |
|   bsOut << APPLNAME;
 | |
|   fClt->write(bsOut);
 | |
| 
 | |
|   // Wait for the response, and check for any errors
 | |
|   std::string rollbackErrMsg;
 | |
|   bsIn.reset(new messageqcpp::ByteStream());
 | |
|   bsIn = fClt->read();
 | |
| 
 | |
|   if (bsIn->length() == 0)
 | |
|   {
 | |
|     std::string errMsg("Network error, PM rollback");
 | |
|     setStatus(103, errMsg);
 | |
| 
 | |
|     boost::mutex::scoped_lock lk(fStdOutLock);
 | |
|     std::cout << "No response from PM" << fStatus->moduleID() << std::endl;
 | |
|     return;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     messageqcpp::ByteStream::byte rc;
 | |
|     *bsIn >> rc;
 | |
|     *bsIn >> rollbackErrMsg;
 | |
| 
 | |
|     {
 | |
|       boost::mutex::scoped_lock lk(fStdOutLock);
 | |
| 
 | |
|       if (rc == 0)
 | |
|         std::cout << "Successful rollback response from PM" << fStatus->moduleID() << std::endl;
 | |
|       else
 | |
|         std::cout << "Unsuccessful rollback response from PM" << fStatus->moduleID() << "; " << rollbackErrMsg
 | |
|                   << std::endl;
 | |
| 
 | |
|       //			std::cout << "cleartablelock rollback response; rc-" << (int)rc <<
 | |
|       //				"; retMsg: <" << rollbackErrMsg << '>' << std::endl;
 | |
|     }
 | |
| 
 | |
|     if (rc != 0)
 | |
|     {
 | |
|       std::string errMsg("PM rollback error: ");
 | |
|       errMsg += rollbackErrMsg;
 | |
|       setStatus(104, errMsg);
 | |
|       return;
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Process file cleanup portion of cleartablelock command.
 | |
| //------------------------------------------------------------------------------
 | |
| void ClearTableLockThread::executeFileCleanup()
 | |
| {
 | |
|   boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | |
|   messageqcpp::ByteStream bsOut;
 | |
| 
 | |
|   //--------------------------------------------------------------------------
 | |
|   // Send cleanup msg (to delete rb files) to the we server connected to fClt.
 | |
|   //--------------------------------------------------------------------------
 | |
|   {
 | |
|     boost::mutex::scoped_lock lk(fStdOutLock);
 | |
|     std::cout << "Sending cleanup request to PM" << fStatus->moduleID() << "..." << std::endl;
 | |
|     //		std::cout  << "cleartablelock cleanup: " <<
 | |
|     //			"oid-" << fTableLockInfo.tableOID    << std::endl;
 | |
|   }
 | |
| 
 | |
|   bsOut << (messageqcpp::ByteStream::byte)WriteEngine::WE_CLT_SRV_CLEAR_TABLE_LOCK_CLEANUP;
 | |
|   bsOut << fTableLockInfo.tableOID;
 | |
|   fClt->write(bsOut);
 | |
| 
 | |
|   // Wait for the response, and check for any errors
 | |
|   std::string fileDeleteErrMsg;
 | |
|   bsIn.reset(new messageqcpp::ByteStream());
 | |
|   bsIn = fClt->read();
 | |
| 
 | |
|   if (bsIn->length() == 0)
 | |
|   {
 | |
|     std::string errMsg("Network error; PM rollback cleanup");
 | |
|     setStatus(105, errMsg);
 | |
| 
 | |
|     boost::mutex::scoped_lock lk(fStdOutLock);
 | |
|     std::cout << "No response from PM" << fStatus->moduleID() << std::endl;
 | |
|     return;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     messageqcpp::ByteStream::byte rc;
 | |
|     *bsIn >> rc;
 | |
|     *bsIn >> fileDeleteErrMsg;
 | |
| 
 | |
|     {
 | |
|       boost::mutex::scoped_lock lk(fStdOutLock);
 | |
| 
 | |
|       if (rc == 0)
 | |
|         std::cout << "Successful cleanup response from PM" << fStatus->moduleID() << std::endl;
 | |
|       else
 | |
|         std::cout << "Unsuccessful cleanup response from PM" << fStatus->moduleID() << "; "
 | |
|                   << fileDeleteErrMsg << std::endl;
 | |
| 
 | |
|       //			std::cout << "cleartablelock cleanup response; rc-" << (int)rc <<
 | |
|       //				"; retMsg: <" << fileDeleteErrMsg << '>' << std::endl;
 | |
|     }
 | |
| 
 | |
|     if (rc != 0)
 | |
|     {
 | |
|       std::string errMsg("PM rollback cleanup error: ");
 | |
|       errMsg += fileDeleteErrMsg;
 | |
|       setStatus(106, errMsg);
 | |
|       return;
 | |
|     }
 | |
|   }
 | |
| }
 |