diff --git a/dbcon/joblist/orderby-tests.cpp b/dbcon/joblist/orderby-tests.cpp index e9e552881..fa98f0772 100644 --- a/dbcon/joblist/orderby-tests.cpp +++ b/dbcon/joblist/orderby-tests.cpp @@ -315,7 +315,7 @@ private: void ORDERBY_TIME_TEST() { uint64_t numRows = 8192; - uint64_t maxThreads = 16; + uint64_t maxThreads = 8; // limit == 100000 is still twice as good to sort in parallel // limit == 1000000 however is better to sort using single threaded sorting uint64_t limit = 100000; @@ -324,9 +324,10 @@ private: bool generateRandValues = true; bool hasDistinct = true; bool noDistinct = false; - //orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, noDistinct); - //orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, hasDistinct); + orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, noDistinct); orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, noDistinct); + orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, hasDistinct); + orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, hasDistinct); } void QUICK_TEST() { diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index 4581ec119..3f9e8a7b4 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -138,7 +138,7 @@ TupleAnnexStep::~TupleAnnexStep() { if(fOrderByList.size() > 0) { - for(uint64_t id = 0; id <= fMaxThreads; id++) + for(uint64_t id = 0; id < fOrderByList.size(); id++) { delete fOrderByList[id]; } @@ -704,7 +704,8 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() fRowGroupOut.getRow(0, &fRowOut); ordering::SortingPQ finalPQ; scoped_ptr distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this))); - + fRowGroupIn.initRow(&row1); + fRowGroupIn.initRow(&row2); try { @@ -744,156 +745,120 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() ERR_IN_PROCESS, fErrorInfo, fSessionId); } + // OFFSET processing + while (finalPQ.size() && offset < fLimitStart) + { + offset++; + finalPQ.pop(); + } - try + // Calculate rowSize only once + if (finalPQ.size()) { - // OFFSET processing - while (finalPQ.size() && offset < fLimitStart) + ordering::OrderByRow& topOBRow = + const_cast(finalPQ.top()); + fRowIn.setData(topOBRow.fData); + if (!fConstant) { - offset++; - finalPQ.pop(); - } - } - catch (const std::exception& ex) - { - catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); - } - catch (...) - { - catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\ - caught an unknown exception 2", - ERR_IN_PROCESS, fErrorInfo, fSessionId); + copyRow(fRowIn, &fRowOut); + } + else + { + fConstant->fillInConstants(fRowIn, fRowOut); + } + rowSize = fRowOut.getSize(); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + finalPQ.pop(); + count++; } - try + if (!fConstant) { - // Calculate rowSize only once - if (finalPQ.size()) + while(finalPQ.size()) { - ordering::OrderByRow& topOBRow = - const_cast(finalPQ.top()); - fRowIn.setData(topOBRow.fData); - if (!fConstant) + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); copyRow(fRowIn, &fRowOut); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + + finalPQ.pop(); + count++; + if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) + { + break; + } + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } else + { + break; + } + } // end of limit bound while loop + } + else // Add ConstantColumns striped earlier + { + while(finalPQ.size()) + { + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); fConstant->fillInConstants(fRowIn, fRowOut); - rowSize = fRowOut.getSize(); - fRowGroupOut.incRowCount(); - fRowOut.nextRow(rowSize); - finalPQ.pop(); - count++; - } - } - catch (const std::exception& ex) - { - catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); - } - catch (...) - { - catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\ - caught an unknown exception 3", - ERR_IN_PROCESS, fErrorInfo, fSessionId); - } + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); - try - { - if (!fConstant) - { - while(finalPQ.size()) + finalPQ.pop(); + count++; + if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) + { + break; + } + } + + if (fRowGroupOut.getRowCount() > 0) { - if (cancelled()) - { - break; - } - - while (count < fLimitCount && finalPQ.size() - && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) - { - ordering::OrderByRow &topOBRow = - const_cast(finalPQ.top()); - - fRowIn.setData(topOBRow.fData); - copyRow(fRowIn, &fRowOut); - fRowGroupOut.incRowCount(); - fRowOut.nextRow(rowSize); - - finalPQ.pop(); - count++; - if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) - { - break; - } - } - - if (fRowGroupOut.getRowCount() > 0) - { - fRowsReturned += fRowGroupOut.getRowCount(); - fOutputDL->insert(rgDataOut); - rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize); - fRowGroupOut.setData(&rgDataOut); - fRowGroupOut.resetRowGroup(0); - fRowGroupOut.getRow(0, &fRowOut); - } - else - { - break; - } - } // end of limit bound while loop - } - else // Add ConstantColumns striped earlier - { - while(finalPQ.size()) + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } + else { - if (cancelled()) - { break; - } - - while (count < fLimitCount && finalPQ.size() - && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) - { - ordering::OrderByRow &topOBRow = - const_cast(finalPQ.top()); - - fRowIn.setData(topOBRow.fData); - fConstant->fillInConstants(fRowIn, fRowOut); - fRowGroupOut.incRowCount(); - fRowOut.nextRow(rowSize); - - finalPQ.pop(); - count++; - if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) - { - break; - } - } - - if (fRowGroupOut.getRowCount() > 0) - { - fRowsReturned += fRowGroupOut.getRowCount(); - fOutputDL->insert(rgDataOut); - rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); - fRowGroupOut.setData(&rgDataOut); - fRowGroupOut.resetRowGroup(0); - fRowGroupOut.getRow(0, &fRowOut); - } - else - { - break; - } - } // end of limit bound while loop - } // end of if-else - } - catch (const std::exception& ex) - { - catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); - } - catch (...) - { - catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\ - caught an unknown exception 4", - ERR_IN_PROCESS, fErrorInfo, fSessionId); - } + } + } // end of limit bound while loop + } // end of if-else if (fRowGroupOut.getRowCount() > 0) { @@ -982,152 +947,116 @@ void TupleAnnexStep::finalizeParallelOrderBy() ERR_IN_PROCESS, fErrorInfo, fSessionId); } + // OFFSET processing + while (finalPQ.size() && offset < fLimitStart) + { + offset++; + finalPQ.pop(); + } - try + // Calculate rowSize only once + if (finalPQ.size()) { - // OFFSET processing - while (finalPQ.size() && offset < fLimitStart) + ordering::OrderByRow& topOBRow = + const_cast(finalPQ.top()); + fRowIn.setData(topOBRow.fData); + if (!fConstant) { - offset++; - finalPQ.pop(); - } - } - catch (const std::exception& ex) - { - catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); - } - catch (...) - { - catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\ - caught an unknown exception 2", - ERR_IN_PROCESS, fErrorInfo, fSessionId); + copyRow(fRowIn, &fRowOut); + } + else + { + fConstant->fillInConstants(fRowIn, fRowOut); + } + rowSize = fRowOut.getSize(); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + finalPQ.pop(); + count++; } - try + if (!fConstant) { - // Calculate rowSize only once - if (finalPQ.size()) + while(finalPQ.size()) { - ordering::OrderByRow& topOBRow = - const_cast(finalPQ.top()); - fRowIn.setData(topOBRow.fData); - if (!fConstant) + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); copyRow(fRowIn, &fRowOut); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + + finalPQ.pop(); + count++; + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } else + { + break; + } + } // end of limit bound while loop + } + else // Add ConstantColumns striped earlier + { + while(finalPQ.size()) + { + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); fConstant->fillInConstants(fRowIn, fRowOut); - rowSize = fRowOut.getSize(); - fRowGroupOut.incRowCount(); - fRowOut.nextRow(rowSize); - finalPQ.pop(); - count++; - } - } - catch (const std::exception& ex) - { - catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); - } - catch (...) - { - catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\ - caught an unknown exception 3", - ERR_IN_PROCESS, fErrorInfo, fSessionId); - } + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); - try - { - if (!fConstant) - { - while(finalPQ.size()) + finalPQ.pop(); + count++; + if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) + { + break; + } + } + + if (fRowGroupOut.getRowCount() > 0) { - if (cancelled()) - { - break; - } - - while (count < fLimitCount && finalPQ.size() - && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) - { - ordering::OrderByRow &topOBRow = - const_cast(finalPQ.top()); - - fRowIn.setData(topOBRow.fData); - copyRow(fRowIn, &fRowOut); - fRowGroupOut.incRowCount(); - fRowOut.nextRow(rowSize); - - finalPQ.pop(); - count++; - } - - if (fRowGroupOut.getRowCount() > 0) - { - fRowsReturned += fRowGroupOut.getRowCount(); - fOutputDL->insert(rgDataOut); - rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize); - fRowGroupOut.setData(&rgDataOut); - fRowGroupOut.resetRowGroup(0); - fRowGroupOut.getRow(0, &fRowOut); - } - else - { - break; - } - } // end of limit bound while loop - } - else // Add ConstantColumns striped earlier - { - while(finalPQ.size()) + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } + else { - if (cancelled()) - { - break; - } - - while (count < fLimitCount && finalPQ.size() - && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) - { - ordering::OrderByRow &topOBRow = - const_cast(finalPQ.top()); - - fRowIn.setData(topOBRow.fData); - fConstant->fillInConstants(fRowIn, fRowOut); - fRowGroupOut.incRowCount(); - fRowOut.nextRow(rowSize); - - finalPQ.pop(); - count++; - if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) - { - break; - } - } - - if (fRowGroupOut.getRowCount() > 0) - { - fRowsReturned += fRowGroupOut.getRowCount(); - fOutputDL->insert(rgDataOut); - rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); - fRowGroupOut.setData(&rgDataOut); - fRowGroupOut.resetRowGroup(0); - fRowGroupOut.getRow(0, &fRowOut); - } - else - { - break; - } - } // end of limit bound while loop - } // end of if-else - } - catch (const std::exception& ex) - { - catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); - } - catch (...) - { - catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\ - caught an unknown exception 4", - ERR_IN_PROCESS, fErrorInfo, fSessionId); - } + break; + } + } // end of limit bound while loop + } // end of if-else if (fRowGroupOut.getRowCount() > 0) { @@ -1200,7 +1129,10 @@ void TupleAnnexStep::executeParallelOrderBy(uint64_t id) rg.setData(&rgDataIn); rg.getRow(0, &r); - rowSize = r.getSize(); + if (!rowSize) + { + rowSize = r.getSize(); + } rowCount = rg.getRowCount(); for (uint64_t i = 0; i < rowCount; ++i) diff --git a/dbcon/joblist/tupleannexstep.h b/dbcon/joblist/tupleannexstep.h index 76cac326d..f9efd459b 100644 --- a/dbcon/joblist/tupleannexstep.h +++ b/dbcon/joblist/tupleannexstep.h @@ -28,13 +28,6 @@ #include "jobstep.h" #include "limitedorderby.h" -// forward reference -namespace fucexp -{ -class FuncExp; -} - - namespace joblist { class TupleConstantStep; diff --git a/dbcon/joblist/tupleconstantstep.cpp b/dbcon/joblist/tupleconstantstep.cpp index 4ffb811e7..3e2b61e39 100644 --- a/dbcon/joblist/tupleconstantstep.cpp +++ b/dbcon/joblist/tupleconstantstep.cpp @@ -472,7 +472,7 @@ void TupleConstantStep::execute() fOutputDL->endOfInput(); } - +// *DRRTUY Copy row at once not one field at a time void TupleConstantStep::fillInConstants() { fRowGroupIn.getRow(0, &fRowIn); @@ -495,7 +495,6 @@ void TupleConstantStep::fillInConstants() } else // only first column is constant { - //size_t n = fRowOut.getOffset(fRowOut.getColumnCount()) - fRowOut.getOffset(1); for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i) { fRowOut.setRid(fRowIn.getRelRid()); diff --git a/utils/windowfunction/idborderby.cpp b/utils/windowfunction/idborderby.cpp index d1fdd2801..b2e70a6c3 100644 --- a/utils/windowfunction/idborderby.cpp +++ b/utils/windowfunction/idborderby.cpp @@ -468,37 +468,6 @@ int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) } else { - if (v1 > v2) - ret = fSpec.fAsc; - else if (v1 < v2) - ret = -fSpec.fAsc; - } - - return ret; -} - -int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) -{ - l->row1().setData(r1); - l->row2().setData(r2); - - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); - - int ret = 0; - - if (b1 == true || b2 == true) - { - if (b1 == false && b2 == true) - ret = fSpec.fNf; - else if (b1 == true && b2 == false) - ret = -fSpec.fNf; - } - else - { - int64_t v1 = l->row1().getIntField(fSpec.fIndex); - int64_t v2 = l->row2().getIntField(fSpec.fIndex); - // ((int64_t) -00:00:26) > ((int64_t) -00:00:25) // i.e. For 2 negative TIME values, we invert the order of // comparison operations to force "-00:00:26" to appear before @@ -684,13 +653,6 @@ void CompareRule::compileRules(const std::vector& spec, const rowgr break; } - case CalpontSystemCatalog::TIME: - { - Compare* c = new TimeCompare(*i); - fCompares.push_back(c); - break; - } - default: { break; diff --git a/utils/windowfunction/idborderby.h b/utils/windowfunction/idborderby.h index b4fd7c441..e26ba1aa3 100644 --- a/utils/windowfunction/idborderby.h +++ b/utils/windowfunction/idborderby.h @@ -51,8 +51,8 @@ class ResourceManager; namespace ordering { -template, - typename _Compare = less > +template, + typename _Compare = std::less > class reservablePQ: private std::priority_queue<_Tp, _Sequence, _Compare> { public: @@ -132,6 +132,7 @@ public: void revertSortSpec() { fSpec.fAsc = -fSpec.fAsc; + fSpec.fNf = -fSpec.fNf; } protected: @@ -289,15 +290,6 @@ public: // End of comparators for variable sized types -class TimeCompare : public Compare -{ -public: - TimeCompare(const IdbSortSpec& spec) : Compare(spec) {} - - int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); -}; - - class CompareRule { public: