1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

MCOL-507 Further ExeMgr performance improvements

This does the following:

* Switch resource manager to a singleton which reduces the amount of
times the XML data is scanned and objects allocated.
* Make the I_S tables use the FE implementation of the system catalog
* Make the I_S.columnstore_columns table use the RID list cache
* Make the extentmap pre-allocate a vector instead of many small allocs
This commit is contained in:
Andrew Hutchings
2017-01-16 12:33:27 +00:00
parent 605f6c51e2
commit ffcfc41563
49 changed files with 197 additions and 178 deletions

View File

@ -197,9 +197,9 @@ TupleAggregateStep::TupleAggregateStep(
fIsMultiThread = (multiAgg || fAggregator->aggMapKeyLength() > 0);
// initialize multi-thread variables
fNumOfThreads = fRm.aggNumThreads();
fNumOfBuckets = fRm.aggNumBuckets();
fNumOfRowGroups = fRm.aggNumRowGroups();
fNumOfThreads = fRm->aggNumThreads();
fNumOfBuckets = fRm->aggNumBuckets();
fNumOfRowGroups = fRm->aggNumRowGroups();
fMemUsage.reset(new uint64_t[fNumOfThreads]);
memset(fMemUsage.get(), 0, fNumOfThreads * sizeof(uint64_t));
@ -211,7 +211,7 @@ TupleAggregateStep::TupleAggregateStep(
TupleAggregateStep::~TupleAggregateStep()
{
for (uint32_t i = 0; i < fNumOfThreads; i++)
fRm.returnMemory(fMemUsage[i], fSessionMemLimit);
fRm->returnMemory(fMemUsage[i], fSessionMemLimit);
for (uint32_t i = 0; i < fAgg_mutex.size(); i++)
delete fAgg_mutex[i];
}
@ -1311,7 +1311,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
posAgg.push_back(posAgg[i] + widthAgg[i]);
RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, scaleAgg, precisionAgg,
jobInfo.stringTableThreshold);
SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec, &jobInfo.rm, jobInfo.umMemLimit));
SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec, jobInfo.rm, jobInfo.umMemLimit));
rowgroups.push_back(aggRG);
aggregators.push_back(rowAgg);
@ -2111,14 +2111,14 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
posAgg.push_back(posAgg[i] + widthAgg[i]);
RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, scaleAgg, precisionAgg,
jobInfo.stringTableThreshold);
SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec1, &jobInfo.rm, jobInfo.umMemLimit));
SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec1, jobInfo.rm, jobInfo.umMemLimit));
posAggDist.push_back(2); // rid
for (uint64_t i = 0; i < oidsAggDist.size(); i++)
posAggDist.push_back(posAggDist[i] + widthAggDist[i]);
RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist, typeAggDist,
scaleAggDist, precisionAggDist, jobInfo.stringTableThreshold);
SP_ROWAGG_DIST rowAggDist(new RowAggregationDistinct(groupByNoDist, functionVec2, &jobInfo.rm, jobInfo.umMemLimit));
SP_ROWAGG_DIST rowAggDist(new RowAggregationDistinct(groupByNoDist, functionVec2, jobInfo.rm, jobInfo.umMemLimit));
// mapping the group_concat columns, if any.
if (jobInfo.groupConcatInfo.groupConcat().size() > 0)
@ -2133,7 +2133,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
if (jobInfo.distinctColVec.size() > 1)
{
RowAggregationMultiDistinct* multiDistinctAggregator =
new RowAggregationMultiDistinct(groupByNoDist, functionVec2, &jobInfo.rm, jobInfo.umMemLimit);
new RowAggregationMultiDistinct(groupByNoDist, functionVec2, jobInfo.rm, jobInfo.umMemLimit);
rowAggDist.reset(multiDistinctAggregator);
rowAggDist->groupConcat(jobInfo.groupConcatInfo.groupConcat());
@ -2244,7 +2244,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
// construct sub-aggregator
SP_ROWAGG_UM_t subAgg(
new RowAggregationSubDistinct(groupBySub, functionSub1, &jobInfo.rm, jobInfo.umMemLimit));
new RowAggregationSubDistinct(groupBySub, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
// add to rowAggDist
@ -2298,7 +2298,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
// construct sub-aggregator
SP_ROWAGG_UM_t subAgg(
new RowAggregationUM(groupBySubNoDist, functionSub1, &jobInfo.rm, jobInfo.umMemLimit));
new RowAggregationUM(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
// add to rowAggDist
@ -2926,7 +2926,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
posAggUm.push_back(posAggUm[i] + widthAggUm[i]);
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, scaleAggUm,
precisionAggUm, jobInfo.stringTableThreshold);
SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionVecUm, &jobInfo.rm, jobInfo.umMemLimit));
SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionVecUm, jobInfo.rm, jobInfo.umMemLimit));
rowgroups.push_back(aggRgUm);
aggregators.push_back(rowAggUm);
@ -3716,21 +3716,21 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
posAggUm.push_back(posAggUm[i] + widthAggUm[i]);
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, scaleAggUm,
precisionAggUm, jobInfo.stringTableThreshold);
SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionNoDistVec, &jobInfo.rm, jobInfo.umMemLimit));
SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionNoDistVec, jobInfo.rm, jobInfo.umMemLimit));
posAggDist.push_back(2); // rid
for (uint64_t i = 0; i < oidsAggDist.size(); i++)
posAggDist.push_back(posAggDist[i] + widthAggDist[i]);
RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist, typeAggDist,
scaleAggDist, precisionAggDist, jobInfo.stringTableThreshold);
SP_ROWAGG_DIST rowAggDist(new RowAggregationDistinct(groupByNoDist, functionVecUm, &jobInfo.rm, jobInfo.umMemLimit));
SP_ROWAGG_DIST rowAggDist(new RowAggregationDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit));
// if distinct key word applied to more than one aggregate column, reset rowAggDist
vector<RowGroup> subRgVec;
if (jobInfo.distinctColVec.size() > 1)
{
RowAggregationMultiDistinct* multiDistinctAggregator =
new RowAggregationMultiDistinct(groupByNoDist, functionVecUm, &jobInfo.rm, jobInfo.umMemLimit);
new RowAggregationMultiDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit);
rowAggDist.reset(multiDistinctAggregator);
// construct and add sub-aggregators to rowAggDist
@ -3840,7 +3840,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
}
// construct sub-aggregator
SP_ROWAGG_UM_t subAgg(new RowAggregationSubDistinct(groupBySub, functionSub1, &jobInfo.rm, jobInfo.umMemLimit));
SP_ROWAGG_UM_t subAgg(new RowAggregationSubDistinct(groupBySub, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
// add to rowAggDist
multiDistinctAggregator->addSubAggregator(subAgg, subRg, functionSub2);
@ -3892,7 +3892,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// construct sub-aggregator
SP_ROWAGG_UM_t subAgg(
new RowAggregationUMP2(groupBySubNoDist, functionSub1, &jobInfo.rm, jobInfo.umMemLimit));
new RowAggregationUMP2(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
// add to rowAggDist
multiDistinctAggregator->addSubAggregator(subAgg, aggRgUm, functionSub2);
@ -4217,7 +4217,7 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
fRowGroupIns[threadID].setData(&rgData);
fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings();
if (!fRm.getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit))
if (!fRm->getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit))
{
rgDatas.clear(); // to short-cut the rest of processing
abort();
@ -4335,7 +4335,7 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
usleep(1000); // avoid using all CPU during busy wait
}
rgDatas.clear();
fRm.returnMemory(fMemUsage[threadID], fSessionMemLimit);
fRm->returnMemory(fMemUsage[threadID], fSessionMemLimit);
fMemUsage[threadID] = 0;
if (cancelled())