You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	* Fixes of bugs from ASAN warnings, part one * MQC as static library, with nifty counter for global map and mutex * Switch clang to 16 * link messageqcpp to execplan
		
			
				
	
	
		
			1002 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1002 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2022 Mariadb Corporation.
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
#include "sqlfrontsessionthread.h"
 | 
						|
#include "primproc.h"
 | 
						|
#include "primitiveserverthreadpools.h"
 | 
						|
 | 
						|
namespace exemgr
 | 
						|
{
 | 
						|
uint64_t SQLFrontSessionThread::getMaxMemPct(uint32_t sessionId)
 | 
						|
{
 | 
						|
  return globServiceExeMgr->getMaxMemPct(sessionId);
 | 
						|
}
 | 
						|
void SQLFrontSessionThread::deleteMaxMemPct(uint32_t sessionId)
 | 
						|
{
 | 
						|
  return globServiceExeMgr->deleteMaxMemPct(sessionId);
 | 
						|
}
 | 
						|
void SQLFrontSessionThread::incThreadCntPerSession(uint32_t sessionId)
 | 
						|
{
 | 
						|
  return globServiceExeMgr->incThreadCntPerSession(sessionId);
 | 
						|
}
 | 
						|
void SQLFrontSessionThread::decThreadCntPerSession(uint32_t sessionId)
 | 
						|
{
 | 
						|
  return globServiceExeMgr->decThreadCntPerSession(sessionId);
 | 
						|
}
 | 
						|
 | 
						|
void SQLFrontSessionThread::initMaxMemPct(uint32_t sessionId)
 | 
						|
{
 | 
						|
  return globServiceExeMgr->initMaxMemPct(sessionId);
 | 
						|
}
 | 
						|
//...Get and log query stats to specified output stream
 | 
						|
const std::string SQLFrontSessionThread::formatQueryStats(
 | 
						|
    joblist::SJLP& jl,         // joblist associated with query
 | 
						|
    const std::string& label,  // header label to print in front of log output
 | 
						|
    bool includeNewLine,       // include line breaks in query stats std::string
 | 
						|
    bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned)
 | 
						|
{
 | 
						|
  std::ostringstream os;
 | 
						|
 | 
						|
  // Get stats if not already acquired for current query
 | 
						|
  if (!fStatsRetrieved)
 | 
						|
  {
 | 
						|
    if (wantExtendedStats)
 | 
						|
    {
 | 
						|
      // wait for the ei data to be written by another thread (brain-dead)
 | 
						|
      struct timespec req = {0, 250000};  // 250 usec
 | 
						|
      nanosleep(&req, 0);
 | 
						|
    }
 | 
						|
 | 
						|
    // Get % memory usage during current query for sessionId
 | 
						|
    jl->querySummary(wantExtendedStats);
 | 
						|
    fStats = jl->queryStats();
 | 
						|
    fStats.fMaxMemPct = getMaxMemPct(fStats.fSessionID);
 | 
						|
    fStats.fRows = rowsReturned;
 | 
						|
    fStatsRetrieved = true;
 | 
						|
  }
 | 
						|
 | 
						|
  std::string queryMode;
 | 
						|
  queryMode = (vtableModeOn ? "Distributed" : "Standard");
 | 
						|
 | 
						|
  // Log stats to specified output stream
 | 
						|
  os << label << ": MaxMemPct-" << fStats.fMaxMemPct << "; NumTempFiles-" << fStats.fNumFiles
 | 
						|
     << "; TempFileSpace-" << roundBytes(fStats.fFileBytes) << "; ApproxPhyI/O-" << fStats.fPhyIO
 | 
						|
     << "; CacheI/O-" << fStats.fCacheIO << "; BlocksTouched-" << fStats.fMsgRcvCnt;
 | 
						|
 | 
						|
  if (includeNewLine)
 | 
						|
    os << std::endl << "      ";  // insert line break
 | 
						|
  else
 | 
						|
    os << "; ";  // continue without line break
 | 
						|
 | 
						|
  os << "PartitionBlocksEliminated-" << fStats.fCPBlocksSkipped << "; MsgBytesIn-"
 | 
						|
     << roundBytes(fStats.fMsgBytesIn) << "; MsgBytesOut-" << roundBytes(fStats.fMsgBytesOut) << "; Mode-"
 | 
						|
     << queryMode;
 | 
						|
 | 
						|
  return os.str();
 | 
						|
}
 | 
						|
 | 
						|
//... Round off to human readable format (KB, MB, or GB).
 | 
						|
const std::string SQLFrontSessionThread::roundBytes(uint64_t value) const
 | 
						|
{
 | 
						|
  const char* units[] = {"B", "KB", "MB", "GB", "TB"};
 | 
						|
  uint64_t i = 0, up = 0;
 | 
						|
  uint64_t roundedValue = value;
 | 
						|
 | 
						|
  while (roundedValue > 1024 && i < 4)
 | 
						|
  {
 | 
						|
    up = (roundedValue & 512);
 | 
						|
    roundedValue /= 1024;
 | 
						|
    i++;
 | 
						|
  }
 | 
						|
 | 
						|
  if (up)
 | 
						|
    roundedValue++;
 | 
						|
 | 
						|
  std::ostringstream oss;
 | 
						|
  oss << roundedValue << units[i];
 | 
						|
  return oss.str();
 | 
						|
}
 | 
						|
 | 
						|
//...Round off to nearest (1024*1024) MB
 | 
						|
uint64_t SQLFrontSessionThread::roundMB(uint64_t value) const
 | 
						|
{
 | 
						|
  uint64_t roundedValue = value >> 20;
 | 
						|
 | 
						|
  if (value & 524288)
 | 
						|
    roundedValue++;
 | 
						|
 | 
						|
  return roundedValue;
 | 
						|
}
 | 
						|
 | 
						|
void SQLFrontSessionThread::setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms)
 | 
						|
{
 | 
						|
  for (execplan::CalpontSelectExecutionPlan::RMParmVec::const_iterator it = parms.begin(); it != parms.end();
 | 
						|
       ++it)
 | 
						|
  {
 | 
						|
    switch (it->id)
 | 
						|
    {
 | 
						|
      case execplan::PMSMALLSIDEMEMORY:
 | 
						|
      {
 | 
						|
        globServiceExeMgr->getRm().addHJPmMaxSmallSideMap(it->sessionId, it->value);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      case execplan::UMSMALLSIDEMEMORY:
 | 
						|
      {
 | 
						|
        globServiceExeMgr->getRm().addHJUmMaxSmallSideMap(it->sessionId, it->value);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      default:;
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void SQLFrontSessionThread::buildSysCache(const execplan::CalpontSelectExecutionPlan& csep,
 | 
						|
                                          boost::shared_ptr<execplan::CalpontSystemCatalog> csc)
 | 
						|
{
 | 
						|
  const execplan::CalpontSelectExecutionPlan::ColumnMap& colMap = csep.columnMap();
 | 
						|
  std::string schemaName;
 | 
						|
 | 
						|
  for (auto it = colMap.begin(); it != colMap.end(); ++it)
 | 
						|
  {
 | 
						|
    const auto sc = dynamic_cast<execplan::SimpleColumn*>((it->second).get());
 | 
						|
 | 
						|
    if (sc)
 | 
						|
    {
 | 
						|
      schemaName = sc->schemaName();
 | 
						|
 | 
						|
      // only the first time a schema is got will actually query
 | 
						|
      // system catalog. System catalog keeps a schema name std::map.
 | 
						|
      // if a schema exists, the call getSchemaInfo returns without
 | 
						|
      // doing anything.
 | 
						|
      if (!schemaName.empty())
 | 
						|
        csc->getSchemaInfo(schemaName);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  execplan::CalpontSelectExecutionPlan::SelectList::const_iterator subIt;
 | 
						|
 | 
						|
  for (subIt = csep.derivedTableList().begin(); subIt != csep.derivedTableList().end(); ++subIt)
 | 
						|
  {
 | 
						|
    buildSysCache(*(dynamic_cast<execplan::CalpontSelectExecutionPlan*>(subIt->get())), csc);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void SQLFrontSessionThread::writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg)
 | 
						|
{
 | 
						|
  messageqcpp::ByteStream emsgBs;
 | 
						|
  messageqcpp::ByteStream tbs;
 | 
						|
  tbs << code;
 | 
						|
  fIos.write(tbs);
 | 
						|
  emsgBs << emsg;
 | 
						|
  fIos.write(emsgBs);
 | 
						|
}
 | 
						|
 | 
						|
void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl,
 | 
						|
                                                bool& stmtCounted)
 | 
						|
{
 | 
						|
  auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount();
 | 
						|
  messageqcpp::ByteStream::quadbyte qb;
 | 
						|
  execplan::MCSAnalyzeTableExecutionPlan caep;
 | 
						|
 | 
						|
  bs = fIos.read();
 | 
						|
  caep.unserialize(bs);
 | 
						|
 | 
						|
  statementsRunningCount->incr(stmtCounted);
 | 
						|
  PrimitiveServerThreadPools primitiveServerThreadPools;
 | 
						|
  jl = joblist::JobListFactory::makeJobList(&caep, fRm, primitiveServerThreadPools, false, true);
 | 
						|
 | 
						|
  // Joblist is empty.
 | 
						|
  if (jl->status() == logging::statisticsJobListEmpty)
 | 
						|
  {
 | 
						|
    if (caep.traceOn())
 | 
						|
      std::cout << "JobList is empty " << std::endl;
 | 
						|
 | 
						|
    jl.reset();
 | 
						|
    bs.restart();
 | 
						|
    qb = ANALYZE_TABLE_SUCCESS;
 | 
						|
    bs << qb;
 | 
						|
    fIos.write(bs);
 | 
						|
    bs.reset();
 | 
						|
    statementsRunningCount->decr(stmtCounted);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
 | 
						|
  {
 | 
						|
    std::cout << "fEc setup " << std::endl;
 | 
						|
    fEc->Setup();
 | 
						|
  }
 | 
						|
 | 
						|
  if (jl->status() == 0)
 | 
						|
  {
 | 
						|
    std::string emsg;
 | 
						|
 | 
						|
    if (jl->putEngineComm(fEc) != 0)
 | 
						|
      throw std::runtime_error(jl->errMsg());
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    throw std::runtime_error("ExeMgr: could not build a JobList!");
 | 
						|
  }
 | 
						|
 | 
						|
  // Execute a joblist.
 | 
						|
  jl->doQuery();
 | 
						|
 | 
						|
  FEMsgHandler msgHandler(jl, &fIos);
 | 
						|
  msgHandler.start();
 | 
						|
 | 
						|
  // Seemls like this a legacy parameter, not really needed.
 | 
						|
  const uint32_t dummyTableOid = 100;
 | 
						|
  auto* statisticsManager = statistics::StatisticsManager::instance();
 | 
						|
  // Process rowGroup by rowGroup.
 | 
						|
  auto rowCount = jl->projectTable(dummyTableOid, bs);
 | 
						|
  while (rowCount)
 | 
						|
  {
 | 
						|
    auto const& outRG = (static_cast<joblist::TupleJobList*>(jl.get()))->getOutputRowGroup();
 | 
						|
    statisticsManager->collectSample(outRG);
 | 
						|
    rowCount = jl->projectTable(dummyTableOid, bs);
 | 
						|
  }
 | 
						|
  msgHandler.stop();
 | 
						|
 | 
						|
  // Analyze collected samples.
 | 
						|
  statisticsManager->analyzeSample(caep.traceOn());
 | 
						|
  statisticsManager->incEpoch();
 | 
						|
  statisticsManager->saveToFile();
 | 
						|
 | 
						|
  // Distribute statistics across all ExeMgr clients if possible.
 | 
						|
  statistics::StatisticsDistributor::instance()->distributeStatistics();
 | 
						|
 | 
						|
  // Send the signal back to front-end.
 | 
						|
  bs.restart();
 | 
						|
  qb = ANALYZE_TABLE_SUCCESS;
 | 
						|
  bs << qb;
 | 
						|
  fIos.write(bs);
 | 
						|
  bs.reset();
 | 
						|
  statementsRunningCount->decr(stmtCounted);
 | 
						|
}
 | 
						|
 | 
						|
void SQLFrontSessionThread::analyzeTableHandleStats(messageqcpp::ByteStream& bs)
 | 
						|
{
 | 
						|
  messageqcpp::ByteStream::quadbyte qb;
 | 
						|
#ifdef DEBUG_STATISTICS
 | 
						|
  std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
 | 
						|
#endif
 | 
						|
  bs = fIos.read();
 | 
						|
#ifdef DEBUG_STATISTICS
 | 
						|
  std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
 | 
						|
#endif
 | 
						|
  uint64_t dataHashRec;
 | 
						|
  bs >> dataHashRec;
 | 
						|
 | 
						|
  uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats();
 | 
						|
  // The stats are the same.
 | 
						|
  if (dataHash == dataHashRec)
 | 
						|
  {
 | 
						|
#ifdef DEBUG_STATISTICS
 | 
						|
    std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
 | 
						|
#endif
 | 
						|
    qb = ANALYZE_TABLE_SUCCESS;
 | 
						|
    bs << qb;
 | 
						|
    fIos.write(bs);
 | 
						|
    bs.reset();
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  bs.restart();
 | 
						|
  qb = ANALYZE_TABLE_NEED_STATS;
 | 
						|
  bs << qb;
 | 
						|
  fIos.write(bs);
 | 
						|
 | 
						|
  bs.restart();
 | 
						|
  bs = fIos.read();
 | 
						|
#ifdef DEBUG_STATISTICS
 | 
						|
  std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
 | 
						|
#endif
 | 
						|
  statistics::StatisticsManager::instance()->unserialize(bs);
 | 
						|
  statistics::StatisticsManager::instance()->saveToFile();
 | 
						|
 | 
						|
#ifdef DEBUG_STATISTICS
 | 
						|
  std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl;
 | 
						|
#endif
 | 
						|
  qb = ANALYZE_TABLE_SUCCESS;
 | 
						|
  bs << qb;
 | 
						|
  fIos.write(bs);
 | 
						|
  bs.reset();
 | 
						|
}
 | 
						|
 | 
						|
void SQLFrontSessionThread::operator()()
 | 
						|
{
 | 
						|
  messageqcpp::ByteStream bs, inbs;
 | 
						|
  execplan::CalpontSelectExecutionPlan csep;
 | 
						|
  csep.sessionID(0);
 | 
						|
  joblist::SJLP jl;
 | 
						|
  bool incSQLFrontSessionThreadCnt = true;
 | 
						|
  std::mutex jlMutex;
 | 
						|
  std::condition_variable jlCleanupDone;
 | 
						|
  int destructing = 0;
 | 
						|
  int gDebug = globServiceExeMgr->getDebugLevel();
 | 
						|
  logging::Logger& msgLog = globServiceExeMgr->getLogger();
 | 
						|
 | 
						|
  bool selfJoin = false;
 | 
						|
  bool tryTuples = false;
 | 
						|
  bool usingTuples = false;
 | 
						|
  bool stmtCounted = false;
 | 
						|
  auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount();
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    for (;;)
 | 
						|
    {
 | 
						|
      selfJoin = false;
 | 
						|
      tryTuples = false;
 | 
						|
      usingTuples = false;
 | 
						|
 | 
						|
      if (jl)
 | 
						|
      {
 | 
						|
        // puts the real destruction in another thread to avoid
 | 
						|
        // making the whole session wait.  It can take several seconds.
 | 
						|
        std::unique_lock<std::mutex> scoped(jlMutex);
 | 
						|
        destructing++;
 | 
						|
        std::thread bgdtor(
 | 
						|
            [jl, &jlMutex, &jlCleanupDone, &destructing]
 | 
						|
            {
 | 
						|
              std::unique_lock<std::mutex> scoped(jlMutex);
 | 
						|
              const_cast<joblist::SJLP&>(jl).reset();  // this happens second; does real destruction
 | 
						|
              if (--destructing == 0)
 | 
						|
                jlCleanupDone.notify_one();
 | 
						|
            });
 | 
						|
        jl.reset();  // this runs first
 | 
						|
        bgdtor.detach();
 | 
						|
      }
 | 
						|
 | 
						|
      bs = fIos.read();
 | 
						|
 | 
						|
      if (bs.length() == 0)
 | 
						|
      {
 | 
						|
        if (gDebug > 1 || (gDebug && !csep.isInternal()))
 | 
						|
          std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl;
 | 
						|
 | 
						|
        // connection closed by client
 | 
						|
        fIos.close();
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else if (bs.length() < 4)  // Not a CalpontSelectExecutionPlan
 | 
						|
      {
 | 
						|
        if (gDebug)
 | 
						|
          std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length "
 | 
						|
                    << bs.length() << std::endl;
 | 
						|
 | 
						|
        fIos.close();
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else if (bs.length() == 4)  // possible tuple flag
 | 
						|
      {
 | 
						|
        messageqcpp::ByteStream::quadbyte qb;
 | 
						|
        bs >> qb;
 | 
						|
 | 
						|
        if (qb == 4)  // UM wants new tuple i/f
 | 
						|
        {
 | 
						|
          if (gDebug)
 | 
						|
            std::cout << "### UM wants tuples" << std::endl;
 | 
						|
 | 
						|
          tryTuples = true;
 | 
						|
          // now wait for the CSEP...
 | 
						|
          bs = fIos.read();
 | 
						|
        }
 | 
						|
        else if (qb == 5)  // somebody wants stats
 | 
						|
        {
 | 
						|
          bs.restart();
 | 
						|
          qb = statementsRunningCount->cur();
 | 
						|
          bs << qb;
 | 
						|
          qb = statementsRunningCount->waiting();
 | 
						|
          bs << qb;
 | 
						|
          fIos.write(bs);
 | 
						|
          fIos.close();
 | 
						|
          break;
 | 
						|
        }
 | 
						|
        else if (qb == ANALYZE_TABLE_EXECUTE)
 | 
						|
        {
 | 
						|
          analyzeTableExecute(bs, jl, stmtCounted);
 | 
						|
          continue;
 | 
						|
        }
 | 
						|
        else if (qb == ANALYZE_TABLE_REC_STATS)
 | 
						|
        {
 | 
						|
          analyzeTableHandleStats(bs);
 | 
						|
          continue;
 | 
						|
        }
 | 
						|
        else if (qb == 0)
 | 
						|
        {
 | 
						|
          // 0 => Nothing left to do. Sent by rnd_end() just to be sure.
 | 
						|
          continue;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          if (gDebug)
 | 
						|
            std::cout << "### Got a not-a-plan value " << qb << std::endl;
 | 
						|
 | 
						|
          fIos.close();
 | 
						|
          break;
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
    new_plan:
 | 
						|
      try
 | 
						|
      {
 | 
						|
        csep.unserialize(bs);
 | 
						|
      }
 | 
						|
      catch (logging::IDBExcept& ex)
 | 
						|
      {
 | 
						|
        // We can get here on illegal function parameter data type, e.g.
 | 
						|
        //   SELECT blob_column|1 FROM t1;
 | 
						|
        statementsRunningCount->decr(stmtCounted);
 | 
						|
        writeCodeAndError(ex.errorCode(), std::string(ex.what()));
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
 | 
						|
      querytele::QueryTeleStats qts;
 | 
						|
 | 
						|
      if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT"))
 | 
						|
      {
 | 
						|
        qts.query_uuid = csep.uuid();
 | 
						|
        qts.msg_type = querytele::QueryTeleStats::QT_START;
 | 
						|
        qts.start_time = querytele::QueryTeleClient::timeNowms();
 | 
						|
        qts.query = csep.data();
 | 
						|
        qts.session_id = csep.sessionID();
 | 
						|
        qts.query_type = csep.queryType();
 | 
						|
        qts.system_name = fOamCachePtr->getSystemName();
 | 
						|
        qts.module_name = fOamCachePtr->getModuleName();
 | 
						|
        qts.local_query = csep.localQuery();
 | 
						|
        qts.schema_name = csep.schemaName();
 | 
						|
        fTeleClient.postQueryTele(qts);
 | 
						|
      }
 | 
						|
 | 
						|
      if (gDebug > 1 || (gDebug && !csep.isInternal()))
 | 
						|
        std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl;
 | 
						|
 | 
						|
      setRMParms(csep.rmParms());
 | 
						|
      // Re-establish lost PP connections.
 | 
						|
      if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
 | 
						|
      {
 | 
						|
        fEc->Setup();
 | 
						|
      }
 | 
						|
      // @bug 1021. try to get schema cache for a come in query.
 | 
						|
      // skip system catalog queries.
 | 
						|
      if (!csep.isInternal())
 | 
						|
      {
 | 
						|
        boost::shared_ptr<execplan::CalpontSystemCatalog> csc =
 | 
						|
            execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID());
 | 
						|
        buildSysCache(csep, csc);
 | 
						|
      }
 | 
						|
 | 
						|
      // As soon as we have a session id for this thread, update the
 | 
						|
      // thread count per session; only do this once per thread.
 | 
						|
      // Mask 0x80000000 is for associate user query and csc query
 | 
						|
      if (incSQLFrontSessionThreadCnt)
 | 
						|
      {
 | 
						|
        // WIP
 | 
						|
        incThreadCntPerSession(csep.sessionID() | 0x80000000);
 | 
						|
        incSQLFrontSessionThreadCnt = false;
 | 
						|
      }
 | 
						|
 | 
						|
      bool needDbProfEndStatementMsg = false;
 | 
						|
      logging::Message::Args args;
 | 
						|
      std::string sqlText = csep.data();
 | 
						|
      logging::LoggingID li(16, csep.sessionID(), csep.txnID());
 | 
						|
 | 
						|
      // Initialize stats for this query, including
 | 
						|
      // init sessionMemMap entry for this session to 0 memory %.
 | 
						|
      // We will need this later for traceOn() or if we receive a
 | 
						|
      // table request with qb=3 (see below). This is also recorded
 | 
						|
      // as query start time.
 | 
						|
      initStats(csep.sessionID(), sqlText);
 | 
						|
      fStats.fQueryType = csep.queryType();
 | 
						|
 | 
						|
      // Log start and end statement if tracing is enabled.  Keep in
 | 
						|
      // mind the trace flag won't be set for system catalog queries.
 | 
						|
      if (csep.traceOn())
 | 
						|
      {
 | 
						|
        args.reset();
 | 
						|
        args.add((int)csep.statementID());
 | 
						|
        args.add((int)csep.verID().currentScn);
 | 
						|
        args.add(sqlText);
 | 
						|
        msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li);
 | 
						|
        needDbProfEndStatementMsg = true;
 | 
						|
      }
 | 
						|
 | 
						|
      // Don't log subsequent self joins after first.
 | 
						|
      if (selfJoin)
 | 
						|
        sqlText = "";
 | 
						|
 | 
						|
      std::ostringstream oss;
 | 
						|
      oss << sqlText << "; |" << csep.schemaName() << "|";
 | 
						|
      logging::SQLLogger sqlLog(oss.str(), li);
 | 
						|
 | 
						|
      statementsRunningCount->incr(stmtCounted);
 | 
						|
 | 
						|
      PrimitiveServerThreadPools primitiveServerThreadPools(
 | 
						|
          ServicePrimProc::instance()->getPrimitiveServerThreadPool());
 | 
						|
 | 
						|
      if (tryTuples)
 | 
						|
      {
 | 
						|
        try  // @bug2244: try/catch around fIos.write() calls responding to makeTupleList
 | 
						|
        {
 | 
						|
          jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, true, true);
 | 
						|
          // assign query stats
 | 
						|
          jl->queryStats(fStats);
 | 
						|
 | 
						|
          if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0))
 | 
						|
          {
 | 
						|
            usingTuples = true;
 | 
						|
 | 
						|
            // Tell the FE that we're sending tuples back, not TableBands
 | 
						|
            writeCodeAndError(0, "NOERROR");
 | 
						|
            auto tjlp = dynamic_cast<joblist::TupleJobList*>(jl.get());
 | 
						|
            assert(tjlp);
 | 
						|
            messageqcpp::ByteStream tbs;
 | 
						|
            tbs << tjlp->getOutputRowGroup();
 | 
						|
            fIos.write(tbs);
 | 
						|
          }
 | 
						|
          else
 | 
						|
          {
 | 
						|
            const std::string emsg = jl->errMsg();
 | 
						|
            statementsRunningCount->decr(stmtCounted);
 | 
						|
            writeCodeAndError(jl->status(), emsg);
 | 
						|
            std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl;
 | 
						|
            continue;
 | 
						|
          }
 | 
						|
        }
 | 
						|
        catch (std::exception& ex)
 | 
						|
        {
 | 
						|
          std::ostringstream errMsg;
 | 
						|
          errMsg << "ExeMgr: error writing makeJoblist "
 | 
						|
                    "response; "
 | 
						|
                 << ex.what();
 | 
						|
          throw std::runtime_error(errMsg.str());
 | 
						|
        }
 | 
						|
        catch (...)
 | 
						|
        {
 | 
						|
          std::ostringstream errMsg;
 | 
						|
          errMsg << "ExeMgr: unknown error writing makeJoblist "
 | 
						|
                    "response; ";
 | 
						|
          throw std::runtime_error(errMsg.str());
 | 
						|
        }
 | 
						|
 | 
						|
        if (!usingTuples)
 | 
						|
        {
 | 
						|
          if (gDebug)
 | 
						|
            std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          if (gDebug)
 | 
						|
            std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl;
 | 
						|
        }
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        usingTuples = false;
 | 
						|
        jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, false, true);
 | 
						|
 | 
						|
        if (jl->status() == 0)
 | 
						|
        {
 | 
						|
          std::string emsg;
 | 
						|
 | 
						|
          if (jl->putEngineComm(fEc) != 0)
 | 
						|
            throw std::runtime_error(jl->errMsg());
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          throw std::runtime_error("ExeMgr: could not build a JobList!");
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      jl->doQuery();
 | 
						|
 | 
						|
      execplan::CalpontSystemCatalog::OID tableOID;
 | 
						|
      bool swallowRows = false;
 | 
						|
      joblist::DeliveredTableMap tm;
 | 
						|
      uint64_t totalBytesSent = 0;
 | 
						|
      uint64_t totalRowCount = 0;
 | 
						|
 | 
						|
      // Project each table as the FE asks for it
 | 
						|
      for (;;)
 | 
						|
      {
 | 
						|
        bs = fIos.read();
 | 
						|
 | 
						|
        if (bs.length() == 0)
 | 
						|
        {
 | 
						|
          if (gDebug > 1 || (gDebug && !csep.isInternal()))
 | 
						|
            std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl;
 | 
						|
 | 
						|
          break;
 | 
						|
        }
 | 
						|
 | 
						|
        if (gDebug && bs.length() > 4)
 | 
						|
          std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length()
 | 
						|
                    << std::endl;
 | 
						|
 | 
						|
        // TODO: Holy crud! Can this be right?
 | 
						|
        //@bug 1444 Yes, if there is a self-join
 | 
						|
        if (bs.length() > 4)
 | 
						|
        {
 | 
						|
          selfJoin = true;
 | 
						|
          statementsRunningCount->decr(stmtCounted);
 | 
						|
          goto new_plan;
 | 
						|
        }
 | 
						|
 | 
						|
        assert(bs.length() == 4);
 | 
						|
 | 
						|
        messageqcpp::ByteStream::quadbyte qb;
 | 
						|
 | 
						|
        try  // @bug2244: try/catch around fIos.write() calls responding to qb command
 | 
						|
        {
 | 
						|
          bs >> qb;
 | 
						|
 | 
						|
          if (gDebug > 1 || (gDebug && !csep.isInternal()))
 | 
						|
            std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb << std::endl;
 | 
						|
 | 
						|
          if (qb == 0)
 | 
						|
          {
 | 
						|
            // No more tables, query is done
 | 
						|
            break;
 | 
						|
          }
 | 
						|
          else if (qb == 1)
 | 
						|
          {
 | 
						|
            // super-secret flag indicating that the UM is going to scarf down all the rows in the
 | 
						|
            //   query.
 | 
						|
            swallowRows = true;
 | 
						|
            tm = jl->deliveredTables();
 | 
						|
            continue;
 | 
						|
          }
 | 
						|
          else if (qb == 2)
 | 
						|
          {
 | 
						|
            // UM just wants any table
 | 
						|
            assert(swallowRows);
 | 
						|
            auto iter = tm.begin();
 | 
						|
 | 
						|
            if (iter == tm.end())
 | 
						|
            {
 | 
						|
              if (gDebug > 1 || (gDebug && !csep.isInternal()))
 | 
						|
                std::cout << "### For session id " << csep.sessionID() << ", returning end flag" << std::endl;
 | 
						|
 | 
						|
              bs.restart();
 | 
						|
              bs << (messageqcpp::ByteStream::byte)1;
 | 
						|
              fIos.write(bs);
 | 
						|
              continue;
 | 
						|
            }
 | 
						|
 | 
						|
            tableOID = iter->first;
 | 
						|
          }
 | 
						|
          else if (qb == 3)  // special option-UM wants job stats std::string
 | 
						|
          {
 | 
						|
            std::string statsString;
 | 
						|
 | 
						|
            // Log stats std::string to be sent back to front end
 | 
						|
            statsString = formatQueryStats(
 | 
						|
                jl, "Query Stats", false,
 | 
						|
                !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
 | 
						|
                (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), totalRowCount);
 | 
						|
 | 
						|
            bs.restart();
 | 
						|
            bs << statsString;
 | 
						|
 | 
						|
            if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0)
 | 
						|
            {
 | 
						|
              bs << jl->extendedInfo();
 | 
						|
              bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo());
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
              std::string empty;
 | 
						|
              bs << empty;
 | 
						|
              bs << empty;
 | 
						|
            }
 | 
						|
 | 
						|
            // send stats to connector for inserting to the querystats table
 | 
						|
            fStats.serialize(bs);
 | 
						|
            fIos.write(bs);
 | 
						|
            continue;
 | 
						|
          }
 | 
						|
          // for table mode handling
 | 
						|
          else if (qb == 4)
 | 
						|
          {
 | 
						|
            statementsRunningCount->decr(stmtCounted);
 | 
						|
            bs = fIos.read();
 | 
						|
            goto new_plan;
 | 
						|
          }
 | 
						|
          else  // (qb > 3)
 | 
						|
          {
 | 
						|
            // Return table bands for the requested tableOID
 | 
						|
            tableOID = static_cast<execplan::CalpontSystemCatalog::OID>(qb);
 | 
						|
          }
 | 
						|
        }
 | 
						|
        catch (std::exception& ex)
 | 
						|
        {
 | 
						|
          std::ostringstream errMsg;
 | 
						|
          errMsg << "ExeMgr: error writing qb response "
 | 
						|
                    "for qb cmd "
 | 
						|
                 << qb << "; " << ex.what();
 | 
						|
          throw std::runtime_error(errMsg.str());
 | 
						|
        }
 | 
						|
        catch (...)
 | 
						|
        {
 | 
						|
          std::ostringstream errMsg;
 | 
						|
          errMsg << "ExeMgr: unknown error writing qb response "
 | 
						|
                    "for qb cmd "
 | 
						|
                 << qb;
 | 
						|
          throw std::runtime_error(errMsg.str());
 | 
						|
        }
 | 
						|
 | 
						|
        if (swallowRows)
 | 
						|
          tm.erase(tableOID);
 | 
						|
 | 
						|
        FEMsgHandler msgHandler(jl, &fIos);
 | 
						|
 | 
						|
        if (tableOID == 100)
 | 
						|
          msgHandler.start();
 | 
						|
 | 
						|
        //...Loop serializing table bands projected for the tableOID
 | 
						|
        for (;;)
 | 
						|
        {
 | 
						|
          uint32_t rowCount;
 | 
						|
 | 
						|
          rowCount = jl->projectTable(tableOID, bs);
 | 
						|
 | 
						|
          msgHandler.stop();
 | 
						|
 | 
						|
          if (jl->status())
 | 
						|
          {
 | 
						|
            const auto errInfo = logging::IDBErrorInfo::instance();
 | 
						|
 | 
						|
            if (jl->errMsg().length() != 0)
 | 
						|
              bs << jl->errMsg();
 | 
						|
            else
 | 
						|
              bs << errInfo->errorMsg(jl->status());
 | 
						|
          }
 | 
						|
 | 
						|
          if (!swallowRows)
 | 
						|
          {
 | 
						|
            try  // @bug2244: try/catch around fIos.write() calls projecting rows
 | 
						|
            {
 | 
						|
              if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
 | 
						|
              {
 | 
						|
                // Skip the write to the front end until the last empty band.  Used to time queries
 | 
						|
                // through without any front end waiting.
 | 
						|
                if (tableOID < 3000 || rowCount == 0)
 | 
						|
                  fIos.write(bs);
 | 
						|
              }
 | 
						|
              else
 | 
						|
              {
 | 
						|
                fIos.write(bs);
 | 
						|
              }
 | 
						|
            }
 | 
						|
            catch (std::exception& ex)
 | 
						|
            {
 | 
						|
              msgHandler.stop();
 | 
						|
              std::ostringstream errMsg;
 | 
						|
              errMsg << "ExeMgr: error projecting rows "
 | 
						|
                        "for tableOID: "
 | 
						|
                     << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; "
 | 
						|
                     << ex.what();
 | 
						|
              jl->abort();
 | 
						|
 | 
						|
              while (rowCount)
 | 
						|
                rowCount = jl->projectTable(tableOID, bs);
 | 
						|
 | 
						|
              if (tableOID == 100 && msgHandler.aborted())
 | 
						|
              {
 | 
						|
                /* TODO: modularize the cleanup code, as well as
 | 
						|
                 * the rest of this fcn */
 | 
						|
 | 
						|
                decThreadCntPerSession(csep.sessionID() | 0x80000000);
 | 
						|
                statementsRunningCount->decr(stmtCounted);
 | 
						|
                fIos.close();
 | 
						|
                return;
 | 
						|
              }
 | 
						|
 | 
						|
              // std::cout << "connection drop\n";
 | 
						|
              throw std::runtime_error(errMsg.str());
 | 
						|
            }
 | 
						|
            catch (...)
 | 
						|
            {
 | 
						|
              std::ostringstream errMsg;
 | 
						|
              msgHandler.stop();
 | 
						|
              errMsg << "ExeMgr: unknown error projecting rows "
 | 
						|
                        "for tableOID: "
 | 
						|
                     << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount;
 | 
						|
              jl->abort();
 | 
						|
 | 
						|
              while (rowCount)
 | 
						|
                rowCount = jl->projectTable(tableOID, bs);
 | 
						|
 | 
						|
              throw std::runtime_error(errMsg.str());
 | 
						|
            }
 | 
						|
          }
 | 
						|
          totalRowCount += rowCount;
 | 
						|
          totalBytesSent += bs.length();
 | 
						|
 | 
						|
          if (rowCount == 0)
 | 
						|
          {
 | 
						|
            msgHandler.stop();
 | 
						|
            // No more bands, table is done
 | 
						|
            bs.reset();
 | 
						|
 | 
						|
            // @bug 2083 decr active statement count here for table mode.
 | 
						|
            if (!usingTuples)
 | 
						|
              statementsRunningCount->decr(stmtCounted);
 | 
						|
 | 
						|
            break;
 | 
						|
          }
 | 
						|
          else
 | 
						|
          {
 | 
						|
            bs.restart();
 | 
						|
          }
 | 
						|
        }  // End of loop to project and serialize table bands for a table
 | 
						|
      }    // End of loop to process tables
 | 
						|
 | 
						|
      // @bug 828
 | 
						|
      if (csep.traceOn())
 | 
						|
        jl->graph(csep.sessionID());
 | 
						|
 | 
						|
      if (needDbProfEndStatementMsg)
 | 
						|
      {
 | 
						|
        std::string ss;
 | 
						|
        std::ostringstream prefix;
 | 
						|
        prefix << "ses:" << csep.sessionID() << " Query Totals";
 | 
						|
 | 
						|
        // Log stats std::string to standard out
 | 
						|
        ss = formatQueryStats(jl, prefix.str(), true,
 | 
						|
                              !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
 | 
						|
                              (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG),
 | 
						|
                              totalRowCount);
 | 
						|
        //@Bug 1306. Added timing info for real time tracking.
 | 
						|
        std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl;
 | 
						|
 | 
						|
        // log query stats to debug log file
 | 
						|
        args.reset();
 | 
						|
        args.add((int)csep.statementID());
 | 
						|
        args.add(fStats.fMaxMemPct);
 | 
						|
        args.add(fStats.fNumFiles);
 | 
						|
        args.add(fStats.fFileBytes);  // log raw byte count instead of MB
 | 
						|
        args.add(fStats.fPhyIO);
 | 
						|
        args.add(fStats.fCacheIO);
 | 
						|
        args.add(fStats.fMsgRcvCnt);
 | 
						|
        args.add(fStats.fMsgBytesIn);
 | 
						|
        args.add(fStats.fMsgBytesOut);
 | 
						|
        args.add(fStats.fCPBlocksSkipped);
 | 
						|
        msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfQueryStats, args, li);
 | 
						|
        //@bug 1327
 | 
						|
        deleteMaxMemPct(csep.sessionID());
 | 
						|
        // Calling reset here, will cause joblist destructor to be
 | 
						|
        // called, which "joins" the threads.  We need to do that
 | 
						|
        // here to make sure all syslogging from all the threads
 | 
						|
        // are complete; and that our logDbProfEndStatement will
 | 
						|
        // appear "last" in the syslog for this SQL statement.
 | 
						|
        // puts the real destruction in another thread to avoid
 | 
						|
        // making the whole session wait.  It can take several seconds.
 | 
						|
        int stmtID = csep.statementID();
 | 
						|
        std::unique_lock<std::mutex> scoped(jlMutex);
 | 
						|
        destructing++;
 | 
						|
        std::thread bgdtor(
 | 
						|
            [jl, &jlMutex, &jlCleanupDone, stmtID, li, &destructing, &msgLog]
 | 
						|
            {
 | 
						|
              std::unique_lock<std::mutex> scoped(jlMutex);
 | 
						|
              const_cast<joblist::SJLP&>(jl).reset();  // this happens second; does real destruction
 | 
						|
              logging::Message::Args args;
 | 
						|
              args.add(stmtID);
 | 
						|
              msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li);
 | 
						|
              if (--destructing == 0)
 | 
						|
                jlCleanupDone.notify_one();
 | 
						|
            });
 | 
						|
        jl.reset();  // this happens first
 | 
						|
        bgdtor.detach();
 | 
						|
      }
 | 
						|
      else
 | 
						|
        // delete sessionMemMap entry for this session's memory % use
 | 
						|
        deleteMaxMemPct(csep.sessionID());
 | 
						|
 | 
						|
      std::string endtime(globServiceExeMgr->timeNow());
 | 
						|
 | 
						|
      if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000))
 | 
						|
      {
 | 
						|
        std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at "
 | 
						|
                  << endtime << std::endl;
 | 
						|
 | 
						|
        // @bug 663 - Implemented caltraceon(16) to replace the
 | 
						|
        // $FIFO_SINK compiler definition in pColStep.
 | 
						|
        // This option consumes rows in the project steps.
 | 
						|
        if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4)
 | 
						|
        {
 | 
						|
          std::cout << std::endl;
 | 
						|
          std::cout << "**** No data returned to DM.  Rows consumed "
 | 
						|
                       "in ProjectSteps - caltrace(16) is on (FIFO_SINK)."
 | 
						|
                       " ****"
 | 
						|
                    << std::endl;
 | 
						|
          std::cout << std::endl;
 | 
						|
        }
 | 
						|
        else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
 | 
						|
        {
 | 
						|
          std::cout << std::endl;
 | 
						|
          std::cout << "**** No data returned to DM - caltrace(8) is "
 | 
						|
                       "on (SWALLOW_ROWS_EXEMGR). ****"
 | 
						|
                    << std::endl;
 | 
						|
          std::cout << std::endl;
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      statementsRunningCount->decr(stmtCounted);
 | 
						|
 | 
						|
      if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT"))
 | 
						|
      {
 | 
						|
        qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY;
 | 
						|
        qts.max_mem_pct = fStats.fMaxMemPct;
 | 
						|
        qts.num_files = fStats.fNumFiles;
 | 
						|
        qts.phy_io = fStats.fPhyIO;
 | 
						|
        qts.cache_io = fStats.fCacheIO;
 | 
						|
        qts.msg_rcv_cnt = fStats.fMsgRcvCnt;
 | 
						|
        qts.cp_blocks_skipped = fStats.fCPBlocksSkipped;
 | 
						|
        qts.msg_bytes_in = fStats.fMsgBytesIn;
 | 
						|
        qts.msg_bytes_out = fStats.fMsgBytesOut;
 | 
						|
        qts.rows = totalRowCount;
 | 
						|
        qts.end_time = querytele::QueryTeleClient::timeNowms();
 | 
						|
        qts.session_id = csep.sessionID();
 | 
						|
        qts.query_type = csep.queryType();
 | 
						|
        qts.query = csep.data();
 | 
						|
        qts.system_name = fOamCachePtr->getSystemName();
 | 
						|
        qts.module_name = fOamCachePtr->getModuleName();
 | 
						|
        qts.local_query = csep.localQuery();
 | 
						|
        fTeleClient.postQueryTele(qts);
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    // Release CSC object (for sessionID) that was added by makeJobList()
 | 
						|
    // Mask 0x80000000 is for associate user query and csc query.
 | 
						|
    // (actual joblist destruction happens at the top of this loop)
 | 
						|
    decThreadCntPerSession(csep.sessionID() | 0x80000000);
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    decThreadCntPerSession(csep.sessionID() | 0x80000000);
 | 
						|
    statementsRunningCount->decr(stmtCounted);
 | 
						|
    std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl;
 | 
						|
    logging::Message::Args args;
 | 
						|
    logging::LoggingID li(16, csep.sessionID(), csep.txnID());
 | 
						|
    args.add(ex.what());
 | 
						|
    msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li);
 | 
						|
    fIos.close();
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    decThreadCntPerSession(csep.sessionID() | 0x80000000);
 | 
						|
    statementsRunningCount->decr(stmtCounted);
 | 
						|
    std::cerr << "### Exception caught!" << std::endl;
 | 
						|
    logging::Message::Args args;
 | 
						|
    logging::LoggingID li(16, csep.sessionID(), csep.txnID());
 | 
						|
    args.add("ExeMgr caught unknown exception");
 | 
						|
    msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li);
 | 
						|
    fIos.close();
 | 
						|
  }
 | 
						|
 | 
						|
  // make sure we don't leave scope while joblists are being destroyed
 | 
						|
  std::unique_lock<std::mutex> scoped(jlMutex);
 | 
						|
  while (destructing > 0)
 | 
						|
    jlCleanupDone.wait(scoped);
 | 
						|
}
 | 
						|
};  // namespace exemgr
 |