You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
Merge branch 'develop' into MCOL-265
This commit is contained in:
@ -29,6 +29,8 @@
|
||||
#include <stdexcept>
|
||||
#include <limits>
|
||||
#include <typeinfo>
|
||||
#include <cassert>
|
||||
|
||||
#include "joblisttypes.h"
|
||||
#include "resourcemanager.h"
|
||||
#include "groupconcat.h"
|
||||
@ -47,16 +49,18 @@
|
||||
#include "funcexp.h"
|
||||
#include "rowaggregation.h"
|
||||
#include "calpontsystemcatalog.h"
|
||||
#include "utils_utf8.h"
|
||||
//#include "utils_utf8.h"
|
||||
|
||||
//..comment out NDEBUG to enable assertions, uncomment NDEBUG to disable
|
||||
//#define NDEBUG
|
||||
#include <cassert>
|
||||
#include "funcexp/utils_utf8.h"
|
||||
|
||||
|
||||
using namespace std;
|
||||
using namespace boost;
|
||||
using namespace dataconvert;
|
||||
|
||||
|
||||
// inlines of RowAggregation that used only in this file
|
||||
namespace
|
||||
{
|
||||
@ -223,7 +227,6 @@ inline string getStringNullValue()
|
||||
|
||||
namespace rowgroup
|
||||
{
|
||||
|
||||
const std::string typeStr("");
|
||||
const static_any::any& RowAggregation::charTypeId((char)1);
|
||||
const static_any::any& RowAggregation::scharTypeId((signed char)1);
|
||||
@ -385,6 +388,7 @@ inline void RowAggregation::updateFloatMinMax(float val1, float val2, int64_t co
|
||||
}
|
||||
|
||||
|
||||
|
||||
#define STRCOLL_ENH__
|
||||
|
||||
void RowAggregation::updateStringMinMax(string val1, string val2, int64_t col, int func)
|
||||
@ -601,7 +605,8 @@ inline bool RowAggregation::isNull(const RowGroup* pRowGroup, const Row& row, in
|
||||
RowAggregation::RowAggregation() :
|
||||
fAggMapPtr(NULL), fRowGroupOut(NULL),
|
||||
fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE),
|
||||
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0)
|
||||
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0),
|
||||
fOrigFunctionCols(NULL)
|
||||
{
|
||||
}
|
||||
|
||||
@ -610,7 +615,8 @@ RowAggregation::RowAggregation(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCol
|
||||
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols) :
|
||||
fAggMapPtr(NULL), fRowGroupOut(NULL),
|
||||
fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE),
|
||||
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0)
|
||||
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0),
|
||||
fOrigFunctionCols(NULL)
|
||||
{
|
||||
fGroupByCols.assign(rowAggGroupByCols.begin(), rowAggGroupByCols.end());
|
||||
fFunctionCols.assign(rowAggFunctionCols.begin(), rowAggFunctionCols.end());
|
||||
@ -621,7 +627,7 @@ RowAggregation::RowAggregation(const RowAggregation& rhs):
|
||||
fAggMapPtr(NULL), fRowGroupOut(NULL),
|
||||
fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE),
|
||||
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0),
|
||||
fRGContext(rhs.fRGContext)
|
||||
fRGContext(rhs.fRGContext), fOrigFunctionCols(NULL)
|
||||
{
|
||||
//fGroupByCols.clear();
|
||||
//fFunctionCols.clear();
|
||||
@ -725,11 +731,8 @@ void RowAggregation::setJoinRowGroups(vector<RowGroup>* pSmallSideRG, RowGroup*
|
||||
// threads on the PM and by multple threads on the UM. It must remain
|
||||
// thread safe.
|
||||
//------------------------------------------------------------------------------
|
||||
void RowAggregation::resetUDAF(uint64_t funcColID)
|
||||
void RowAggregation::resetUDAF(RowUDAFFunctionCol* rowUDAF)
|
||||
{
|
||||
// Get the UDAF class pointer and store in the row definition object.
|
||||
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[funcColID].get());
|
||||
|
||||
// RowAggregation and it's functions need to be re-entrant which means
|
||||
// each instance (thread) needs its own copy of the context object.
|
||||
// Note: operator=() doesn't copy userData.
|
||||
@ -797,7 +800,7 @@ void RowAggregation::initialize()
|
||||
{
|
||||
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
resetUDAF(i);
|
||||
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -849,7 +852,7 @@ void RowAggregation::aggReset()
|
||||
{
|
||||
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
resetUDAF(i);
|
||||
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -896,14 +899,28 @@ void RowAggregationUM::aggregateRowWithRemap(Row& row)
|
||||
inserted.first->second = RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1);
|
||||
|
||||
// If there's UDAF involved, reset the user data.
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
if (fOrigFunctionCols)
|
||||
{
|
||||
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
||||
// This is a multi-distinct query and fFunctionCols may not
|
||||
// contain all the UDAF we need to reset
|
||||
for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++)
|
||||
{
|
||||
resetUDAF(i);
|
||||
if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get()));
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
{
|
||||
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// replace the key value with an equivalent copy, yes this is OK
|
||||
const_cast<RowPosition&>((inserted.first->first)) = pos;
|
||||
}
|
||||
@ -957,14 +974,28 @@ void RowAggregation::aggregateRow(Row& row)
|
||||
RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1);
|
||||
|
||||
// If there's UDAF involved, reset the user data.
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
if (fOrigFunctionCols)
|
||||
{
|
||||
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
||||
// This is a multi-distinct query and fFunctionCols may not
|
||||
// contain all the UDAF we need to reset
|
||||
for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++)
|
||||
{
|
||||
resetUDAF(i);
|
||||
if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get()));
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
{
|
||||
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1931,7 +1962,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
// The vector of parameters to be sent to the UDAF
|
||||
mcsv1sdk::ColumnDatum valsIn[paramCount];
|
||||
uint32_t dataFlags[paramCount];
|
||||
ConstantColumn* cc;
|
||||
execplan::ConstantColumn* cc;
|
||||
bool bIsNull = false;
|
||||
execplan::CalpontSystemCatalog::ColDataType colDataType;
|
||||
|
||||
@ -1947,10 +1978,10 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
|
||||
if (fFunctionCols[funcColsIdx]->fpConstCol)
|
||||
{
|
||||
cc = dynamic_cast<ConstantColumn*>(fFunctionCols[funcColsIdx]->fpConstCol.get());
|
||||
cc = dynamic_cast<execplan::ConstantColumn*>(fFunctionCols[funcColsIdx]->fpConstCol.get());
|
||||
}
|
||||
|
||||
if ((cc && cc->type() == ConstantColumn::NULLDATA)
|
||||
if ((cc && cc->type() == execplan::ConstantColumn::NULLDATA)
|
||||
|| (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true))
|
||||
{
|
||||
if (fRGContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
||||
@ -3735,7 +3766,7 @@ void RowAggregationUM::doNotNullConstantAggregate(const ConstantAggData& aggData
|
||||
|
||||
// Create a datum item for sending to UDAF
|
||||
mcsv1sdk::ColumnDatum& datum = valsIn[0];
|
||||
datum.dataType = (CalpontSystemCatalog::ColDataType)colDataType;
|
||||
datum.dataType = (execplan::CalpontSystemCatalog::ColDataType)colDataType;
|
||||
|
||||
switch (colDataType)
|
||||
{
|
||||
@ -4760,7 +4791,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation()
|
||||
{
|
||||
// backup the function column vector for finalize().
|
||||
vector<SP_ROWAGG_FUNC_t> origFunctionCols = fFunctionCols;
|
||||
|
||||
fOrigFunctionCols = &origFunctionCols;
|
||||
// aggregate data from each sub-aggregator to distinct aggregator
|
||||
for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
|
||||
{
|
||||
@ -4788,6 +4819,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation()
|
||||
|
||||
// restore the function column vector
|
||||
fFunctionCols = origFunctionCols;
|
||||
fOrigFunctionCols = NULL;
|
||||
}
|
||||
|
||||
|
||||
@ -4795,7 +4827,8 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(vector<vector<Row
|
||||
{
|
||||
// backup the function column vector for finalize().
|
||||
vector<SP_ROWAGG_FUNC_t> origFunctionCols = fFunctionCols;
|
||||
|
||||
fOrigFunctionCols = &origFunctionCols;
|
||||
|
||||
// aggregate data from each sub-aggregator to distinct aggregator
|
||||
for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
|
||||
{
|
||||
@ -4812,9 +4845,9 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(vector<vector<Row
|
||||
|
||||
inRows[i].clear();
|
||||
}
|
||||
|
||||
// restore the function column vector
|
||||
fFunctionCols = origFunctionCols;
|
||||
fOrigFunctionCols = NULL;
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user