diff --git a/dbcon/execplan/calpontselectexecutionplan.cpp b/dbcon/execplan/calpontselectexecutionplan.cpp index fd12b8179..186b64930 100644 --- a/dbcon/execplan/calpontselectexecutionplan.cpp +++ b/dbcon/execplan/calpontselectexecutionplan.cpp @@ -82,6 +82,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(const int location): fQueryType(SELECT), fPriority(querystats::DEFAULT_USER_PRIORITY_LEVEL), fStringTableThreshold(20), + fOrderByThreads(1), fDJSSmallSideLimit(0), fDJSLargeSideLimit(0), fDJSPartitionSize(100 * 1024 * 1024), // 100MB mem usage for disk based join, @@ -125,6 +126,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan( fQueryType(SELECT), fPriority(querystats::DEFAULT_USER_PRIORITY_LEVEL), fStringTableThreshold(20), + fOrderByThreads(1), fDJSSmallSideLimit(0), fDJSLargeSideLimit(0), fDJSPartitionSize(100 * 1024 * 1024), // 100MB mem usage for disk based join @@ -151,6 +153,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan (string data) : fQueryType(SELECT), fPriority(querystats::DEFAULT_USER_PRIORITY_LEVEL), fStringTableThreshold(20), + fOrderByThreads(1), fDJSSmallSideLimit(0), fDJSLargeSideLimit(0), fDJSPartitionSize(100 * 1024 * 1024), // 100MB mem usage for disk based join @@ -479,6 +482,7 @@ void CalpontSelectExecutionPlan::serialize(messageqcpp::ByteStream& b) const b << (uint64_t)fLimitNum; b << static_cast(fHasOrderBy); b << static_cast(fSpecHandlerProcessed); + b << reinterpret_cast(fOrderByThreads); b << static_cast(fSelectSubList.size()); @@ -648,6 +652,7 @@ void CalpontSelectExecutionPlan::unserialize(messageqcpp::ByteStream& b) b >> (uint64_t&)fLimitNum; b >> reinterpret_cast< ByteStream::byte&>(fHasOrderBy); b >> reinterpret_cast< ByteStream::byte&>(fSpecHandlerProcessed); + b >> reinterpret_cast(fOrderByThreads); // for SELECT subquery b >> size; diff --git a/dbcon/execplan/calpontselectexecutionplan.h b/dbcon/execplan/calpontselectexecutionplan.h index e21cd96bb..cf31b0711 100644 --- a/dbcon/execplan/calpontselectexecutionplan.h +++ b/dbcon/execplan/calpontselectexecutionplan.h @@ -588,6 +588,16 @@ public: return fSpecHandlerProcessed; } + void orderByThreads(const uint32_t number) + { + fOrderByThreads = number; + } + const uint32_t orderByThreads() const + { + return fOrderByThreads; + } + + void selectSubList(const SelectList& selectSubList) { fSelectSubList = selectSubList; @@ -897,6 +907,7 @@ private: // for specific handlers processing, e.g. GROUP BY bool fSpecHandlerProcessed; + uint32_t fOrderByThreads; // Derived table involved in the query. For derived table optimization std::vector fSubSelectList; diff --git a/dbcon/joblist/CMakeLists.txt b/dbcon/joblist/CMakeLists.txt index 7d15511aa..c450b61af 100644 --- a/dbcon/joblist/CMakeLists.txt +++ b/dbcon/joblist/CMakeLists.txt @@ -57,12 +57,15 @@ set(joblist_LIB_SRCS virtualtable.cpp windowfunctionstep.cpp) -#libjoblist_la_CXXFLAGS = $(march_flags) $(AM_CXXFLAGS) - add_library(joblist SHARED ${joblist_LIB_SRCS}) set_target_properties(joblist PROPERTIES VERSION 1.0.0 SOVERSION 1) install(TARGETS joblist DESTINATION ${ENGINE_LIBDIR} COMPONENT libs) +if (WITH_ORDERBY_UT) + add_executable(job_orderby_tests orderby-tests.cpp) + target_link_libraries(job_orderby_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) + install(TARGETS job_orderby_tests DESTINATION ${ENGINE_BINDIR} COMPONENT platform) +endif() diff --git a/dbcon/joblist/jlf_common.h b/dbcon/joblist/jlf_common.h index 6d8456a4b..5c5a14b8e 100644 --- a/dbcon/joblist/jlf_common.h +++ b/dbcon/joblist/jlf_common.h @@ -247,6 +247,7 @@ struct JobInfo std::vector > orderByColVec; uint64_t limitStart; uint64_t limitCount; + uint32_t orderByThreads; // tupleInfo boost::shared_ptr keyInfo; diff --git a/dbcon/joblist/jlf_subquery.cpp b/dbcon/joblist/jlf_subquery.cpp index 896b1909e..11179eee2 100644 --- a/dbcon/joblist/jlf_subquery.cpp +++ b/dbcon/joblist/jlf_subquery.cpp @@ -769,6 +769,7 @@ void addOrderByAndLimit(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo) { jobInfo.limitStart = csep->limitStart(); jobInfo.limitCount = csep->limitNum(); + jobInfo.orderByThreads = csep->orderByThreads(); CalpontSelectExecutionPlan::OrderByColumnList& orderByCols = csep->orderByCols(); diff --git a/dbcon/joblist/jlf_tuplejoblist.cpp b/dbcon/joblist/jlf_tuplejoblist.cpp index b18584d6a..713703ae8 100644 --- a/dbcon/joblist/jlf_tuplejoblist.cpp +++ b/dbcon/joblist/jlf_tuplejoblist.cpp @@ -484,6 +484,8 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps, deliverySteps[CNX_VTABLE_ID] = ws; } + // TODO MCOL-894 we don't need to run sorting|distinct + // every time // if ((jobInfo.limitCount != (uint64_t) - 1) || // (jobInfo.constantCol == CONST_COL_EXIST) || // (jobInfo.hasDistinct)) @@ -494,12 +496,13 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps, TupleAnnexStep* tas = dynamic_cast(jobInfo.annexStep.get()); tas->setLimit(jobInfo.limitStart, jobInfo.limitCount); -// if (jobInfo.limitCount != (uint64_t) - 1) -// { if (jobInfo.orderByColVec.size() > 0) + { tas->addOrderBy(new LimitedOrderBy()); - -// } + if (jobInfo.orderByThreads > 1) + tas->setParallelOp(); + tas->setMaxThreads(jobInfo.orderByThreads); + } if (jobInfo.constantCol == CONST_COL_EXIST) tas->addConstant(new TupleConstantStep(jobInfo)); @@ -518,7 +521,11 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps, if (jobInfo.trace) cout << "Output RowGroup 2: " << rg2.toString() << endl; AnyDataListSPtr spdlIn(new AnyDataList()); - RowGroupDL* dlIn = new RowGroupDL(1, jobInfo.fifoSize); + RowGroupDL* dlIn; + if (jobInfo.orderByColVec.size() > 0) + dlIn = new RowGroupDL(jobInfo.orderByThreads, jobInfo.fifoSize); + else + dlIn = new RowGroupDL(1, jobInfo.fifoSize); dlIn->OID(CNX_VTABLE_ID); spdlIn->rowGroupDL(dlIn); JobStepAssociation jsaIn; diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index 7f01dccb7..b087c54ac 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -1727,14 +1727,12 @@ void makeVtableModeSteps(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo, { jobInfo.limitCount = (uint64_t) - 1; } - // support order by and limit in sub-query/union or // GROUP BY handler processed outer query order else if (csep->orderByCols().size() > 0) { addOrderByAndLimit(csep, jobInfo); } - // limit without order by in any query else { diff --git a/dbcon/joblist/limitedorderby.cpp b/dbcon/joblist/limitedorderby.cpp index 5a423682d..f4cf096b2 100644 --- a/dbcon/joblist/limitedorderby.cpp +++ b/dbcon/joblist/limitedorderby.cpp @@ -55,7 +55,7 @@ LimitedOrderBy::~LimitedOrderBy() } -void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo) +void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo, bool invertRules, bool isMultiThreaded) { fRm = jobInfo.rm; fSessionMemLimit = jobInfo.umMemLimit; @@ -77,12 +77,22 @@ void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo) map::iterator j = keyToIndexMap.find(i->first); idbassert(j != keyToIndexMap.end()); - fOrderByCond.push_back(IdbSortSpec(j->second, i->second)); + fOrderByCond.push_back(IdbSortSpec(j->second, i->second ^ invertRules)); } // limit row count info - fStart = jobInfo.limitStart; - fCount = jobInfo.limitCount; + if (isMultiThreaded) + { + // CS can't apply offset at the first stage + // otherwise it looses records. + fStart = 0; + fCount = jobInfo.limitStart+jobInfo.limitCount; + } + else + { + fStart = jobInfo.limitStart; + fCount = jobInfo.limitCount; + } IdbOrderBy::initialize(rg); } @@ -169,6 +179,8 @@ void LimitedOrderBy::finalize() if (fOrderByQueue.size() > 0) { + // *DRRTUY Very memory intensive. CS needs to account active + // memory only and release memory if needed. uint64_t memSizeInc = fRowsPerRG * fRowGroup.getRowSize(); fMemSize += memSizeInc; @@ -207,6 +219,8 @@ void LimitedOrderBy::finalize() fData.reinit(fRowGroup, fRowsPerRG); fRowGroup.setData(&fData); fRowGroup.resetRowGroup(0); + // *DRRTUY This approach won't work with + // OFSET > fRowsPerRG offset = offset != 0 ? offset - 1 : offset; fRowGroup.getRow(offset, &fRow0); @@ -219,7 +233,8 @@ void LimitedOrderBy::finalize() offset--; fRow0.prevRow(rSize); fOrderByQueue.pop(); - + + // if RG has fRowsPerRG rows if(offset == (uint64_t)-1) { tempRGDataList.push_front(fData); @@ -232,7 +247,7 @@ void LimitedOrderBy::finalize() throw IDBExcept(fErrorCode); } - fData.reinit(fRowGroup, fRowsPerRG); + fData.reinit(fRowGroup, fRowsPerRG); fRowGroup.setData(&fData); fRowGroup.resetRowGroup(0); // ? fRowGroup.getRow(preLastRowNumb, &fRow0); @@ -241,9 +256,9 @@ void LimitedOrderBy::finalize() } // Push the last/only group into the queue. if (fRowGroup.getRowCount() > 0) - tempRGDataList.push_front(fData); + tempRGDataList.push_front(fData); - for(tempListIter = tempRGDataList.begin(); tempListIter != tempRGDataList.end(); tempListIter++) + for(tempListIter = tempRGDataList.begin(); tempListIter != tempRGDataList.end(); tempListIter++) tempQueue.push(*tempListIter); fDataQueue = tempQueue; diff --git a/dbcon/joblist/limitedorderby.h b/dbcon/joblist/limitedorderby.h index 3ff266aa3..ec2d9d4f9 100644 --- a/dbcon/joblist/limitedorderby.h +++ b/dbcon/joblist/limitedorderby.h @@ -46,9 +46,16 @@ public: LimitedOrderBy(); virtual ~LimitedOrderBy(); - void initialize(const rowgroup::RowGroup&, const JobInfo&); + void initialize(const rowgroup::RowGroup&, + const JobInfo&, + bool invertRules = false, + bool isMultiThreded = false); void processRow(const rowgroup::Row&); uint64_t getKeyLength() const; + uint64_t getLimitCount() const + { + return fCount; + } const std::string toString() const; void finalize(); diff --git a/dbcon/joblist/orderby-tests.cpp b/dbcon/joblist/orderby-tests.cpp new file mode 100644 index 000000000..fa98f0772 --- /dev/null +++ b/dbcon/joblist/orderby-tests.cpp @@ -0,0 +1,390 @@ +/* Copyright (C) 2019 MariaDB Corporaton. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +// $Id: tdriver-filter.cpp 9210 2013-01-21 14:10:42Z rdempsey $ + +#include +#include +#include +#include +#include +#include +#include + +#include "jobstep.h" +#include "funcexp.h" +#include "jlf_common.h" +#include "tupleannexstep.h" +#include "calpontsystemcatalog.h" +#include "resourcemanager.h" +#include +#include +#include "bytestream.h" +#include +#include +#include + +#define DEBUG +#define MEMORY_LIMIT 14983602176 + +using namespace std; +using namespace joblist; +using namespace messageqcpp; + +// Timer class used by this tdriver to output elapsed times, etc. +class Timer +{ +public: + void start(const string& message) + { + if (!fHeaderDisplayed) + { + header(); + fHeaderDisplayed = true; + } + + gettimeofday(&fTvStart, 0); + cout << timestr() << " Start " << message << endl; + } + + void stop(const string& message) + { + time_t now; + time(&now); + string secondsElapsed; + getTimeElapsed(secondsElapsed); + cout << timestr() << " " << secondsElapsed << " Stop " << message << endl; + } + + Timer() : fHeaderDisplayed(false) {} + +private: + + struct timeval fTvStart; + bool fHeaderDisplayed; + + double getTimeElapsed(string& seconds) + { + struct timeval tvStop; + gettimeofday(&tvStop, 0); + double secondsElapsed = + (tvStop.tv_sec + (tvStop.tv_usec / 1000000.0)) - + (fTvStart.tv_sec + (fTvStart.tv_usec / 1000000.0)); + ostringstream oss; + oss << secondsElapsed; + seconds = oss.str(); + seconds.resize(8, '0'); + return secondsElapsed; + } + + string timestr() + { + struct tm tm; + struct timeval tv; + + gettimeofday(&tv, 0); + localtime_r(&tv.tv_sec, &tm); + + ostringstream oss; + oss << setfill('0') + << setw(2) << tm.tm_hour << ':' + << setw(2) << tm.tm_min << ':' + << setw(2) << tm.tm_sec << '.' + << setw(6) << tv.tv_usec + ; + return oss.str(); + } + + void header() + { + cout << endl; + cout << "Time Seconds Activity" << endl; + } +}; + +class FilterDriver : public CppUnit::TestFixture +{ + + CPPUNIT_TEST_SUITE(FilterDriver); + + CPPUNIT_TEST(ORDERBY_TIME_TEST); + + CPPUNIT_TEST_SUITE_END(); + +private: + void orderByTest_nRGs(uint64_t numRows, uint64_t limit, + uint64_t maxThreads, + bool parallelExecution, + bool generateRandValues, + bool hasDistinct) + { + Timer timer; + // This test creates TAS for single bigint column and run sorting on it. + + cout << endl; + cout << "------------------------------------------------------------" << endl; + timer.start("insert"); + + stringstream ss2; + ss2.flush(); + ss2 << "loading " << numRows << " rows into initial RowGroup."; + std::string message = ss2.str(); + timer.start(message); + + ResourceManager* rm = ResourceManager::instance(true); + + joblist::JobInfo jobInfo = joblist::JobInfo(rm); + // 1st column is the sorting key + uint8_t tupleKey = 1; + uint8_t offset = 0; + uint16_t rowsPerRG = 8192; // hardcoded max rows in RowGroup value + uint32_t numberOfRGs = numRows / rowsPerRG; + + // set sorting rules + // true - ascending order, false otherwise + jobInfo.orderByColVec.push_back(make_pair(tupleKey, false)); + jobInfo.limitStart = offset; + jobInfo.limitCount = limit; + jobInfo.hasDistinct = hasDistinct; + // JobInfo doesn't set these SP in ctor + jobInfo.umMemLimit.reset(new int64_t); + *(jobInfo.umMemLimit) = MEMORY_LIMIT; + SErrorInfo errorInfo(new ErrorInfo()); + jobInfo.errorInfo = errorInfo; + uint32_t oid =3001; + // populate JobInfo.nonConstDelCols with dummy shared_pointers + // to notify TupleAnnexStep::initialise() about number of non-constant columns + execplan::SRCP srcp1, srcp2; + jobInfo.nonConstDelCols.push_back(srcp1); + jobInfo.nonConstDelCols.push_back(srcp2); + + // create two columns RG. 1st is the sorting key, second is the data column + std::vector offsets, roids, tkeys, cscale, cprecision; + std::vector types; + offsets.push_back(2); offsets.push_back(10); offsets.push_back(18); + roids.push_back(oid); roids.push_back(oid); + tkeys.push_back(1); tkeys.push_back(1); + types.push_back(execplan::CalpontSystemCatalog::UBIGINT); + types.push_back(execplan::CalpontSystemCatalog::UBIGINT); + cscale.push_back(0); cscale.push_back(0); + cprecision.push_back(20); cprecision.push_back(20); + rowgroup::RowGroup inRG(2, //column count + offsets, //oldOffset + roids, // column oids + tkeys, //keys + types, // types + cscale, //scale + cprecision, // precision + 20, // sTableThreshold + false //useStringTable + ); + rowgroup::RowGroup jobInfoRG(inRG); + joblist::TupleAnnexStep tns = joblist::TupleAnnexStep(jobInfo); + tns.addOrderBy(new joblist::LimitedOrderBy()); + tns.delivery(true); + // activate parallel sort + if(parallelExecution) + { + tns.setParallelOp(); + } + tns.setMaxThreads(maxThreads); + tns.initialize(jobInfoRG, jobInfo); + tns.setLimit(0, limit); + + // Create JobStepAssociation mediator class ins to connect DL and JS + joblist::AnyDataListSPtr spdlIn(new AnyDataList()); + //joblist::RowGroupDL* dlIn = new RowGroupDL(maxThreads, jobInfo.fifoSize); + joblist::RowGroupDL* dlIn = new RowGroupDL(maxThreads, 21001); + dlIn->OID(oid); + spdlIn->rowGroupDL(dlIn); + joblist::JobStepAssociation jsaIn; + jsaIn.outAdd(spdlIn); + tns.inputAssociation(jsaIn); + + uint64_t base = 42; + uint64_t maxInt = 0; + if(generateRandValues) + ::srand(base); + uint64_t nextValue; + for(uint32_t i = 1; i <= numberOfRGs; i++) + { + // create RGData with the RG structure and manually populate RG + // Use reinint(numberOfRGs) to preset an array + rowgroup::RGData rgD = rowgroup::RGData(inRG); + inRG.setData(&rgD); + rowgroup::Row r; + inRG.initRow(&r); + uint32_t rowSize = r.getSize(); + inRG.getRow(0, &r); + // populate RGData + //for(uint64_t i = rowsPerRG+1; i > 0; i--) + for(uint64_t i = 0; i < rowsPerRG; i++) // Worst case scenario for PQ + { + // TBD Use set..._offset methods to avoid offset calculation instructions + if(generateRandValues) + { + nextValue = ::rand(); + } + else + { + nextValue = base + i; + } + + r.setUintField<8>(nextValue, 0); + r.setUintField<8>(nextValue, 1); + if (maxInt < nextValue) + { + maxInt = nextValue; + } + r.nextRow(rowSize); + } + base += 1; + inRG.setRowCount(rowsPerRG); + + // Insert RGData into input DL + dlIn->insert(rgD); + } + // end of input signal + std::cout << "orderByTest_nRGs input DataList totalSize " << dlIn->totalSize() << std::endl; + dlIn->endOfInput(); + timer.stop(message); + + joblist::AnyDataListSPtr spdlOut(new AnyDataList()); + // Set the ring buffer big enough to take RGData-s with results b/c + // there is nobody to read out of the buffer. + joblist::RowGroupDL* dlOut = new RowGroupDL(1, numberOfRGs); + dlOut->OID(oid); + spdlOut->rowGroupDL(dlOut); + joblist::JobStepAssociation jsaOut; + jsaOut.outAdd(spdlOut); + //uint64_t outputDLIter = dlOut->getIterator(); + tns.outputAssociation(jsaOut); + + // Run Annex Step + message = "Sorting"; + timer.start(message); + tns.run(); + tns.join(); + timer.stop(message); + + // serialize RGData into bs and later back + // to follow ExeMgr whilst getting TAS result RowGroup + messageqcpp::ByteStream bs; + uint32_t result = 0; + rowgroup::RowGroup outRG(inRG); + rowgroup::RGData outRGData(outRG); + result = tns.nextBand(bs); + /*bool more = false; + do + { + dlOut->next(outputDLIter, &outRGData); + } while (more);*/ + std::cout << "orderByTest_nRGs result " << result << std::endl; + //CPPUNIT_ASSERT( result == limit ); + outRGData.deserialize(bs); + outRG.setData(&outRGData); + + //std::cout << "orderByTest_nRGs output RG " << outRG.toString() << std::endl; + std::cout << "maxInt " << maxInt << std::endl; + { + rowgroup::Row r; + outRG.initRow(&r); + outRG.getRow(0, &r); + CPPUNIT_ASSERT(limit == outRG.getRowCount() || outRG.getRowCount() == 8192); + CPPUNIT_ASSERT_EQUAL(maxInt, r.getUintField(1)); + } + + cout << "------------------------------------------------------------" << endl; + } + + + void ORDERBY_TIME_TEST() + { + uint64_t numRows = 8192; + uint64_t maxThreads = 8; + // limit == 100000 is still twice as good to sort in parallel + // limit == 1000000 however is better to sort using single threaded sorting + uint64_t limit = 100000; + bool parallel = true; + bool woParallel = false; + bool generateRandValues = true; + bool hasDistinct = true; + bool noDistinct = false; + orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, noDistinct); + orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, noDistinct); + orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, hasDistinct); + orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, hasDistinct); + } + void QUICK_TEST() + { + float f = 1.1; + double d = 1.2; + uint64_t i = 1; + uint64_t* i_ptr = &i; + double* d_ptr = &d; + uint64_t* i2_ptr = (uint64_t*) d_ptr; + float* f_ptr = &f; + i_ptr = (uint64_t*) f_ptr; + + cout << "*i_ptr=" << *i_ptr << endl; + cout << "*i2_ptr=" << *i2_ptr << endl; + f_ptr = (float*) i_ptr; + + cout << "*f_ptr=" << *f_ptr << endl; + + cout << endl; + + if (d > i) + cout << "1.2 is greater than 1." << endl; + + if (f > i) + cout << "1.1 is greater than 1." << endl; + + if (d > f) + cout << "1.2 is greater than 1.1" << endl; + + if (*i_ptr < *i2_ptr) + cout << "1.1 < 1.2 when represented as uint64_t." << endl; + + cout << "sizeof(f) = " << sizeof(f) << endl; + cout << "sizeof(i) = " << sizeof(i) << endl; + cout << "sizeof(d) = " << sizeof(d) << endl; + + double dbl = 9.7; + double dbl2 = 1.3; + i_ptr = (uint64_t*) &dbl; + i2_ptr = (uint64_t*) &dbl2; + cout << endl; + cout << "9.7 as int is " << *i_ptr << endl; + cout << "9.7 as int is " << *i2_ptr << endl; + cout << "1.2 < 9.7 is " << (*i_ptr < *i2_ptr) << endl; + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(FilterDriver); + + +int main( int argc, char** argv) +{ + CppUnit::TextUi::TestRunner runner; + CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry(); + runner.addTest( registry.makeTest() ); + bool wasSuccessful = runner.run( "", false ); + return (wasSuccessful ? 0 : 1); +} + + diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index de46f85d0..e60d4db40 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -3153,12 +3153,6 @@ void TupleBPS::rgDataToDl(RGData& rgData, RowGroup& rg, RowGroupDL* dlp) if (dupColumns.size() > 0) dupOutputColumns(rgData, rg); - /* - if (!(fSessionId & 0x80000000)) { - rg.setData(&rgData); - cerr << "TBPS rowcount: " << rg.getRowCount() << endl; - } - */ dlp->insert(rgData); } diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index c1ef06a4e..3f9e8a7b4 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -1,4 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2019 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -65,6 +66,8 @@ using namespace querytele; #include "tupleannexstep.h" +#define QUEUE_RESERVE_SIZE 100000 + namespace { struct TAHasher @@ -117,10 +120,12 @@ TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) : fLimitHit(false), fEndOfResult(false), fDistinct(false), + fParallelOp(false), fOrderBy(NULL), fConstant(NULL), fFeInstance(funcexp::FuncExp::instance()), - fJobList(jobInfo.jobListPtr) + fJobList(jobInfo.jobListPtr), + fFinishedThreads(0) { fExtendedInfo = "TNS: "; fQtc.stepParms().stepType = StepTeleStats::T_TNS; @@ -129,6 +134,22 @@ TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) : TupleAnnexStep::~TupleAnnexStep() { + if(fParallelOp) + { + if(fOrderByList.size() > 0) + { + for(uint64_t id = 0; id < fOrderByList.size(); id++) + { + delete fOrderByList[id]; + } + + fOrderByList.clear(); + } + + fInputIteratorsList.clear(); + fRunnersList.clear(); + } + if (fOrderBy) delete fOrderBy; @@ -149,25 +170,41 @@ void TupleAnnexStep::setOutputRowGroup(const rowgroup::RowGroup& rg) void TupleAnnexStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo) { + // Initialize structures used by separate workers + uint64_t id = 1; fRowGroupIn = rgIn; fRowGroupIn.initRow(&fRowIn); - - if (fOrderBy) + if(fParallelOp && fOrderBy) { - fOrderBy->distinct(fDistinct); - fOrderBy->initialize(rgIn, jobInfo); + fOrderByList.resize(fMaxThreads+1); + for(id = 0; id <= fMaxThreads; id++) + { + // *DRRTUY use SP here? + fOrderByList[id] = new LimitedOrderBy(); + fOrderByList[id]->distinct(fDistinct); + fOrderByList[id]->initialize(rgIn, jobInfo, false, true); + } + } + else + { + if (fOrderBy) + { + fOrderBy->distinct(fDistinct); + fOrderBy->initialize(rgIn, jobInfo); + } } if (fConstant == NULL) { - vector oids, oidsIn = fRowGroupIn.getOIDs(); - vector keys, keysIn = fRowGroupIn.getKeys(); - vector scale, scaleIn = fRowGroupIn.getScale(); - vector precision, precisionIn = fRowGroupIn.getPrecision(); - vector types, typesIn = fRowGroupIn.getColTypes(); - vector pos, posIn = fRowGroupIn.getOffsets(); - + vector oids, oidsIn = rgIn.getOIDs(); + vector keys, keysIn = rgIn.getKeys(); + vector scale, scaleIn = rgIn.getScale(); + vector precision, precisionIn = rgIn.getPrecision(); + vector types, typesIn = rgIn.getColTypes(); + vector pos, posIn = rgIn.getOffsets(); size_t n = jobInfo.nonConstDelCols.size(); + + // Add all columns into output RG as keys. Can we put only keys? oids.insert(oids.end(), oidsIn.begin(), oidsIn.begin() + n); keys.insert(keys.end(), keysIn.begin(), keysIn.begin() + n); scale.insert(scale.end(), scaleIn.begin(), scaleIn.begin() + n); @@ -198,8 +235,6 @@ void TupleAnnexStep::run() if (fInputDL == NULL) throw logic_error("Input is not a RowGroup data list."); - fInputIterator = fInputDL->getIterator(); - if (fOutputJobStepAssociation.outSize() == 0) throw logic_error("No output data list for annex step."); @@ -213,17 +248,58 @@ void TupleAnnexStep::run() fOutputIterator = fOutputDL->getIterator(); } - fRunner = jobstepThreadPool.invoke(Runner(this)); -} + if(fParallelOp) + { + // Indexing begins with 1 + fRunnersList.resize(fMaxThreads); + fInputIteratorsList.resize(fMaxThreads+1); + // Activate stats collecting before CS spawns threads. + if (traceOn()) dlTimes.setFirstReadTime(); + + // *DRRTUY Make this block conditional + StepTeleStats sts; + sts.query_uuid = fQueryUuid; + sts.step_uuid = fStepUuid; + sts.msg_type = StepTeleStats::ST_START; + sts.total_units_of_work = 1; + postStepStartTele(sts); + + for(uint32_t id = 1; id <= fMaxThreads; id++) + { + fInputIteratorsList[id] = fInputDL->getIterator(); + fRunnersList[id-1] = jobstepThreadPool.invoke(Runner(this, id)); + } + } + else + { + fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL(); + + if (fInputDL == NULL) + throw logic_error("Input is not a RowGroup data list."); + + fInputIterator = fInputDL->getIterator(); + fRunner = jobstepThreadPool.invoke(Runner(this)); + } + +} void TupleAnnexStep::join() { - if (fRunner) - jobstepThreadPool.join(fRunner); + if(fParallelOp) + { + jobstepThreadPool.join(fRunnersList); + } + else + { + if (fRunner) + { + jobstepThreadPool.join(fRunner); + } + } + } - uint32_t TupleAnnexStep::nextBand(messageqcpp::ByteStream& bs) { RGData rgDataOut; @@ -312,6 +388,12 @@ void TupleAnnexStep::execute() } } +void TupleAnnexStep::execute(uint32_t id) +{ + if(fOrderByList[id]) + executeParallelOrderBy(id); + +} void TupleAnnexStep::executeNoOrderBy() { @@ -337,7 +419,6 @@ void TupleAnnexStep::executeNoOrderBy() { fRowGroupIn.setData(&rgDataIn); fRowGroupIn.getRow(0, &fRowIn); - // Get a new output rowgroup for each input rowgroup to preserve the rids rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount()); fRowGroupOut.setData(&rgDataOut); @@ -459,7 +540,7 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() fJobList->abortOnLimit((JobStep*) this); } - if (UNLIKELY(fRowGroupOut.getRowCount() >= 8192)) + if (UNLIKELY(fRowGroupOut.getRowCount() >= rowgroup::rgCommonSize)) { dataVec.push_back(rgDataOut); rgDataOut.reinit(fRowGroupOut); @@ -596,6 +677,511 @@ void TupleAnnexStep::executeWithOrderBy() fOutputDL->endOfInput(); } +/* + The m() iterates over thread's LimitedOrderBy instances, + reverts the rules and then populates the final collection + used for final sorting. The method uses OrderByRow that + combination of Row::data and comparison rules. + When m() finishes with thread's LOBs it iterates over + final sorting collection, populates rgDataOut, then + sends it into outputDL. + Changing this method don't forget to make changes in + finalizeParallelOrderBy() that is a clone. + !!!The method doesn't set Row::baseRid +*/ +void TupleAnnexStep::finalizeParallelOrderByDistinct() +{ + utils::setThreadName("TASwParOrdDistM"); + uint64_t count = 0; + uint64_t offset = 0; + uint32_t rowSize = 0; + + rowgroup::RGData rgDataOut; + rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + // Calculate offset here + fRowGroupOut.getRow(0, &fRowOut); + ordering::SortingPQ finalPQ; + scoped_ptr distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this))); + fRowGroupIn.initRow(&row1); + fRowGroupIn.initRow(&row2); + + try + { + for(uint64_t id = 1; id <= fMaxThreads; id++) + { + if (cancelled()) + { + break; + } + // Revert the ordering rules before we + // add rows into the final PQ. + fOrderByList[id]->getRule().revertRules(); + ordering::SortingPQ ¤tPQ = fOrderByList[id]->getQueue(); + finalPQ.reserve(finalPQ.size()+currentPQ.size()); + pair inserted; + while (currentPQ.size()) + { + ordering::OrderByRow &topOBRow = + const_cast(currentPQ.top()); + inserted = distinctMap->insert(topOBRow.fData); + if (inserted.second) + { + finalPQ.push(topOBRow); + } + currentPQ.pop(); + } + } + } + catch (const std::exception& ex) + { + catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + catch (...) + { + catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\ + caught an unknown exception 1", + ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + + // OFFSET processing + while (finalPQ.size() && offset < fLimitStart) + { + offset++; + finalPQ.pop(); + } + + // Calculate rowSize only once + if (finalPQ.size()) + { + ordering::OrderByRow& topOBRow = + const_cast(finalPQ.top()); + fRowIn.setData(topOBRow.fData); + if (!fConstant) + { + copyRow(fRowIn, &fRowOut); + } + else + { + fConstant->fillInConstants(fRowIn, fRowOut); + } + rowSize = fRowOut.getSize(); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + finalPQ.pop(); + count++; + } + + if (!fConstant) + { + while(finalPQ.size()) + { + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); + copyRow(fRowIn, &fRowOut); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + + finalPQ.pop(); + count++; + if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) + { + break; + } + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } + else + { + break; + } + } // end of limit bound while loop + } + else // Add ConstantColumns striped earlier + { + while(finalPQ.size()) + { + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); + fConstant->fillInConstants(fRowIn, fRowOut); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + + finalPQ.pop(); + count++; + if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) + { + break; + } + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } + else + { + break; + } + } // end of limit bound while loop + } // end of if-else + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + } + + fOutputDL->endOfInput(); + + StepTeleStats sts; + sts.query_uuid = fQueryUuid; + sts.step_uuid = fStepUuid; + sts.msg_type = StepTeleStats::ST_SUMMARY; + sts.total_units_of_work = sts.units_of_work_completed = 1; + sts.rows = fRowsReturned; + postStepSummaryTele(sts); + + if (traceOn()) + { + if (dlTimes.FirstReadTime().tv_sec == 0) + dlTimes.setFirstReadTime(); + + dlTimes.setLastReadTime(); + dlTimes.setEndOfInputTime(); + printCalTrace(); + } +} + +/* + The m() iterates over thread's LimitedOrderBy instances, + reverts the rules and then populates the final collection + used for final sorting. The method uses OrderByRow that + combination of Row::data and comparison rules. + When m() finishes with thread's LOBs it iterates over + final sorting collection, populates rgDataOut, then + sends it into outputDL. + Changing this method don't forget to make changes in + finalizeParallelOrderByDistinct() that is a clone. + !!!The method doesn't set Row::baseRid +*/ +void TupleAnnexStep::finalizeParallelOrderBy() +{ + utils::setThreadName("TASwParOrdMerge"); + uint64_t count = 0; + uint64_t offset = 0; + uint32_t rowSize = 0; + + rowgroup::RGData rgDataOut; + ordering::SortingPQ finalPQ; + rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + // Calculate offset here + fRowGroupOut.getRow(0, &fRowOut); + + try + { + for(uint64_t id = 1; id <= fMaxThreads; id++) + { + if (cancelled()) + { + break; + } + // Revert the ordering rules before we + // add rows into the final PQ. + fOrderByList[id]->getRule().revertRules(); + ordering::SortingPQ ¤tPQ = fOrderByList[id]->getQueue(); + finalPQ.reserve(currentPQ.size()); + while (currentPQ.size()) + { + ordering::OrderByRow &topOBRow = + const_cast(currentPQ.top()); + finalPQ.push(topOBRow); + currentPQ.pop(); + } + } + } + catch (const std::exception& ex) + { + catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + catch (...) + { + catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\ + caught an unknown exception 1", + ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + + // OFFSET processing + while (finalPQ.size() && offset < fLimitStart) + { + offset++; + finalPQ.pop(); + } + + // Calculate rowSize only once + if (finalPQ.size()) + { + ordering::OrderByRow& topOBRow = + const_cast(finalPQ.top()); + fRowIn.setData(topOBRow.fData); + if (!fConstant) + { + copyRow(fRowIn, &fRowOut); + } + else + { + fConstant->fillInConstants(fRowIn, fRowOut); + } + rowSize = fRowOut.getSize(); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + finalPQ.pop(); + count++; + } + + if (!fConstant) + { + while(finalPQ.size()) + { + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); + copyRow(fRowIn, &fRowOut); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + + finalPQ.pop(); + count++; + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } + else + { + break; + } + } // end of limit bound while loop + } + else // Add ConstantColumns striped earlier + { + while(finalPQ.size()) + { + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); + fConstant->fillInConstants(fRowIn, fRowOut); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + + finalPQ.pop(); + count++; + if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) + { + break; + } + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } + else + { + break; + } + } // end of limit bound while loop + } // end of if-else + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + } + + fOutputDL->endOfInput(); + + StepTeleStats sts; + sts.query_uuid = fQueryUuid; + sts.step_uuid = fStepUuid; + sts.msg_type = StepTeleStats::ST_SUMMARY; + sts.total_units_of_work = sts.units_of_work_completed = 1; + sts.rows = fRowsReturned; + postStepSummaryTele(sts); + + if (traceOn()) + { + if (dlTimes.FirstReadTime().tv_sec == 0) + dlTimes.setFirstReadTime(); + + dlTimes.setLastReadTime(); + dlTimes.setEndOfInputTime(); + printCalTrace(); + } +} + +void TupleAnnexStep::executeParallelOrderBy(uint64_t id) +{ + utils::setThreadName("TASwParOrd"); + RGData rgDataIn; + RGData rgDataOut; + bool more = false; + uint64_t dlOffset = 0; + uint32_t rowSize = 0; + + uint64_t rowCount = 0; + uint64_t doubleRGSize = 2*rowgroup::rgCommonSize; + rowgroup::Row r = fRowIn; + rowgroup::RowGroup rg = fRowGroupIn; + rg.initRow(&r); + LimitedOrderBy *limOrderBy = fOrderByList[id]; + ordering::SortingPQ ¤tPQ = limOrderBy->getQueue(); + if (limOrderBy->getLimitCount() < QUEUE_RESERVE_SIZE) + { + currentPQ.reserve(limOrderBy->getLimitCount()); + } + else + { + currentPQ.reserve(QUEUE_RESERVE_SIZE); + } + + try + { + more = fInputDL->next(fInputIteratorsList[id], &rgDataIn); + if (more) dlOffset++; + + while (more && !cancelled()) + { + if (dlOffset%fMaxThreads == id-1) + { + if (cancelled()) + break; + + if (currentPQ.capacity()-currentPQ.size() < doubleRGSize) + { + currentPQ.reserve(QUEUE_RESERVE_SIZE); + } + + rg.setData(&rgDataIn); + rg.getRow(0, &r); + if (!rowSize) + { + rowSize = r.getSize(); + } + rowCount = rg.getRowCount(); + + for (uint64_t i = 0; i < rowCount; ++i) + { + limOrderBy->processRow(r); + r.nextRow(rowSize); + } + } + + // *DRRTUY Implement a method to skip elements in FIFO + more = fInputDL->next(fInputIteratorsList[id], &rgDataIn); + if(more) dlOffset++; + } + } + catch (const std::exception& ex) + { + catchHandler(ex.what(), ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + catch (...) + { + catchHandler("TupleAnnexStep::executeParallelOrderBy execute caught an unknown exception", + ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + + // read out the input DL + while (more) + more = fInputDL->next(fInputIteratorsList[id], &rgDataIn); + + // Count finished sorting threads under mutex and run final + // sort step when the last thread converges + fParallelFinalizeMutex.lock(); + fFinishedThreads++; + if (fFinishedThreads == fMaxThreads) + { + fParallelFinalizeMutex.unlock(); + if(fDistinct) + { + finalizeParallelOrderByDistinct(); + } + else + { + finalizeParallelOrderBy(); + } + } + else + { + fParallelFinalizeMutex.unlock(); + } +} const RowGroup& TupleAnnexStep::getOutputRowGroup() const { @@ -626,7 +1212,8 @@ bool TupleAnnexStep::deliverStringTableRowGroup() const const string TupleAnnexStep::toString() const { ostringstream oss; - oss << "AnnexStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId; + oss << "AnnexStep "; + oss << " ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId; oss << " in:"; @@ -673,8 +1260,8 @@ void TupleAnnexStep::printCalTrace() void TupleAnnexStep::formatMiniStats() { ostringstream oss; - oss << "TNS " - << "UM " + oss << "TNS "; + oss << "UM " << "- " << "- " << "- " diff --git a/dbcon/joblist/tupleannexstep.h b/dbcon/joblist/tupleannexstep.h index 367e7ea4f..f9efd459b 100644 --- a/dbcon/joblist/tupleannexstep.h +++ b/dbcon/joblist/tupleannexstep.h @@ -1,4 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2019 MariaDB Corporaton. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -21,15 +22,11 @@ #ifndef JOBLIST_TUPLEANNEXSTEP_H #define JOBLIST_TUPLEANNEXSTEP_H +#include +#include + #include "jobstep.h" - - -// forward reference -namespace fucexp -{ -class FuncExp; -} - +#include "limitedorderby.h" namespace joblist { @@ -49,6 +46,8 @@ public: /** @brief TupleAnnexStep constructor */ TupleAnnexStep(const JobInfo& jobInfo); + // Copy ctor to have a class mutex + TupleAnnexStep(const TupleAnnexStep ©); /** @brief TupleAnnexStep destructor */ @@ -90,6 +89,14 @@ public: fLimitStart = s; fLimitCount = c; } + void setParallelOp() + { + fParallelOp = true; + } + void setMaxThreads(uint32_t number) + { + fMaxThreads = number; + } virtual bool stringTableFriendly() { @@ -100,11 +107,15 @@ public: protected: void execute(); + void execute(uint32_t); void executeNoOrderBy(); void executeWithOrderBy(); + void executeParallelOrderBy(uint64_t id); void executeNoOrderByWithDistinct(); void formatMiniStats(); void printCalTrace(); + void finalizeParallelOrderBy(); + void finalizeParallelOrderByDistinct(); // input/output rowgroup and row rowgroup::RowGroup fRowGroupIn; @@ -117,18 +128,26 @@ protected: RowGroupDL* fInputDL; RowGroupDL* fOutputDL; uint64_t fInputIterator; + std::vector fInputIteratorsList; uint64_t fOutputIterator; class Runner { public: - Runner(TupleAnnexStep* step) : fStep(step) { } + Runner(TupleAnnexStep* step) : + fStep(step), id(0) { } + Runner(TupleAnnexStep* step, uint32_t id) : + fStep(step), id(id) { } void operator()() { - fStep->execute(); + if(id) + fStep->execute(id); + else + fStep->execute(); } TupleAnnexStep* fStep; + uint16_t id; }; uint64_t fRunner; // thread pool handle @@ -136,9 +155,11 @@ protected: uint64_t fRowsReturned; uint64_t fLimitStart; uint64_t fLimitCount; + uint64_t fMaxThreads; bool fLimitHit; bool fEndOfResult; bool fDistinct; + bool fParallelOp; LimitedOrderBy* fOrderBy; TupleConstantStep* fConstant; @@ -146,8 +167,21 @@ protected: funcexp::FuncExp* fFeInstance; JobList* fJobList; + std::vector fOrderByList; + std::vector fRunnersList; + uint16_t fFinishedThreads; + boost::mutex fParallelFinalizeMutex; }; +template +class reservablePQ: private std::priority_queue +{ +public: + typedef typename std::priority_queue::size_type size_type; + reservablePQ(size_type capacity = 0) { reserve(capacity); }; + void reserve(size_type capacity) { this->c.reserve(capacity); } + size_type capacity() const { return this->c.capacity(); } +}; } // namespace diff --git a/dbcon/joblist/tupleconstantstep.cpp b/dbcon/joblist/tupleconstantstep.cpp index 4ffb811e7..3e2b61e39 100644 --- a/dbcon/joblist/tupleconstantstep.cpp +++ b/dbcon/joblist/tupleconstantstep.cpp @@ -472,7 +472,7 @@ void TupleConstantStep::execute() fOutputDL->endOfInput(); } - +// *DRRTUY Copy row at once not one field at a time void TupleConstantStep::fillInConstants() { fRowGroupIn.getRow(0, &fRowIn); @@ -495,7 +495,6 @@ void TupleConstantStep::fillInConstants() } else // only first column is constant { - //size_t n = fRowOut.getOffset(fRowOut.getColumnCount()) - fRowOut.getOffset(1); for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i) { fRowOut.setRid(fRowIn.getRelRid()); diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 31199331a..28b2d8065 100755 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -7807,6 +7807,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, // To activate LimitedOrderBy if(isPushdownHand) { + csep->orderByThreads(get_orderby_threads(gwi.thd)); csep->specHandlerProcessed(true); } } @@ -7844,6 +7845,8 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, { Item_int* select = (Item_int*)select_lex.master_unit()->global_parameters()->select_limit; csep->limitNum(select->val_int()); + // MCOL-894 Activate parallel ORDER BY + csep->orderByThreads(get_orderby_threads(gwi.thd)); } } } @@ -9840,6 +9843,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro { csep->hasOrderBy(true); csep->specHandlerProcessed(true); + csep->orderByThreads(get_orderby_threads(gwi.thd)); } } diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index a0759d88c..af4016139 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -107,6 +107,18 @@ static MYSQL_THDVAR_BOOL( 0 ); +static MYSQL_THDVAR_UINT( + orderby_threads, + PLUGIN_VAR_RQCMDARG, + "Number of parallel threads used by ORDER BY. (default to 16)", + NULL, + NULL, + 16, + 0, + 2048, + 1 +); + // legacy system variables static MYSQL_THDVAR_ULONG( decimal_scale, @@ -294,6 +306,7 @@ st_mysql_sys_var* mcs_system_variables[] = MYSQL_SYSVAR(derived_handler), MYSQL_SYSVAR(processing_handlers_fallback), MYSQL_SYSVAR(group_by_handler), + MYSQL_SYSVAR(orderby_threads), MYSQL_SYSVAR(decimal_scale), MYSQL_SYSVAR(use_decimal_scale), MYSQL_SYSVAR(ordered_only), @@ -396,6 +409,16 @@ mcs_compression_type_t get_compression_type(THD* thd) { return (mcs_compression_type_t) THDVAR(thd, compression_type); } +uint get_orderby_threads(THD* thd) +{ + return ( thd == NULL ) ? 0 : THDVAR(thd, orderby_threads); +} +void set_orderby_threads(THD* thd, uint value) +{ + THDVAR(thd, orderby_threads) = value; +} + + bool get_use_decimal_scale(THD* thd) { return ( thd == NULL ) ? false : THDVAR(thd, use_decimal_scale); diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index e1c0d4731..d9ec7f4d0 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -58,6 +58,9 @@ void set_group_by_handler(THD* thd, bool value); bool get_fallback_knob(THD* thd); void set_fallback_knob(THD* thd, bool value); +uint get_orderby_threads(THD* thd); +void set_orderby_threads(THD* thd, uint value); + bool get_use_decimal_scale(THD* thd); void set_use_decimal_scale(THD* thd, bool value); diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index 27d530343..cefae07f4 100644 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -979,117 +979,6 @@ bool Row::isNullValue(uint32_t colIndex) const uint64_t Row::getNullValue(uint32_t colIndex) const { return utils::getNullValue(types[colIndex], getColumnWidth(colIndex)); -#if 0 - - switch (types[colIndex]) - { - case CalpontSystemCatalog::TINYINT: - return joblist::TINYINTNULL; - - case CalpontSystemCatalog::SMALLINT: - return joblist::SMALLINTNULL; - - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - return joblist::INTNULL; - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - return joblist::FLOATNULL; - - case CalpontSystemCatalog::DATE: - return joblist::DATENULL; - - case CalpontSystemCatalog::BIGINT: - return joblist::BIGINTNULL; - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - return joblist::DOUBLENULL; - - case CalpontSystemCatalog::DATETIME: - return joblist::DATETIMENULL; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::VARCHAR: - case CalpontSystemCatalog::STRINT: - { - uint32_t len = getColumnWidth(colIndex); - - switch (len) - { - case 1: - return joblist::CHAR1NULL; - - case 2: - return joblist::CHAR2NULL; - - case 3: - case 4: - return joblist::CHAR4NULL; - - case 5: - case 6: - case 7: - case 8: - return joblist::CHAR8NULL; - - default: - throw logic_error("Row::getNullValue() Can't return the NULL string"); - } - - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - uint32_t len = getColumnWidth(colIndex); - - switch (len) - { - case 1 : - return joblist::TINYINTNULL; - - case 2 : - return joblist::SMALLINTNULL; - - case 4 : - return joblist::INTNULL; - - default: - return joblist::BIGINTNULL; - } - - break; - } - - case CalpontSystemCatalog::UTINYINT: - return joblist::UTINYINTNULL; - - case CalpontSystemCatalog::USMALLINT: - return joblist::USMALLINTNULL; - - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - return joblist::UINTNULL; - - case CalpontSystemCatalog::UBIGINT: - return joblist::UBIGINTNULL; - - case CalpontSystemCatalog::LONGDOUBLE: - return -1; // no NULL value for long double yet, this is a nan. - - case CalpontSystemCatalog::VARBINARY: - default: - ostringstream os; - os << "Row::getNullValue(): got bad column type (" << types[colIndex] << - "). Width=" << getColumnWidth(colIndex) << endl; - os << toString() << endl; - throw logic_error(os.str()); - } - -#endif } /* This fcn might produce overflow warnings from the compiler, but that's OK. @@ -1098,117 +987,6 @@ uint64_t Row::getNullValue(uint32_t colIndex) const int64_t Row::getSignedNullValue(uint32_t colIndex) const { return utils::getSignedNullValue(types[colIndex], getColumnWidth(colIndex)); -#if 0 - - switch (types[colIndex]) - { - case CalpontSystemCatalog::TINYINT: - return (int64_t) ((int8_t) joblist::TINYINTNULL); - - case CalpontSystemCatalog::SMALLINT: - return (int64_t) ((int16_t) joblist::SMALLINTNULL); - - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - return (int64_t) ((int32_t) joblist::INTNULL); - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - return (int64_t) ((int32_t) joblist::FLOATNULL); - - case CalpontSystemCatalog::DATE: - return (int64_t) ((int32_t) joblist::DATENULL); - - case CalpontSystemCatalog::BIGINT: - return joblist::BIGINTNULL; - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - return joblist::DOUBLENULL; - - case CalpontSystemCatalog::DATETIME: - return joblist::DATETIMENULL; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::VARCHAR: - case CalpontSystemCatalog::STRINT: - { - uint32_t len = getColumnWidth(colIndex); - - switch (len) - { - case 1: - return (int64_t) ((int8_t) joblist::CHAR1NULL); - - case 2: - return (int64_t) ((int16_t) joblist::CHAR2NULL); - - case 3: - case 4: - return (int64_t) ((int32_t) joblist::CHAR4NULL); - - case 5: - case 6: - case 7: - case 8: - return joblist::CHAR8NULL; - - default: - throw logic_error("Row::getSignedNullValue() Can't return the NULL string"); - } - - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - uint32_t len = getColumnWidth(colIndex); - - switch (len) - { - case 1 : - return (int64_t) ((int8_t) joblist::TINYINTNULL); - - case 2 : - return (int64_t) ((int16_t) joblist::SMALLINTNULL); - - case 4 : - return (int64_t) ((int32_t) joblist::INTNULL); - - default: - return joblist::BIGINTNULL; - } - - break; - } - - case CalpontSystemCatalog::UTINYINT: - return (int64_t) ((int8_t) joblist::UTINYINTNULL); - - case CalpontSystemCatalog::USMALLINT: - return (int64_t) ((int16_t) joblist::USMALLINTNULL); - - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - return (int64_t) ((int32_t) joblist::UINTNULL); - - case CalpontSystemCatalog::UBIGINT: - return (int64_t)joblist::UBIGINTNULL; - - case CalpontSystemCatalog::LONGDOUBLE: - return -1; // no NULL value for long double yet, this is a nan. - - case CalpontSystemCatalog::VARBINARY: - default: - ostringstream os; - os << "Row::getSignedNullValue(): got bad column type (" << types[colIndex] << - "). Width=" << getColumnWidth(colIndex) << endl; - os << toString() << endl; - throw logic_error(os.str()); - } - -#endif } RowGroup::RowGroup() : columnCount(0), data(NULL), rgData(NULL), strings(NULL), diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index c6a9ea6cb..504dda86e 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -63,6 +63,8 @@ namespace rowgroup { +const int16_t rgCommonSize = 8192; + /* The format of the data RowGroup points to is currently ... diff --git a/utils/windowfunction/CMakeLists.txt b/utils/windowfunction/CMakeLists.txt index 1a221511f..77e4e86d3 100755 --- a/utils/windowfunction/CMakeLists.txt +++ b/utils/windowfunction/CMakeLists.txt @@ -30,3 +30,8 @@ set_target_properties(windowfunction PROPERTIES VERSION 1.0.0 SOVERSION 1) install(TARGETS windowfunction DESTINATION ${ENGINE_LIBDIR} COMPONENT libs) +if (WITH_SORTING_COMPARATORS_UT) + add_executable(comparators_tests comparators-tests.cpp) + target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) + install(TARGETS comparators_tests DESTINATION ${ENGINE_BINDIR} COMPONENT platform) +endif() diff --git a/utils/windowfunction/comparators-tests.cpp b/utils/windowfunction/comparators-tests.cpp new file mode 100644 index 000000000..d2d5df316 --- /dev/null +++ b/utils/windowfunction/comparators-tests.cpp @@ -0,0 +1,425 @@ +/* Copyright (C) 2019 MariaDB Corporaton. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +// $Id: tdriver-filter.cpp 9210 2013-01-21 14:10:42Z rdempsey $ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "jobstep.h" +#include "funcexp.h" +#include "jlf_common.h" +#include "tupleannexstep.h" +#include "calpontsystemcatalog.h" +#include +#include +#include "bytestream.h" +#include "idborderby.h" + +#define DEBUG +#define MEMORY_LIMIT 14983602176 + +using namespace std; +using namespace joblist; +using namespace messageqcpp; + +// Timer class used by this tdriver to output elapsed times, etc. +class Timer +{ +public: + void start(const string& message) + { + if (!fHeaderDisplayed) + { + header(); + fHeaderDisplayed = true; + } + + gettimeofday(&fTvStart, 0); + cout << timestr() << " Start " << message << endl; + } + + void stop(const string& message) + { + time_t now; + time(&now); + string secondsElapsed; + getTimeElapsed(secondsElapsed); + cout << timestr() << " " << secondsElapsed << " Stop " << message << endl; + } + + Timer() : fHeaderDisplayed(false) {} + +private: + + struct timeval fTvStart; + bool fHeaderDisplayed; + + double getTimeElapsed(string& seconds) + { + struct timeval tvStop; + gettimeofday(&tvStop, 0); + double secondsElapsed = + (tvStop.tv_sec + (tvStop.tv_usec / 1000000.0)) - + (fTvStart.tv_sec + (fTvStart.tv_usec / 1000000.0)); + ostringstream oss; + oss << secondsElapsed; + seconds = oss.str(); + seconds.resize(8, '0'); + return secondsElapsed; + } + + string timestr() + { + struct tm tm; + struct timeval tv; + + gettimeofday(&tv, 0); + localtime_r(&tv.tv_sec, &tm); + + ostringstream oss; + oss << setfill('0') + << setw(2) << tm.tm_hour << ':' + << setw(2) << tm.tm_min << ':' + << setw(2) << tm.tm_sec << '.' + << setw(6) << tv.tv_usec + ; + return oss.str(); + } + + void header() + { + cout << endl; + cout << "Time Seconds Activity" << endl; + } +}; + +class FilterDriver : public CppUnit::TestFixture +{ + + CPPUNIT_TEST_SUITE(FilterDriver); + + CPPUNIT_TEST(INT_TEST); + CPPUNIT_TEST(FLOAT_TEST); + + CPPUNIT_TEST_SUITE_END(); + +private: + // The tests creates an RG with 1 column of the cscDt type + // then initialize RGData. After that it adds two numeric values (v1 < v2)and two NULL. + // Then creates comparator structores and run a number of tests. v1 < v2 + void testComparatorWithDT(execplan::CalpontSystemCatalog::ColDataType cscDt, + uint32_t width, + bool generateRandValues) + { + std::cout << std::endl << "------------------------------------------------------------" << std::endl; + uint32_t oid =3001; + std::vector offsets, roids, tkeys, cscale, cprecision; + std::vector types; + offsets.push_back(2); offsets.push_back(2+width); + roids.push_back(oid); + tkeys.push_back(1); + types.push_back(cscDt); + cscale.push_back(0); + cprecision.push_back(20); + rowgroup::RowGroup inRG(1, //column count + offsets, //oldOffset + roids, // column oids + tkeys, //keys + types, // types + cscale, //scale + cprecision, // precision + 20, // sTableThreshold + false //useStringTable + ); + + rowgroup::RGData rgD = rowgroup::RGData(inRG); + inRG.setData(&rgD); + rowgroup::Row r, r1, r2, r3; + inRG.initRow(&r); + uint32_t rowSize = r.getSize(); + inRG.getRow(0, &r); + + // Sorting spec describes sorting direction and NULL comparision + // preferences + ordering::IdbSortSpec spec = ordering::IdbSortSpec(0, // column index + true, // ascending + true); // NULLs first + std::vector specVect; + specVect.push_back(spec); + + switch(cscDt) + { + case execplan::CalpontSystemCatalog::UTINYINT: + { + std::cout << "UTINYINT " << std::endl; + r.setUintField<1>(42, 0); + r.nextRow(rowSize); + r.setUintField<1>(43, 0); + r.nextRow(rowSize); + r.setUintField<1>(joblist::UTINYINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::USMALLINT: + { + std::cout << "USMALLINT " << std::endl; + r.setUintField<2>(42, 0); + r.nextRow(rowSize); + r.setUintField<2>(43, 0); + r.nextRow(rowSize); + r.setUintField<2>(joblist::USMALLINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::UMEDINT: + case execplan::CalpontSystemCatalog::UINT: + { + std::cout << "UINT " << std::endl; + r.setUintField<4>(42, 0); + r.nextRow(rowSize); + r.setUintField<4>(43, 0); + r.nextRow(rowSize); + r.setUintField<4>(joblist::UINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::DATETIME: + case execplan::CalpontSystemCatalog::UBIGINT: + { + std::cout << "UBIGINT " << std::endl; + r.setUintField<8>(42, 0); + r.nextRow(rowSize); + r.setUintField<8>(43, 0); + r.nextRow(rowSize); + r.setUintField<8>(joblist::UBIGINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::TINYINT: + { + std::cout << "TINYINT " << std::endl; + r.setIntField<1>(42, 0); + r.nextRow(rowSize); + r.setIntField<1>(43, 0); + r.nextRow(rowSize); + r.setIntField<1>(joblist::TINYINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::SMALLINT: + { + std::cout << "SMALLINT " << std::endl; + r.setIntField<2>(42, 0); + r.nextRow(rowSize); + r.setIntField<2>(43, 0); + r.nextRow(rowSize); + r.setIntField<2>(joblist::SMALLINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::MEDINT: + case execplan::CalpontSystemCatalog::INT: + case execplan::CalpontSystemCatalog::DATE: + { + if (cscDt == execplan::CalpontSystemCatalog::DATE) + std::cout << "DATE" << std::endl; + else + std::cout << "INT " << std::endl; + r.setIntField<4>(42, 0); + r.nextRow(rowSize); + r.setIntField<4>(43, 0); + r.nextRow(rowSize); + if (cscDt == execplan::CalpontSystemCatalog::DATE) + r.setIntField<4>(joblist::DATENULL, 0); + else + r.setIntField<4>(joblist::INTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::BIGINT: + { + std::cout << "BIGINT " << std::endl; + r.setIntField<8>(42, 0); + r.nextRow(rowSize); + r.setIntField<8>(43, 0); + r.nextRow(rowSize); + r.setIntField<8>(joblist::BIGINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::DECIMAL: + case execplan::CalpontSystemCatalog::UDECIMAL: + { + std::cout << "DECIMAL " << std::endl; + switch (width) + { + case 1 : + { + r.setUintField<1>(42, 0); + r.nextRow(rowSize); + r.setUintField<1>(43, 0); + r.nextRow(rowSize); + r.setUintField<1>(joblist::TINYINTNULL, 0); + break; + } + case 2 : + { + r.setUintField<2>(42, 0); + r.nextRow(rowSize); + r.setUintField<2>(43, 0); + r.nextRow(rowSize); + r.setUintField<2>(joblist::SMALLINTNULL, 0); + break; + } + case 4 : + { + r.setUintField<4>(42, 0); + r.nextRow(rowSize); + r.setUintField<4>(43, 0); + r.nextRow(rowSize); + r.setUintField<4>(joblist::INTNULL, 0); + break; + } + default: + { + r.setUintField<8>(42, 0); + r.nextRow(rowSize); + r.setUintField<8>(43, 0); + r.nextRow(rowSize); + r.setUintField<8>(joblist::BIGINTNULL, 0); + break; + } + } + break; + } + case execplan::CalpontSystemCatalog::FLOAT: + case execplan::CalpontSystemCatalog::UFLOAT: + { + std::cout << "FLOAT " << std::endl; + r.setFloatField(42.1, 0); + r.nextRow(rowSize); + r.setFloatField(43.1, 0); + r.nextRow(rowSize); + r.setUintField(joblist::FLOATNULL, 0); + break; + + } + case execplan::CalpontSystemCatalog::DOUBLE: + case execplan::CalpontSystemCatalog::UDOUBLE: + { + std::cout << "DOUBLE " << std::endl; + r.setDoubleField(42.1, 0); + r.nextRow(rowSize); + r.setDoubleField(43.1, 0); + r.nextRow(rowSize); + r.setUintField(joblist::DOUBLENULL, 0); + break; + } + case execplan::CalpontSystemCatalog::LONGDOUBLE: + { + r.setLongDoubleField(42.1, 0); + r.nextRow(rowSize); + r.setLongDoubleField(43.1, 0); + r.nextRow(rowSize); + r.setLongDoubleField(joblist::LONGDOUBLENULL, 0); + break; + } + default: + { + break; + } + } + + inRG.setRowCount(3); + inRG.initRow(&r1); + inRG.initRow(&r2); + inRG.initRow(&r3); + inRG.getRow(0, &r1); + inRG.getRow(1, &r2); + inRG.getRow(2, &r3); + + std::cout<< "r1 " << r1.toString() << " r2 " << r2.toString() + << " r3 " << r3.toString() << std::endl; + + ordering::IdbCompare idbCompare; + idbCompare.initialize(inRG); + ordering::OrderByData odbData = ordering::OrderByData(specVect, inRG); + bool result = odbData(r1.getPointer(), r2.getPointer()); + std::cout << r1.toString() << " < " << r2.toString() << " is " + << ((result) ? "true" : "false") << std::endl; + CPPUNIT_ASSERT(result == true); + result = odbData(r2.getPointer(), r1.getPointer()); + std::cout << r2.toString() << " < " << r1.toString() << " is " + << ((result) ? "true" : "false") << std::endl; + CPPUNIT_ASSERT(result == false); + result = odbData(r2.getPointer(), r2.getPointer()); + std::cout << r2.toString() << " < " << r2.toString() << " is " + << ((result) ? "true" : "false") << std::endl; + CPPUNIT_ASSERT(result == false); + // Compare value with NULL. if spec.fNf then NULLs goes first + result = odbData(r3.getPointer(), r1.getPointer()); + std::cout << r3.toString() << " < " << r1.toString() << " is " + << ((result) ? "true" : "false") << std::endl; + CPPUNIT_ASSERT(result == true); + // Compare NULL with NULL + result = odbData(r3.getPointer(), r1.getPointer()); + std::cout << r3.toString() << " < " << r3.toString() << " is " + << ((result) ? "true" : "false") << std::endl; + CPPUNIT_ASSERT(result == true); + } + + void INT_TEST() + { + //bool generateValues = true; + bool fixedValues = false; + testComparatorWithDT(execplan::CalpontSystemCatalog::UTINYINT, 1, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::USMALLINT, 2, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::UMEDINT, 4, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::UBIGINT, 8, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::DATETIME, 8, fixedValues); + + testComparatorWithDT(execplan::CalpontSystemCatalog::TINYINT, 1, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::SMALLINT, 2, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::MEDINT, 4, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::DATE, 4, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::BIGINT, 8, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::DECIMAL, 8, fixedValues); + + testComparatorWithDT(execplan::CalpontSystemCatalog::FLOAT, 4, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::DOUBLE, 8, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::LONGDOUBLE, 8, fixedValues); + } + + void FLOAT_TEST() + { + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(FilterDriver); + + +int main( int argc, char** argv) +{ + CppUnit::TextUi::TestRunner runner; + CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry(); + runner.addTest( registry.makeTest() ); + bool wasSuccessful = runner.run( "", false ); + return (wasSuccessful ? 0 : 1); +} + + diff --git a/utils/windowfunction/idborderby.cpp b/utils/windowfunction/idborderby.cpp index f6e88bd7e..b2e70a6c3 100644 --- a/utils/windowfunction/idborderby.cpp +++ b/utils/windowfunction/idborderby.cpp @@ -43,32 +43,85 @@ using namespace rowgroup; #include "idborderby.h" +#include "joblisttypes.h" namespace ordering { +int TinyIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + int8_t v1 = l->row1().getIntField(fSpec.fIndex); + int8_t v2 = l->row2().getIntField(fSpec.fIndex); + int8_t nullValue = static_cast(joblist::TINYINTNULL); + + if (v1 == nullValue || v2 == nullValue) + { + if (v1 != nullValue && v2 == nullValue) + ret = fSpec.fNf; + else if (v1 == nullValue && v2 != nullValue) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} + +int SmallIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + int16_t v1 = l->row1().getIntField(fSpec.fIndex); + int16_t v2 = l->row2().getIntField(fSpec.fIndex); + int16_t nullValue = static_cast(joblist::SMALLINTNULL); + + if (v1 == nullValue || v2 == nullValue) + { + if (v1 != nullValue && v2 == nullValue) + ret = fSpec.fNf; + else if (v1 == nullValue && v2 != nullValue) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} int IntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) { l->row1().setData(r1); l->row2().setData(r2); - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); - int ret = 0; + int32_t v1 = l->row1().getIntField(fSpec.fIndex); + int32_t v2 = l->row2().getIntField(fSpec.fIndex); + int32_t nullValue = static_cast(joblist::INTNULL); - if (b1 == true || b2 == true) + if (v1 == nullValue || v2 == nullValue) { - if (b1 == false && b2 == true) + if (v1 != nullValue && v2 == nullValue) ret = fSpec.fNf; - else if (b1 == true && b2 == false) + else if (v1 == nullValue && v2 != nullValue) ret = -fSpec.fNf; } else { - int64_t v1 = l->row1().getIntField(fSpec.fIndex); - int64_t v2 = l->row2().getIntField(fSpec.fIndex); - if (v1 > v2) ret = fSpec.fAsc; else if (v1 < v2) @@ -78,29 +131,25 @@ int IntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) return ret; } - -int UintCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +int BigIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) { l->row1().setData(r1); l->row2().setData(r2); - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); - int ret = 0; + int64_t v1 = l->row1().getIntField(fSpec.fIndex); + int64_t v2 = l->row2().getIntField(fSpec.fIndex); + int64_t nullValue = static_cast(joblist::BIGINTNULL); - if (b1 == true || b2 == true) + if (v1 == nullValue || v2 == nullValue) { - if (b1 == false && b2 == true) + if (v1 != nullValue && v2 == nullValue) ret = fSpec.fNf; - else if (b1 == true && b2 == false) + else if (v1 == nullValue && v2 != nullValue) ret = -fSpec.fNf; } else { - uint64_t v1 = l->row1().getUintField(fSpec.fIndex); - uint64_t v2 = l->row2().getUintField(fSpec.fIndex); - if (v1 > v2) ret = fSpec.fAsc; else if (v1 < v2) @@ -110,6 +159,116 @@ int UintCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) return ret; } +int UTinyIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint8_t v1 = l->row1().getUintField(fSpec.fIndex); + uint8_t v2 = l->row2().getUintField(fSpec.fIndex); + uint8_t nullValue = static_cast(joblist::UTINYINTNULL); + + if (v1 == nullValue || v2 == nullValue) + { + if (v1 != nullValue && v2 == nullValue) + ret = fSpec.fNf; + else if (v1 == nullValue && v2 != nullValue) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} + +int USmallIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint16_t v1 = l->row1().getUintField(fSpec.fIndex); + uint16_t v2 = l->row2().getUintField(fSpec.fIndex); + uint16_t nullValue = static_cast(joblist::USMALLINTNULL); + + if (v1 == nullValue || v2 == nullValue) + { + if (v1 != nullValue && v2 == nullValue) + ret = fSpec.fNf; + else if (v1 == nullValue && v2 != nullValue) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} + +int UIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint32_t v1 = l->row1().getUintField(fSpec.fIndex); + uint32_t v2 = l->row2().getUintField(fSpec.fIndex); + uint32_t nullValue = static_cast(joblist::UINTNULL); + + if (v1 == nullValue || v2 == nullValue) + { + if (v1 != nullValue && v2 == nullValue) + ret = fSpec.fNf; + else if (v1 == nullValue && v2 != nullValue) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} + +int UBigIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint64_t v1 = l->row1().getUintField(fSpec.fIndex); + uint64_t v2 = l->row2().getUintField(fSpec.fIndex); + + if (v1 == joblist::UBIGINTNULL || v2 == joblist::UBIGINTNULL) + { + if (v1 != joblist::UBIGINTNULL && v2 == joblist::UBIGINTNULL) + ret = fSpec.fNf; + else if (v1 == joblist::UBIGINTNULL && v2 != joblist::UBIGINTNULL) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} int StringCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) { @@ -130,41 +289,45 @@ int StringCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) } else { - string v1 = l->row1().getStringField(fSpec.fIndex); - string v2 = l->row2().getStringField(fSpec.fIndex); - - if (v1 > v2) - ret = fSpec.fAsc; - else if (v1 < v2) - ret = -fSpec.fAsc; + int len1 = l->row1().getStringLength(fSpec.fIndex); + int len2 = l->row2().getStringLength(fSpec.fIndex); + const char* s1 = (const char*)l->row1().getStringPointer(fSpec.fIndex); + const char* s2 = (const char*)l->row2().getStringPointer(fSpec.fIndex); + // For Japanese, coll.compare() may not be as correct as strncmp + if (JPcodePoint) + { + ret = fSpec.fAsc * strncmp(s1, s2, max(len1,len2)); + } + else + { + const std::collate& coll = std::use_facet >(loc); + ret = fSpec.fAsc * coll.compare(s1, s1+len1, s2, s2+len2); + } } return ret; } - int DoubleCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) { l->row1().setData(r1); l->row2().setData(r2); - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); - + uint64_t uiv1 = l->row1().getUintField(fSpec.fIndex); + uint64_t uiv2 = l->row2().getUintField(fSpec.fIndex); int ret = 0; - if (b1 == true || b2 == true) + if (uiv1 == joblist::DOUBLENULL || uiv2 == joblist::DOUBLENULL) { - if (b1 == false && b2 == true) + if (uiv1 != joblist::DOUBLENULL && uiv2 == joblist::DOUBLENULL) ret = fSpec.fNf; - else if (b1 == true && b2 == false) + else if (uiv1 == joblist::DOUBLENULL && uiv2 != joblist::DOUBLENULL) ret = -fSpec.fNf; } else { double v1 = l->row1().getDoubleField(fSpec.fIndex); double v2 = l->row2().getDoubleField(fSpec.fIndex); - if (v1 > v2) ret = fSpec.fAsc; else if (v1 < v2) @@ -179,23 +342,22 @@ int FloatCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) l->row1().setData(r1); l->row2().setData(r2); - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); - + int32_t iv1 = l->row1().getIntField(fSpec.fIndex); + int32_t iv2 = l->row2().getIntField(fSpec.fIndex); + int32_t nullValue = static_cast(joblist::FLOATNULL); int ret = 0; - if (b1 == true || b2 == true) + if (iv1 == nullValue || iv2 == nullValue) { - if (b1 == false && b2 == true) + if (iv1 != nullValue && iv2 == nullValue) ret = fSpec.fNf; - else if (b1 == true && b2 == false) + else if (iv1 == nullValue && iv2 != nullValue) ret = -fSpec.fNf; } else { float v1 = l->row1().getFloatField(fSpec.fIndex); float v2 = l->row2().getFloatField(fSpec.fIndex); - if (v1 > v2) ret = fSpec.fAsc; else if (v1 < v2) @@ -210,23 +372,20 @@ int LongDoubleCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r l->row1().setData(r1); l->row2().setData(r2); - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); + long double v1 = l->row1().getLongDoubleField(fSpec.fIndex); + long double v2 = l->row2().getLongDoubleField(fSpec.fIndex); int ret = 0; - if (b1 == true || b2 == true) + if (v1 == joblist::LONGDOUBLENULL || v2 == joblist::LONGDOUBLENULL) { - if (b1 == false && b2 == true) + if (v1 != joblist::LONGDOUBLENULL && v2 == joblist::LONGDOUBLENULL) ret = fSpec.fNf; - else if (b1 == true && b2 == false) + else if (v1 == joblist::LONGDOUBLENULL && v2 != joblist::LONGDOUBLENULL) ret = -fSpec.fNf; } else { - long double v1 = l->row1().getLongDoubleField(fSpec.fIndex); - long double v2 = l->row2().getLongDoubleField(fSpec.fIndex); - if (v1 > v2) ret = fSpec.fAsc; else if (v1 < v2) @@ -236,29 +395,79 @@ int LongDoubleCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r return ret; } +int DateCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint32_t v1 = l->row1().getUintField(fSpec.fIndex); + uint32_t v2 = l->row2().getUintField(fSpec.fIndex); + uint32_t nullValue = static_cast(joblist::DATENULL); + + if (v1 == nullValue || v2 == nullValue) + { + if (v1 != nullValue && v2 == nullValue) + ret = fSpec.fNf; + else if (v1 == nullValue && v2 != nullValue) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} + +int DatetimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint64_t v1 = l->row1().getUintField(fSpec.fIndex); + uint64_t v2 = l->row2().getUintField(fSpec.fIndex); + + if (v1 == joblist::DATETIMENULL || v2 == joblist::DATETIMENULL) + { + if (v1 != joblist::DATETIMENULL && v2 == joblist::DATETIMENULL) + ret = fSpec.fNf; + else if (v1 == joblist::DATETIMENULL && v2 != joblist::DATETIMENULL) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) { l->row1().setData(r1); l->row2().setData(r2); - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); - int ret = 0; + uint64_t v1 = l->row1().getUintField(fSpec.fIndex); + uint64_t v2 = l->row2().getUintField(fSpec.fIndex); - if (b1 == true || b2 == true) + if (v1 == joblist::TIMENULL || v2 == joblist::TIMENULL) { - if (b1 == false && b2 == true) + if (v1 != joblist::TIMENULL && v2 == joblist::TIMENULL) ret = fSpec.fNf; - else if (b1 == true && b2 == false) + else if (v1 == joblist::TIMENULL && v2 != joblist::TIMENULL) ret = -fSpec.fNf; } else { - int64_t v1 = l->row1().getIntField(fSpec.fIndex); - int64_t v2 = l->row2().getIntField(fSpec.fIndex); - // ((int64_t) -00:00:26) > ((int64_t) -00:00:25) // i.e. For 2 negative TIME values, we invert the order of // comparison operations to force "-00:00:26" to appear before @@ -301,6 +510,16 @@ bool CompareRule::less(Row::Pointer r1, Row::Pointer r2) return false; } +void CompareRule::revertRules() +{ + std::vector::iterator fCompareIter = fCompares.begin(); + for(; fCompareIter!=fCompares.end(); fCompareIter++) + { + (*fCompareIter)->revertSortSpec(); + } +} + + void CompareRule::compileRules(const std::vector& spec, const rowgroup::RowGroup& rg) { @@ -311,31 +530,80 @@ void CompareRule::compileRules(const std::vector& spec, const rowgr switch (types[i->fIndex]) { case CalpontSystemCatalog::TINYINT: + { + Compare* c = new TinyIntCompare(*i); + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::SMALLINT: + { + Compare* c = new SmallIntCompare(*i); + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::MEDINT: case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: { Compare* c = new IntCompare(*i); fCompares.push_back(c); break; } + case CalpontSystemCatalog::BIGINT: + { + Compare* c = new BigIntCompare(*i); + fCompares.push_back(c); + break; + } + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + Compare* c; + uint32_t len = rg.getColumnWidth(i->fIndex); + switch (len) + { + case 1 : + c = new TinyIntCompare(*i); break; + case 2 : + c = new SmallIntCompare(*i); break; + case 4 : + c = new IntCompare(*i); break; + default: + c = new BigIntCompare(*i); + } + + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::UTINYINT: + { + Compare* c = new UTinyIntCompare(*i); + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::USMALLINT: + { + Compare* c = new USmallIntCompare(*i); + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::UMEDINT: case CalpontSystemCatalog::UINT: + { + Compare* c = new UIntCompare(*i); + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::UBIGINT: { - Compare* c = new UintCompare(*i); + Compare* c = new UBigIntCompare(*i); fCompares.push_back(c); break; } case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::TEXT: { Compare* c = new StringCompare(*i); fCompares.push_back(c); @@ -366,14 +634,18 @@ void CompareRule::compileRules(const std::vector& spec, const rowgr } case CalpontSystemCatalog::DATE: - case CalpontSystemCatalog::DATETIME: - case CalpontSystemCatalog::TIMESTAMP: { - Compare* c = new UintCompare(*i); + Compare* c = new DateCompare(*i); + fCompares.push_back(c); + break; + } + case CalpontSystemCatalog::DATETIME: + case CalpontSystemCatalog::TIMESTAMP: + { + Compare* c = new DatetimeCompare(*i); fCompares.push_back(c); break; } - case CalpontSystemCatalog::TIME: { Compare* c = new TimeCompare(*i); @@ -417,9 +689,10 @@ OrderByData::OrderByData(const std::vector& spec, const rowgroup::R // IdbOrderBy class implementation IdbOrderBy::IdbOrderBy() : - fDistinct(false), fMemSize(0), fRowsPerRG(8192), fErrorCode(0), fRm(NULL) -{ -} + fDistinct(false), fMemSize(0), fRowsPerRG(rowgroup::rgCommonSize), + fErrorCode(0), + fRm(NULL) +{ } IdbOrderBy::~IdbOrderBy() diff --git a/utils/windowfunction/idborderby.h b/utils/windowfunction/idborderby.h index b11b9c44e..e26ba1aa3 100644 --- a/utils/windowfunction/idborderby.h +++ b/utils/windowfunction/idborderby.h @@ -41,7 +41,6 @@ #include "hasher.h" #include "stlpoolallocator.h" - // forward reference namespace joblist { @@ -52,9 +51,27 @@ class ResourceManager; namespace ordering { +template, + typename _Compare = std::less > +class reservablePQ: private std::priority_queue<_Tp, _Sequence, _Compare> +{ +public: + typedef typename std::priority_queue<_Tp, _Sequence, _Compare>::size_type size_type; + reservablePQ(size_type capacity = 0) { reserve(capacity); }; + void reserve(size_type capacity) { this->c.reserve(capacity); } + size_type capacity() const { return this->c.capacity(); } + using std::priority_queue<_Tp, _Sequence, _Compare>::size; + using std::priority_queue<_Tp, _Sequence, _Compare>::top; + using std::priority_queue<_Tp, _Sequence, _Compare>::pop; + using std::priority_queue<_Tp, _Sequence, _Compare>::push; + using std::priority_queue<_Tp, _Sequence, _Compare>::empty; +}; // forward reference class IdbCompare; +class OrderByRow; + +typedef reservablePQ SortingPQ; // order by specification struct IdbSortSpec @@ -63,6 +80,7 @@ struct IdbSortSpec // TODO There are three ordering specs since 10.2 int fAsc; // ::= ASC | DESC int fNf; // ::= NULLS FIRST | NULLS LAST + std::string fLocale; IdbSortSpec() : fIndex(-1), fAsc(1), fNf(1) {} IdbSortSpec(int i, bool b) : fIndex(i), fAsc(b ? 1 : -1), fNf(fAsc) {} @@ -75,13 +93,72 @@ struct IdbSortSpec class Compare { public: - Compare(const IdbSortSpec& spec) : fSpec(spec) {} + Compare(const IdbSortSpec& spec) : fSpec(spec) + { + // Save off the current Locale in case something goes wrong. + std::string curLocale = setlocale(LC_COLLATE, NULL); + if (spec.fLocale.length() > 0) + { + fLocale = spec.fLocale; + } + else + { + fLocale = curLocale; + } + + try + { + std::locale localloc(fLocale.c_str()); + loc = localloc; + } + catch(...) + { + fLocale = curLocale; + std::locale localloc(fLocale.c_str()); + loc = localloc; + } + if (fLocale.find("ja_JP") != std::string::npos) + { + JPcodePoint = true; + } + else + { + JPcodePoint = false; + } + } virtual ~Compare() {} virtual int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer) = 0; + void revertSortSpec() + { + fSpec.fAsc = -fSpec.fAsc; + fSpec.fNf = -fSpec.fNf; + } protected: IdbSortSpec fSpec; + std::string fLocale; + std::locale loc; + bool JPcodePoint; // code point ordering (Japanese UTF) flag +}; + +// Comparators for signed types + +class TinyIntCompare : public Compare +{ +public: + TinyIntCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + + +class SmallIntCompare : public Compare +{ +public: + SmallIntCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); }; @@ -94,24 +171,56 @@ public: }; -class UintCompare : public Compare +class BigIntCompare : public Compare { public: - UintCompare(const IdbSortSpec& spec) : Compare(spec) {} + BigIntCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + +// End of comparators for signed types +// Comparators for unsigned types + +class UTinyIntCompare : public Compare +{ +public: + UTinyIntCompare(const IdbSortSpec& spec) : Compare(spec) {} int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); }; -class StringCompare : public Compare +class USmallIntCompare : public Compare { public: - StringCompare(const IdbSortSpec& spec) : Compare(spec) {} + USmallIntCompare(const IdbSortSpec& spec) : Compare(spec) {} int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); }; +class UIntCompare : public Compare +{ +public: + UIntCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + + +class UBigIntCompare : public Compare +{ +public: + UBigIntCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + +// end of comparators for unsigned types + +// Comparators for float types + class DoubleCompare : public Compare { public: @@ -120,6 +229,7 @@ public: int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); }; + class LongDoubleCompare : public Compare { public: @@ -137,6 +247,26 @@ public: int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); }; +// End of comparators for float types +// Comparators for temporal types + +class DateCompare : public Compare +{ +public: + DateCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + + +class DatetimeCompare : public Compare +{ +public: + DatetimeCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + class TimeCompare : public Compare { @@ -146,6 +276,19 @@ public: int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); }; +// End of comparators for temporal types + +// Comparators for non-fixed size types + +class StringCompare : public Compare +{ +public: + StringCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + +// End of comparators for variable sized types class CompareRule { @@ -156,6 +299,7 @@ public: bool less(rowgroup::Row::Pointer r1, rowgroup::Row::Pointer r2); void compileRules(const std::vector&, const rowgroup::RowGroup&); + void revertRules(); std::vector fCompares; IdbCompare* fIdbCompare; @@ -197,7 +341,7 @@ public: return fRule->less(fData, rhs.fData); } - rowgroup::Row::Pointer fData; + rowgroup::Row::Pointer fData; CompareRule* fRule; }; @@ -216,7 +360,6 @@ public: bool operator()(rowgroup::Row::Pointer, rowgroup::Row::Pointer); -//protected: std::vector fIndex; }; @@ -263,10 +406,18 @@ public: { return fDistinct; } + SortingPQ& getQueue() + { + return fOrderByQueue; + } + CompareRule &getRule() + { + return fRule; + } + SortingPQ fOrderByQueue; protected: std::vector fOrderByCond; - std::priority_queue fOrderByQueue; rowgroup::Row fRow0; CompareRule fRule;