1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Merge branch 'develop-1.2' into develop-merge-up-20190425

This commit is contained in:
Andrew Hutchings
2019-04-25 10:27:59 +01:00
37 changed files with 508 additions and 320 deletions

View File

@ -218,7 +218,6 @@ inline string getStringNullValue()
namespace rowgroup
{
const std::string typeStr("");
const static_any::any& RowAggregation::charTypeId((char)1);
const static_any::any& RowAggregation::scharTypeId((signed char)1);
@ -590,7 +589,8 @@ inline bool RowAggregation::isNull(const RowGroup* pRowGroup, const Row& row, in
RowAggregation::RowAggregation() :
fAggMapPtr(NULL), fRowGroupOut(NULL),
fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE),
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0)
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0),
fOrigFunctionCols(NULL)
{
}
@ -599,7 +599,8 @@ RowAggregation::RowAggregation(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCol
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols) :
fAggMapPtr(NULL), fRowGroupOut(NULL),
fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE),
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0)
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0),
fOrigFunctionCols(NULL)
{
fGroupByCols.assign(rowAggGroupByCols.begin(), rowAggGroupByCols.end());
fFunctionCols.assign(rowAggFunctionCols.begin(), rowAggFunctionCols.end());
@ -610,7 +611,7 @@ RowAggregation::RowAggregation(const RowAggregation& rhs):
fAggMapPtr(NULL), fRowGroupOut(NULL),
fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE),
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0),
fRGContext(rhs.fRGContext)
fRGContext(rhs.fRGContext), fOrigFunctionCols(NULL)
{
//fGroupByCols.clear();
//fFunctionCols.clear();
@ -714,11 +715,8 @@ void RowAggregation::setJoinRowGroups(vector<RowGroup>* pSmallSideRG, RowGroup*
// threads on the PM and by multple threads on the UM. It must remain
// thread safe.
//------------------------------------------------------------------------------
void RowAggregation::resetUDAF(uint64_t funcColID)
void RowAggregation::resetUDAF(RowUDAFFunctionCol* rowUDAF)
{
// Get the UDAF class pointer and store in the row definition object.
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[funcColID].get());
// RowAggregation and it's functions need to be re-entrant which means
// each instance (thread) needs its own copy of the context object.
// Note: operator=() doesn't copy userData.
@ -786,7 +784,7 @@ void RowAggregation::initialize()
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(i);
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
}
}
}
@ -838,7 +836,7 @@ void RowAggregation::aggReset()
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(i);
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
}
}
}
@ -885,14 +883,28 @@ void RowAggregationUM::aggregateRowWithRemap(Row& row)
inserted.first->second = RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1);
// If there's UDAF involved, reset the user data.
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
if (fOrigFunctionCols)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
// This is a multi-distinct query and fFunctionCols may not
// contain all the UDAF we need to reset
for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++)
{
resetUDAF(i);
if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get()));
}
}
}
else
{
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
}
}
}
// replace the key value with an equivalent copy, yes this is OK
const_cast<RowPosition&>((inserted.first->first)) = pos;
}
@ -946,14 +958,28 @@ void RowAggregation::aggregateRow(Row& row)
RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1);
// If there's UDAF involved, reset the user data.
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
if (fOrigFunctionCols)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
// This is a multi-distinct query and fFunctionCols may not
// contain all the UDAF we need to reset
for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++)
{
resetUDAF(i);
if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get()));
}
}
}
else
{
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
}
}
}
}
else
{
@ -4699,7 +4725,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation()
{
// backup the function column vector for finalize().
vector<SP_ROWAGG_FUNC_t> origFunctionCols = fFunctionCols;
fOrigFunctionCols = &origFunctionCols;
// aggregate data from each sub-aggregator to distinct aggregator
for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
{
@ -4727,6 +4753,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation()
// restore the function column vector
fFunctionCols = origFunctionCols;
fOrigFunctionCols = NULL;
}
@ -4734,7 +4761,8 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(vector<vector<Row
{
// backup the function column vector for finalize().
vector<SP_ROWAGG_FUNC_t> origFunctionCols = fFunctionCols;
fOrigFunctionCols = &origFunctionCols;
// aggregate data from each sub-aggregator to distinct aggregator
for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
{
@ -4751,9 +4779,9 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(vector<vector<Row
inRows[i].clear();
}
// restore the function column vector
fFunctionCols = origFunctionCols;
fOrigFunctionCols = NULL;
}