diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index b3ad0c209..3f06ed1c5 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -1478,7 +1478,7 @@ void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector& functionVec[i]->fAuxColumnIndex = lastCol; - // sum(x) + // mean(x) oidsAgg.push_back(oidsProj[j]); keysAgg.push_back(keysProj[j]); scaleAgg.push_back(0); @@ -1488,7 +1488,7 @@ void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector& widthAgg.push_back(sizeof(long double)); ++lastCol; - // sum(x**2) + // sum(x_i - mean)^2 oidsAgg.push_back(oidsProj[j]); keysAgg.push_back(keysProj[j]); scaleAgg.push_back(0); @@ -1910,7 +1910,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vectorfAuxColumnIndex = ++colAgg; - // sum(x) + // mean(x) oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); scaleAgg.push_back(0); @@ -1920,7 +1920,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vectorfAuxColumnIndex = lastCol; - // sum(x) + // mean(x) oidsAggDist.push_back(oidsAgg[j]); keysAggDist.push_back(keysAgg[j]); scaleAggDist.push_back(0); @@ -2591,7 +2591,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector widthAggPm.push_back(sizeof(double)); funct->fAuxColumnIndex = ++colAggPm; - // sum(x) + // mean(x) oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); @@ -3253,7 +3253,7 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector widthAggPm.push_back(sizeof(long double)); ++colAggPm; - // sum(x**2) + // sum(x_i - mean)^2 oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); @@ -3701,7 +3701,7 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector functionVecUm[i]->fAuxColumnIndex = lastCol; - // sum(x) + // mean(x) oidsAggUm.push_back(oidsAggPm[j]); keysAggUm.push_back(keysAggPm[j]); scaleAggUm.push_back(0); @@ -3711,7 +3711,7 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector widthAggUm.push_back(sizeof(long double)); ++lastCol; - // sum(x**2) + // sum(x_i - mean)^2 oidsAggUm.push_back(oidsAggPm[j]); keysAggUm.push_back(keysAggPm[j]); scaleAggUm.push_back(0); @@ -4152,7 +4152,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(JobInfo& jobInfo, vectorfAuxColumnIndex = ++colAggPm; - // sum(x) + // mean(x) oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); @@ -4162,7 +4162,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(JobInfo& jobInfo, vectorfAuxColumnIndex = lastCol; - // sum(x) + // mean(x) oidsAggDist.push_back(oidsAggPm[j]); keysAggDist.push_back(keysAggPm[j]); scaleAggDist.push_back(0); @@ -4818,7 +4818,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(JobInfo& jobInfo, vector 1 { - long double sum1 = fRow.getLongDoubleField(colAux); - long double sum2 = fRow.getLongDoubleField(colAux + 1); + long double M2 = fRow.getLongDoubleField(colAux + 1); uint32_t scale = fRow.getScale(colOut); auto factor = datatypes::scaleDivisor(scale); if (scale != 0) // adjust the scale if necessary { - sum1 /= factor; - sum2 /= factor * factor; + M2 /= factor * factor; } - long double stat = sum1 * sum1 / cnt; - stat = sum2 - stat; - if (fFunctionCols[i]->fStatsFunction == ROWAGG_STDDEV_POP) - stat = sqrt(stat / cnt); + M2 = sqrt(M2 / cnt); else if (fFunctionCols[i]->fStatsFunction == ROWAGG_STDDEV_SAMP) - stat = sqrt(stat / (cnt - 1)); + M2 = sqrt(M2 / (cnt - 1)); else if (fFunctionCols[i]->fStatsFunction == ROWAGG_VAR_POP) - stat = stat / cnt; + M2 = M2 / cnt; else if (fFunctionCols[i]->fStatsFunction == ROWAGG_VAR_SAMP) - stat = stat / (cnt - 1); + M2 = M2 / (cnt - 1); - fRow.setDoubleField(stat, colOut); + fRow.setDoubleField(M2, colOut); } } } @@ -4281,18 +4284,39 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, // Update the sum and count fields for stattistics if input is not null. // rowIn(in) - Row to be included in aggregation. // colIn(in) - column in the input row group stores the count/logical block -// colIn + 1 - column in the input row group stores the sum(x)/logical block -// colIn + 2 - column in the input row group stores the sum(x**2)/logical block +// colIn + 1 - column in the input row group stores the mean(x)/logical block +// colIn + 2 - column in the input row group stores the sum(x_i - mean)^2/logical block // colOut(in) - column in the output row group stores the count -// colAux(in) - column in the output row group stores the sum(x) -// colAux + 1 - column in the output row group stores the sum(x**2) +// colAux(in) - column in the output row group stores the mean(x) +// colAux + 1 - column in the output row group stores the sum(x_i - mean)^2 //------------------------------------------------------------------------------ void RowAggregationUMP2::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux) { - fRow.setDoubleField(fRow.getDoubleField(colOut) + rowIn.getDoubleField(colIn), colOut); - fRow.setLongDoubleField(fRow.getLongDoubleField(colAux) + rowIn.getLongDoubleField(colIn + 1), colAux); - fRow.setLongDoubleField(fRow.getLongDoubleField(colAux + 1) + rowIn.getLongDoubleField(colIn + 2), - colAux + 1); + double count = fRow.getDoubleField(colOut); + long double mean = fRow.getLongDoubleField(colAux); + long double M2 = fRow.getLongDoubleField(colAux + 1); + + double block_count = rowIn.getDoubleField(colIn); + long double block_mean = rowIn.getLongDoubleField(colIn + 1); + long double block_M2 = rowIn.getLongDoubleField(colIn + 2); + + double next_count = count + block_count; + long double next_mean; + long double next_M2; + if (next_count == 0) + { + next_mean = 0; + next_M2 = 0; + } + else + { + volatile long double delta = mean - block_mean; + next_mean = (mean * count + block_mean * block_count) / next_count; + next_M2 = M2 + block_M2 + delta * delta * (count * block_count / next_count); + } + fRow.setDoubleField(next_count, colOut); + fRow.setLongDoubleField(next_mean, colAux); + fRow.setLongDoubleField(next_M2, colAux + 1); } //------------------------------------------------------------------------------