You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-31 18:30:33 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			458 lines
		
	
	
		
			9.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			458 lines
		
	
	
		
			9.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| #include <queue>
 | |
| #include <sstream>
 | |
| #include <fstream>
 | |
| using namespace std;
 | |
| 
 | |
| #define BOOST_DISABLE_ASSERTS
 | |
| #include <boost/thread.hpp>
 | |
| 
 | |
| #pragma GCC diagnostic push
 | |
| #pragma GCC diagnostic ignored "-Wsuggest-override"
 | |
| #include "thrift/transport/TSocket.h"
 | |
| #include "thrift/transport/TBufferTransports.h"
 | |
| namespace att = apache::thrift::transport;
 | |
| 
 | |
| #include "thrift/protocol/TBinaryProtocol.h"
 | |
| namespace atp = apache::thrift::protocol;
 | |
| #pragma GCC diagnostic pop
 | |
| 
 | |
| #include "atomicops.h"
 | |
| 
 | |
| #include "queryteleserverparms.h"
 | |
| #include "querytele_types.h"
 | |
| #include "QueryTeleService.h"
 | |
| 
 | |
| #include "queryteleprotoimpl.h"
 | |
| 
 | |
| namespace
 | |
| {
 | |
| const size_t MaxQueueElems = 1000;
 | |
| 
 | |
| template <class T>
 | |
| struct TsTeleQueue
 | |
| {
 | |
|   typedef std::queue<T> TeleQueue;
 | |
| 
 | |
|   TeleQueue queue;
 | |
|   boost::mutex queueMtx;
 | |
| };
 | |
| 
 | |
| TsTeleQueue<querytele::StepTele> stQueue;
 | |
| TsTeleQueue<querytele::QueryTele> qtQueue;
 | |
| TsTeleQueue<querytele::ImportTele> itQueue;
 | |
| 
 | |
| volatile bool isInited = false;
 | |
| boost::mutex initMux;
 | |
| 
 | |
| std::shared_ptr<att::TSocket> fSocket;
 | |
| std::shared_ptr<att::TBufferedTransport> fTransport;
 | |
| std::shared_ptr<atp::TBinaryProtocol> fProtocol;
 | |
| 
 | |
| querytele::StepTele gLastStep;
 | |
| 
 | |
| struct QStats
 | |
| {
 | |
|   int qtqueuedrops;
 | |
|   int stqueuedrops;
 | |
|   int stqueuedups;
 | |
|   int itqueuedrops;
 | |
|   QStats() : qtqueuedrops(0), stqueuedrops(0), stqueuedups(0), itqueuedrops(0)
 | |
|   {
 | |
|     ;
 | |
|   }
 | |
| };
 | |
| 
 | |
| QStats fQStats;
 | |
| 
 | |
| #ifdef QUERY_TELE_DEBUG
 | |
| string get_trace_file()
 | |
| {
 | |
|   ostringstream oss;
 | |
|   pid_t pid = getpid();
 | |
|   pthread_t threadid = pthread_self();
 | |
|   oss << "/tmp/qt-consumer-" << pid << "-" << threadid;
 | |
| 
 | |
|   return oss.str();
 | |
| }
 | |
| 
 | |
| void log_query(const querytele::QueryTele& qtdata)
 | |
| {
 | |
|   ofstream trace(get_trace_file().c_str(), ios::out | ios::app);
 | |
|   trace << "Query," << qtdata.query_uuid << "," << ",";  // skip step uuid
 | |
| 
 | |
|   if (qtdata.msg_type == querytele::QTType::QT_SUMMARY)
 | |
|     trace << "SUMMARY,";
 | |
|   else if (qtdata.msg_type == querytele::QTType::QT_START)
 | |
|     trace << "START,";
 | |
|   else
 | |
|     trace << "PROGRESS,";
 | |
| 
 | |
|   trace << ",";  // sktp step type
 | |
| 
 | |
|   trace << qtdata.start_time << ",";
 | |
|   trace << qtdata.end_time << ",";
 | |
| 
 | |
|   trace << qtdata.cache_io << ",";
 | |
|   trace << qtdata.msg_rcv_cnt << ",";
 | |
|   trace << qtdata.rows << ",";
 | |
|   trace << qtdata.max_mem_pct << ",";
 | |
| 
 | |
|   trace << qtdata.query_type << ",";
 | |
|   trace << qtdata.schema_name << ",";
 | |
|   trace << qtdata.query << ",";
 | |
|   trace << qtdata.system_name;
 | |
|   trace << endl;
 | |
|   trace.close();
 | |
| }
 | |
| #endif
 | |
| 
 | |
| #ifdef QUERY_TELE_DEBUG
 | |
| const string st2str(enum querytele::StepType::type t)
 | |
| {
 | |
|   switch (t)
 | |
|   {
 | |
|     case querytele::StepType::T_HJS: return "HJS";
 | |
| 
 | |
|     case querytele::StepType::T_DSS: return "DSS";
 | |
| 
 | |
|     case querytele::StepType::T_CES: return "CES";
 | |
| 
 | |
|     case querytele::StepType::T_SQS: return "SQS";
 | |
| 
 | |
|     case querytele::StepType::T_TAS: return "TAS";
 | |
| 
 | |
|     case querytele::StepType::T_TNS: return "TNS";
 | |
| 
 | |
|     case querytele::StepType::T_BPS: return "BPS";
 | |
| 
 | |
|     case querytele::StepType::T_TCS: return "TCS";
 | |
| 
 | |
|     case querytele::StepType::T_HVS: return "HVS";
 | |
| 
 | |
|     case querytele::StepType::T_WFS: return "WFS";
 | |
| 
 | |
|     case querytele::StepType::T_SAS: return "SAS";
 | |
| 
 | |
|     case querytele::StepType::T_TUN: return "TUN";
 | |
| 
 | |
|     default: return "INV";
 | |
|   }
 | |
| 
 | |
|   return "INV";
 | |
| }
 | |
| #endif
 | |
| 
 | |
| #ifdef QUERY_TELE_DEBUG
 | |
| void log_step(const querytele::StepTele& stdata)
 | |
| {
 | |
|   ofstream trace(get_trace_file().c_str(), ios::out | ios::app);
 | |
| 
 | |
|   trace << "Step," << stdata.query_uuid << "," << stdata.step_uuid << ",";
 | |
| 
 | |
|   if (stdata.msg_type == querytele::STType::ST_SUMMARY)
 | |
|     trace << "SUMMARY,";
 | |
|   else if (stdata.msg_type == querytele::STType::ST_START)
 | |
|     trace << "START,";
 | |
|   else
 | |
|     trace << "PROGRESS,";
 | |
| 
 | |
|   trace << st2str(stdata.step_type) << ",";
 | |
| 
 | |
|   trace << stdata.start_time << ",";
 | |
|   trace << stdata.end_time << ",";
 | |
| 
 | |
|   trace << stdata.cache_io << ",";
 | |
|   trace << stdata.msg_rcv_cnt << ",";
 | |
|   trace << stdata.rows << ",";
 | |
| 
 | |
|   if (stdata.total_units_of_work > 0)
 | |
|     trace << stdata.units_of_work_completed * 100 / stdata.total_units_of_work << ",";
 | |
|   else
 | |
|     trace << "-1,";
 | |
| 
 | |
|   trace << ",,,";  // skip qtype, schemo, etc.
 | |
|   trace << fQStats.stqueuedrops << "," << fQStats.stqueuedups << "," << stQueue.queue.size();
 | |
|   trace << endl;
 | |
|   trace.close();
 | |
| }
 | |
| #endif
 | |
| 
 | |
| void TeleConsumer()
 | |
| {
 | |
|   bool didSomeWork = false;
 | |
|   boost::mutex::scoped_lock itlk(itQueue.queueMtx, boost::defer_lock);
 | |
|   boost::mutex::scoped_lock qtlk(qtQueue.queueMtx, boost::defer_lock);
 | |
|   boost::mutex::scoped_lock stlk(stQueue.queueMtx, boost::defer_lock);
 | |
|   querytele::QueryTeleServiceClient client(fProtocol);
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     for (;;)
 | |
|     {
 | |
|       didSomeWork = false;
 | |
| 
 | |
|       itlk.lock();
 | |
| 
 | |
|       // Empty the import queue first...
 | |
|       while (!itQueue.queue.empty())
 | |
|       {
 | |
|         querytele::ImportTele itdata = itQueue.queue.front();
 | |
|         itQueue.queue.pop();
 | |
|         itlk.unlock();
 | |
| 
 | |
|         try
 | |
|         {
 | |
|           fTransport->open();
 | |
|           client.postImport(itdata);
 | |
|           fTransport->close();
 | |
|         }
 | |
|         catch (...)
 | |
|         {
 | |
|           try
 | |
|           {
 | |
|             fTransport->close();
 | |
|           }
 | |
|           catch (...)
 | |
|           {
 | |
|           }
 | |
|         }
 | |
| 
 | |
|         didSomeWork = true;
 | |
|         itlk.lock();
 | |
|       }
 | |
| 
 | |
|       itlk.unlock();
 | |
| 
 | |
|       qtlk.lock();
 | |
| 
 | |
|       // Now empty the query queue...
 | |
|       while (!qtQueue.queue.empty())
 | |
|       {
 | |
|         querytele::QueryTele qtdata = qtQueue.queue.front();
 | |
|         qtQueue.queue.pop();
 | |
|         qtlk.unlock();
 | |
| 
 | |
|         try
 | |
|         {
 | |
|           fTransport->open();
 | |
| #ifdef QUERY_TELE_DEBUG
 | |
|           log_query(qtdata);
 | |
| #endif
 | |
|           client.postQuery(qtdata);
 | |
|           fTransport->close();
 | |
|         }
 | |
|         catch (...)
 | |
|         {
 | |
|           try
 | |
|           {
 | |
|             fTransport->close();
 | |
|           }
 | |
|           catch (...)
 | |
|           {
 | |
|           }
 | |
|         }
 | |
| 
 | |
|         didSomeWork = true;
 | |
|         qtlk.lock();
 | |
|       }
 | |
| 
 | |
|       qtlk.unlock();
 | |
| 
 | |
|       stlk.lock();
 | |
| 
 | |
|       // Finally empty the step queue...
 | |
|       // @bug6088 - Added check for query queue and import queue in while statment below to
 | |
|       //            keep the step logs from starving the query and import logs.
 | |
|       while (!stQueue.queue.empty() && qtQueue.queue.empty() && itQueue.queue.empty())
 | |
|       {
 | |
|         querytele::StepTele stdata = stQueue.queue.front();
 | |
|         stQueue.queue.pop();
 | |
|         stlk.unlock();
 | |
| 
 | |
|         try
 | |
|         {
 | |
|           fTransport->open();
 | |
| #ifdef QUERY_TELE_DEBUG
 | |
|           log_step(stdata);
 | |
| #endif
 | |
|           client.postStep(stdata);
 | |
|           fTransport->close();
 | |
|         }
 | |
|         catch (...)
 | |
|         {
 | |
|           try
 | |
|           {
 | |
|             fTransport->close();
 | |
|           }
 | |
|           catch (...)
 | |
|           {
 | |
|           }
 | |
|         }
 | |
| 
 | |
|         didSomeWork = true;
 | |
|         stlk.lock();
 | |
|       }
 | |
| 
 | |
|       stlk.unlock();
 | |
| 
 | |
|       if (!didSomeWork)
 | |
|       {
 | |
|         usleep(50000);
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     // we're probably shutting down, just let this thread die quietly...
 | |
|   }
 | |
| }
 | |
| 
 | |
| boost::thread* consThd;
 | |
| 
 | |
| }  // namespace
 | |
| 
 | |
| namespace querytele
 | |
| {
 | |
| QueryTeleProtoImpl::QueryTeleProtoImpl(const QueryTeleServerParms& sp) : fServerParms(sp)
 | |
| {
 | |
|   if (fServerParms.host.empty() || fServerParms.port == 0)
 | |
|     return;
 | |
| 
 | |
|   boost::mutex::scoped_lock lk(initMux);
 | |
| 
 | |
|   atomicops::atomicMb();
 | |
| 
 | |
|   if (isInited)
 | |
|     return;
 | |
| 
 | |
|   fSocket.reset(new att::TSocket(fServerParms.host, fServerParms.port));
 | |
|   fTransport.reset(new att::TBufferedTransport(fSocket));
 | |
|   fProtocol.reset(new atp::TBinaryProtocol(fTransport));
 | |
| 
 | |
|   consThd = new boost::thread(&TeleConsumer);
 | |
| 
 | |
|   atomicops::atomicMb();
 | |
|   isInited = true;
 | |
| }
 | |
| 
 | |
| int QueryTeleProtoImpl::enqStepTele(const StepTele& stdata)
 | |
| {
 | |
|   try
 | |
|   {
 | |
|     boost::mutex::scoped_lock lk(stQueue.queueMtx);
 | |
| 
 | |
|     // @bug6088 - Added conditions below to always log progress SUMMARY and START messages to avoid completed
 | |
|     // queries showing up with progress 0
 | |
|     //            and no steps.
 | |
|     if (stQueue.queue.size() >= MaxQueueElems && stdata.msg_type != querytele::STType::ST_SUMMARY &&
 | |
|         stdata.msg_type != querytele::STType::ST_START)
 | |
|     {
 | |
|       fQStats.stqueuedrops++;
 | |
|       return -1;
 | |
|     }
 | |
| 
 | |
|     if (stdata.step_uuid != gLastStep.step_uuid || stdata.msg_type != gLastStep.msg_type ||
 | |
|         stdata.step_type != gLastStep.step_type ||
 | |
|         stdata.total_units_of_work != gLastStep.total_units_of_work ||
 | |
|         stdata.units_of_work_completed != gLastStep.units_of_work_completed)
 | |
|     {
 | |
|       stQueue.queue.push(stdata);
 | |
|       gLastStep = stdata;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       fQStats.stqueuedups++;
 | |
|     }
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     return -2;
 | |
|   }
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| int QueryTeleProtoImpl::enqQueryTele(const QueryTele& qtdata)
 | |
| {
 | |
|   try
 | |
|   {
 | |
|     boost::mutex::scoped_lock lk(qtQueue.queueMtx);
 | |
| 
 | |
|     if (qtQueue.queue.size() >= MaxQueueElems)
 | |
|     {
 | |
|       fQStats.qtqueuedrops++;
 | |
|       return -1;
 | |
|     }
 | |
| 
 | |
|     qtQueue.queue.push(qtdata);
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     return -2;
 | |
|   }
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| int QueryTeleProtoImpl::enqImportTele(const ImportTele& itdata)
 | |
| {
 | |
|   try
 | |
|   {
 | |
|     boost::mutex::scoped_lock lk(itQueue.queueMtx);
 | |
| 
 | |
|     if (itQueue.queue.size() >= MaxQueueElems)
 | |
|     {
 | |
|       fQStats.itqueuedrops++;
 | |
|       return -1;
 | |
|     }
 | |
| 
 | |
|     itQueue.queue.push(itdata);
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     return -2;
 | |
|   }
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| int QueryTeleProtoImpl::waitForQueues()
 | |
| {
 | |
|   try
 | |
|   {
 | |
|     boost::mutex::scoped_lock lk(itQueue.queueMtx);
 | |
| 
 | |
|     while (!itQueue.queue.empty())
 | |
|     {
 | |
|       lk.unlock();
 | |
|       usleep(100000);
 | |
|       lk.lock();
 | |
|     }
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     return -1;
 | |
|   }
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| }  // namespace querytele
 |