You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			458 lines
		
	
	
		
			9.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			458 lines
		
	
	
		
			9.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
#include <queue>
 | 
						|
#include <sstream>
 | 
						|
#include <fstream>
 | 
						|
using namespace std;
 | 
						|
 | 
						|
#define BOOST_DISABLE_ASSERTS
 | 
						|
#include <boost/thread.hpp>
 | 
						|
 | 
						|
#pragma GCC diagnostic push
 | 
						|
#pragma GCC diagnostic ignored "-Wsuggest-override"
 | 
						|
#include "thrift/transport/TSocket.h"
 | 
						|
#include "thrift/transport/TBufferTransports.h"
 | 
						|
namespace att = apache::thrift::transport;
 | 
						|
 | 
						|
#include "thrift/protocol/TBinaryProtocol.h"
 | 
						|
namespace atp = apache::thrift::protocol;
 | 
						|
#pragma GCC diagnostic pop
 | 
						|
 | 
						|
#include "atomicops.h"
 | 
						|
 | 
						|
#include "queryteleserverparms.h"
 | 
						|
#include "querytele_types.h"
 | 
						|
#include "QueryTeleService.h"
 | 
						|
 | 
						|
#include "queryteleprotoimpl.h"
 | 
						|
 | 
						|
namespace
 | 
						|
{
 | 
						|
const size_t MaxQueueElems = 1000;
 | 
						|
 | 
						|
template <class T>
 | 
						|
struct TsTeleQueue
 | 
						|
{
 | 
						|
  typedef std::queue<T> TeleQueue;
 | 
						|
 | 
						|
  TeleQueue queue;
 | 
						|
  boost::mutex queueMtx;
 | 
						|
};
 | 
						|
 | 
						|
TsTeleQueue<querytele::StepTele> stQueue;
 | 
						|
TsTeleQueue<querytele::QueryTele> qtQueue;
 | 
						|
TsTeleQueue<querytele::ImportTele> itQueue;
 | 
						|
 | 
						|
volatile bool isInited = false;
 | 
						|
boost::mutex initMux;
 | 
						|
 | 
						|
std::shared_ptr<att::TSocket> fSocket;
 | 
						|
std::shared_ptr<att::TBufferedTransport> fTransport;
 | 
						|
std::shared_ptr<atp::TBinaryProtocol> fProtocol;
 | 
						|
 | 
						|
querytele::StepTele gLastStep;
 | 
						|
 | 
						|
struct QStats
 | 
						|
{
 | 
						|
  int qtqueuedrops;
 | 
						|
  int stqueuedrops;
 | 
						|
  int stqueuedups;
 | 
						|
  int itqueuedrops;
 | 
						|
  QStats() : qtqueuedrops(0), stqueuedrops(0), stqueuedups(0), itqueuedrops(0)
 | 
						|
  {
 | 
						|
    ;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
QStats fQStats;
 | 
						|
 | 
						|
#ifdef QUERY_TELE_DEBUG
 | 
						|
string get_trace_file()
 | 
						|
{
 | 
						|
  ostringstream oss;
 | 
						|
  pid_t pid = getpid();
 | 
						|
  pthread_t threadid = pthread_self();
 | 
						|
  oss << "/tmp/qt-consumer-" << pid << "-" << threadid;
 | 
						|
 | 
						|
  return oss.str();
 | 
						|
}
 | 
						|
 | 
						|
void log_query(const querytele::QueryTele& qtdata)
 | 
						|
{
 | 
						|
  ofstream trace(get_trace_file().c_str(), ios::out | ios::app);
 | 
						|
  trace << "Query," << qtdata.query_uuid << "," << ",";  // skip step uuid
 | 
						|
 | 
						|
  if (qtdata.msg_type == querytele::QTType::QT_SUMMARY)
 | 
						|
    trace << "SUMMARY,";
 | 
						|
  else if (qtdata.msg_type == querytele::QTType::QT_START)
 | 
						|
    trace << "START,";
 | 
						|
  else
 | 
						|
    trace << "PROGRESS,";
 | 
						|
 | 
						|
  trace << ",";  // sktp step type
 | 
						|
 | 
						|
  trace << qtdata.start_time << ",";
 | 
						|
  trace << qtdata.end_time << ",";
 | 
						|
 | 
						|
  trace << qtdata.cache_io << ",";
 | 
						|
  trace << qtdata.msg_rcv_cnt << ",";
 | 
						|
  trace << qtdata.rows << ",";
 | 
						|
  trace << qtdata.max_mem_pct << ",";
 | 
						|
 | 
						|
  trace << qtdata.query_type << ",";
 | 
						|
  trace << qtdata.schema_name << ",";
 | 
						|
  trace << qtdata.query << ",";
 | 
						|
  trace << qtdata.system_name;
 | 
						|
  trace << endl;
 | 
						|
  trace.close();
 | 
						|
}
 | 
						|
#endif
 | 
						|
 | 
						|
#ifdef QUERY_TELE_DEBUG
 | 
						|
const string st2str(enum querytele::StepType::type t)
 | 
						|
{
 | 
						|
  switch (t)
 | 
						|
  {
 | 
						|
    case querytele::StepType::T_HJS: return "HJS";
 | 
						|
 | 
						|
    case querytele::StepType::T_DSS: return "DSS";
 | 
						|
 | 
						|
    case querytele::StepType::T_CES: return "CES";
 | 
						|
 | 
						|
    case querytele::StepType::T_SQS: return "SQS";
 | 
						|
 | 
						|
    case querytele::StepType::T_TAS: return "TAS";
 | 
						|
 | 
						|
    case querytele::StepType::T_TNS: return "TNS";
 | 
						|
 | 
						|
    case querytele::StepType::T_BPS: return "BPS";
 | 
						|
 | 
						|
    case querytele::StepType::T_TCS: return "TCS";
 | 
						|
 | 
						|
    case querytele::StepType::T_HVS: return "HVS";
 | 
						|
 | 
						|
    case querytele::StepType::T_WFS: return "WFS";
 | 
						|
 | 
						|
    case querytele::StepType::T_SAS: return "SAS";
 | 
						|
 | 
						|
    case querytele::StepType::T_TUN: return "TUN";
 | 
						|
 | 
						|
    default: return "INV";
 | 
						|
  }
 | 
						|
 | 
						|
  return "INV";
 | 
						|
}
 | 
						|
#endif
 | 
						|
 | 
						|
#ifdef QUERY_TELE_DEBUG
 | 
						|
void log_step(const querytele::StepTele& stdata)
 | 
						|
{
 | 
						|
  ofstream trace(get_trace_file().c_str(), ios::out | ios::app);
 | 
						|
 | 
						|
  trace << "Step," << stdata.query_uuid << "," << stdata.step_uuid << ",";
 | 
						|
 | 
						|
  if (stdata.msg_type == querytele::STType::ST_SUMMARY)
 | 
						|
    trace << "SUMMARY,";
 | 
						|
  else if (stdata.msg_type == querytele::STType::ST_START)
 | 
						|
    trace << "START,";
 | 
						|
  else
 | 
						|
    trace << "PROGRESS,";
 | 
						|
 | 
						|
  trace << st2str(stdata.step_type) << ",";
 | 
						|
 | 
						|
  trace << stdata.start_time << ",";
 | 
						|
  trace << stdata.end_time << ",";
 | 
						|
 | 
						|
  trace << stdata.cache_io << ",";
 | 
						|
  trace << stdata.msg_rcv_cnt << ",";
 | 
						|
  trace << stdata.rows << ",";
 | 
						|
 | 
						|
  if (stdata.total_units_of_work > 0)
 | 
						|
    trace << stdata.units_of_work_completed * 100 / stdata.total_units_of_work << ",";
 | 
						|
  else
 | 
						|
    trace << "-1,";
 | 
						|
 | 
						|
  trace << ",,,";  // skip qtype, schemo, etc.
 | 
						|
  trace << fQStats.stqueuedrops << "," << fQStats.stqueuedups << "," << stQueue.queue.size();
 | 
						|
  trace << endl;
 | 
						|
  trace.close();
 | 
						|
}
 | 
						|
#endif
 | 
						|
 | 
						|
void TeleConsumer()
 | 
						|
{
 | 
						|
  bool didSomeWork = false;
 | 
						|
  boost::mutex::scoped_lock itlk(itQueue.queueMtx, boost::defer_lock);
 | 
						|
  boost::mutex::scoped_lock qtlk(qtQueue.queueMtx, boost::defer_lock);
 | 
						|
  boost::mutex::scoped_lock stlk(stQueue.queueMtx, boost::defer_lock);
 | 
						|
  querytele::QueryTeleServiceClient client(fProtocol);
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    for (;;)
 | 
						|
    {
 | 
						|
      didSomeWork = false;
 | 
						|
 | 
						|
      itlk.lock();
 | 
						|
 | 
						|
      // Empty the import queue first...
 | 
						|
      while (!itQueue.queue.empty())
 | 
						|
      {
 | 
						|
        querytele::ImportTele itdata = itQueue.queue.front();
 | 
						|
        itQueue.queue.pop();
 | 
						|
        itlk.unlock();
 | 
						|
 | 
						|
        try
 | 
						|
        {
 | 
						|
          fTransport->open();
 | 
						|
          client.postImport(itdata);
 | 
						|
          fTransport->close();
 | 
						|
        }
 | 
						|
        catch (...)
 | 
						|
        {
 | 
						|
          try
 | 
						|
          {
 | 
						|
            fTransport->close();
 | 
						|
          }
 | 
						|
          catch (...)
 | 
						|
          {
 | 
						|
          }
 | 
						|
        }
 | 
						|
 | 
						|
        didSomeWork = true;
 | 
						|
        itlk.lock();
 | 
						|
      }
 | 
						|
 | 
						|
      itlk.unlock();
 | 
						|
 | 
						|
      qtlk.lock();
 | 
						|
 | 
						|
      // Now empty the query queue...
 | 
						|
      while (!qtQueue.queue.empty())
 | 
						|
      {
 | 
						|
        querytele::QueryTele qtdata = qtQueue.queue.front();
 | 
						|
        qtQueue.queue.pop();
 | 
						|
        qtlk.unlock();
 | 
						|
 | 
						|
        try
 | 
						|
        {
 | 
						|
          fTransport->open();
 | 
						|
#ifdef QUERY_TELE_DEBUG
 | 
						|
          log_query(qtdata);
 | 
						|
#endif
 | 
						|
          client.postQuery(qtdata);
 | 
						|
          fTransport->close();
 | 
						|
        }
 | 
						|
        catch (...)
 | 
						|
        {
 | 
						|
          try
 | 
						|
          {
 | 
						|
            fTransport->close();
 | 
						|
          }
 | 
						|
          catch (...)
 | 
						|
          {
 | 
						|
          }
 | 
						|
        }
 | 
						|
 | 
						|
        didSomeWork = true;
 | 
						|
        qtlk.lock();
 | 
						|
      }
 | 
						|
 | 
						|
      qtlk.unlock();
 | 
						|
 | 
						|
      stlk.lock();
 | 
						|
 | 
						|
      // Finally empty the step queue...
 | 
						|
      // @bug6088 - Added check for query queue and import queue in while statment below to
 | 
						|
      //            keep the step logs from starving the query and import logs.
 | 
						|
      while (!stQueue.queue.empty() && qtQueue.queue.empty() && itQueue.queue.empty())
 | 
						|
      {
 | 
						|
        querytele::StepTele stdata = stQueue.queue.front();
 | 
						|
        stQueue.queue.pop();
 | 
						|
        stlk.unlock();
 | 
						|
 | 
						|
        try
 | 
						|
        {
 | 
						|
          fTransport->open();
 | 
						|
#ifdef QUERY_TELE_DEBUG
 | 
						|
          log_step(stdata);
 | 
						|
#endif
 | 
						|
          client.postStep(stdata);
 | 
						|
          fTransport->close();
 | 
						|
        }
 | 
						|
        catch (...)
 | 
						|
        {
 | 
						|
          try
 | 
						|
          {
 | 
						|
            fTransport->close();
 | 
						|
          }
 | 
						|
          catch (...)
 | 
						|
          {
 | 
						|
          }
 | 
						|
        }
 | 
						|
 | 
						|
        didSomeWork = true;
 | 
						|
        stlk.lock();
 | 
						|
      }
 | 
						|
 | 
						|
      stlk.unlock();
 | 
						|
 | 
						|
      if (!didSomeWork)
 | 
						|
      {
 | 
						|
        usleep(50000);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    // we're probably shutting down, just let this thread die quietly...
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
boost::thread* consThd;
 | 
						|
 | 
						|
}  // namespace
 | 
						|
 | 
						|
namespace querytele
 | 
						|
{
 | 
						|
QueryTeleProtoImpl::QueryTeleProtoImpl(const QueryTeleServerParms& sp) : fServerParms(sp)
 | 
						|
{
 | 
						|
  if (fServerParms.host.empty() || fServerParms.port == 0)
 | 
						|
    return;
 | 
						|
 | 
						|
  boost::mutex::scoped_lock lk(initMux);
 | 
						|
 | 
						|
  atomicops::atomicMb();
 | 
						|
 | 
						|
  if (isInited)
 | 
						|
    return;
 | 
						|
 | 
						|
  fSocket.reset(new att::TSocket(fServerParms.host, fServerParms.port));
 | 
						|
  fTransport.reset(new att::TBufferedTransport(fSocket));
 | 
						|
  fProtocol.reset(new atp::TBinaryProtocol(fTransport));
 | 
						|
 | 
						|
  consThd = new boost::thread(&TeleConsumer);
 | 
						|
 | 
						|
  atomicops::atomicMb();
 | 
						|
  isInited = true;
 | 
						|
}
 | 
						|
 | 
						|
int QueryTeleProtoImpl::enqStepTele(const StepTele& stdata)
 | 
						|
{
 | 
						|
  try
 | 
						|
  {
 | 
						|
    boost::mutex::scoped_lock lk(stQueue.queueMtx);
 | 
						|
 | 
						|
    // @bug6088 - Added conditions below to always log progress SUMMARY and START messages to avoid completed
 | 
						|
    // queries showing up with progress 0
 | 
						|
    //            and no steps.
 | 
						|
    if (stQueue.queue.size() >= MaxQueueElems && stdata.msg_type != querytele::STType::ST_SUMMARY &&
 | 
						|
        stdata.msg_type != querytele::STType::ST_START)
 | 
						|
    {
 | 
						|
      fQStats.stqueuedrops++;
 | 
						|
      return -1;
 | 
						|
    }
 | 
						|
 | 
						|
    if (stdata.step_uuid != gLastStep.step_uuid || stdata.msg_type != gLastStep.msg_type ||
 | 
						|
        stdata.step_type != gLastStep.step_type ||
 | 
						|
        stdata.total_units_of_work != gLastStep.total_units_of_work ||
 | 
						|
        stdata.units_of_work_completed != gLastStep.units_of_work_completed)
 | 
						|
    {
 | 
						|
      stQueue.queue.push(stdata);
 | 
						|
      gLastStep = stdata;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      fQStats.stqueuedups++;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    return -2;
 | 
						|
  }
 | 
						|
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
int QueryTeleProtoImpl::enqQueryTele(const QueryTele& qtdata)
 | 
						|
{
 | 
						|
  try
 | 
						|
  {
 | 
						|
    boost::mutex::scoped_lock lk(qtQueue.queueMtx);
 | 
						|
 | 
						|
    if (qtQueue.queue.size() >= MaxQueueElems)
 | 
						|
    {
 | 
						|
      fQStats.qtqueuedrops++;
 | 
						|
      return -1;
 | 
						|
    }
 | 
						|
 | 
						|
    qtQueue.queue.push(qtdata);
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    return -2;
 | 
						|
  }
 | 
						|
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
int QueryTeleProtoImpl::enqImportTele(const ImportTele& itdata)
 | 
						|
{
 | 
						|
  try
 | 
						|
  {
 | 
						|
    boost::mutex::scoped_lock lk(itQueue.queueMtx);
 | 
						|
 | 
						|
    if (itQueue.queue.size() >= MaxQueueElems)
 | 
						|
    {
 | 
						|
      fQStats.itqueuedrops++;
 | 
						|
      return -1;
 | 
						|
    }
 | 
						|
 | 
						|
    itQueue.queue.push(itdata);
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    return -2;
 | 
						|
  }
 | 
						|
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
int QueryTeleProtoImpl::waitForQueues()
 | 
						|
{
 | 
						|
  try
 | 
						|
  {
 | 
						|
    boost::mutex::scoped_lock lk(itQueue.queueMtx);
 | 
						|
 | 
						|
    while (!itQueue.queue.empty())
 | 
						|
    {
 | 
						|
      lk.unlock();
 | 
						|
      usleep(100000);
 | 
						|
      lk.lock();
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    return -1;
 | 
						|
  }
 | 
						|
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace querytele
 |