1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Sergey Zefirov 3bc8bd8cc6
fix(group by, having): MCOL-5776: GROUP BY/HAVING closer to server's (#3257)
This patch introduces an internal aggregate operator SELECT_SOME that
is automatically added to columns that are not in GROUP BY. It
"computes" some plausible value of the column (actually, last one
passed).

Along the way it fixes incorrect handling of HAVING being transferred
into WHERE, window function handling and a bit of other inconsistencies.
2024-12-20 19:12:32 +00:00

415 lines
10 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: tuplehavingstep.cpp 9709 2013-07-20 06:08:46Z xlou $
//#define NDEBUG
#include <cassert>
#include <sstream>
#include <iomanip>
using namespace std;
#include <boost/shared_ptr.hpp>
#include <boost/uuid/uuid_io.hpp>
using namespace boost;
#include "messagequeue.h"
using namespace messageqcpp;
#include "loggingid.h"
#include "errorcodes.h"
using namespace logging;
#include "calpontsystemcatalog.h"
#include "aggregatecolumn.h"
#include "constantcolumn.h"
#include "simplecolumn.h"
using namespace execplan;
#include "jobstep.h"
#include "rowgroup.h"
using namespace rowgroup;
#include "querytele.h"
using namespace querytele;
#include "funcexp.h"
#include "jlf_common.h"
#include "tuplehavingstep.h"
namespace joblist
{
TupleHavingStep::TupleHavingStep(const JobInfo& jobInfo)
: ExpressionStep(jobInfo)
, fInputDL(NULL)
, fOutputDL(NULL)
, fInputIterator(0)
, fRunner(0)
, fRowsReturned(0)
, fEndOfResult(false)
, fFeInstance(funcexp::FuncExp::instance())
{
fExtendedInfo = "HVS: ";
fQtc.stepParms().stepType = StepTeleStats::T_HVS;
}
TupleHavingStep::~TupleHavingStep()
{
}
void TupleHavingStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
{
throw runtime_error("Disabled, use initialize() to set output RowGroup.");
}
void TupleHavingStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo)
{
fRowGroupIn = rgIn;
fRowGroupIn.initRow(&fRowIn);
map<uint32_t, uint32_t> keyToIndexMap;
for (uint64_t i = 0; i < fRowGroupIn.getKeys().size(); ++i)
if (keyToIndexMap.find(fRowGroupIn.getKeys()[i]) == keyToIndexMap.end())
{
keyToIndexMap.insert(make_pair(fRowGroupIn.getKeys()[i], i));
}
updateInputIndex(keyToIndexMap, jobInfo);
vector<uint32_t> oids, oidsIn = fRowGroupIn.getOIDs();
vector<uint32_t> keys, keysIn = fRowGroupIn.getKeys();
vector<uint32_t> scale, scaleIn = fRowGroupIn.getScale();
vector<uint32_t> precision, precisionIn = fRowGroupIn.getPrecision();
vector<CalpontSystemCatalog::ColDataType> types, typesIn = fRowGroupIn.getColTypes();
vector<uint32_t> csNums, csNumsIn = fRowGroupIn.getCharsetNumbers();
vector<uint32_t> pos, posIn = fRowGroupIn.getOffsets();
size_t n = 0;
RetColsVector::const_iterator i = jobInfo.deliveredCols.begin();
while (i != jobInfo.deliveredCols.end())
if (NULL == dynamic_cast<const ConstantColumn*>(i++->get()))
n++;
oids.insert(oids.end(), oidsIn.begin(), oidsIn.begin() + n);
keys.insert(keys.end(), keysIn.begin(), keysIn.begin() + n);
scale.insert(scale.end(), scaleIn.begin(), scaleIn.begin() + n);
precision.insert(precision.end(), precisionIn.begin(), precisionIn.begin() + n);
types.insert(types.end(), typesIn.begin(), typesIn.begin() + n);
csNums.insert(csNums.end(), csNumsIn.begin(), csNumsIn.begin() + n);
pos.insert(pos.end(), posIn.begin(), posIn.begin() + n + 1);
fRowGroupOut =
RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
fRowGroupOut.initRow(&fRowOut);
}
void TupleHavingStep::expressionFilter(const ParseTree* filter, JobInfo& jobInfo)
{
// let base class handle the simple columns
ExpressionStep::expressionFilter(filter, jobInfo);
// extract simple columns from parse tree
#if 01
vector<AggregateColumn*> acv;
fExpressionFilter->walk(getAggCols, &acv);
fColumns.insert(fColumns.end(), acv.begin(), acv.end());
#endif
}
void TupleHavingStep::run()
{
if (fInputJobStepAssociation.outSize() == 0)
throw logic_error("No input data list for having step.");
fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
if (fInputDL == NULL)
throw logic_error("Input is not a RowGroup data list.");
fInputIterator = fInputDL->getIterator();
if (fDelivery == false)
{
if (fOutputJobStepAssociation.outSize() == 0)
throw logic_error("No output data list for non-delivery having step.");
fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
if (fOutputDL == NULL)
throw logic_error("Output is not a RowGroup data list.");
fRunner = jobstepThreadPool.invoke(Runner(this));
}
}
void TupleHavingStep::join()
{
if (fRunner)
jobstepThreadPool.join(fRunner);
}
uint32_t TupleHavingStep::nextBand(messageqcpp::ByteStream& bs)
{
RGData rgDataIn;
RGData rgDataOut;
bool more = false;
uint32_t rowCount = 0;
try
{
bs.restart();
more = fInputDL->next(fInputIterator, &rgDataIn);
if (dlTimes.FirstReadTime().tv_sec == 0)
dlTimes.setFirstReadTime();
if (!more || cancelled())
{
fEndOfResult = true;
}
bool emptyRowGroup = true;
while (more && !fEndOfResult && emptyRowGroup)
{
if (cancelled())
{
while (more)
more = fInputDL->next(fInputIterator, &rgDataIn);
break;
}
fRowGroupIn.setData(&rgDataIn);
rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
fRowGroupOut.setData(&rgDataOut);
doHavingFilters();
if (fRowGroupOut.getRowCount() > 0)
{
emptyRowGroup = false;
fRowGroupOut.serializeRGData(bs);
rowCount = fRowGroupOut.getRowCount();
}
else
{
more = fInputDL->next(fInputIterator, &rgDataIn);
}
}
if (!more)
{
fEndOfResult = true;
}
}
catch (...)
{
handleException(std::current_exception(), logging::tupleHavingStepErr, logging::ERR_ALWAYS_CRITICAL,
"TupleHavingStep::nextBand()");
while (more)
more = fInputDL->next(fInputIterator, &rgDataIn);
fEndOfResult = true;
}
if (fEndOfResult)
{
// send an empty / error band
rgDataOut.reinit(fRowGroupOut, 0);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.setStatus(status());
fRowGroupOut.serializeRGData(bs);
dlTimes.setLastReadTime();
dlTimes.setEndOfInputTime();
if (traceOn())
printCalTrace();
}
return rowCount;
}
void TupleHavingStep::execute()
{
RGData rgDataIn;
RGData rgDataOut;
bool more = false;
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
try
{
more = fInputDL->next(fInputIterator, &rgDataIn);
dlTimes.setFirstReadTime();
sts.msg_type = StepTeleStats::ST_START;
sts.total_units_of_work = 1;
postStepStartTele(sts);
if (!more && cancelled())
{
fEndOfResult = true;
}
while (more && !fEndOfResult)
{
fRowGroupIn.setData(&rgDataIn);
rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
fRowGroupOut.setData(&rgDataOut);
doHavingFilters();
more = fInputDL->next(fInputIterator, &rgDataIn);
if (cancelled())
{
fEndOfResult = true;
}
else
{
fOutputDL->insert(rgDataOut);
}
}
}
catch (...)
{
handleException(std::current_exception(), logging::tupleHavingStepErr, logging::ERR_ALWAYS_CRITICAL,
"TupleHavingStep::nextBand()");
}
while (more)
more = fInputDL->next(fInputIterator, &rgDataIn);
fEndOfResult = true;
fOutputDL->endOfInput();
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
dlTimes.setLastReadTime();
dlTimes.setEndOfInputTime();
if (traceOn())
printCalTrace();
}
void TupleHavingStep::doHavingFilters()
{
fRowGroupIn.getRow(0, &fRowIn);
fRowGroupOut.getRow(0, &fRowOut);
fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
{
if (fFeInstance->evaluate(fRowIn, fExpressionFilter))
{
copyRow(fRowIn, &fRowOut);
fRowGroupOut.incRowCount();
fRowOut.nextRow();
}
fRowIn.nextRow();
}
fRowsReturned += fRowGroupOut.getRowCount();
}
const RowGroup& TupleHavingStep::getOutputRowGroup() const
{
return fRowGroupOut;
}
const RowGroup& TupleHavingStep::getDeliveredRowGroup() const
{
return fRowGroupOut;
}
void TupleHavingStep::deliverStringTableRowGroup(bool b)
{
fRowGroupOut.setUseStringTable(b);
}
bool TupleHavingStep::deliverStringTableRowGroup() const
{
return fRowGroupOut.usesStringTable();
}
const string TupleHavingStep::toString() const
{
ostringstream oss;
oss << "HavingStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
oss << " in:";
for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
oss << fInputJobStepAssociation.outAt(i);
oss << " out:";
for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
oss << fOutputJobStepAssociation.outAt(i);
oss << endl;
return oss.str();
}
void TupleHavingStep::printCalTrace()
{
time_t t = time(0);
char timeString[50];
ctime_r(&t, timeString);
timeString[strlen(timeString) - 1] = '\0';
ostringstream logStr;
logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
<< "; total rows returned-" << fRowsReturned << endl
<< "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString()
<< "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
<< "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
<< "\tJob completion status " << status() << endl;
logEnd(logStr.str().c_str());
fExtendedInfo += logStr.str();
formatMiniStats();
}
void TupleHavingStep::formatMiniStats()
{
fMiniInfo += "THS ";
fMiniInfo += "UM ";
fMiniInfo += "- ";
fMiniInfo += "- ";
fMiniInfo += "- ";
fMiniInfo += "- ";
fMiniInfo += "- ";
fMiniInfo += "- ";
fMiniInfo += JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) + " ";
fMiniInfo += "- ";
}
} // namespace joblist