/* Copyright (C) 2014 InfiniDB, Inc. Copyright (c) 2016-2020 MariaDB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: subquerystep.cpp 6370 2010-03-18 02:58:09Z xlou $ #include //#define NDEBUG #include using namespace std; #include #include #include #include #include using namespace boost; #include "parsetree.h" #include "logicoperator.h" using namespace execplan; #include "rowgroup.h" using namespace rowgroup; #include "errorids.h" #include "exceptclasses.h" using namespace logging; #include "querytele.h" using namespace querytele; #include "funcexp.h" #include "jobstep.h" #include "jlf_common.h" #include "jlf_tuplejoblist.h" #include "expressionstep.h" #include "subquerystep.h" using namespace joblist; namespace joblist { SubQueryStep::SubQueryStep(const JobInfo& jobInfo) : JobStep(jobInfo), fRowsReturned(0) { fExtendedInfo = "SQS: "; fQtc.stepParms().stepType = StepTeleStats::T_SQS; } SubQueryStep::~SubQueryStep() { } void SubQueryStep::run() { fSubJobList->doQuery(); } void SubQueryStep::join() { if (fRunner) fRunner->join(); } void SubQueryStep::abort() { JobStep::abort(); fSubJobList->abort(); } const string SubQueryStep::toString() const { ostringstream oss; oss << "SubQueryStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId; if (fOutputJobStepAssociation.outSize() > 0) { oss << " out:"; for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++) oss << fOutputJobStepAssociation.outAt(i); } return oss.str(); } /* void SubQueryStep::printCalTrace() { time_t t = time (0); char timeString[50]; ctime_r (&t, timeString); timeString[strlen (timeString )-1] = '\0'; ostringstream logStr; logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at "<< timeString << "; total rows returned-" << fRowsReturned << endl << "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << "s;\n\tJob completion status " << status() << endl; logEnd(logStr.str().c_str()); fExtendedInfo += logStr.str(); formatMiniStats(); } void SubQueryStep::formatMiniStats() { ostringstream oss; oss << "SQS " << "UM " << "- " << "- " << "- " << "- " << "- " << "- " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " " << fRowsReturned << " "; fMiniInfo += oss.str(); } */ SubAdapterStep::SubAdapterStep(SJSTEP& s, const JobInfo& jobInfo) : JobStep(jobInfo) , fTableOid(s->tableOid()) , fSubStep(s) , fRowsInput(0) , fRowsReturned(0) , fEndOfResult(false) , fInputIterator(0) , fOutputIterator(0) , fRunner(0) { fAlias = s->alias(); fView = s->view(); fInputJobStepAssociation = s->outputAssociation(); fRowGroupIn = dynamic_cast(s.get())->getOutputRowGroup(); setOutputRowGroup(fRowGroupIn); } SubAdapterStep::~SubAdapterStep() { } void SubAdapterStep::abort() { JobStep::abort(); if (fSubStep) fSubStep->abort(); } void SubAdapterStep::run() { if (fInputJobStepAssociation.outSize() == 0) throw logic_error("No input data list for subquery adapter step."); fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL(); if (fInputDL == NULL) throw logic_error("Input is not a RowGroup data list."); fInputIterator = fInputDL->getIterator(); if (fOutputJobStepAssociation.outSize() == 0) throw logic_error("No output data list for non-delivery subquery adapter step."); fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL(); if (fOutputDL == NULL) throw logic_error("Output is not a RowGroup data list."); if (fDelivery) fOutputIterator = fOutputDL->getIterator(); fRunner = jobstepThreadPool.invoke(Runner(this)); } void SubAdapterStep::join() { if (fRunner) jobstepThreadPool.join(fRunner); } uint32_t SubAdapterStep::nextBand(messageqcpp::ByteStream& bs) { RGData rgDataOut; bool more = false; uint32_t rowCount = 0; try { bs.restart(); more = fOutputDL->next(fOutputIterator, &rgDataOut); if (!more || cancelled()) { //@bug4459. while (more) more = fOutputDL->next(fOutputIterator, &rgDataOut); fEndOfResult = true; } if (more && !fEndOfResult) { fRowGroupDeliver.setData(&rgDataOut); fRowGroupDeliver.serializeRGData(bs); rowCount = fRowGroupDeliver.getRowCount(); } } catch (...) { handleException(std::current_exception(), logging::ERR_IN_DELIVERY, logging::ERR_ALWAYS_CRITICAL, "SubAdapterStep::nextBand()"); while (more) more = fOutputDL->next(fOutputIterator, &rgDataOut); fEndOfResult = true; } if (fEndOfResult) { // send an empty / error band RGData rgData(fRowGroupDeliver, 0); fRowGroupDeliver.setData(&rgData); fRowGroupDeliver.resetRowGroup(0); fRowGroupDeliver.setStatus(status()); fRowGroupDeliver.serializeRGData(bs); } return rowCount; } void SubAdapterStep::setFeRowGroup(const rowgroup::RowGroup& rg) { fRowGroupFe = rg; } void SubAdapterStep::setOutputRowGroup(const rowgroup::RowGroup& rg) { fRowGroupOut = fRowGroupDeliver = rg; if (fRowGroupFe.getColumnCount() == 0) fIndexMap = makeMapping(fRowGroupIn, fRowGroupOut); else fIndexMap = makeMapping(fRowGroupFe, fRowGroupOut); checkDupOutputColumns(); } void SubAdapterStep::checkDupOutputColumns() { map keymap; // map fDupColumns.clear(); const vector& keys = fRowGroupDeliver.getKeys(); for (uint32_t i = 0; i < keys.size(); i++) { map::iterator j = keymap.find(keys[i]); if (j == keymap.end()) keymap.insert(make_pair(keys[i], i)); // map key to col index else fDupColumns.push_back(make_pair(i, j->second)); // dest/src index pair } } void SubAdapterStep::dupOutputColumns(Row& row) { for (uint64_t i = 0; i < fDupColumns.size(); i++) row.copyField(fDupColumns[i].first, fDupColumns[i].second); } void SubAdapterStep::outputRow(Row& rowIn, Row& rowOut) { applyMapping(fIndexMap, rowIn, &rowOut); if (fDupColumns.size() > 0) dupOutputColumns(rowOut); fRowGroupOut.incRowCount(); rowOut.nextRow(); } void SubAdapterStep::deliverStringTableRowGroup(bool b) { fRowGroupOut.setUseStringTable(b); fRowGroupDeliver.setUseStringTable(b); } bool SubAdapterStep::deliverStringTableRowGroup() const { idbassert(fRowGroupOut.usesStringTable() == fRowGroupDeliver.usesStringTable()); return fRowGroupDeliver.usesStringTable(); } const string SubAdapterStep::toString() const { ostringstream oss; oss << "SubAdapterStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId; if (fInputJobStepAssociation.outSize() > 0) oss << fInputJobStepAssociation.outAt(0); if (fOutputJobStepAssociation.outSize() > 0) oss << fOutputJobStepAssociation.outAt(0); return oss.str(); } void SubAdapterStep::execute() { RGData rgDataIn; RGData rgDataOut; Row rowIn; Row rowFe; Row rowOut; fRowGroupIn.initRow(&rowIn); fRowGroupOut.initRow(&rowOut); RGData rowFeData; StepTeleStats sts; sts.query_uuid = fQueryUuid; sts.step_uuid = fStepUuid; bool usesFE = false; if (fRowGroupFe.getColumnCount() > 0) { usesFE = true; fRowGroupFe.initRow(&rowFe, true); rowFeData = RGData(fRowGroupFe, 1); fRowGroupFe.setData(&rowFeData); fRowGroupFe.getRow(0, &rowFe); } bool more = false; try { sts.msg_type = StepTeleStats::ST_START; sts.total_units_of_work = 1; postStepStartTele(sts); fSubStep->run(); more = fInputDL->next(fInputIterator, &rgDataIn); if (traceOn()) dlTimes.setFirstReadTime(); while (more && !cancelled()) { fRowGroupIn.setData(&rgDataIn); rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount()); fRowGroupOut.setData(&rgDataOut); fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid()); fRowGroupIn.getRow(0, &rowIn); fRowGroupOut.getRow(0, &rowOut); fRowsInput += fRowGroupIn.getRowCount(); for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i) { if (fExpression.get() == NULL) { outputRow(rowIn, rowOut); } else if (!usesFE) { if (fExpression->evaluate(&rowIn)) { outputRow(rowIn, rowOut); } } else { copyRow(rowIn, &rowFe, rowIn.getColumnCount()); // memcpy(rowFe.getData(), rowIn.getData(), rowIn.getSize()); if (fExpression->evaluate(&rowFe)) { outputRow(rowFe, rowOut); } } rowIn.nextRow(); } if (fRowGroupOut.getRowCount() > 0) { fRowsReturned += fRowGroupOut.getRowCount(); fOutputDL->insert(rgDataOut); } more = fInputDL->next(fInputIterator, &rgDataIn); } } catch (...) { handleException(std::current_exception(), logging::ERR_EXEMGR_MALFUNCTION, logging::ERR_ALWAYS_CRITICAL, "SubAdapterStep::execute()"); } if (cancelled()) while (more) more = fInputDL->next(fInputIterator, &rgDataIn); if (traceOn()) { dlTimes.setLastReadTime(); dlTimes.setEndOfInputTime(); printCalTrace(); } sts.msg_type = StepTeleStats::ST_SUMMARY; sts.total_units_of_work = sts.units_of_work_completed = 1; sts.rows = fRowsReturned; postStepSummaryTele(sts); // Bug 3136, let mini stats to be formatted if traceOn. fOutputDL->endOfInput(); } void SubAdapterStep::addExpression(const JobStepVector& exps, JobInfo& jobInfo) { // maps key to the index in the RG map keyToIndexMap; const vector& keys = fRowGroupIn.getKeys(); for (size_t i = 0; i < keys.size(); i++) keyToIndexMap[keys[i]] = i; // combine the expression to one parse tree ParseTree* filter = NULL; for (JobStepVector::const_iterator it = exps.begin(); it != exps.end(); it++) { ExpressionStep* e = dynamic_cast(it->get()); idbassert(e); e->updateInputIndex(keyToIndexMap, jobInfo); if (filter != NULL) { ParseTree* left = filter; ParseTree* right = new ParseTree(); right->copyTree(*(e->expressionFilter())); filter = new ParseTree(new LogicOperator("and")); filter->left(left); filter->right(right); } else { filter = new ParseTree(); filter->copyTree(*(e->expressionFilter())); } } // add to the expression wrapper if (fExpression.get() == NULL) fExpression.reset(new funcexp::FuncExpWrapper()); fExpression->addFilter(boost::shared_ptr(filter)); } void SubAdapterStep::addExpression(const vector& exps) { // add to the function wrapper if (fExpression.get() == NULL) fExpression.reset(new funcexp::FuncExpWrapper()); for (vector::const_iterator i = exps.begin(); i != exps.end(); i++) fExpression->addReturnedColumn(*i); } void SubAdapterStep::addFcnJoinExp(const vector& exps) { addExpression(exps); } void SubAdapterStep::printCalTrace() { time_t t = time(0); char timeString[50]; ctime_r(&t, timeString); timeString[strlen(timeString) - 1] = '\0'; ostringstream logStr; logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString << "; total rows input-" << fRowsInput << "; total rows returned-" << fRowsReturned << endl << "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl << "\tJob completion status " << status() << endl; logEnd(logStr.str().c_str()); fExtendedInfo += logStr.str(); formatMiniStats(); } void SubAdapterStep::formatMiniStats() { /* ostringstream oss; oss << "SAS " << "UM " << "- " << "- " << "- " << "- " << "- " << "- " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " " << fRowsReturned << " "; fMiniInfo += oss.str(); */ } } // namespace joblist