1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-26 05:02:32 +03:00
Files
mariadb-columnstore-engine/primitives/primproc/sqlfrontsessionthread.cpp
David.Hall 53af74b027 MCOL-1170 Fix ANALYZE to not error (#2682)
Analyze needs to be completed differently than a normal query. In server, when an ANALYZE is seen, it calls init_scan() immediatly followed by end_scan(). This leaves the sqlfrontendsession (ExeMgr) in a state where it expects to return rows. This patch fixes end_scan to clean this up via reads and writes to get everything back in synch.

ANALYZE should display the number of rows to be displayed if the query were run normally. We have that information available, but no way to return it. A modification to server side to ask for that in the handler is required.

This patch also includes a beautification of sqlfrontsessionthread.cpp since it looked bad. The important change is at line 774
if (!swallowRows)
which short circuits the actual return of data
2023-01-09 13:59:26 -06:00

1002 lines
32 KiB
C++

/* Copyright (C) 2022 Mariadb Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "sqlfrontsessionthread.h"
#include "primproc.h"
#include "primitiveserverthreadpools.h"
namespace exemgr
{
uint64_t SQLFrontSessionThread::getMaxMemPct(uint32_t sessionId)
{
return globServiceExeMgr->getMaxMemPct(sessionId);
}
void SQLFrontSessionThread::deleteMaxMemPct(uint32_t sessionId)
{
return globServiceExeMgr->deleteMaxMemPct(sessionId);
}
void SQLFrontSessionThread::incThreadCntPerSession(uint32_t sessionId)
{
return globServiceExeMgr->incThreadCntPerSession(sessionId);
}
void SQLFrontSessionThread::decThreadCntPerSession(uint32_t sessionId)
{
return globServiceExeMgr->decThreadCntPerSession(sessionId);
}
void SQLFrontSessionThread::initMaxMemPct(uint32_t sessionId)
{
return globServiceExeMgr->initMaxMemPct(sessionId);
}
//...Get and log query stats to specified output stream
const std::string SQLFrontSessionThread::formatQueryStats(
joblist::SJLP& jl, // joblist associated with query
const std::string& label, // header label to print in front of log output
bool includeNewLine, // include line breaks in query stats std::string
bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned)
{
std::ostringstream os;
// Get stats if not already acquired for current query
if (!fStatsRetrieved)
{
if (wantExtendedStats)
{
// wait for the ei data to be written by another thread (brain-dead)
struct timespec req = {0, 250000}; // 250 usec
nanosleep(&req, 0);
}
// Get % memory usage during current query for sessionId
jl->querySummary(wantExtendedStats);
fStats = jl->queryStats();
fStats.fMaxMemPct = getMaxMemPct(fStats.fSessionID);
fStats.fRows = rowsReturned;
fStatsRetrieved = true;
}
std::string queryMode;
queryMode = (vtableModeOn ? "Distributed" : "Standard");
// Log stats to specified output stream
os << label << ": MaxMemPct-" << fStats.fMaxMemPct << "; NumTempFiles-" << fStats.fNumFiles
<< "; TempFileSpace-" << roundBytes(fStats.fFileBytes) << "; ApproxPhyI/O-" << fStats.fPhyIO
<< "; CacheI/O-" << fStats.fCacheIO << "; BlocksTouched-" << fStats.fMsgRcvCnt;
if (includeNewLine)
os << std::endl << " "; // insert line break
else
os << "; "; // continue without line break
os << "PartitionBlocksEliminated-" << fStats.fCPBlocksSkipped << "; MsgBytesIn-"
<< roundBytes(fStats.fMsgBytesIn) << "; MsgBytesOut-" << roundBytes(fStats.fMsgBytesOut) << "; Mode-"
<< queryMode;
return os.str();
}
//... Round off to human readable format (KB, MB, or GB).
const std::string SQLFrontSessionThread::roundBytes(uint64_t value) const
{
const char* units[] = {"B", "KB", "MB", "GB", "TB"};
uint64_t i = 0, up = 0;
uint64_t roundedValue = value;
while (roundedValue > 1024 && i < 4)
{
up = (roundedValue & 512);
roundedValue /= 1024;
i++;
}
if (up)
roundedValue++;
std::ostringstream oss;
oss << roundedValue << units[i];
return oss.str();
}
//...Round off to nearest (1024*1024) MB
uint64_t SQLFrontSessionThread::roundMB(uint64_t value) const
{
uint64_t roundedValue = value >> 20;
if (value & 524288)
roundedValue++;
return roundedValue;
}
void SQLFrontSessionThread::setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms)
{
for (execplan::CalpontSelectExecutionPlan::RMParmVec::const_iterator it = parms.begin(); it != parms.end();
++it)
{
switch (it->id)
{
case execplan::PMSMALLSIDEMEMORY:
{
globServiceExeMgr->getRm().addHJPmMaxSmallSideMap(it->sessionId, it->value);
break;
}
case execplan::UMSMALLSIDEMEMORY:
{
globServiceExeMgr->getRm().addHJUmMaxSmallSideMap(it->sessionId, it->value);
break;
}
default:;
}
}
}
void SQLFrontSessionThread::buildSysCache(const execplan::CalpontSelectExecutionPlan& csep,
boost::shared_ptr<execplan::CalpontSystemCatalog> csc)
{
const execplan::CalpontSelectExecutionPlan::ColumnMap& colMap = csep.columnMap();
std::string schemaName;
for (auto it = colMap.begin(); it != colMap.end(); ++it)
{
const auto sc = dynamic_cast<execplan::SimpleColumn*>((it->second).get());
if (sc)
{
schemaName = sc->schemaName();
// only the first time a schema is got will actually query
// system catalog. System catalog keeps a schema name std::map.
// if a schema exists, the call getSchemaInfo returns without
// doing anything.
if (!schemaName.empty())
csc->getSchemaInfo(schemaName);
}
}
execplan::CalpontSelectExecutionPlan::SelectList::const_iterator subIt;
for (subIt = csep.derivedTableList().begin(); subIt != csep.derivedTableList().end(); ++subIt)
{
buildSysCache(*(dynamic_cast<execplan::CalpontSelectExecutionPlan*>(subIt->get())), csc);
}
}
void SQLFrontSessionThread::writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg)
{
messageqcpp::ByteStream emsgBs;
messageqcpp::ByteStream tbs;
tbs << code;
fIos.write(tbs);
emsgBs << emsg;
fIos.write(emsgBs);
}
void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl,
bool& stmtCounted)
{
auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount();
messageqcpp::ByteStream::quadbyte qb;
execplan::MCSAnalyzeTableExecutionPlan caep;
bs = fIos.read();
caep.unserialize(bs);
statementsRunningCount->incr(stmtCounted);
PrimitiveServerThreadPools primitiveServerThreadPools;
jl = joblist::JobListFactory::makeJobList(&caep, fRm, primitiveServerThreadPools, false, true);
// Joblist is empty.
if (jl->status() == logging::statisticsJobListEmpty)
{
if (caep.traceOn())
std::cout << "JobList is empty " << std::endl;
jl.reset();
bs.restart();
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
statementsRunningCount->decr(stmtCounted);
return;
}
if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
{
std::cout << "fEc setup " << std::endl;
fEc->Setup();
}
if (jl->status() == 0)
{
std::string emsg;
if (jl->putEngineComm(fEc) != 0)
throw std::runtime_error(jl->errMsg());
}
else
{
throw std::runtime_error("ExeMgr: could not build a JobList!");
}
// Execute a joblist.
jl->doQuery();
FEMsgHandler msgHandler(jl, &fIos);
msgHandler.start();
// Seemls like this a legacy parameter, not really needed.
const uint32_t dummyTableOid = 100;
auto* statisticsManager = statistics::StatisticsManager::instance();
// Process rowGroup by rowGroup.
auto rowCount = jl->projectTable(dummyTableOid, bs);
while (rowCount)
{
auto outRG = (static_cast<joblist::TupleJobList*>(jl.get()))->getOutputRowGroup();
statisticsManager->collectSample(outRG);
rowCount = jl->projectTable(dummyTableOid, bs);
}
msgHandler.stop();
// Analyze collected samples.
statisticsManager->analyzeSample(caep.traceOn());
statisticsManager->incEpoch();
statisticsManager->saveToFile();
// Distribute statistics across all ExeMgr clients if possible.
statistics::StatisticsDistributor::instance()->distributeStatistics();
// Send the signal back to front-end.
bs.restart();
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
statementsRunningCount->decr(stmtCounted);
}
void SQLFrontSessionThread::analyzeTableHandleStats(messageqcpp::ByteStream& bs)
{
messageqcpp::ByteStream::quadbyte qb;
#ifdef DEBUG_STATISTICS
std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
#endif
bs = fIos.read();
#ifdef DEBUG_STATISTICS
std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
#endif
uint64_t dataHashRec;
bs >> dataHashRec;
uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats();
// The stats are the same.
if (dataHash == dataHashRec)
{
#ifdef DEBUG_STATISTICS
std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
#endif
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
return;
}
bs.restart();
qb = ANALYZE_TABLE_NEED_STATS;
bs << qb;
fIos.write(bs);
bs.restart();
bs = fIos.read();
#ifdef DEBUG_STATISTICS
std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
#endif
statistics::StatisticsManager::instance()->unserialize(bs);
statistics::StatisticsManager::instance()->saveToFile();
#ifdef DEBUG_STATISTICS
std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl;
#endif
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
}
void SQLFrontSessionThread::operator()()
{
messageqcpp::ByteStream bs, inbs;
execplan::CalpontSelectExecutionPlan csep;
csep.sessionID(0);
joblist::SJLP jl;
bool incSQLFrontSessionThreadCnt = true;
std::mutex jlMutex;
std::condition_variable jlCleanupDone;
int destructing = 0;
int gDebug = globServiceExeMgr->getDebugLevel();
logging::Logger& msgLog = globServiceExeMgr->getLogger();
bool selfJoin = false;
bool tryTuples = false;
bool usingTuples = false;
bool stmtCounted = false;
auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount();
try
{
for (;;)
{
selfJoin = false;
tryTuples = false;
usingTuples = false;
if (jl)
{
// puts the real destruction in another thread to avoid
// making the whole session wait. It can take several seconds.
std::unique_lock<std::mutex> scoped(jlMutex);
destructing++;
std::thread bgdtor(
[jl, &jlMutex, &jlCleanupDone, &destructing]
{
std::unique_lock<std::mutex> scoped(jlMutex);
const_cast<joblist::SJLP&>(jl).reset(); // this happens second; does real destruction
if (--destructing == 0)
jlCleanupDone.notify_one();
});
jl.reset(); // this runs first
bgdtor.detach();
}
bs = fIos.read();
if (bs.length() == 0)
{
if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl;
// connection closed by client
fIos.close();
break;
}
else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan
{
if (gDebug)
std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length "
<< bs.length() << std::endl;
fIos.close();
break;
}
else if (bs.length() == 4) // possible tuple flag
{
messageqcpp::ByteStream::quadbyte qb;
bs >> qb;
if (qb == 4) // UM wants new tuple i/f
{
if (gDebug)
std::cout << "### UM wants tuples" << std::endl;
tryTuples = true;
// now wait for the CSEP...
bs = fIos.read();
}
else if (qb == 5) // somebody wants stats
{
bs.restart();
qb = statementsRunningCount->cur();
bs << qb;
qb = statementsRunningCount->waiting();
bs << qb;
fIos.write(bs);
fIos.close();
break;
}
else if (qb == ANALYZE_TABLE_EXECUTE)
{
analyzeTableExecute(bs, jl, stmtCounted);
continue;
}
else if (qb == ANALYZE_TABLE_REC_STATS)
{
analyzeTableHandleStats(bs);
continue;
}
else if (qb == 0)
{
// 0 => Nothing left to do. Sent by rnd_end() just to be sure.
continue;
}
else
{
if (gDebug)
std::cout << "### Got a not-a-plan value " << qb << std::endl;
fIos.close();
break;
}
}
new_plan:
try
{
csep.unserialize(bs);
}
catch (logging::IDBExcept& ex)
{
// We can get here on illegal function parameter data type, e.g.
// SELECT blob_column|1 FROM t1;
statementsRunningCount->decr(stmtCounted);
writeCodeAndError(ex.errorCode(), std::string(ex.what()));
continue;
}
querytele::QueryTeleStats qts;
if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT"))
{
qts.query_uuid = csep.uuid();
qts.msg_type = querytele::QueryTeleStats::QT_START;
qts.start_time = querytele::QueryTeleClient::timeNowms();
qts.query = csep.data();
qts.session_id = csep.sessionID();
qts.query_type = csep.queryType();
qts.system_name = fOamCachePtr->getSystemName();
qts.module_name = fOamCachePtr->getModuleName();
qts.local_query = csep.localQuery();
qts.schema_name = csep.schemaName();
fTeleClient.postQueryTele(qts);
}
if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl;
setRMParms(csep.rmParms());
// Re-establish lost PP connections.
if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
{
fEc->Setup();
}
// @bug 1021. try to get schema cache for a come in query.
// skip system catalog queries.
if (!csep.isInternal())
{
boost::shared_ptr<execplan::CalpontSystemCatalog> csc =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID());
buildSysCache(csep, csc);
}
// As soon as we have a session id for this thread, update the
// thread count per session; only do this once per thread.
// Mask 0x80000000 is for associate user query and csc query
if (incSQLFrontSessionThreadCnt)
{
// WIP
incThreadCntPerSession(csep.sessionID() | 0x80000000);
incSQLFrontSessionThreadCnt = false;
}
bool needDbProfEndStatementMsg = false;
logging::Message::Args args;
std::string sqlText = csep.data();
logging::LoggingID li(16, csep.sessionID(), csep.txnID());
// Initialize stats for this query, including
// init sessionMemMap entry for this session to 0 memory %.
// We will need this later for traceOn() or if we receive a
// table request with qb=3 (see below). This is also recorded
// as query start time.
initStats(csep.sessionID(), sqlText);
fStats.fQueryType = csep.queryType();
// Log start and end statement if tracing is enabled. Keep in
// mind the trace flag won't be set for system catalog queries.
if (csep.traceOn())
{
args.reset();
args.add((int)csep.statementID());
args.add((int)csep.verID().currentScn);
args.add(sqlText);
msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li);
needDbProfEndStatementMsg = true;
}
// Don't log subsequent self joins after first.
if (selfJoin)
sqlText = "";
std::ostringstream oss;
oss << sqlText << "; |" << csep.schemaName() << "|";
logging::SQLLogger sqlLog(oss.str(), li);
statementsRunningCount->incr(stmtCounted);
PrimitiveServerThreadPools primitiveServerThreadPools(
ServicePrimProc::instance()->getPrimitiveServerThreadPool());
if (tryTuples)
{
try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList
{
jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, true, true);
// assign query stats
jl->queryStats(fStats);
if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0))
{
usingTuples = true;
// Tell the FE that we're sending tuples back, not TableBands
writeCodeAndError(0, "NOERROR");
auto tjlp = dynamic_cast<joblist::TupleJobList*>(jl.get());
assert(tjlp);
messageqcpp::ByteStream tbs;
tbs << tjlp->getOutputRowGroup();
fIos.write(tbs);
}
else
{
const std::string emsg = jl->errMsg();
statementsRunningCount->decr(stmtCounted);
writeCodeAndError(jl->status(), emsg);
std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl;
continue;
}
}
catch (std::exception& ex)
{
std::ostringstream errMsg;
errMsg << "ExeMgr: error writing makeJoblist "
"response; "
<< ex.what();
throw std::runtime_error(errMsg.str());
}
catch (...)
{
std::ostringstream errMsg;
errMsg << "ExeMgr: unknown error writing makeJoblist "
"response; ";
throw std::runtime_error(errMsg.str());
}
if (!usingTuples)
{
if (gDebug)
std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl;
}
else
{
if (gDebug)
std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl;
}
}
else
{
usingTuples = false;
jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, false, true);
if (jl->status() == 0)
{
std::string emsg;
if (jl->putEngineComm(fEc) != 0)
throw std::runtime_error(jl->errMsg());
}
else
{
throw std::runtime_error("ExeMgr: could not build a JobList!");
}
}
jl->doQuery();
execplan::CalpontSystemCatalog::OID tableOID;
bool swallowRows = false;
joblist::DeliveredTableMap tm;
uint64_t totalBytesSent = 0;
uint64_t totalRowCount = 0;
// Project each table as the FE asks for it
for (;;)
{
bs = fIos.read();
if (bs.length() == 0)
{
if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl;
break;
}
if (gDebug && bs.length() > 4)
std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length()
<< std::endl;
// TODO: Holy crud! Can this be right?
//@bug 1444 Yes, if there is a self-join
if (bs.length() > 4)
{
selfJoin = true;
statementsRunningCount->decr(stmtCounted);
goto new_plan;
}
assert(bs.length() == 4);
messageqcpp::ByteStream::quadbyte qb;
try // @bug2244: try/catch around fIos.write() calls responding to qb command
{
bs >> qb;
if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb << std::endl;
if (qb == 0)
{
// No more tables, query is done
break;
}
else if (qb == 1)
{
// super-secret flag indicating that the UM is going to scarf down all the rows in the
// query.
swallowRows = true;
tm = jl->deliveredTables();
continue;
}
else if (qb == 2)
{
// UM just wants any table
assert(swallowRows);
auto iter = tm.begin();
if (iter == tm.end())
{
if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### For session id " << csep.sessionID() << ", returning end flag" << std::endl;
bs.restart();
bs << (messageqcpp::ByteStream::byte)1;
fIos.write(bs);
continue;
}
tableOID = iter->first;
}
else if (qb == 3) // special option-UM wants job stats std::string
{
std::string statsString;
// Log stats std::string to be sent back to front end
statsString = formatQueryStats(
jl, "Query Stats", false,
!(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), totalRowCount);
bs.restart();
bs << statsString;
if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0)
{
bs << jl->extendedInfo();
bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo());
}
else
{
std::string empty;
bs << empty;
bs << empty;
}
// send stats to connector for inserting to the querystats table
fStats.serialize(bs);
fIos.write(bs);
continue;
}
// for table mode handling
else if (qb == 4)
{
statementsRunningCount->decr(stmtCounted);
bs = fIos.read();
goto new_plan;
}
else // (qb > 3)
{
// Return table bands for the requested tableOID
tableOID = static_cast<execplan::CalpontSystemCatalog::OID>(qb);
}
}
catch (std::exception& ex)
{
std::ostringstream errMsg;
errMsg << "ExeMgr: error writing qb response "
"for qb cmd "
<< qb << "; " << ex.what();
throw std::runtime_error(errMsg.str());
}
catch (...)
{
std::ostringstream errMsg;
errMsg << "ExeMgr: unknown error writing qb response "
"for qb cmd "
<< qb;
throw std::runtime_error(errMsg.str());
}
if (swallowRows)
tm.erase(tableOID);
FEMsgHandler msgHandler(jl, &fIos);
if (tableOID == 100)
msgHandler.start();
//...Loop serializing table bands projected for the tableOID
for (;;)
{
uint32_t rowCount;
rowCount = jl->projectTable(tableOID, bs);
msgHandler.stop();
if (jl->status())
{
const auto errInfo = logging::IDBErrorInfo::instance();
if (jl->errMsg().length() != 0)
bs << jl->errMsg();
else
bs << errInfo->errorMsg(jl->status());
}
if (!swallowRows)
{
try // @bug2244: try/catch around fIos.write() calls projecting rows
{
if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
{
// Skip the write to the front end until the last empty band. Used to time queries
// through without any front end waiting.
if (tableOID < 3000 || rowCount == 0)
fIos.write(bs);
}
else
{
fIos.write(bs);
}
}
catch (std::exception& ex)
{
msgHandler.stop();
std::ostringstream errMsg;
errMsg << "ExeMgr: error projecting rows "
"for tableOID: "
<< tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; "
<< ex.what();
jl->abort();
while (rowCount)
rowCount = jl->projectTable(tableOID, bs);
if (tableOID == 100 && msgHandler.aborted())
{
/* TODO: modularize the cleanup code, as well as
* the rest of this fcn */
decThreadCntPerSession(csep.sessionID() | 0x80000000);
statementsRunningCount->decr(stmtCounted);
fIos.close();
return;
}
// std::cout << "connection drop\n";
throw std::runtime_error(errMsg.str());
}
catch (...)
{
std::ostringstream errMsg;
msgHandler.stop();
errMsg << "ExeMgr: unknown error projecting rows "
"for tableOID: "
<< tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount;
jl->abort();
while (rowCount)
rowCount = jl->projectTable(tableOID, bs);
throw std::runtime_error(errMsg.str());
}
}
totalRowCount += rowCount;
totalBytesSent += bs.length();
if (rowCount == 0)
{
msgHandler.stop();
// No more bands, table is done
bs.reset();
// @bug 2083 decr active statement count here for table mode.
if (!usingTuples)
statementsRunningCount->decr(stmtCounted);
break;
}
else
{
bs.restart();
}
} // End of loop to project and serialize table bands for a table
} // End of loop to process tables
// @bug 828
if (csep.traceOn())
jl->graph(csep.sessionID());
if (needDbProfEndStatementMsg)
{
std::string ss;
std::ostringstream prefix;
prefix << "ses:" << csep.sessionID() << " Query Totals";
// Log stats std::string to standard out
ss = formatQueryStats(jl, prefix.str(), true,
!(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG),
totalRowCount);
//@Bug 1306. Added timing info for real time tracking.
std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl;
// log query stats to debug log file
args.reset();
args.add((int)csep.statementID());
args.add(fStats.fMaxMemPct);
args.add(fStats.fNumFiles);
args.add(fStats.fFileBytes); // log raw byte count instead of MB
args.add(fStats.fPhyIO);
args.add(fStats.fCacheIO);
args.add(fStats.fMsgRcvCnt);
args.add(fStats.fMsgBytesIn);
args.add(fStats.fMsgBytesOut);
args.add(fStats.fCPBlocksSkipped);
msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfQueryStats, args, li);
//@bug 1327
deleteMaxMemPct(csep.sessionID());
// Calling reset here, will cause joblist destructor to be
// called, which "joins" the threads. We need to do that
// here to make sure all syslogging from all the threads
// are complete; and that our logDbProfEndStatement will
// appear "last" in the syslog for this SQL statement.
// puts the real destruction in another thread to avoid
// making the whole session wait. It can take several seconds.
int stmtID = csep.statementID();
std::unique_lock<std::mutex> scoped(jlMutex);
destructing++;
std::thread bgdtor(
[jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog]
{
std::unique_lock<std::mutex> scoped(jlMutex);
const_cast<joblist::SJLP&>(jl).reset(); // this happens second; does real destruction
logging::Message::Args args;
args.add(stmtID);
msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li);
if (--destructing == 0)
jlCleanupDone.notify_one();
});
jl.reset(); // this happens first
bgdtor.detach();
}
else
// delete sessionMemMap entry for this session's memory % use
deleteMaxMemPct(csep.sessionID());
std::string endtime(globServiceExeMgr->timeNow());
if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000))
{
std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at "
<< endtime << std::endl;
// @bug 663 - Implemented caltraceon(16) to replace the
// $FIFO_SINK compiler definition in pColStep.
// This option consumes rows in the project steps.
if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4)
{
std::cout << std::endl;
std::cout << "**** No data returned to DM. Rows consumed "
"in ProjectSteps - caltrace(16) is on (FIFO_SINK)."
" ****"
<< std::endl;
std::cout << std::endl;
}
else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
{
std::cout << std::endl;
std::cout << "**** No data returned to DM - caltrace(8) is "
"on (SWALLOW_ROWS_EXEMGR). ****"
<< std::endl;
std::cout << std::endl;
}
}
statementsRunningCount->decr(stmtCounted);
if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT"))
{
qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY;
qts.max_mem_pct = fStats.fMaxMemPct;
qts.num_files = fStats.fNumFiles;
qts.phy_io = fStats.fPhyIO;
qts.cache_io = fStats.fCacheIO;
qts.msg_rcv_cnt = fStats.fMsgRcvCnt;
qts.cp_blocks_skipped = fStats.fCPBlocksSkipped;
qts.msg_bytes_in = fStats.fMsgBytesIn;
qts.msg_bytes_out = fStats.fMsgBytesOut;
qts.rows = totalRowCount;
qts.end_time = querytele::QueryTeleClient::timeNowms();
qts.session_id = csep.sessionID();
qts.query_type = csep.queryType();
qts.query = csep.data();
qts.system_name = fOamCachePtr->getSystemName();
qts.module_name = fOamCachePtr->getModuleName();
qts.local_query = csep.localQuery();
fTeleClient.postQueryTele(qts);
}
}
// Release CSC object (for sessionID) that was added by makeJobList()
// Mask 0x80000000 is for associate user query and csc query.
// (actual joblist destruction happens at the top of this loop)
decThreadCntPerSession(csep.sessionID() | 0x80000000);
}
catch (std::exception& ex)
{
decThreadCntPerSession(csep.sessionID() | 0x80000000);
statementsRunningCount->decr(stmtCounted);
std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl;
logging::Message::Args args;
logging::LoggingID li(16, csep.sessionID(), csep.txnID());
args.add(ex.what());
msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li);
fIos.close();
}
catch (...)
{
decThreadCntPerSession(csep.sessionID() | 0x80000000);
statementsRunningCount->decr(stmtCounted);
std::cerr << "### Exception caught!" << std::endl;
logging::Message::Args args;
logging::LoggingID li(16, csep.sessionID(), csep.txnID());
args.add("ExeMgr caught unknown exception");
msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li);
fIos.close();
}
// make sure we don't leave scope while joblists are being destroyed
std::unique_lock<std::mutex> scoped(jlMutex);
while (destructing > 0)
jlCleanupDone.wait(scoped);
}
}; // namespace exemgr