1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-27 21:01:50 +03:00

MCOL-894 Upmerged post review changes.

Raised the default for orderby threads from 4 to 16.
    Removed if LIMIT check block in makeVtableModeSteps().
    Removed a duplicate of TimeCompare class and methods.

MCOL-3536 Upmerged the change.

MCOL-894 Post review changes.
    Uncomment if LIMIT check block in makeVtableModeSteps().

    TupleAnnexStep dtor now uses vector::size() as a boundary.

    Removed useless try-catch blocks.

    executeParallelOrderBy() now calculates rowSize only once.

    Removed forward declaration in the unexisting namespace.

    Trim UTs a bit.
This commit is contained in:
Roman Nozdrin
2019-10-30 17:00:51 +03:00
parent 7b5e5f0eb6
commit 0696696cf6
6 changed files with 212 additions and 333 deletions

View File

@ -315,7 +315,7 @@ private:
void ORDERBY_TIME_TEST()
{
uint64_t numRows = 8192;
uint64_t maxThreads = 16;
uint64_t maxThreads = 8;
// limit == 100000 is still twice as good to sort in parallel
// limit == 1000000 however is better to sort using single threaded sorting
uint64_t limit = 100000;
@ -324,9 +324,10 @@ private:
bool generateRandValues = true;
bool hasDistinct = true;
bool noDistinct = false;
//orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, noDistinct);
//orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, hasDistinct);
orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, noDistinct);
orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, noDistinct);
orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, hasDistinct);
orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, hasDistinct);
}
void QUICK_TEST()
{

View File

@ -138,7 +138,7 @@ TupleAnnexStep::~TupleAnnexStep()
{
if(fOrderByList.size() > 0)
{
for(uint64_t id = 0; id <= fMaxThreads; id++)
for(uint64_t id = 0; id < fOrderByList.size(); id++)
{
delete fOrderByList[id];
}
@ -704,7 +704,8 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct()
fRowGroupOut.getRow(0, &fRowOut);
ordering::SortingPQ finalPQ;
scoped_ptr<DistinctMap_t> distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this)));
fRowGroupIn.initRow(&row1);
fRowGroupIn.initRow(&row2);
try
{
@ -744,156 +745,120 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct()
ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
// OFFSET processing
while (finalPQ.size() && offset < fLimitStart)
{
offset++;
finalPQ.pop();
}
try
// Calculate rowSize only once
if (finalPQ.size())
{
// OFFSET processing
while (finalPQ.size() && offset < fLimitStart)
ordering::OrderByRow& topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
if (!fConstant)
{
offset++;
finalPQ.pop();
}
}
catch (const std::exception& ex)
{
catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
catch (...)
{
catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\
caught an unknown exception 2",
ERR_IN_PROCESS, fErrorInfo, fSessionId);
copyRow(fRowIn, &fRowOut);
}
else
{
fConstant->fillInConstants(fRowIn, fRowOut);
}
rowSize = fRowOut.getSize();
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
}
try
if (!fConstant)
{
// Calculate rowSize only once
if (finalPQ.size())
while(finalPQ.size())
{
ordering::OrderByRow& topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
if (!fConstant)
if (cancelled())
{
break;
}
while (count < fLimitCount && finalPQ.size()
&& fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
copyRow(fRowIn, &fRowOut);
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
{
break;
}
}
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.getRow(0, &fRowOut);
}
else
{
break;
}
} // end of limit bound while loop
}
else // Add ConstantColumns striped earlier
{
while(finalPQ.size())
{
if (cancelled())
{
break;
}
while (count < fLimitCount && finalPQ.size()
&& fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
fConstant->fillInConstants(fRowIn, fRowOut);
rowSize = fRowOut.getSize();
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
}
}
catch (const std::exception& ex)
{
catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
catch (...)
{
catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\
caught an unknown exception 3",
ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
try
{
if (!fConstant)
{
while(finalPQ.size())
finalPQ.pop();
count++;
if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
{
break;
}
}
if (fRowGroupOut.getRowCount() > 0)
{
if (cancelled())
{
break;
}
while (count < fLimitCount && finalPQ.size()
&& fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
copyRow(fRowIn, &fRowOut);
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
{
break;
}
}
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.getRow(0, &fRowOut);
}
else
{
break;
}
} // end of limit bound while loop
}
else // Add ConstantColumns striped earlier
{
while(finalPQ.size())
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.getRow(0, &fRowOut);
}
else
{
if (cancelled())
{
break;
}
while (count < fLimitCount && finalPQ.size()
&& fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
fConstant->fillInConstants(fRowIn, fRowOut);
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
{
break;
}
}
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.getRow(0, &fRowOut);
}
else
{
break;
}
} // end of limit bound while loop
} // end of if-else
}
catch (const std::exception& ex)
{
catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
catch (...)
{
catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\
caught an unknown exception 4",
ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
}
} // end of limit bound while loop
} // end of if-else
if (fRowGroupOut.getRowCount() > 0)
{
@ -982,152 +947,116 @@ void TupleAnnexStep::finalizeParallelOrderBy()
ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
// OFFSET processing
while (finalPQ.size() && offset < fLimitStart)
{
offset++;
finalPQ.pop();
}
try
// Calculate rowSize only once
if (finalPQ.size())
{
// OFFSET processing
while (finalPQ.size() && offset < fLimitStart)
ordering::OrderByRow& topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
if (!fConstant)
{
offset++;
finalPQ.pop();
}
}
catch (const std::exception& ex)
{
catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
catch (...)
{
catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\
caught an unknown exception 2",
ERR_IN_PROCESS, fErrorInfo, fSessionId);
copyRow(fRowIn, &fRowOut);
}
else
{
fConstant->fillInConstants(fRowIn, fRowOut);
}
rowSize = fRowOut.getSize();
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
}
try
if (!fConstant)
{
// Calculate rowSize only once
if (finalPQ.size())
while(finalPQ.size())
{
ordering::OrderByRow& topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
if (!fConstant)
if (cancelled())
{
break;
}
while (count < fLimitCount && finalPQ.size()
&& fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
copyRow(fRowIn, &fRowOut);
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
}
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.getRow(0, &fRowOut);
}
else
{
break;
}
} // end of limit bound while loop
}
else // Add ConstantColumns striped earlier
{
while(finalPQ.size())
{
if (cancelled())
{
break;
}
while (count < fLimitCount && finalPQ.size()
&& fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
fConstant->fillInConstants(fRowIn, fRowOut);
rowSize = fRowOut.getSize();
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
}
}
catch (const std::exception& ex)
{
catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
catch (...)
{
catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\
caught an unknown exception 3",
ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
try
{
if (!fConstant)
{
while(finalPQ.size())
finalPQ.pop();
count++;
if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
{
break;
}
}
if (fRowGroupOut.getRowCount() > 0)
{
if (cancelled())
{
break;
}
while (count < fLimitCount && finalPQ.size()
&& fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
copyRow(fRowIn, &fRowOut);
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
}
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.getRow(0, &fRowOut);
}
else
{
break;
}
} // end of limit bound while loop
}
else // Add ConstantColumns striped earlier
{
while(finalPQ.size())
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.getRow(0, &fRowOut);
}
else
{
if (cancelled())
{
break;
}
while (count < fLimitCount && finalPQ.size()
&& fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
fConstant->fillInConstants(fRowIn, fRowOut);
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
{
break;
}
}
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.getRow(0, &fRowOut);
}
else
{
break;
}
} // end of limit bound while loop
} // end of if-else
}
catch (const std::exception& ex)
{
catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
catch (...)
{
catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\
caught an unknown exception 4",
ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
break;
}
} // end of limit bound while loop
} // end of if-else
if (fRowGroupOut.getRowCount() > 0)
{
@ -1200,7 +1129,10 @@ void TupleAnnexStep::executeParallelOrderBy(uint64_t id)
rg.setData(&rgDataIn);
rg.getRow(0, &r);
rowSize = r.getSize();
if (!rowSize)
{
rowSize = r.getSize();
}
rowCount = rg.getRowCount();
for (uint64_t i = 0; i < rowCount; ++i)

View File

@ -28,13 +28,6 @@
#include "jobstep.h"
#include "limitedorderby.h"
// forward reference
namespace fucexp
{
class FuncExp;
}
namespace joblist
{
class TupleConstantStep;

View File

@ -472,7 +472,7 @@ void TupleConstantStep::execute()
fOutputDL->endOfInput();
}
// *DRRTUY Copy row at once not one field at a time
void TupleConstantStep::fillInConstants()
{
fRowGroupIn.getRow(0, &fRowIn);
@ -495,7 +495,6 @@ void TupleConstantStep::fillInConstants()
}
else // only first column is constant
{
//size_t n = fRowOut.getOffset(fRowOut.getColumnCount()) - fRowOut.getOffset(1);
for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
{
fRowOut.setRid(fRowIn.getRelRid());

View File

@ -468,37 +468,6 @@ int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
bool b1 = l->row1().isNullValue(fSpec.fIndex);
bool b2 = l->row2().isNullValue(fSpec.fIndex);
int ret = 0;
if (b1 == true || b2 == true)
{
if (b1 == false && b2 == true)
ret = fSpec.fNf;
else if (b1 == true && b2 == false)
ret = -fSpec.fNf;
}
else
{
int64_t v1 = l->row1().getIntField(fSpec.fIndex);
int64_t v2 = l->row2().getIntField(fSpec.fIndex);
// ((int64_t) -00:00:26) > ((int64_t) -00:00:25)
// i.e. For 2 negative TIME values, we invert the order of
// comparison operations to force "-00:00:26" to appear before
@ -684,13 +653,6 @@ void CompareRule::compileRules(const std::vector<IdbSortSpec>& spec, const rowgr
break;
}
case CalpontSystemCatalog::TIME:
{
Compare* c = new TimeCompare(*i);
fCompares.push_back(c);
break;
}
default:
{
break;

View File

@ -51,8 +51,8 @@ class ResourceManager;
namespace ordering
{
template<typename _Tp, typename _Sequence = vector<_Tp>,
typename _Compare = less<typename _Sequence::value_type> >
template<typename _Tp, typename _Sequence = std::vector<_Tp>,
typename _Compare = std::less<typename _Sequence::value_type> >
class reservablePQ: private std::priority_queue<_Tp, _Sequence, _Compare>
{
public:
@ -132,6 +132,7 @@ public:
void revertSortSpec()
{
fSpec.fAsc = -fSpec.fAsc;
fSpec.fNf = -fSpec.fNf;
}
protected:
@ -289,15 +290,6 @@ public:
// End of comparators for variable sized types
class TimeCompare : public Compare
{
public:
TimeCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class CompareRule
{
public: