1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/subquerystep.cpp
2023-04-14 09:42:50 +00:00

536 lines
13 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (c) 2016-2020 MariaDB
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: subquerystep.cpp 6370 2010-03-18 02:58:09Z xlou $
#include <iostream>
//#define NDEBUG
#include <cassert>
using namespace std;
#include <boost/scoped_array.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/uuid/uuid_io.hpp>
using namespace boost;
#include "parsetree.h"
#include "logicoperator.h"
using namespace execplan;
#include "rowgroup.h"
using namespace rowgroup;
#include "errorids.h"
#include "exceptclasses.h"
using namespace logging;
#include "querytele.h"
using namespace querytele;
#include "funcexp.h"
#include "jobstep.h"
#include "jlf_common.h"
#include "jlf_tuplejoblist.h"
#include "expressionstep.h"
#include "subquerystep.h"
using namespace joblist;
namespace joblist
{
SubQueryStep::SubQueryStep(const JobInfo& jobInfo) : JobStep(jobInfo), fRowsReturned(0)
{
fExtendedInfo = "SQS: ";
fQtc.stepParms().stepType = StepTeleStats::T_SQS;
}
SubQueryStep::~SubQueryStep()
{
}
void SubQueryStep::run()
{
fSubJobList->doQuery();
}
void SubQueryStep::join()
{
if (fRunner)
fRunner->join();
}
void SubQueryStep::abort()
{
JobStep::abort();
fSubJobList->abort();
}
const string SubQueryStep::toString() const
{
ostringstream oss;
oss << "SubQueryStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
if (fOutputJobStepAssociation.outSize() > 0)
{
oss << " out:";
for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
oss << fOutputJobStepAssociation.outAt(i);
}
return oss.str();
}
/*
void SubQueryStep::printCalTrace()
{
time_t t = time (0);
char timeString[50];
ctime_r (&t, timeString);
timeString[strlen (timeString )-1] = '\0';
ostringstream logStr;
logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at "<< timeString
<< "; total rows returned-" << fRowsReturned << endl
<< "\t1st read " << dlTimes.FirstReadTimeString()
<< "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
<< JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
<< "s;\n\tJob completion status " << status() << endl;
logEnd(logStr.str().c_str());
fExtendedInfo += logStr.str();
formatMiniStats();
}
void SubQueryStep::formatMiniStats()
{
ostringstream oss;
oss << "SQS "
<< "UM "
<< "- "
<< "- "
<< "- "
<< "- "
<< "- "
<< "- "
<< JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
<< fRowsReturned << " ";
fMiniInfo += oss.str();
}
*/
SubAdapterStep::SubAdapterStep(SJSTEP& s, const JobInfo& jobInfo)
: JobStep(jobInfo)
, fTableOid(s->tableOid())
, fSubStep(s)
, fRowsInput(0)
, fRowsReturned(0)
, fEndOfResult(false)
, fInputIterator(0)
, fOutputIterator(0)
, fRunner(0)
{
fAlias = s->alias();
fView = s->view();
fInputJobStepAssociation = s->outputAssociation();
fRowGroupIn = dynamic_cast<SubQueryStep*>(s.get())->getOutputRowGroup();
setOutputRowGroup(fRowGroupIn);
}
SubAdapterStep::~SubAdapterStep()
{
}
void SubAdapterStep::abort()
{
JobStep::abort();
if (fSubStep)
fSubStep->abort();
}
void SubAdapterStep::run()
{
if (fInputJobStepAssociation.outSize() == 0)
throw logic_error("No input data list for subquery adapter step.");
fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
if (fInputDL == NULL)
throw logic_error("Input is not a RowGroup data list.");
fInputIterator = fInputDL->getIterator();
if (fOutputJobStepAssociation.outSize() == 0)
throw logic_error("No output data list for non-delivery subquery adapter step.");
fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
if (fOutputDL == NULL)
throw logic_error("Output is not a RowGroup data list.");
if (fDelivery)
fOutputIterator = fOutputDL->getIterator();
fRunner = jobstepThreadPool.invoke(Runner(this));
}
void SubAdapterStep::join()
{
if (fRunner)
jobstepThreadPool.join(fRunner);
}
uint32_t SubAdapterStep::nextBand(messageqcpp::ByteStream& bs)
{
RGData rgDataOut;
bool more = false;
uint32_t rowCount = 0;
try
{
bs.restart();
more = fOutputDL->next(fOutputIterator, &rgDataOut);
if (!more || cancelled())
{
//@bug4459.
while (more)
more = fOutputDL->next(fOutputIterator, &rgDataOut);
fEndOfResult = true;
}
if (more && !fEndOfResult)
{
fRowGroupDeliver.setData(&rgDataOut);
fRowGroupDeliver.serializeRGData(bs);
rowCount = fRowGroupDeliver.getRowCount();
}
}
catch (...)
{
handleException(std::current_exception(), logging::ERR_IN_DELIVERY, logging::ERR_ALWAYS_CRITICAL,
"SubAdapterStep::nextBand()");
while (more)
more = fOutputDL->next(fOutputIterator, &rgDataOut);
fEndOfResult = true;
}
if (fEndOfResult)
{
// send an empty / error band
RGData rgData(fRowGroupDeliver, 0);
fRowGroupDeliver.setData(&rgData);
fRowGroupDeliver.resetRowGroup(0);
fRowGroupDeliver.setStatus(status());
fRowGroupDeliver.serializeRGData(bs);
}
return rowCount;
}
void SubAdapterStep::setFeRowGroup(const rowgroup::RowGroup& rg)
{
fRowGroupFe = rg;
}
void SubAdapterStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
{
fRowGroupOut = fRowGroupDeliver = rg;
if (fRowGroupFe.getColumnCount() == 0)
fIndexMap = makeMapping(fRowGroupIn, fRowGroupOut);
else
fIndexMap = makeMapping(fRowGroupFe, fRowGroupOut);
checkDupOutputColumns();
}
void SubAdapterStep::checkDupOutputColumns()
{
map<uint32_t, uint32_t> keymap; // map<unique col key, col index in the row group>
fDupColumns.clear();
const vector<uint32_t>& keys = fRowGroupDeliver.getKeys();
for (uint32_t i = 0; i < keys.size(); i++)
{
map<uint32_t, uint32_t>::iterator j = keymap.find(keys[i]);
if (j == keymap.end())
keymap.insert(make_pair(keys[i], i)); // map key to col index
else
fDupColumns.push_back(make_pair(i, j->second)); // dest/src index pair
}
}
void SubAdapterStep::dupOutputColumns(Row& row)
{
for (uint64_t i = 0; i < fDupColumns.size(); i++)
row.copyField(fDupColumns[i].first, fDupColumns[i].second);
}
void SubAdapterStep::outputRow(Row& rowIn, Row& rowOut)
{
applyMapping(fIndexMap, rowIn, &rowOut);
if (fDupColumns.size() > 0)
dupOutputColumns(rowOut);
fRowGroupOut.incRowCount();
rowOut.nextRow();
}
void SubAdapterStep::deliverStringTableRowGroup(bool b)
{
fRowGroupOut.setUseStringTable(b);
fRowGroupDeliver.setUseStringTable(b);
}
bool SubAdapterStep::deliverStringTableRowGroup() const
{
idbassert(fRowGroupOut.usesStringTable() == fRowGroupDeliver.usesStringTable());
return fRowGroupDeliver.usesStringTable();
}
const string SubAdapterStep::toString() const
{
ostringstream oss;
oss << "SubAdapterStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
if (fInputJobStepAssociation.outSize() > 0)
oss << fInputJobStepAssociation.outAt(0);
if (fOutputJobStepAssociation.outSize() > 0)
oss << fOutputJobStepAssociation.outAt(0);
return oss.str();
}
void SubAdapterStep::execute()
{
RGData rgDataIn;
RGData rgDataOut;
Row rowIn;
Row rowFe;
Row rowOut;
fRowGroupIn.initRow(&rowIn);
fRowGroupOut.initRow(&rowOut);
RGData rowFeData;
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
bool usesFE = false;
if (fRowGroupFe.getColumnCount() > 0)
{
usesFE = true;
fRowGroupFe.initRow(&rowFe, true);
rowFeData = RGData(fRowGroupFe, 1);
fRowGroupFe.setData(&rowFeData);
fRowGroupFe.getRow(0, &rowFe);
}
bool more = false;
try
{
sts.msg_type = StepTeleStats::ST_START;
sts.total_units_of_work = 1;
postStepStartTele(sts);
fSubStep->run();
more = fInputDL->next(fInputIterator, &rgDataIn);
if (traceOn())
dlTimes.setFirstReadTime();
while (more && !cancelled())
{
fRowGroupIn.setData(&rgDataIn);
rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
fRowGroupIn.getRow(0, &rowIn);
fRowGroupOut.getRow(0, &rowOut);
fRowsInput += fRowGroupIn.getRowCount();
for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
{
if (fExpression.get() == NULL)
{
outputRow(rowIn, rowOut);
}
else if (!usesFE)
{
if (fExpression->evaluate(&rowIn))
{
outputRow(rowIn, rowOut);
}
}
else
{
copyRow(rowIn, &rowFe, rowIn.getColumnCount());
// memcpy(rowFe.getData(), rowIn.getData(), rowIn.getSize());
if (fExpression->evaluate(&rowFe))
{
outputRow(rowFe, rowOut);
}
}
rowIn.nextRow();
}
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
}
more = fInputDL->next(fInputIterator, &rgDataIn);
}
}
catch (...)
{
handleException(std::current_exception(), logging::ERR_EXEMGR_MALFUNCTION, logging::ERR_ALWAYS_CRITICAL,
"SubAdapterStep::execute()");
}
if (cancelled())
while (more)
more = fInputDL->next(fInputIterator, &rgDataIn);
if (traceOn())
{
dlTimes.setLastReadTime();
dlTimes.setEndOfInputTime();
printCalTrace();
}
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
// Bug 3136, let mini stats to be formatted if traceOn.
fOutputDL->endOfInput();
}
void SubAdapterStep::addExpression(const JobStepVector& exps, JobInfo& jobInfo)
{
// maps key to the index in the RG
map<uint32_t, uint32_t> keyToIndexMap;
const vector<uint32_t>& keys = fRowGroupIn.getKeys();
for (size_t i = 0; i < keys.size(); i++)
keyToIndexMap[keys[i]] = i;
// combine the expression to one parse tree
ParseTree* filter = NULL;
for (JobStepVector::const_iterator it = exps.begin(); it != exps.end(); it++)
{
ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
idbassert(e);
e->updateInputIndex(keyToIndexMap, jobInfo);
if (filter != NULL)
{
ParseTree* left = filter;
ParseTree* right = new ParseTree();
right->copyTree(*(e->expressionFilter()));
filter = new ParseTree(new LogicOperator("and"));
filter->left(left);
filter->right(right);
}
else
{
filter = new ParseTree();
filter->copyTree(*(e->expressionFilter()));
}
}
// add to the expression wrapper
if (fExpression.get() == NULL)
fExpression.reset(new funcexp::FuncExpWrapper());
fExpression->addFilter(boost::shared_ptr<execplan::ParseTree>(filter));
}
void SubAdapterStep::addExpression(const vector<SRCP>& exps)
{
// add to the function wrapper
if (fExpression.get() == NULL)
fExpression.reset(new funcexp::FuncExpWrapper());
for (vector<SRCP>::const_iterator i = exps.begin(); i != exps.end(); i++)
fExpression->addReturnedColumn(*i);
}
void SubAdapterStep::addFcnJoinExp(const vector<SRCP>& exps)
{
addExpression(exps);
}
void SubAdapterStep::printCalTrace()
{
time_t t = time(0);
char timeString[50];
ctime_r(&t, timeString);
timeString[strlen(timeString) - 1] = '\0';
ostringstream logStr;
logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
<< "; total rows input-" << fRowsInput << "; total rows returned-" << fRowsReturned << endl
<< "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString()
<< "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
<< "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
<< "\tJob completion status " << status() << endl;
logEnd(logStr.str().c_str());
fExtendedInfo += logStr.str();
formatMiniStats();
}
void SubAdapterStep::formatMiniStats()
{
/*
ostringstream oss;
oss << "SAS "
<< "UM "
<< "- "
<< "- "
<< "- "
<< "- "
<< "- "
<< "- "
<< JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
<< fRowsReturned << " ";
fMiniInfo += oss.str();
*/
}
} // namespace joblist