1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/jobstep.cpp
Denis Khalikov 896e8dd769 MCOL-5522 Properly process pm join result count. (#2909)
This patch:
1. Properly processes situation when pm join result count is exceeded.
2. Adds session variable 'columnstore_max_pm_join_result_count` to control the limit.
2023-08-04 16:55:45 +03:00

248 lines
8.6 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (c) 2016-2020 MariaDB
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: jobstep.cpp 9414 2013-04-22 22:18:30Z xlou $
#include <iostream>
#include <string>
using namespace std;
#include <stdlib.h>
#include <boost/thread.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
namespace bu = boost::uuids;
#include "configcpp.h"
using namespace config;
#include "calpontsystemcatalog.h"
#include "calpontselectexecutionplan.h"
#include "messagelog.h"
#include "messageids.h"
#include "timestamp.h"
#include "oamcache.h"
#include "jobstep.h"
#include "jlf_common.h"
using namespace logging;
#include "querytele.h"
using namespace querytele;
namespace
{
int toInt(const string& val)
{
if (val.length() == 0)
return -1;
return static_cast<int>(config::Config::fromText(val));
}
} // namespace
namespace joblist
{
boost::mutex JobStep::fLogMutex; //=PTHREAD_MUTEX_INITIALIZER;
threadpool::ThreadPool JobStep::jobstepThreadPool(defaultJLThreadPoolSize, 0);
ostream& operator<<(ostream& os, const JobStep* rhs)
{
os << rhs->toString();
return os;
}
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
JobStep::JobStep(const JobInfo& j)
: fSessionId(j.sessionId)
, fTxnId(j.txnId)
, fVerId(j.verId)
, fStatementId(j.statementId)
, fStepId(0)
, fTupleId(-1)
, fTraceFlags(0)
, fCardinality(0)
, fDelayedRunFlag(false)
, fDelivery(false)
, fOnClauseFilter(false)
, fDie(false)
, fWaitToRunStepCnt(0)
, fPriority(1)
, fErrorInfo(j.errorInfo)
, fLogger(j.logger)
, fLocalQuery(j.localQuery)
, fQueryUuid(j.uuid)
, fProgress(0)
, fStartTime(-1)
, fTimeZone(j.timeZone)
, fMaxPmJoinResultCount(j.maxPmJoinResultCount)
{
QueryTeleServerParms tsp;
string teleServerHost(Config::makeConfig()->getConfig("QueryTele", "Host"));
if (!teleServerHost.empty())
{
int teleServerPort = toInt(Config::makeConfig()->getConfig("QueryTele", "Port"));
if (teleServerPort > 0)
{
tsp.host = teleServerHost;
tsp.port = teleServerPort;
}
}
fQtc.serverParms(tsp);
// fStepUuid = bu::random_generator()();
fStepUuid = QueryTeleClient::genUUID();
}
//------------------------------------------------------------------------------
// Log a syslog msg for the start of this specified job step
//------------------------------------------------------------------------------
void JobStep::syslogStartStep(uint32_t subSystem, const string& stepName) const
{
LoggingID logId(subSystem, sessionId(), txnId());
MessageLog msgLog(logId);
Message msgStartStep(M0030);
Message::Args args;
args.add((uint64_t)statementId()); // statement id for this job step
args.add((int)stepId()); // step id for this job step
args.add(stepName); // step name for this job step
msgStartStep.format(args);
msgLog.logDebugMessage(msgStartStep);
}
//------------------------------------------------------------------------------
// Log a syslog message for the end of this specified job step
//------------------------------------------------------------------------------
void JobStep::syslogEndStep(uint32_t subSystem, uint64_t blockedDLInput, uint64_t blockedDLOutput,
uint64_t msgBytesInput, uint64_t msgBytesOutput) const
{
LoggingID logId(subSystem, sessionId(), txnId());
MessageLog msgLog(logId);
Message msgEndStep(M0031);
Message::Args args;
args.add((uint64_t)statementId()); // statement id for this job step
args.add((int)stepId()); // step id for this job step
args.add(blockedDLInput); // blocked datalist input (ex: fifo)
args.add(blockedDLOutput); // blocked datalist output(ex: fifo)
args.add(msgBytesInput); // incoming msg byte count
args.add(msgBytesOutput); // outgoing msg byte count
msgEndStep.format(args);
msgLog.logDebugMessage(msgEndStep);
}
//------------------------------------------------------------------------------
// Log a syslog message for the physical vs cache block I/O counts
//------------------------------------------------------------------------------
void JobStep::syslogReadBlockCounts(uint32_t subSystem, uint64_t physicalReadCount, uint64_t cacheReadCount,
uint64_t casualPartBlocks) const
{
LoggingID logId(subSystem, sessionId(), txnId());
MessageLog msgLog(logId);
Message msgEndStep(M0032);
Message::Args args;
args.add((uint64_t)statementId()); // statement id for this job step
args.add((int)stepId()); // step id for this job step
args.add((int)oid()); // step id for this job step
args.add(physicalReadCount); // blocked datalist input (ex: fifo)
args.add(cacheReadCount); // blocked datalist output(ex: fifo)
args.add(casualPartBlocks); // casual partition block hits
msgEndStep.format(args);
msgLog.logDebugMessage(msgEndStep);
}
//------------------------------------------------------------------------------
// Log a syslog msg for the effective start/end times for this step
// (lastWriteTime denotes when the EndOfInput marker was written out).
//------------------------------------------------------------------------------
void JobStep::syslogProcessingTimes(uint32_t subSystem, const struct timeval& firstReadTime,
const struct timeval& lastReadTime, const struct timeval& firstWriteTime,
const struct timeval& lastWriteTime) const
{
LoggingID logId(subSystem, sessionId(), txnId());
MessageLog msgLog(logId);
Message msgStartStep(M0046);
Message::Args args;
args.add((uint64_t)statementId()); // statement id for this job step
args.add((int)stepId()); // step id for this job step
args.add(JSTimeStamp::format(firstReadTime)); // when first DL input element read
args.add(JSTimeStamp::format(lastReadTime)); // when last DL input element read
args.add(JSTimeStamp::format(firstWriteTime)); // when first DL output elem written
args.add(JSTimeStamp::format(lastWriteTime)); // when EndOfInput written to DL out
msgStartStep.format(args);
msgLog.logDebugMessage(msgStartStep);
}
bool JobStep::traceOn() const
{
return fTraceFlags & execplan::CalpontSelectExecutionPlan::TRACE_LOG;
}
//////////////////////////////////////////////////////////////////////
// DESCRIPTION:
// The m() rethrows a query runtime exception and handles it across
// all steps in a uniform
// way.
// PARAMETERS:
// e ptr to the exception raised
// errorCode error code to log
// critErrorCode is this a crit IDBExcept or not
// methodName method name to log
//////////////////////////////////////////////////////////////////////
void JobStep::handleException(std::exception_ptr e, const int errorCode, const unsigned infoErrorCode,
const std::string& methodName)
{
try
{
std::rethrow_exception(e);
}
catch (const IDBExcept& iex)
{
std::cerr << methodName << " caught a internal exception. " << std::endl;
catchHandler(methodName + " " + iex.what(), iex.errorCode(), fErrorInfo, fSessionId,
(iex.errorCode() == infoErrorCode ? LOG_TYPE_INFO : LOG_TYPE_CRITICAL));
}
catch (boost::exception& e)
{
std::cerr << methodName << " caught a boost::exception. " << std::endl;
catchHandler(methodName + " caught " + boost::diagnostic_information(e), errorCode, fErrorInfo,
fSessionId);
}
catch (const std::exception& ex)
{
std::cerr << methodName << " caught an exception. " << std::endl;
catchHandler(methodName + " caught " + ex.what(), errorCode, fErrorInfo, fSessionId);
}
catch (...)
{
std::ostringstream oss;
std::cerr << methodName << " caught an unknown exception." << std::endl;
catchHandler(methodName + " caught an unknown exception ", errorCode, fErrorInfo, fSessionId);
}
}
} // namespace joblist