From 3b01d5ea22464f74ebb32229d411701d410cc621 Mon Sep 17 00:00:00 2001 From: David Hall Date: Tue, 8 Oct 2019 13:02:52 -0500 Subject: [PATCH 1/4] MCOL-3536 use Locale from columnstore.xml when doing ORDER BY --- utils/windowfunction/idborderby.cpp | 13 ++++++------ utils/windowfunction/idborderby.h | 32 ++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/utils/windowfunction/idborderby.cpp b/utils/windowfunction/idborderby.cpp index f6e88bd7e..703008183 100644 --- a/utils/windowfunction/idborderby.cpp +++ b/utils/windowfunction/idborderby.cpp @@ -130,13 +130,12 @@ int StringCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) } else { - string v1 = l->row1().getStringField(fSpec.fIndex); - string v2 = l->row2().getStringField(fSpec.fIndex); - - if (v1 > v2) - ret = fSpec.fAsc; - else if (v1 < v2) - ret = -fSpec.fAsc; + int len1 = l->row1().getStringLength(fSpec.fIndex); + int len2 = l->row2().getStringLength(fSpec.fIndex); + const char* s1 = (const char*)l->row1().getStringPointer(fSpec.fIndex); + const char* s2 = (const char*)l->row2().getStringPointer(fSpec.fIndex); + const std::collate& coll = std::use_facet >(loc); + ret = fSpec.fAsc * coll.compare(s1, s1+len1, s2, s2+len2); } return ret; diff --git a/utils/windowfunction/idborderby.h b/utils/windowfunction/idborderby.h index b11b9c44e..6d0f8574b 100644 --- a/utils/windowfunction/idborderby.h +++ b/utils/windowfunction/idborderby.h @@ -63,6 +63,7 @@ struct IdbSortSpec // TODO There are three ordering specs since 10.2 int fAsc; // ::= ASC | DESC int fNf; // ::= NULLS FIRST | NULLS LAST + std::string fLocale; IdbSortSpec() : fIndex(-1), fAsc(1), fNf(1) {} IdbSortSpec(int i, bool b) : fIndex(i), fAsc(b ? 1 : -1), fNf(fAsc) {} @@ -75,13 +76,42 @@ struct IdbSortSpec class Compare { public: - Compare(const IdbSortSpec& spec) : fSpec(spec) {} + Compare(const IdbSortSpec& spec) : fSpec(spec) + { + // Save off the current Locale in case something goes wrong. + std::string curLocale = setlocale(LC_COLLATE, NULL); + if (spec.fLocale.length() > 0) + { + fLocale = spec.fLocale; + } + else + { + fLocale = curLocale; + } + + try + { + std::locale localloc(fLocale.c_str()); + loc = localloc; + } + catch(...) + { + fLocale = curLocale; + std::locale localloc(fLocale.c_str()); + loc = localloc; + } + if (fLocale.find("ja_JP") != std::string::npos) + JPcodePoint = true; + } virtual ~Compare() {} virtual int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer) = 0; protected: IdbSortSpec fSpec; + std::string fLocale; + std::locale loc; + bool JPcodePoint; // code point ordering (Japanese UTF) flag }; From 0d2b0e007068b89e2511558c3620658a3ec1ba76 Mon Sep 17 00:00:00 2001 From: David Hall Date: Mon, 14 Oct 2019 12:49:08 -0500 Subject: [PATCH 2/4] MCOL-3536 check for Japanese and use byte compare if found --- utils/windowfunction/idborderby.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/utils/windowfunction/idborderby.cpp b/utils/windowfunction/idborderby.cpp index 703008183..04f89f1a9 100644 --- a/utils/windowfunction/idborderby.cpp +++ b/utils/windowfunction/idborderby.cpp @@ -134,8 +134,16 @@ int StringCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) int len2 = l->row2().getStringLength(fSpec.fIndex); const char* s1 = (const char*)l->row1().getStringPointer(fSpec.fIndex); const char* s2 = (const char*)l->row2().getStringPointer(fSpec.fIndex); - const std::collate& coll = std::use_facet >(loc); - ret = fSpec.fAsc * coll.compare(s1, s1+len1, s2, s2+len2); + // For Japanese, coll.compare() may not be as correct as strncmp + if (JPcodePoint) + { + ret = fSpec.fAsc * strncmp(s1, s2, max(len1,len2)); + } + else + { + const std::collate& coll = std::use_facet >(loc); + ret = fSpec.fAsc * coll.compare(s1, s1+len1, s2, s2+len2); + } } return ret; From 7b5e5f0eb671bf2e35bdeca2873ba8380ae27161 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Mon, 27 May 2019 16:27:47 +0300 Subject: [PATCH 3/4] MCOL-894 Upmerged the fist part of the patch into develop. MCOL-894 Add default values in Compare and CSEP ctors to activate UTF-8 sorting properly. MCOL-894 Unit tests to build a framework for a new parallel sorting. MCOL-894 Finished with parallel workers invocation. The implementation lacks final aggregation step. MCOL-894 TupleAnnexStep's init and destructor are now parallel execution aware. Implemented final merging step for parallel execution finalizeParallelOrderBy(). Templated unit test to use it with arbitrary number of rows, threads. Reuse LimitedOrderBy in the final step MCOL-894 Cleaned up finalizeParallelOrderBy. MCOL-894 Add and propagate thread variable that controls a number of threads. Optimized comparators used for sorting and add corresponding UTs. Refactored TupleAnnexStep::finalizeParallelOrderByDistinct. Parallel sorting methods now preallocates memory in batches. MCOL-894 Fixed comparator for StringCompare. --- dbcon/execplan/calpontselectexecutionplan.cpp | 5 + dbcon/execplan/calpontselectexecutionplan.h | 11 + dbcon/joblist/CMakeLists.txt | 7 +- dbcon/joblist/jlf_common.h | 1 + dbcon/joblist/jlf_subquery.cpp | 1 + dbcon/joblist/jlf_tuplejoblist.cpp | 17 +- dbcon/joblist/joblistfactory.cpp | 2 - dbcon/joblist/limitedorderby.cpp | 31 +- dbcon/joblist/limitedorderby.h | 9 +- dbcon/joblist/orderby-tests.cpp | 389 ++++++++++ dbcon/joblist/tuple-bps.cpp | 6 - dbcon/joblist/tupleannexstep.cpp | 703 +++++++++++++++++- dbcon/joblist/tupleannexstep.h | 47 +- dbcon/mysql/ha_mcs_execplan.cpp | 4 + dbcon/mysql/ha_mcs_sysvars.cpp | 23 + dbcon/mysql/ha_mcs_sysvars.h | 3 + utils/rowgroup/rowgroup.cpp | 222 ------ utils/rowgroup/rowgroup.h | 2 + utils/windowfunction/CMakeLists.txt | 5 + utils/windowfunction/comparators-tests.cpp | 425 +++++++++++ utils/windowfunction/idborderby.cpp | 406 ++++++++-- utils/windowfunction/idborderby.h | 145 +++- 22 files changed, 2132 insertions(+), 332 deletions(-) create mode 100644 dbcon/joblist/orderby-tests.cpp create mode 100644 utils/windowfunction/comparators-tests.cpp diff --git a/dbcon/execplan/calpontselectexecutionplan.cpp b/dbcon/execplan/calpontselectexecutionplan.cpp index fd12b8179..186b64930 100644 --- a/dbcon/execplan/calpontselectexecutionplan.cpp +++ b/dbcon/execplan/calpontselectexecutionplan.cpp @@ -82,6 +82,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(const int location): fQueryType(SELECT), fPriority(querystats::DEFAULT_USER_PRIORITY_LEVEL), fStringTableThreshold(20), + fOrderByThreads(1), fDJSSmallSideLimit(0), fDJSLargeSideLimit(0), fDJSPartitionSize(100 * 1024 * 1024), // 100MB mem usage for disk based join, @@ -125,6 +126,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan( fQueryType(SELECT), fPriority(querystats::DEFAULT_USER_PRIORITY_LEVEL), fStringTableThreshold(20), + fOrderByThreads(1), fDJSSmallSideLimit(0), fDJSLargeSideLimit(0), fDJSPartitionSize(100 * 1024 * 1024), // 100MB mem usage for disk based join @@ -151,6 +153,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan (string data) : fQueryType(SELECT), fPriority(querystats::DEFAULT_USER_PRIORITY_LEVEL), fStringTableThreshold(20), + fOrderByThreads(1), fDJSSmallSideLimit(0), fDJSLargeSideLimit(0), fDJSPartitionSize(100 * 1024 * 1024), // 100MB mem usage for disk based join @@ -479,6 +482,7 @@ void CalpontSelectExecutionPlan::serialize(messageqcpp::ByteStream& b) const b << (uint64_t)fLimitNum; b << static_cast(fHasOrderBy); b << static_cast(fSpecHandlerProcessed); + b << reinterpret_cast(fOrderByThreads); b << static_cast(fSelectSubList.size()); @@ -648,6 +652,7 @@ void CalpontSelectExecutionPlan::unserialize(messageqcpp::ByteStream& b) b >> (uint64_t&)fLimitNum; b >> reinterpret_cast< ByteStream::byte&>(fHasOrderBy); b >> reinterpret_cast< ByteStream::byte&>(fSpecHandlerProcessed); + b >> reinterpret_cast(fOrderByThreads); // for SELECT subquery b >> size; diff --git a/dbcon/execplan/calpontselectexecutionplan.h b/dbcon/execplan/calpontselectexecutionplan.h index e21cd96bb..cf31b0711 100644 --- a/dbcon/execplan/calpontselectexecutionplan.h +++ b/dbcon/execplan/calpontselectexecutionplan.h @@ -588,6 +588,16 @@ public: return fSpecHandlerProcessed; } + void orderByThreads(const uint32_t number) + { + fOrderByThreads = number; + } + const uint32_t orderByThreads() const + { + return fOrderByThreads; + } + + void selectSubList(const SelectList& selectSubList) { fSelectSubList = selectSubList; @@ -897,6 +907,7 @@ private: // for specific handlers processing, e.g. GROUP BY bool fSpecHandlerProcessed; + uint32_t fOrderByThreads; // Derived table involved in the query. For derived table optimization std::vector fSubSelectList; diff --git a/dbcon/joblist/CMakeLists.txt b/dbcon/joblist/CMakeLists.txt index 7d15511aa..c450b61af 100644 --- a/dbcon/joblist/CMakeLists.txt +++ b/dbcon/joblist/CMakeLists.txt @@ -57,12 +57,15 @@ set(joblist_LIB_SRCS virtualtable.cpp windowfunctionstep.cpp) -#libjoblist_la_CXXFLAGS = $(march_flags) $(AM_CXXFLAGS) - add_library(joblist SHARED ${joblist_LIB_SRCS}) set_target_properties(joblist PROPERTIES VERSION 1.0.0 SOVERSION 1) install(TARGETS joblist DESTINATION ${ENGINE_LIBDIR} COMPONENT libs) +if (WITH_ORDERBY_UT) + add_executable(job_orderby_tests orderby-tests.cpp) + target_link_libraries(job_orderby_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) + install(TARGETS job_orderby_tests DESTINATION ${ENGINE_BINDIR} COMPONENT platform) +endif() diff --git a/dbcon/joblist/jlf_common.h b/dbcon/joblist/jlf_common.h index 6d8456a4b..5c5a14b8e 100644 --- a/dbcon/joblist/jlf_common.h +++ b/dbcon/joblist/jlf_common.h @@ -247,6 +247,7 @@ struct JobInfo std::vector > orderByColVec; uint64_t limitStart; uint64_t limitCount; + uint32_t orderByThreads; // tupleInfo boost::shared_ptr keyInfo; diff --git a/dbcon/joblist/jlf_subquery.cpp b/dbcon/joblist/jlf_subquery.cpp index 896b1909e..11179eee2 100644 --- a/dbcon/joblist/jlf_subquery.cpp +++ b/dbcon/joblist/jlf_subquery.cpp @@ -769,6 +769,7 @@ void addOrderByAndLimit(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo) { jobInfo.limitStart = csep->limitStart(); jobInfo.limitCount = csep->limitNum(); + jobInfo.orderByThreads = csep->orderByThreads(); CalpontSelectExecutionPlan::OrderByColumnList& orderByCols = csep->orderByCols(); diff --git a/dbcon/joblist/jlf_tuplejoblist.cpp b/dbcon/joblist/jlf_tuplejoblist.cpp index b18584d6a..713703ae8 100644 --- a/dbcon/joblist/jlf_tuplejoblist.cpp +++ b/dbcon/joblist/jlf_tuplejoblist.cpp @@ -484,6 +484,8 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps, deliverySteps[CNX_VTABLE_ID] = ws; } + // TODO MCOL-894 we don't need to run sorting|distinct + // every time // if ((jobInfo.limitCount != (uint64_t) - 1) || // (jobInfo.constantCol == CONST_COL_EXIST) || // (jobInfo.hasDistinct)) @@ -494,12 +496,13 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps, TupleAnnexStep* tas = dynamic_cast(jobInfo.annexStep.get()); tas->setLimit(jobInfo.limitStart, jobInfo.limitCount); -// if (jobInfo.limitCount != (uint64_t) - 1) -// { if (jobInfo.orderByColVec.size() > 0) + { tas->addOrderBy(new LimitedOrderBy()); - -// } + if (jobInfo.orderByThreads > 1) + tas->setParallelOp(); + tas->setMaxThreads(jobInfo.orderByThreads); + } if (jobInfo.constantCol == CONST_COL_EXIST) tas->addConstant(new TupleConstantStep(jobInfo)); @@ -518,7 +521,11 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps, if (jobInfo.trace) cout << "Output RowGroup 2: " << rg2.toString() << endl; AnyDataListSPtr spdlIn(new AnyDataList()); - RowGroupDL* dlIn = new RowGroupDL(1, jobInfo.fifoSize); + RowGroupDL* dlIn; + if (jobInfo.orderByColVec.size() > 0) + dlIn = new RowGroupDL(jobInfo.orderByThreads, jobInfo.fifoSize); + else + dlIn = new RowGroupDL(1, jobInfo.fifoSize); dlIn->OID(CNX_VTABLE_ID); spdlIn->rowGroupDL(dlIn); JobStepAssociation jsaIn; diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index 7f01dccb7..b087c54ac 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -1727,14 +1727,12 @@ void makeVtableModeSteps(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo, { jobInfo.limitCount = (uint64_t) - 1; } - // support order by and limit in sub-query/union or // GROUP BY handler processed outer query order else if (csep->orderByCols().size() > 0) { addOrderByAndLimit(csep, jobInfo); } - // limit without order by in any query else { diff --git a/dbcon/joblist/limitedorderby.cpp b/dbcon/joblist/limitedorderby.cpp index 5a423682d..f4cf096b2 100644 --- a/dbcon/joblist/limitedorderby.cpp +++ b/dbcon/joblist/limitedorderby.cpp @@ -55,7 +55,7 @@ LimitedOrderBy::~LimitedOrderBy() } -void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo) +void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo, bool invertRules, bool isMultiThreaded) { fRm = jobInfo.rm; fSessionMemLimit = jobInfo.umMemLimit; @@ -77,12 +77,22 @@ void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo) map::iterator j = keyToIndexMap.find(i->first); idbassert(j != keyToIndexMap.end()); - fOrderByCond.push_back(IdbSortSpec(j->second, i->second)); + fOrderByCond.push_back(IdbSortSpec(j->second, i->second ^ invertRules)); } // limit row count info - fStart = jobInfo.limitStart; - fCount = jobInfo.limitCount; + if (isMultiThreaded) + { + // CS can't apply offset at the first stage + // otherwise it looses records. + fStart = 0; + fCount = jobInfo.limitStart+jobInfo.limitCount; + } + else + { + fStart = jobInfo.limitStart; + fCount = jobInfo.limitCount; + } IdbOrderBy::initialize(rg); } @@ -169,6 +179,8 @@ void LimitedOrderBy::finalize() if (fOrderByQueue.size() > 0) { + // *DRRTUY Very memory intensive. CS needs to account active + // memory only and release memory if needed. uint64_t memSizeInc = fRowsPerRG * fRowGroup.getRowSize(); fMemSize += memSizeInc; @@ -207,6 +219,8 @@ void LimitedOrderBy::finalize() fData.reinit(fRowGroup, fRowsPerRG); fRowGroup.setData(&fData); fRowGroup.resetRowGroup(0); + // *DRRTUY This approach won't work with + // OFSET > fRowsPerRG offset = offset != 0 ? offset - 1 : offset; fRowGroup.getRow(offset, &fRow0); @@ -219,7 +233,8 @@ void LimitedOrderBy::finalize() offset--; fRow0.prevRow(rSize); fOrderByQueue.pop(); - + + // if RG has fRowsPerRG rows if(offset == (uint64_t)-1) { tempRGDataList.push_front(fData); @@ -232,7 +247,7 @@ void LimitedOrderBy::finalize() throw IDBExcept(fErrorCode); } - fData.reinit(fRowGroup, fRowsPerRG); + fData.reinit(fRowGroup, fRowsPerRG); fRowGroup.setData(&fData); fRowGroup.resetRowGroup(0); // ? fRowGroup.getRow(preLastRowNumb, &fRow0); @@ -241,9 +256,9 @@ void LimitedOrderBy::finalize() } // Push the last/only group into the queue. if (fRowGroup.getRowCount() > 0) - tempRGDataList.push_front(fData); + tempRGDataList.push_front(fData); - for(tempListIter = tempRGDataList.begin(); tempListIter != tempRGDataList.end(); tempListIter++) + for(tempListIter = tempRGDataList.begin(); tempListIter != tempRGDataList.end(); tempListIter++) tempQueue.push(*tempListIter); fDataQueue = tempQueue; diff --git a/dbcon/joblist/limitedorderby.h b/dbcon/joblist/limitedorderby.h index 3ff266aa3..ec2d9d4f9 100644 --- a/dbcon/joblist/limitedorderby.h +++ b/dbcon/joblist/limitedorderby.h @@ -46,9 +46,16 @@ public: LimitedOrderBy(); virtual ~LimitedOrderBy(); - void initialize(const rowgroup::RowGroup&, const JobInfo&); + void initialize(const rowgroup::RowGroup&, + const JobInfo&, + bool invertRules = false, + bool isMultiThreded = false); void processRow(const rowgroup::Row&); uint64_t getKeyLength() const; + uint64_t getLimitCount() const + { + return fCount; + } const std::string toString() const; void finalize(); diff --git a/dbcon/joblist/orderby-tests.cpp b/dbcon/joblist/orderby-tests.cpp new file mode 100644 index 000000000..e9e552881 --- /dev/null +++ b/dbcon/joblist/orderby-tests.cpp @@ -0,0 +1,389 @@ +/* Copyright (C) 2019 MariaDB Corporaton. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +// $Id: tdriver-filter.cpp 9210 2013-01-21 14:10:42Z rdempsey $ + +#include +#include +#include +#include +#include +#include +#include + +#include "jobstep.h" +#include "funcexp.h" +#include "jlf_common.h" +#include "tupleannexstep.h" +#include "calpontsystemcatalog.h" +#include "resourcemanager.h" +#include +#include +#include "bytestream.h" +#include +#include +#include + +#define DEBUG +#define MEMORY_LIMIT 14983602176 + +using namespace std; +using namespace joblist; +using namespace messageqcpp; + +// Timer class used by this tdriver to output elapsed times, etc. +class Timer +{ +public: + void start(const string& message) + { + if (!fHeaderDisplayed) + { + header(); + fHeaderDisplayed = true; + } + + gettimeofday(&fTvStart, 0); + cout << timestr() << " Start " << message << endl; + } + + void stop(const string& message) + { + time_t now; + time(&now); + string secondsElapsed; + getTimeElapsed(secondsElapsed); + cout << timestr() << " " << secondsElapsed << " Stop " << message << endl; + } + + Timer() : fHeaderDisplayed(false) {} + +private: + + struct timeval fTvStart; + bool fHeaderDisplayed; + + double getTimeElapsed(string& seconds) + { + struct timeval tvStop; + gettimeofday(&tvStop, 0); + double secondsElapsed = + (tvStop.tv_sec + (tvStop.tv_usec / 1000000.0)) - + (fTvStart.tv_sec + (fTvStart.tv_usec / 1000000.0)); + ostringstream oss; + oss << secondsElapsed; + seconds = oss.str(); + seconds.resize(8, '0'); + return secondsElapsed; + } + + string timestr() + { + struct tm tm; + struct timeval tv; + + gettimeofday(&tv, 0); + localtime_r(&tv.tv_sec, &tm); + + ostringstream oss; + oss << setfill('0') + << setw(2) << tm.tm_hour << ':' + << setw(2) << tm.tm_min << ':' + << setw(2) << tm.tm_sec << '.' + << setw(6) << tv.tv_usec + ; + return oss.str(); + } + + void header() + { + cout << endl; + cout << "Time Seconds Activity" << endl; + } +}; + +class FilterDriver : public CppUnit::TestFixture +{ + + CPPUNIT_TEST_SUITE(FilterDriver); + + CPPUNIT_TEST(ORDERBY_TIME_TEST); + + CPPUNIT_TEST_SUITE_END(); + +private: + void orderByTest_nRGs(uint64_t numRows, uint64_t limit, + uint64_t maxThreads, + bool parallelExecution, + bool generateRandValues, + bool hasDistinct) + { + Timer timer; + // This test creates TAS for single bigint column and run sorting on it. + + cout << endl; + cout << "------------------------------------------------------------" << endl; + timer.start("insert"); + + stringstream ss2; + ss2.flush(); + ss2 << "loading " << numRows << " rows into initial RowGroup."; + std::string message = ss2.str(); + timer.start(message); + + ResourceManager* rm = ResourceManager::instance(true); + + joblist::JobInfo jobInfo = joblist::JobInfo(rm); + // 1st column is the sorting key + uint8_t tupleKey = 1; + uint8_t offset = 0; + uint16_t rowsPerRG = 8192; // hardcoded max rows in RowGroup value + uint32_t numberOfRGs = numRows / rowsPerRG; + + // set sorting rules + // true - ascending order, false otherwise + jobInfo.orderByColVec.push_back(make_pair(tupleKey, false)); + jobInfo.limitStart = offset; + jobInfo.limitCount = limit; + jobInfo.hasDistinct = hasDistinct; + // JobInfo doesn't set these SP in ctor + jobInfo.umMemLimit.reset(new int64_t); + *(jobInfo.umMemLimit) = MEMORY_LIMIT; + SErrorInfo errorInfo(new ErrorInfo()); + jobInfo.errorInfo = errorInfo; + uint32_t oid =3001; + // populate JobInfo.nonConstDelCols with dummy shared_pointers + // to notify TupleAnnexStep::initialise() about number of non-constant columns + execplan::SRCP srcp1, srcp2; + jobInfo.nonConstDelCols.push_back(srcp1); + jobInfo.nonConstDelCols.push_back(srcp2); + + // create two columns RG. 1st is the sorting key, second is the data column + std::vector offsets, roids, tkeys, cscale, cprecision; + std::vector types; + offsets.push_back(2); offsets.push_back(10); offsets.push_back(18); + roids.push_back(oid); roids.push_back(oid); + tkeys.push_back(1); tkeys.push_back(1); + types.push_back(execplan::CalpontSystemCatalog::UBIGINT); + types.push_back(execplan::CalpontSystemCatalog::UBIGINT); + cscale.push_back(0); cscale.push_back(0); + cprecision.push_back(20); cprecision.push_back(20); + rowgroup::RowGroup inRG(2, //column count + offsets, //oldOffset + roids, // column oids + tkeys, //keys + types, // types + cscale, //scale + cprecision, // precision + 20, // sTableThreshold + false //useStringTable + ); + rowgroup::RowGroup jobInfoRG(inRG); + joblist::TupleAnnexStep tns = joblist::TupleAnnexStep(jobInfo); + tns.addOrderBy(new joblist::LimitedOrderBy()); + tns.delivery(true); + // activate parallel sort + if(parallelExecution) + { + tns.setParallelOp(); + } + tns.setMaxThreads(maxThreads); + tns.initialize(jobInfoRG, jobInfo); + tns.setLimit(0, limit); + + // Create JobStepAssociation mediator class ins to connect DL and JS + joblist::AnyDataListSPtr spdlIn(new AnyDataList()); + //joblist::RowGroupDL* dlIn = new RowGroupDL(maxThreads, jobInfo.fifoSize); + joblist::RowGroupDL* dlIn = new RowGroupDL(maxThreads, 21001); + dlIn->OID(oid); + spdlIn->rowGroupDL(dlIn); + joblist::JobStepAssociation jsaIn; + jsaIn.outAdd(spdlIn); + tns.inputAssociation(jsaIn); + + uint64_t base = 42; + uint64_t maxInt = 0; + if(generateRandValues) + ::srand(base); + uint64_t nextValue; + for(uint32_t i = 1; i <= numberOfRGs; i++) + { + // create RGData with the RG structure and manually populate RG + // Use reinint(numberOfRGs) to preset an array + rowgroup::RGData rgD = rowgroup::RGData(inRG); + inRG.setData(&rgD); + rowgroup::Row r; + inRG.initRow(&r); + uint32_t rowSize = r.getSize(); + inRG.getRow(0, &r); + // populate RGData + //for(uint64_t i = rowsPerRG+1; i > 0; i--) + for(uint64_t i = 0; i < rowsPerRG; i++) // Worst case scenario for PQ + { + // TBD Use set..._offset methods to avoid offset calculation instructions + if(generateRandValues) + { + nextValue = ::rand(); + } + else + { + nextValue = base + i; + } + + r.setUintField<8>(nextValue, 0); + r.setUintField<8>(nextValue, 1); + if (maxInt < nextValue) + { + maxInt = nextValue; + } + r.nextRow(rowSize); + } + base += 1; + inRG.setRowCount(rowsPerRG); + + // Insert RGData into input DL + dlIn->insert(rgD); + } + // end of input signal + std::cout << "orderByTest_nRGs input DataList totalSize " << dlIn->totalSize() << std::endl; + dlIn->endOfInput(); + timer.stop(message); + + joblist::AnyDataListSPtr spdlOut(new AnyDataList()); + // Set the ring buffer big enough to take RGData-s with results b/c + // there is nobody to read out of the buffer. + joblist::RowGroupDL* dlOut = new RowGroupDL(1, numberOfRGs); + dlOut->OID(oid); + spdlOut->rowGroupDL(dlOut); + joblist::JobStepAssociation jsaOut; + jsaOut.outAdd(spdlOut); + //uint64_t outputDLIter = dlOut->getIterator(); + tns.outputAssociation(jsaOut); + + // Run Annex Step + message = "Sorting"; + timer.start(message); + tns.run(); + tns.join(); + timer.stop(message); + + // serialize RGData into bs and later back + // to follow ExeMgr whilst getting TAS result RowGroup + messageqcpp::ByteStream bs; + uint32_t result = 0; + rowgroup::RowGroup outRG(inRG); + rowgroup::RGData outRGData(outRG); + result = tns.nextBand(bs); + /*bool more = false; + do + { + dlOut->next(outputDLIter, &outRGData); + } while (more);*/ + std::cout << "orderByTest_nRGs result " << result << std::endl; + //CPPUNIT_ASSERT( result == limit ); + outRGData.deserialize(bs); + outRG.setData(&outRGData); + + //std::cout << "orderByTest_nRGs output RG " << outRG.toString() << std::endl; + std::cout << "maxInt " << maxInt << std::endl; + { + rowgroup::Row r; + outRG.initRow(&r); + outRG.getRow(0, &r); + CPPUNIT_ASSERT(limit == outRG.getRowCount() || outRG.getRowCount() == 8192); + CPPUNIT_ASSERT_EQUAL(maxInt, r.getUintField(1)); + } + + cout << "------------------------------------------------------------" << endl; + } + + + void ORDERBY_TIME_TEST() + { + uint64_t numRows = 8192; + uint64_t maxThreads = 16; + // limit == 100000 is still twice as good to sort in parallel + // limit == 1000000 however is better to sort using single threaded sorting + uint64_t limit = 100000; + bool parallel = true; + bool woParallel = false; + bool generateRandValues = true; + bool hasDistinct = true; + bool noDistinct = false; + //orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, noDistinct); + //orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, hasDistinct); + orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, noDistinct); + } + void QUICK_TEST() + { + float f = 1.1; + double d = 1.2; + uint64_t i = 1; + uint64_t* i_ptr = &i; + double* d_ptr = &d; + uint64_t* i2_ptr = (uint64_t*) d_ptr; + float* f_ptr = &f; + i_ptr = (uint64_t*) f_ptr; + + cout << "*i_ptr=" << *i_ptr << endl; + cout << "*i2_ptr=" << *i2_ptr << endl; + f_ptr = (float*) i_ptr; + + cout << "*f_ptr=" << *f_ptr << endl; + + cout << endl; + + if (d > i) + cout << "1.2 is greater than 1." << endl; + + if (f > i) + cout << "1.1 is greater than 1." << endl; + + if (d > f) + cout << "1.2 is greater than 1.1" << endl; + + if (*i_ptr < *i2_ptr) + cout << "1.1 < 1.2 when represented as uint64_t." << endl; + + cout << "sizeof(f) = " << sizeof(f) << endl; + cout << "sizeof(i) = " << sizeof(i) << endl; + cout << "sizeof(d) = " << sizeof(d) << endl; + + double dbl = 9.7; + double dbl2 = 1.3; + i_ptr = (uint64_t*) &dbl; + i2_ptr = (uint64_t*) &dbl2; + cout << endl; + cout << "9.7 as int is " << *i_ptr << endl; + cout << "9.7 as int is " << *i2_ptr << endl; + cout << "1.2 < 9.7 is " << (*i_ptr < *i2_ptr) << endl; + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(FilterDriver); + + +int main( int argc, char** argv) +{ + CppUnit::TextUi::TestRunner runner; + CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry(); + runner.addTest( registry.makeTest() ); + bool wasSuccessful = runner.run( "", false ); + return (wasSuccessful ? 0 : 1); +} + + diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index de46f85d0..e60d4db40 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -3153,12 +3153,6 @@ void TupleBPS::rgDataToDl(RGData& rgData, RowGroup& rg, RowGroupDL* dlp) if (dupColumns.size() > 0) dupOutputColumns(rgData, rg); - /* - if (!(fSessionId & 0x80000000)) { - rg.setData(&rgData); - cerr << "TBPS rowcount: " << rg.getRowCount() << endl; - } - */ dlp->insert(rgData); } diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index c1ef06a4e..4581ec119 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -1,4 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2019 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -65,6 +66,8 @@ using namespace querytele; #include "tupleannexstep.h" +#define QUEUE_RESERVE_SIZE 100000 + namespace { struct TAHasher @@ -117,10 +120,12 @@ TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) : fLimitHit(false), fEndOfResult(false), fDistinct(false), + fParallelOp(false), fOrderBy(NULL), fConstant(NULL), fFeInstance(funcexp::FuncExp::instance()), - fJobList(jobInfo.jobListPtr) + fJobList(jobInfo.jobListPtr), + fFinishedThreads(0) { fExtendedInfo = "TNS: "; fQtc.stepParms().stepType = StepTeleStats::T_TNS; @@ -129,6 +134,22 @@ TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) : TupleAnnexStep::~TupleAnnexStep() { + if(fParallelOp) + { + if(fOrderByList.size() > 0) + { + for(uint64_t id = 0; id <= fMaxThreads; id++) + { + delete fOrderByList[id]; + } + + fOrderByList.clear(); + } + + fInputIteratorsList.clear(); + fRunnersList.clear(); + } + if (fOrderBy) delete fOrderBy; @@ -149,25 +170,41 @@ void TupleAnnexStep::setOutputRowGroup(const rowgroup::RowGroup& rg) void TupleAnnexStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo) { + // Initialize structures used by separate workers + uint64_t id = 1; fRowGroupIn = rgIn; fRowGroupIn.initRow(&fRowIn); - - if (fOrderBy) + if(fParallelOp && fOrderBy) { - fOrderBy->distinct(fDistinct); - fOrderBy->initialize(rgIn, jobInfo); + fOrderByList.resize(fMaxThreads+1); + for(id = 0; id <= fMaxThreads; id++) + { + // *DRRTUY use SP here? + fOrderByList[id] = new LimitedOrderBy(); + fOrderByList[id]->distinct(fDistinct); + fOrderByList[id]->initialize(rgIn, jobInfo, false, true); + } + } + else + { + if (fOrderBy) + { + fOrderBy->distinct(fDistinct); + fOrderBy->initialize(rgIn, jobInfo); + } } if (fConstant == NULL) { - vector oids, oidsIn = fRowGroupIn.getOIDs(); - vector keys, keysIn = fRowGroupIn.getKeys(); - vector scale, scaleIn = fRowGroupIn.getScale(); - vector precision, precisionIn = fRowGroupIn.getPrecision(); - vector types, typesIn = fRowGroupIn.getColTypes(); - vector pos, posIn = fRowGroupIn.getOffsets(); - + vector oids, oidsIn = rgIn.getOIDs(); + vector keys, keysIn = rgIn.getKeys(); + vector scale, scaleIn = rgIn.getScale(); + vector precision, precisionIn = rgIn.getPrecision(); + vector types, typesIn = rgIn.getColTypes(); + vector pos, posIn = rgIn.getOffsets(); size_t n = jobInfo.nonConstDelCols.size(); + + // Add all columns into output RG as keys. Can we put only keys? oids.insert(oids.end(), oidsIn.begin(), oidsIn.begin() + n); keys.insert(keys.end(), keysIn.begin(), keysIn.begin() + n); scale.insert(scale.end(), scaleIn.begin(), scaleIn.begin() + n); @@ -198,8 +235,6 @@ void TupleAnnexStep::run() if (fInputDL == NULL) throw logic_error("Input is not a RowGroup data list."); - fInputIterator = fInputDL->getIterator(); - if (fOutputJobStepAssociation.outSize() == 0) throw logic_error("No output data list for annex step."); @@ -213,17 +248,58 @@ void TupleAnnexStep::run() fOutputIterator = fOutputDL->getIterator(); } - fRunner = jobstepThreadPool.invoke(Runner(this)); -} + if(fParallelOp) + { + // Indexing begins with 1 + fRunnersList.resize(fMaxThreads); + fInputIteratorsList.resize(fMaxThreads+1); + // Activate stats collecting before CS spawns threads. + if (traceOn()) dlTimes.setFirstReadTime(); + + // *DRRTUY Make this block conditional + StepTeleStats sts; + sts.query_uuid = fQueryUuid; + sts.step_uuid = fStepUuid; + sts.msg_type = StepTeleStats::ST_START; + sts.total_units_of_work = 1; + postStepStartTele(sts); + + for(uint32_t id = 1; id <= fMaxThreads; id++) + { + fInputIteratorsList[id] = fInputDL->getIterator(); + fRunnersList[id-1] = jobstepThreadPool.invoke(Runner(this, id)); + } + } + else + { + fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL(); + + if (fInputDL == NULL) + throw logic_error("Input is not a RowGroup data list."); + + fInputIterator = fInputDL->getIterator(); + fRunner = jobstepThreadPool.invoke(Runner(this)); + } + +} void TupleAnnexStep::join() { - if (fRunner) - jobstepThreadPool.join(fRunner); + if(fParallelOp) + { + jobstepThreadPool.join(fRunnersList); + } + else + { + if (fRunner) + { + jobstepThreadPool.join(fRunner); + } + } + } - uint32_t TupleAnnexStep::nextBand(messageqcpp::ByteStream& bs) { RGData rgDataOut; @@ -312,6 +388,12 @@ void TupleAnnexStep::execute() } } +void TupleAnnexStep::execute(uint32_t id) +{ + if(fOrderByList[id]) + executeParallelOrderBy(id); + +} void TupleAnnexStep::executeNoOrderBy() { @@ -337,7 +419,6 @@ void TupleAnnexStep::executeNoOrderBy() { fRowGroupIn.setData(&rgDataIn); fRowGroupIn.getRow(0, &fRowIn); - // Get a new output rowgroup for each input rowgroup to preserve the rids rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount()); fRowGroupOut.setData(&rgDataOut); @@ -459,7 +540,7 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() fJobList->abortOnLimit((JobStep*) this); } - if (UNLIKELY(fRowGroupOut.getRowCount() >= 8192)) + if (UNLIKELY(fRowGroupOut.getRowCount() >= rowgroup::rgCommonSize)) { dataVec.push_back(rgDataOut); rgDataOut.reinit(fRowGroupOut); @@ -596,6 +677,579 @@ void TupleAnnexStep::executeWithOrderBy() fOutputDL->endOfInput(); } +/* + The m() iterates over thread's LimitedOrderBy instances, + reverts the rules and then populates the final collection + used for final sorting. The method uses OrderByRow that + combination of Row::data and comparison rules. + When m() finishes with thread's LOBs it iterates over + final sorting collection, populates rgDataOut, then + sends it into outputDL. + Changing this method don't forget to make changes in + finalizeParallelOrderBy() that is a clone. + !!!The method doesn't set Row::baseRid +*/ +void TupleAnnexStep::finalizeParallelOrderByDistinct() +{ + utils::setThreadName("TASwParOrdDistM"); + uint64_t count = 0; + uint64_t offset = 0; + uint32_t rowSize = 0; + + rowgroup::RGData rgDataOut; + rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + // Calculate offset here + fRowGroupOut.getRow(0, &fRowOut); + ordering::SortingPQ finalPQ; + scoped_ptr distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this))); + + + try + { + for(uint64_t id = 1; id <= fMaxThreads; id++) + { + if (cancelled()) + { + break; + } + // Revert the ordering rules before we + // add rows into the final PQ. + fOrderByList[id]->getRule().revertRules(); + ordering::SortingPQ ¤tPQ = fOrderByList[id]->getQueue(); + finalPQ.reserve(finalPQ.size()+currentPQ.size()); + pair inserted; + while (currentPQ.size()) + { + ordering::OrderByRow &topOBRow = + const_cast(currentPQ.top()); + inserted = distinctMap->insert(topOBRow.fData); + if (inserted.second) + { + finalPQ.push(topOBRow); + } + currentPQ.pop(); + } + } + } + catch (const std::exception& ex) + { + catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + catch (...) + { + catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\ + caught an unknown exception 1", + ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + + + try + { + // OFFSET processing + while (finalPQ.size() && offset < fLimitStart) + { + offset++; + finalPQ.pop(); + } + } + catch (const std::exception& ex) + { + catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + catch (...) + { + catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\ + caught an unknown exception 2", + ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + + try + { + // Calculate rowSize only once + if (finalPQ.size()) + { + ordering::OrderByRow& topOBRow = + const_cast(finalPQ.top()); + fRowIn.setData(topOBRow.fData); + if (!fConstant) + copyRow(fRowIn, &fRowOut); + else + fConstant->fillInConstants(fRowIn, fRowOut); + rowSize = fRowOut.getSize(); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + finalPQ.pop(); + count++; + } + } + catch (const std::exception& ex) + { + catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + catch (...) + { + catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\ + caught an unknown exception 3", + ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + + try + { + if (!fConstant) + { + while(finalPQ.size()) + { + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); + copyRow(fRowIn, &fRowOut); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + + finalPQ.pop(); + count++; + if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) + { + break; + } + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } + else + { + break; + } + } // end of limit bound while loop + } + else // Add ConstantColumns striped earlier + { + while(finalPQ.size()) + { + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); + fConstant->fillInConstants(fRowIn, fRowOut); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + + finalPQ.pop(); + count++; + if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) + { + break; + } + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } + else + { + break; + } + } // end of limit bound while loop + } // end of if-else + } + catch (const std::exception& ex) + { + catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + catch (...) + { + catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\ + caught an unknown exception 4", + ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + } + + fOutputDL->endOfInput(); + + StepTeleStats sts; + sts.query_uuid = fQueryUuid; + sts.step_uuid = fStepUuid; + sts.msg_type = StepTeleStats::ST_SUMMARY; + sts.total_units_of_work = sts.units_of_work_completed = 1; + sts.rows = fRowsReturned; + postStepSummaryTele(sts); + + if (traceOn()) + { + if (dlTimes.FirstReadTime().tv_sec == 0) + dlTimes.setFirstReadTime(); + + dlTimes.setLastReadTime(); + dlTimes.setEndOfInputTime(); + printCalTrace(); + } +} + +/* + The m() iterates over thread's LimitedOrderBy instances, + reverts the rules and then populates the final collection + used for final sorting. The method uses OrderByRow that + combination of Row::data and comparison rules. + When m() finishes with thread's LOBs it iterates over + final sorting collection, populates rgDataOut, then + sends it into outputDL. + Changing this method don't forget to make changes in + finalizeParallelOrderByDistinct() that is a clone. + !!!The method doesn't set Row::baseRid +*/ +void TupleAnnexStep::finalizeParallelOrderBy() +{ + utils::setThreadName("TASwParOrdMerge"); + uint64_t count = 0; + uint64_t offset = 0; + uint32_t rowSize = 0; + + rowgroup::RGData rgDataOut; + ordering::SortingPQ finalPQ; + rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + // Calculate offset here + fRowGroupOut.getRow(0, &fRowOut); + + try + { + for(uint64_t id = 1; id <= fMaxThreads; id++) + { + if (cancelled()) + { + break; + } + // Revert the ordering rules before we + // add rows into the final PQ. + fOrderByList[id]->getRule().revertRules(); + ordering::SortingPQ ¤tPQ = fOrderByList[id]->getQueue(); + finalPQ.reserve(currentPQ.size()); + while (currentPQ.size()) + { + ordering::OrderByRow &topOBRow = + const_cast(currentPQ.top()); + finalPQ.push(topOBRow); + currentPQ.pop(); + } + } + } + catch (const std::exception& ex) + { + catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + catch (...) + { + catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\ + caught an unknown exception 1", + ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + + + try + { + // OFFSET processing + while (finalPQ.size() && offset < fLimitStart) + { + offset++; + finalPQ.pop(); + } + } + catch (const std::exception& ex) + { + catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + catch (...) + { + catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\ + caught an unknown exception 2", + ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + + try + { + // Calculate rowSize only once + if (finalPQ.size()) + { + ordering::OrderByRow& topOBRow = + const_cast(finalPQ.top()); + fRowIn.setData(topOBRow.fData); + if (!fConstant) + copyRow(fRowIn, &fRowOut); + else + fConstant->fillInConstants(fRowIn, fRowOut); + rowSize = fRowOut.getSize(); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + finalPQ.pop(); + count++; + } + } + catch (const std::exception& ex) + { + catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + catch (...) + { + catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\ + caught an unknown exception 3", + ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + + try + { + if (!fConstant) + { + while(finalPQ.size()) + { + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); + copyRow(fRowIn, &fRowOut); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + + finalPQ.pop(); + count++; + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } + else + { + break; + } + } // end of limit bound while loop + } + else // Add ConstantColumns striped earlier + { + while(finalPQ.size()) + { + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); + fConstant->fillInConstants(fRowIn, fRowOut); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + + finalPQ.pop(); + count++; + if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) + { + break; + } + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } + else + { + break; + } + } // end of limit bound while loop + } // end of if-else + } + catch (const std::exception& ex) + { + catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + catch (...) + { + catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\ + caught an unknown exception 4", + ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + } + + fOutputDL->endOfInput(); + + StepTeleStats sts; + sts.query_uuid = fQueryUuid; + sts.step_uuid = fStepUuid; + sts.msg_type = StepTeleStats::ST_SUMMARY; + sts.total_units_of_work = sts.units_of_work_completed = 1; + sts.rows = fRowsReturned; + postStepSummaryTele(sts); + + if (traceOn()) + { + if (dlTimes.FirstReadTime().tv_sec == 0) + dlTimes.setFirstReadTime(); + + dlTimes.setLastReadTime(); + dlTimes.setEndOfInputTime(); + printCalTrace(); + } +} + +void TupleAnnexStep::executeParallelOrderBy(uint64_t id) +{ + utils::setThreadName("TASwParOrd"); + RGData rgDataIn; + RGData rgDataOut; + bool more = false; + uint64_t dlOffset = 0; + uint32_t rowSize = 0; + + uint64_t rowCount = 0; + uint64_t doubleRGSize = 2*rowgroup::rgCommonSize; + rowgroup::Row r = fRowIn; + rowgroup::RowGroup rg = fRowGroupIn; + rg.initRow(&r); + LimitedOrderBy *limOrderBy = fOrderByList[id]; + ordering::SortingPQ ¤tPQ = limOrderBy->getQueue(); + if (limOrderBy->getLimitCount() < QUEUE_RESERVE_SIZE) + { + currentPQ.reserve(limOrderBy->getLimitCount()); + } + else + { + currentPQ.reserve(QUEUE_RESERVE_SIZE); + } + + try + { + more = fInputDL->next(fInputIteratorsList[id], &rgDataIn); + if (more) dlOffset++; + + while (more && !cancelled()) + { + if (dlOffset%fMaxThreads == id-1) + { + if (cancelled()) + break; + + if (currentPQ.capacity()-currentPQ.size() < doubleRGSize) + { + currentPQ.reserve(QUEUE_RESERVE_SIZE); + } + + rg.setData(&rgDataIn); + rg.getRow(0, &r); + rowSize = r.getSize(); + rowCount = rg.getRowCount(); + + for (uint64_t i = 0; i < rowCount; ++i) + { + limOrderBy->processRow(r); + r.nextRow(rowSize); + } + } + + // *DRRTUY Implement a method to skip elements in FIFO + more = fInputDL->next(fInputIteratorsList[id], &rgDataIn); + if(more) dlOffset++; + } + } + catch (const std::exception& ex) + { + catchHandler(ex.what(), ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + catch (...) + { + catchHandler("TupleAnnexStep::executeParallelOrderBy execute caught an unknown exception", + ERR_IN_PROCESS, fErrorInfo, fSessionId); + } + + // read out the input DL + while (more) + more = fInputDL->next(fInputIteratorsList[id], &rgDataIn); + + // Count finished sorting threads under mutex and run final + // sort step when the last thread converges + fParallelFinalizeMutex.lock(); + fFinishedThreads++; + if (fFinishedThreads == fMaxThreads) + { + fParallelFinalizeMutex.unlock(); + if(fDistinct) + { + finalizeParallelOrderByDistinct(); + } + else + { + finalizeParallelOrderBy(); + } + } + else + { + fParallelFinalizeMutex.unlock(); + } +} const RowGroup& TupleAnnexStep::getOutputRowGroup() const { @@ -626,7 +1280,8 @@ bool TupleAnnexStep::deliverStringTableRowGroup() const const string TupleAnnexStep::toString() const { ostringstream oss; - oss << "AnnexStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId; + oss << "AnnexStep "; + oss << " ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId; oss << " in:"; @@ -673,8 +1328,8 @@ void TupleAnnexStep::printCalTrace() void TupleAnnexStep::formatMiniStats() { ostringstream oss; - oss << "TNS " - << "UM " + oss << "TNS "; + oss << "UM " << "- " << "- " << "- " diff --git a/dbcon/joblist/tupleannexstep.h b/dbcon/joblist/tupleannexstep.h index 367e7ea4f..76cac326d 100644 --- a/dbcon/joblist/tupleannexstep.h +++ b/dbcon/joblist/tupleannexstep.h @@ -1,4 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2019 MariaDB Corporaton. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -21,8 +22,11 @@ #ifndef JOBLIST_TUPLEANNEXSTEP_H #define JOBLIST_TUPLEANNEXSTEP_H -#include "jobstep.h" +#include +#include +#include "jobstep.h" +#include "limitedorderby.h" // forward reference namespace fucexp @@ -49,6 +53,8 @@ public: /** @brief TupleAnnexStep constructor */ TupleAnnexStep(const JobInfo& jobInfo); + // Copy ctor to have a class mutex + TupleAnnexStep(const TupleAnnexStep ©); /** @brief TupleAnnexStep destructor */ @@ -90,6 +96,14 @@ public: fLimitStart = s; fLimitCount = c; } + void setParallelOp() + { + fParallelOp = true; + } + void setMaxThreads(uint32_t number) + { + fMaxThreads = number; + } virtual bool stringTableFriendly() { @@ -100,11 +114,15 @@ public: protected: void execute(); + void execute(uint32_t); void executeNoOrderBy(); void executeWithOrderBy(); + void executeParallelOrderBy(uint64_t id); void executeNoOrderByWithDistinct(); void formatMiniStats(); void printCalTrace(); + void finalizeParallelOrderBy(); + void finalizeParallelOrderByDistinct(); // input/output rowgroup and row rowgroup::RowGroup fRowGroupIn; @@ -117,18 +135,26 @@ protected: RowGroupDL* fInputDL; RowGroupDL* fOutputDL; uint64_t fInputIterator; + std::vector fInputIteratorsList; uint64_t fOutputIterator; class Runner { public: - Runner(TupleAnnexStep* step) : fStep(step) { } + Runner(TupleAnnexStep* step) : + fStep(step), id(0) { } + Runner(TupleAnnexStep* step, uint32_t id) : + fStep(step), id(id) { } void operator()() { - fStep->execute(); + if(id) + fStep->execute(id); + else + fStep->execute(); } TupleAnnexStep* fStep; + uint16_t id; }; uint64_t fRunner; // thread pool handle @@ -136,9 +162,11 @@ protected: uint64_t fRowsReturned; uint64_t fLimitStart; uint64_t fLimitCount; + uint64_t fMaxThreads; bool fLimitHit; bool fEndOfResult; bool fDistinct; + bool fParallelOp; LimitedOrderBy* fOrderBy; TupleConstantStep* fConstant; @@ -146,8 +174,21 @@ protected: funcexp::FuncExp* fFeInstance; JobList* fJobList; + std::vector fOrderByList; + std::vector fRunnersList; + uint16_t fFinishedThreads; + boost::mutex fParallelFinalizeMutex; }; +template +class reservablePQ: private std::priority_queue +{ +public: + typedef typename std::priority_queue::size_type size_type; + reservablePQ(size_type capacity = 0) { reserve(capacity); }; + void reserve(size_type capacity) { this->c.reserve(capacity); } + size_type capacity() const { return this->c.capacity(); } +}; } // namespace diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 31199331a..28b2d8065 100755 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -7807,6 +7807,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, // To activate LimitedOrderBy if(isPushdownHand) { + csep->orderByThreads(get_orderby_threads(gwi.thd)); csep->specHandlerProcessed(true); } } @@ -7844,6 +7845,8 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, { Item_int* select = (Item_int*)select_lex.master_unit()->global_parameters()->select_limit; csep->limitNum(select->val_int()); + // MCOL-894 Activate parallel ORDER BY + csep->orderByThreads(get_orderby_threads(gwi.thd)); } } } @@ -9840,6 +9843,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro { csep->hasOrderBy(true); csep->specHandlerProcessed(true); + csep->orderByThreads(get_orderby_threads(gwi.thd)); } } diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index a0759d88c..af4016139 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -107,6 +107,18 @@ static MYSQL_THDVAR_BOOL( 0 ); +static MYSQL_THDVAR_UINT( + orderby_threads, + PLUGIN_VAR_RQCMDARG, + "Number of parallel threads used by ORDER BY. (default to 16)", + NULL, + NULL, + 16, + 0, + 2048, + 1 +); + // legacy system variables static MYSQL_THDVAR_ULONG( decimal_scale, @@ -294,6 +306,7 @@ st_mysql_sys_var* mcs_system_variables[] = MYSQL_SYSVAR(derived_handler), MYSQL_SYSVAR(processing_handlers_fallback), MYSQL_SYSVAR(group_by_handler), + MYSQL_SYSVAR(orderby_threads), MYSQL_SYSVAR(decimal_scale), MYSQL_SYSVAR(use_decimal_scale), MYSQL_SYSVAR(ordered_only), @@ -396,6 +409,16 @@ mcs_compression_type_t get_compression_type(THD* thd) { return (mcs_compression_type_t) THDVAR(thd, compression_type); } +uint get_orderby_threads(THD* thd) +{ + return ( thd == NULL ) ? 0 : THDVAR(thd, orderby_threads); +} +void set_orderby_threads(THD* thd, uint value) +{ + THDVAR(thd, orderby_threads) = value; +} + + bool get_use_decimal_scale(THD* thd) { return ( thd == NULL ) ? false : THDVAR(thd, use_decimal_scale); diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index e1c0d4731..d9ec7f4d0 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -58,6 +58,9 @@ void set_group_by_handler(THD* thd, bool value); bool get_fallback_knob(THD* thd); void set_fallback_knob(THD* thd, bool value); +uint get_orderby_threads(THD* thd); +void set_orderby_threads(THD* thd, uint value); + bool get_use_decimal_scale(THD* thd); void set_use_decimal_scale(THD* thd, bool value); diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index 27d530343..cefae07f4 100644 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -979,117 +979,6 @@ bool Row::isNullValue(uint32_t colIndex) const uint64_t Row::getNullValue(uint32_t colIndex) const { return utils::getNullValue(types[colIndex], getColumnWidth(colIndex)); -#if 0 - - switch (types[colIndex]) - { - case CalpontSystemCatalog::TINYINT: - return joblist::TINYINTNULL; - - case CalpontSystemCatalog::SMALLINT: - return joblist::SMALLINTNULL; - - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - return joblist::INTNULL; - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - return joblist::FLOATNULL; - - case CalpontSystemCatalog::DATE: - return joblist::DATENULL; - - case CalpontSystemCatalog::BIGINT: - return joblist::BIGINTNULL; - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - return joblist::DOUBLENULL; - - case CalpontSystemCatalog::DATETIME: - return joblist::DATETIMENULL; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::VARCHAR: - case CalpontSystemCatalog::STRINT: - { - uint32_t len = getColumnWidth(colIndex); - - switch (len) - { - case 1: - return joblist::CHAR1NULL; - - case 2: - return joblist::CHAR2NULL; - - case 3: - case 4: - return joblist::CHAR4NULL; - - case 5: - case 6: - case 7: - case 8: - return joblist::CHAR8NULL; - - default: - throw logic_error("Row::getNullValue() Can't return the NULL string"); - } - - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - uint32_t len = getColumnWidth(colIndex); - - switch (len) - { - case 1 : - return joblist::TINYINTNULL; - - case 2 : - return joblist::SMALLINTNULL; - - case 4 : - return joblist::INTNULL; - - default: - return joblist::BIGINTNULL; - } - - break; - } - - case CalpontSystemCatalog::UTINYINT: - return joblist::UTINYINTNULL; - - case CalpontSystemCatalog::USMALLINT: - return joblist::USMALLINTNULL; - - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - return joblist::UINTNULL; - - case CalpontSystemCatalog::UBIGINT: - return joblist::UBIGINTNULL; - - case CalpontSystemCatalog::LONGDOUBLE: - return -1; // no NULL value for long double yet, this is a nan. - - case CalpontSystemCatalog::VARBINARY: - default: - ostringstream os; - os << "Row::getNullValue(): got bad column type (" << types[colIndex] << - "). Width=" << getColumnWidth(colIndex) << endl; - os << toString() << endl; - throw logic_error(os.str()); - } - -#endif } /* This fcn might produce overflow warnings from the compiler, but that's OK. @@ -1098,117 +987,6 @@ uint64_t Row::getNullValue(uint32_t colIndex) const int64_t Row::getSignedNullValue(uint32_t colIndex) const { return utils::getSignedNullValue(types[colIndex], getColumnWidth(colIndex)); -#if 0 - - switch (types[colIndex]) - { - case CalpontSystemCatalog::TINYINT: - return (int64_t) ((int8_t) joblist::TINYINTNULL); - - case CalpontSystemCatalog::SMALLINT: - return (int64_t) ((int16_t) joblist::SMALLINTNULL); - - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - return (int64_t) ((int32_t) joblist::INTNULL); - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - return (int64_t) ((int32_t) joblist::FLOATNULL); - - case CalpontSystemCatalog::DATE: - return (int64_t) ((int32_t) joblist::DATENULL); - - case CalpontSystemCatalog::BIGINT: - return joblist::BIGINTNULL; - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - return joblist::DOUBLENULL; - - case CalpontSystemCatalog::DATETIME: - return joblist::DATETIMENULL; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::VARCHAR: - case CalpontSystemCatalog::STRINT: - { - uint32_t len = getColumnWidth(colIndex); - - switch (len) - { - case 1: - return (int64_t) ((int8_t) joblist::CHAR1NULL); - - case 2: - return (int64_t) ((int16_t) joblist::CHAR2NULL); - - case 3: - case 4: - return (int64_t) ((int32_t) joblist::CHAR4NULL); - - case 5: - case 6: - case 7: - case 8: - return joblist::CHAR8NULL; - - default: - throw logic_error("Row::getSignedNullValue() Can't return the NULL string"); - } - - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - uint32_t len = getColumnWidth(colIndex); - - switch (len) - { - case 1 : - return (int64_t) ((int8_t) joblist::TINYINTNULL); - - case 2 : - return (int64_t) ((int16_t) joblist::SMALLINTNULL); - - case 4 : - return (int64_t) ((int32_t) joblist::INTNULL); - - default: - return joblist::BIGINTNULL; - } - - break; - } - - case CalpontSystemCatalog::UTINYINT: - return (int64_t) ((int8_t) joblist::UTINYINTNULL); - - case CalpontSystemCatalog::USMALLINT: - return (int64_t) ((int16_t) joblist::USMALLINTNULL); - - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - return (int64_t) ((int32_t) joblist::UINTNULL); - - case CalpontSystemCatalog::UBIGINT: - return (int64_t)joblist::UBIGINTNULL; - - case CalpontSystemCatalog::LONGDOUBLE: - return -1; // no NULL value for long double yet, this is a nan. - - case CalpontSystemCatalog::VARBINARY: - default: - ostringstream os; - os << "Row::getSignedNullValue(): got bad column type (" << types[colIndex] << - "). Width=" << getColumnWidth(colIndex) << endl; - os << toString() << endl; - throw logic_error(os.str()); - } - -#endif } RowGroup::RowGroup() : columnCount(0), data(NULL), rgData(NULL), strings(NULL), diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index 373269e13..bd18e1726 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -63,6 +63,8 @@ namespace rowgroup { +const int16_t rgCommonSize = 8192; + /* The format of the data RowGroup points to is currently ... diff --git a/utils/windowfunction/CMakeLists.txt b/utils/windowfunction/CMakeLists.txt index 1a221511f..77e4e86d3 100755 --- a/utils/windowfunction/CMakeLists.txt +++ b/utils/windowfunction/CMakeLists.txt @@ -30,3 +30,8 @@ set_target_properties(windowfunction PROPERTIES VERSION 1.0.0 SOVERSION 1) install(TARGETS windowfunction DESTINATION ${ENGINE_LIBDIR} COMPONENT libs) +if (WITH_SORTING_COMPARATORS_UT) + add_executable(comparators_tests comparators-tests.cpp) + target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) + install(TARGETS comparators_tests DESTINATION ${ENGINE_BINDIR} COMPONENT platform) +endif() diff --git a/utils/windowfunction/comparators-tests.cpp b/utils/windowfunction/comparators-tests.cpp new file mode 100644 index 000000000..d2d5df316 --- /dev/null +++ b/utils/windowfunction/comparators-tests.cpp @@ -0,0 +1,425 @@ +/* Copyright (C) 2019 MariaDB Corporaton. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +// $Id: tdriver-filter.cpp 9210 2013-01-21 14:10:42Z rdempsey $ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "jobstep.h" +#include "funcexp.h" +#include "jlf_common.h" +#include "tupleannexstep.h" +#include "calpontsystemcatalog.h" +#include +#include +#include "bytestream.h" +#include "idborderby.h" + +#define DEBUG +#define MEMORY_LIMIT 14983602176 + +using namespace std; +using namespace joblist; +using namespace messageqcpp; + +// Timer class used by this tdriver to output elapsed times, etc. +class Timer +{ +public: + void start(const string& message) + { + if (!fHeaderDisplayed) + { + header(); + fHeaderDisplayed = true; + } + + gettimeofday(&fTvStart, 0); + cout << timestr() << " Start " << message << endl; + } + + void stop(const string& message) + { + time_t now; + time(&now); + string secondsElapsed; + getTimeElapsed(secondsElapsed); + cout << timestr() << " " << secondsElapsed << " Stop " << message << endl; + } + + Timer() : fHeaderDisplayed(false) {} + +private: + + struct timeval fTvStart; + bool fHeaderDisplayed; + + double getTimeElapsed(string& seconds) + { + struct timeval tvStop; + gettimeofday(&tvStop, 0); + double secondsElapsed = + (tvStop.tv_sec + (tvStop.tv_usec / 1000000.0)) - + (fTvStart.tv_sec + (fTvStart.tv_usec / 1000000.0)); + ostringstream oss; + oss << secondsElapsed; + seconds = oss.str(); + seconds.resize(8, '0'); + return secondsElapsed; + } + + string timestr() + { + struct tm tm; + struct timeval tv; + + gettimeofday(&tv, 0); + localtime_r(&tv.tv_sec, &tm); + + ostringstream oss; + oss << setfill('0') + << setw(2) << tm.tm_hour << ':' + << setw(2) << tm.tm_min << ':' + << setw(2) << tm.tm_sec << '.' + << setw(6) << tv.tv_usec + ; + return oss.str(); + } + + void header() + { + cout << endl; + cout << "Time Seconds Activity" << endl; + } +}; + +class FilterDriver : public CppUnit::TestFixture +{ + + CPPUNIT_TEST_SUITE(FilterDriver); + + CPPUNIT_TEST(INT_TEST); + CPPUNIT_TEST(FLOAT_TEST); + + CPPUNIT_TEST_SUITE_END(); + +private: + // The tests creates an RG with 1 column of the cscDt type + // then initialize RGData. After that it adds two numeric values (v1 < v2)and two NULL. + // Then creates comparator structores and run a number of tests. v1 < v2 + void testComparatorWithDT(execplan::CalpontSystemCatalog::ColDataType cscDt, + uint32_t width, + bool generateRandValues) + { + std::cout << std::endl << "------------------------------------------------------------" << std::endl; + uint32_t oid =3001; + std::vector offsets, roids, tkeys, cscale, cprecision; + std::vector types; + offsets.push_back(2); offsets.push_back(2+width); + roids.push_back(oid); + tkeys.push_back(1); + types.push_back(cscDt); + cscale.push_back(0); + cprecision.push_back(20); + rowgroup::RowGroup inRG(1, //column count + offsets, //oldOffset + roids, // column oids + tkeys, //keys + types, // types + cscale, //scale + cprecision, // precision + 20, // sTableThreshold + false //useStringTable + ); + + rowgroup::RGData rgD = rowgroup::RGData(inRG); + inRG.setData(&rgD); + rowgroup::Row r, r1, r2, r3; + inRG.initRow(&r); + uint32_t rowSize = r.getSize(); + inRG.getRow(0, &r); + + // Sorting spec describes sorting direction and NULL comparision + // preferences + ordering::IdbSortSpec spec = ordering::IdbSortSpec(0, // column index + true, // ascending + true); // NULLs first + std::vector specVect; + specVect.push_back(spec); + + switch(cscDt) + { + case execplan::CalpontSystemCatalog::UTINYINT: + { + std::cout << "UTINYINT " << std::endl; + r.setUintField<1>(42, 0); + r.nextRow(rowSize); + r.setUintField<1>(43, 0); + r.nextRow(rowSize); + r.setUintField<1>(joblist::UTINYINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::USMALLINT: + { + std::cout << "USMALLINT " << std::endl; + r.setUintField<2>(42, 0); + r.nextRow(rowSize); + r.setUintField<2>(43, 0); + r.nextRow(rowSize); + r.setUintField<2>(joblist::USMALLINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::UMEDINT: + case execplan::CalpontSystemCatalog::UINT: + { + std::cout << "UINT " << std::endl; + r.setUintField<4>(42, 0); + r.nextRow(rowSize); + r.setUintField<4>(43, 0); + r.nextRow(rowSize); + r.setUintField<4>(joblist::UINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::DATETIME: + case execplan::CalpontSystemCatalog::UBIGINT: + { + std::cout << "UBIGINT " << std::endl; + r.setUintField<8>(42, 0); + r.nextRow(rowSize); + r.setUintField<8>(43, 0); + r.nextRow(rowSize); + r.setUintField<8>(joblist::UBIGINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::TINYINT: + { + std::cout << "TINYINT " << std::endl; + r.setIntField<1>(42, 0); + r.nextRow(rowSize); + r.setIntField<1>(43, 0); + r.nextRow(rowSize); + r.setIntField<1>(joblist::TINYINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::SMALLINT: + { + std::cout << "SMALLINT " << std::endl; + r.setIntField<2>(42, 0); + r.nextRow(rowSize); + r.setIntField<2>(43, 0); + r.nextRow(rowSize); + r.setIntField<2>(joblist::SMALLINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::MEDINT: + case execplan::CalpontSystemCatalog::INT: + case execplan::CalpontSystemCatalog::DATE: + { + if (cscDt == execplan::CalpontSystemCatalog::DATE) + std::cout << "DATE" << std::endl; + else + std::cout << "INT " << std::endl; + r.setIntField<4>(42, 0); + r.nextRow(rowSize); + r.setIntField<4>(43, 0); + r.nextRow(rowSize); + if (cscDt == execplan::CalpontSystemCatalog::DATE) + r.setIntField<4>(joblist::DATENULL, 0); + else + r.setIntField<4>(joblist::INTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::BIGINT: + { + std::cout << "BIGINT " << std::endl; + r.setIntField<8>(42, 0); + r.nextRow(rowSize); + r.setIntField<8>(43, 0); + r.nextRow(rowSize); + r.setIntField<8>(joblist::BIGINTNULL, 0); + break; + } + case execplan::CalpontSystemCatalog::DECIMAL: + case execplan::CalpontSystemCatalog::UDECIMAL: + { + std::cout << "DECIMAL " << std::endl; + switch (width) + { + case 1 : + { + r.setUintField<1>(42, 0); + r.nextRow(rowSize); + r.setUintField<1>(43, 0); + r.nextRow(rowSize); + r.setUintField<1>(joblist::TINYINTNULL, 0); + break; + } + case 2 : + { + r.setUintField<2>(42, 0); + r.nextRow(rowSize); + r.setUintField<2>(43, 0); + r.nextRow(rowSize); + r.setUintField<2>(joblist::SMALLINTNULL, 0); + break; + } + case 4 : + { + r.setUintField<4>(42, 0); + r.nextRow(rowSize); + r.setUintField<4>(43, 0); + r.nextRow(rowSize); + r.setUintField<4>(joblist::INTNULL, 0); + break; + } + default: + { + r.setUintField<8>(42, 0); + r.nextRow(rowSize); + r.setUintField<8>(43, 0); + r.nextRow(rowSize); + r.setUintField<8>(joblist::BIGINTNULL, 0); + break; + } + } + break; + } + case execplan::CalpontSystemCatalog::FLOAT: + case execplan::CalpontSystemCatalog::UFLOAT: + { + std::cout << "FLOAT " << std::endl; + r.setFloatField(42.1, 0); + r.nextRow(rowSize); + r.setFloatField(43.1, 0); + r.nextRow(rowSize); + r.setUintField(joblist::FLOATNULL, 0); + break; + + } + case execplan::CalpontSystemCatalog::DOUBLE: + case execplan::CalpontSystemCatalog::UDOUBLE: + { + std::cout << "DOUBLE " << std::endl; + r.setDoubleField(42.1, 0); + r.nextRow(rowSize); + r.setDoubleField(43.1, 0); + r.nextRow(rowSize); + r.setUintField(joblist::DOUBLENULL, 0); + break; + } + case execplan::CalpontSystemCatalog::LONGDOUBLE: + { + r.setLongDoubleField(42.1, 0); + r.nextRow(rowSize); + r.setLongDoubleField(43.1, 0); + r.nextRow(rowSize); + r.setLongDoubleField(joblist::LONGDOUBLENULL, 0); + break; + } + default: + { + break; + } + } + + inRG.setRowCount(3); + inRG.initRow(&r1); + inRG.initRow(&r2); + inRG.initRow(&r3); + inRG.getRow(0, &r1); + inRG.getRow(1, &r2); + inRG.getRow(2, &r3); + + std::cout<< "r1 " << r1.toString() << " r2 " << r2.toString() + << " r3 " << r3.toString() << std::endl; + + ordering::IdbCompare idbCompare; + idbCompare.initialize(inRG); + ordering::OrderByData odbData = ordering::OrderByData(specVect, inRG); + bool result = odbData(r1.getPointer(), r2.getPointer()); + std::cout << r1.toString() << " < " << r2.toString() << " is " + << ((result) ? "true" : "false") << std::endl; + CPPUNIT_ASSERT(result == true); + result = odbData(r2.getPointer(), r1.getPointer()); + std::cout << r2.toString() << " < " << r1.toString() << " is " + << ((result) ? "true" : "false") << std::endl; + CPPUNIT_ASSERT(result == false); + result = odbData(r2.getPointer(), r2.getPointer()); + std::cout << r2.toString() << " < " << r2.toString() << " is " + << ((result) ? "true" : "false") << std::endl; + CPPUNIT_ASSERT(result == false); + // Compare value with NULL. if spec.fNf then NULLs goes first + result = odbData(r3.getPointer(), r1.getPointer()); + std::cout << r3.toString() << " < " << r1.toString() << " is " + << ((result) ? "true" : "false") << std::endl; + CPPUNIT_ASSERT(result == true); + // Compare NULL with NULL + result = odbData(r3.getPointer(), r1.getPointer()); + std::cout << r3.toString() << " < " << r3.toString() << " is " + << ((result) ? "true" : "false") << std::endl; + CPPUNIT_ASSERT(result == true); + } + + void INT_TEST() + { + //bool generateValues = true; + bool fixedValues = false; + testComparatorWithDT(execplan::CalpontSystemCatalog::UTINYINT, 1, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::USMALLINT, 2, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::UMEDINT, 4, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::UBIGINT, 8, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::DATETIME, 8, fixedValues); + + testComparatorWithDT(execplan::CalpontSystemCatalog::TINYINT, 1, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::SMALLINT, 2, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::MEDINT, 4, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::DATE, 4, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::BIGINT, 8, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::DECIMAL, 8, fixedValues); + + testComparatorWithDT(execplan::CalpontSystemCatalog::FLOAT, 4, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::DOUBLE, 8, fixedValues); + testComparatorWithDT(execplan::CalpontSystemCatalog::LONGDOUBLE, 8, fixedValues); + } + + void FLOAT_TEST() + { + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(FilterDriver); + + +int main( int argc, char** argv) +{ + CppUnit::TextUi::TestRunner runner; + CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry(); + runner.addTest( registry.makeTest() ); + bool wasSuccessful = runner.run( "", false ); + return (wasSuccessful ? 0 : 1); +} + + diff --git a/utils/windowfunction/idborderby.cpp b/utils/windowfunction/idborderby.cpp index 04f89f1a9..d1fdd2801 100644 --- a/utils/windowfunction/idborderby.cpp +++ b/utils/windowfunction/idborderby.cpp @@ -43,32 +43,85 @@ using namespace rowgroup; #include "idborderby.h" +#include "joblisttypes.h" namespace ordering { +int TinyIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + int8_t v1 = l->row1().getIntField(fSpec.fIndex); + int8_t v2 = l->row2().getIntField(fSpec.fIndex); + int8_t nullValue = static_cast(joblist::TINYINTNULL); + + if (v1 == nullValue || v2 == nullValue) + { + if (v1 != nullValue && v2 == nullValue) + ret = fSpec.fNf; + else if (v1 == nullValue && v2 != nullValue) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} + +int SmallIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + int16_t v1 = l->row1().getIntField(fSpec.fIndex); + int16_t v2 = l->row2().getIntField(fSpec.fIndex); + int16_t nullValue = static_cast(joblist::SMALLINTNULL); + + if (v1 == nullValue || v2 == nullValue) + { + if (v1 != nullValue && v2 == nullValue) + ret = fSpec.fNf; + else if (v1 == nullValue && v2 != nullValue) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} int IntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) { l->row1().setData(r1); l->row2().setData(r2); - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); - int ret = 0; + int32_t v1 = l->row1().getIntField(fSpec.fIndex); + int32_t v2 = l->row2().getIntField(fSpec.fIndex); + int32_t nullValue = static_cast(joblist::INTNULL); - if (b1 == true || b2 == true) + if (v1 == nullValue || v2 == nullValue) { - if (b1 == false && b2 == true) + if (v1 != nullValue && v2 == nullValue) ret = fSpec.fNf; - else if (b1 == true && b2 == false) + else if (v1 == nullValue && v2 != nullValue) ret = -fSpec.fNf; } else { - int64_t v1 = l->row1().getIntField(fSpec.fIndex); - int64_t v2 = l->row2().getIntField(fSpec.fIndex); - if (v1 > v2) ret = fSpec.fAsc; else if (v1 < v2) @@ -78,29 +131,25 @@ int IntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) return ret; } - -int UintCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +int BigIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) { l->row1().setData(r1); l->row2().setData(r2); - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); - int ret = 0; + int64_t v1 = l->row1().getIntField(fSpec.fIndex); + int64_t v2 = l->row2().getIntField(fSpec.fIndex); + int64_t nullValue = static_cast(joblist::BIGINTNULL); - if (b1 == true || b2 == true) + if (v1 == nullValue || v2 == nullValue) { - if (b1 == false && b2 == true) + if (v1 != nullValue && v2 == nullValue) ret = fSpec.fNf; - else if (b1 == true && b2 == false) + else if (v1 == nullValue && v2 != nullValue) ret = -fSpec.fNf; } else { - uint64_t v1 = l->row1().getUintField(fSpec.fIndex); - uint64_t v2 = l->row2().getUintField(fSpec.fIndex); - if (v1 > v2) ret = fSpec.fAsc; else if (v1 < v2) @@ -110,6 +159,116 @@ int UintCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) return ret; } +int UTinyIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint8_t v1 = l->row1().getUintField(fSpec.fIndex); + uint8_t v2 = l->row2().getUintField(fSpec.fIndex); + uint8_t nullValue = static_cast(joblist::UTINYINTNULL); + + if (v1 == nullValue || v2 == nullValue) + { + if (v1 != nullValue && v2 == nullValue) + ret = fSpec.fNf; + else if (v1 == nullValue && v2 != nullValue) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} + +int USmallIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint16_t v1 = l->row1().getUintField(fSpec.fIndex); + uint16_t v2 = l->row2().getUintField(fSpec.fIndex); + uint16_t nullValue = static_cast(joblist::USMALLINTNULL); + + if (v1 == nullValue || v2 == nullValue) + { + if (v1 != nullValue && v2 == nullValue) + ret = fSpec.fNf; + else if (v1 == nullValue && v2 != nullValue) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} + +int UIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint32_t v1 = l->row1().getUintField(fSpec.fIndex); + uint32_t v2 = l->row2().getUintField(fSpec.fIndex); + uint32_t nullValue = static_cast(joblist::UINTNULL); + + if (v1 == nullValue || v2 == nullValue) + { + if (v1 != nullValue && v2 == nullValue) + ret = fSpec.fNf; + else if (v1 == nullValue && v2 != nullValue) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} + +int UBigIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint64_t v1 = l->row1().getUintField(fSpec.fIndex); + uint64_t v2 = l->row2().getUintField(fSpec.fIndex); + + if (v1 == joblist::UBIGINTNULL || v2 == joblist::UBIGINTNULL) + { + if (v1 != joblist::UBIGINTNULL && v2 == joblist::UBIGINTNULL) + ret = fSpec.fNf; + else if (v1 == joblist::UBIGINTNULL && v2 != joblist::UBIGINTNULL) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} int StringCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) { @@ -149,29 +308,26 @@ int StringCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) return ret; } - int DoubleCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) { l->row1().setData(r1); l->row2().setData(r2); - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); - + uint64_t uiv1 = l->row1().getUintField(fSpec.fIndex); + uint64_t uiv2 = l->row2().getUintField(fSpec.fIndex); int ret = 0; - if (b1 == true || b2 == true) + if (uiv1 == joblist::DOUBLENULL || uiv2 == joblist::DOUBLENULL) { - if (b1 == false && b2 == true) + if (uiv1 != joblist::DOUBLENULL && uiv2 == joblist::DOUBLENULL) ret = fSpec.fNf; - else if (b1 == true && b2 == false) + else if (uiv1 == joblist::DOUBLENULL && uiv2 != joblist::DOUBLENULL) ret = -fSpec.fNf; } else { double v1 = l->row1().getDoubleField(fSpec.fIndex); double v2 = l->row2().getDoubleField(fSpec.fIndex); - if (v1 > v2) ret = fSpec.fAsc; else if (v1 < v2) @@ -186,23 +342,22 @@ int FloatCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) l->row1().setData(r1); l->row2().setData(r2); - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); - + int32_t iv1 = l->row1().getIntField(fSpec.fIndex); + int32_t iv2 = l->row2().getIntField(fSpec.fIndex); + int32_t nullValue = static_cast(joblist::FLOATNULL); int ret = 0; - if (b1 == true || b2 == true) + if (iv1 == nullValue || iv2 == nullValue) { - if (b1 == false && b2 == true) + if (iv1 != nullValue && iv2 == nullValue) ret = fSpec.fNf; - else if (b1 == true && b2 == false) + else if (iv1 == nullValue && iv2 != nullValue) ret = -fSpec.fNf; } else { float v1 = l->row1().getFloatField(fSpec.fIndex); float v2 = l->row2().getFloatField(fSpec.fIndex); - if (v1 > v2) ret = fSpec.fAsc; else if (v1 < v2) @@ -217,23 +372,20 @@ int LongDoubleCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r l->row1().setData(r1); l->row2().setData(r2); - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); + long double v1 = l->row1().getLongDoubleField(fSpec.fIndex); + long double v2 = l->row2().getLongDoubleField(fSpec.fIndex); int ret = 0; - if (b1 == true || b2 == true) + if (v1 == joblist::LONGDOUBLENULL || v2 == joblist::LONGDOUBLENULL) { - if (b1 == false && b2 == true) + if (v1 != joblist::LONGDOUBLENULL && v2 == joblist::LONGDOUBLENULL) ret = fSpec.fNf; - else if (b1 == true && b2 == false) + else if (v1 == joblist::LONGDOUBLENULL && v2 != joblist::LONGDOUBLENULL) ret = -fSpec.fNf; } else { - long double v1 = l->row1().getLongDoubleField(fSpec.fIndex); - long double v2 = l->row2().getLongDoubleField(fSpec.fIndex); - if (v1 > v2) ret = fSpec.fAsc; else if (v1 < v2) @@ -243,6 +395,87 @@ int LongDoubleCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r return ret; } +int DateCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint32_t v1 = l->row1().getUintField(fSpec.fIndex); + uint32_t v2 = l->row2().getUintField(fSpec.fIndex); + uint32_t nullValue = static_cast(joblist::DATENULL); + + if (v1 == nullValue || v2 == nullValue) + { + if (v1 != nullValue && v2 == nullValue) + ret = fSpec.fNf; + else if (v1 == nullValue && v2 != nullValue) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} + +int DatetimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint64_t v1 = l->row1().getUintField(fSpec.fIndex); + uint64_t v2 = l->row2().getUintField(fSpec.fIndex); + + if (v1 == joblist::DATETIMENULL || v2 == joblist::DATETIMENULL) + { + if (v1 != joblist::DATETIMENULL && v2 == joblist::DATETIMENULL) + ret = fSpec.fNf; + else if (v1 == joblist::DATETIMENULL && v2 != joblist::DATETIMENULL) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} + +int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) +{ + l->row1().setData(r1); + l->row2().setData(r2); + + int ret = 0; + uint64_t v1 = l->row1().getUintField(fSpec.fIndex); + uint64_t v2 = l->row2().getUintField(fSpec.fIndex); + + if (v1 == joblist::TIMENULL || v2 == joblist::TIMENULL) + { + if (v1 != joblist::TIMENULL && v2 == joblist::TIMENULL) + ret = fSpec.fNf; + else if (v1 == joblist::TIMENULL && v2 != joblist::TIMENULL) + ret = -fSpec.fNf; + } + else + { + if (v1 > v2) + ret = fSpec.fAsc; + else if (v1 < v2) + ret = -fSpec.fAsc; + } + + return ret; +} int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) { @@ -308,6 +541,16 @@ bool CompareRule::less(Row::Pointer r1, Row::Pointer r2) return false; } +void CompareRule::revertRules() +{ + std::vector::iterator fCompareIter = fCompares.begin(); + for(; fCompareIter!=fCompares.end(); fCompareIter++) + { + (*fCompareIter)->revertSortSpec(); + } +} + + void CompareRule::compileRules(const std::vector& spec, const rowgroup::RowGroup& rg) { @@ -318,31 +561,80 @@ void CompareRule::compileRules(const std::vector& spec, const rowgr switch (types[i->fIndex]) { case CalpontSystemCatalog::TINYINT: + { + Compare* c = new TinyIntCompare(*i); + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::SMALLINT: + { + Compare* c = new SmallIntCompare(*i); + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::MEDINT: case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: { Compare* c = new IntCompare(*i); fCompares.push_back(c); break; } + case CalpontSystemCatalog::BIGINT: + { + Compare* c = new BigIntCompare(*i); + fCompares.push_back(c); + break; + } + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + Compare* c; + uint32_t len = rg.getColumnWidth(i->fIndex); + switch (len) + { + case 1 : + c = new TinyIntCompare(*i); break; + case 2 : + c = new SmallIntCompare(*i); break; + case 4 : + c = new IntCompare(*i); break; + default: + c = new BigIntCompare(*i); + } + + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::UTINYINT: + { + Compare* c = new UTinyIntCompare(*i); + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::USMALLINT: + { + Compare* c = new USmallIntCompare(*i); + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::UMEDINT: case CalpontSystemCatalog::UINT: + { + Compare* c = new UIntCompare(*i); + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::UBIGINT: { - Compare* c = new UintCompare(*i); + Compare* c = new UBigIntCompare(*i); fCompares.push_back(c); break; } case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::TEXT: { Compare* c = new StringCompare(*i); fCompares.push_back(c); @@ -373,10 +665,21 @@ void CompareRule::compileRules(const std::vector& spec, const rowgr } case CalpontSystemCatalog::DATE: + { + Compare* c = new DateCompare(*i); + fCompares.push_back(c); + break; + } case CalpontSystemCatalog::DATETIME: case CalpontSystemCatalog::TIMESTAMP: { - Compare* c = new UintCompare(*i); + Compare* c = new DatetimeCompare(*i); + fCompares.push_back(c); + break; + } + case CalpontSystemCatalog::TIME: + { + Compare* c = new TimeCompare(*i); fCompares.push_back(c); break; } @@ -424,9 +727,10 @@ OrderByData::OrderByData(const std::vector& spec, const rowgroup::R // IdbOrderBy class implementation IdbOrderBy::IdbOrderBy() : - fDistinct(false), fMemSize(0), fRowsPerRG(8192), fErrorCode(0), fRm(NULL) -{ -} + fDistinct(false), fMemSize(0), fRowsPerRG(rowgroup::rgCommonSize), + fErrorCode(0), + fRm(NULL) +{ } IdbOrderBy::~IdbOrderBy() diff --git a/utils/windowfunction/idborderby.h b/utils/windowfunction/idborderby.h index 6d0f8574b..b4fd7c441 100644 --- a/utils/windowfunction/idborderby.h +++ b/utils/windowfunction/idborderby.h @@ -41,7 +41,6 @@ #include "hasher.h" #include "stlpoolallocator.h" - // forward reference namespace joblist { @@ -52,9 +51,27 @@ class ResourceManager; namespace ordering { +template, + typename _Compare = less > +class reservablePQ: private std::priority_queue<_Tp, _Sequence, _Compare> +{ +public: + typedef typename std::priority_queue<_Tp, _Sequence, _Compare>::size_type size_type; + reservablePQ(size_type capacity = 0) { reserve(capacity); }; + void reserve(size_type capacity) { this->c.reserve(capacity); } + size_type capacity() const { return this->c.capacity(); } + using std::priority_queue<_Tp, _Sequence, _Compare>::size; + using std::priority_queue<_Tp, _Sequence, _Compare>::top; + using std::priority_queue<_Tp, _Sequence, _Compare>::pop; + using std::priority_queue<_Tp, _Sequence, _Compare>::push; + using std::priority_queue<_Tp, _Sequence, _Compare>::empty; +}; // forward reference class IdbCompare; +class OrderByRow; + +typedef reservablePQ SortingPQ; // order by specification struct IdbSortSpec @@ -101,11 +118,21 @@ public: loc = localloc; } if (fLocale.find("ja_JP") != std::string::npos) + { JPcodePoint = true; + } + else + { + JPcodePoint = false; + } } virtual ~Compare() {} virtual int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer) = 0; + void revertSortSpec() + { + fSpec.fAsc = -fSpec.fAsc; + } protected: IdbSortSpec fSpec; @@ -114,6 +141,25 @@ protected: bool JPcodePoint; // code point ordering (Japanese UTF) flag }; +// Comparators for signed types + +class TinyIntCompare : public Compare +{ +public: + TinyIntCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + + +class SmallIntCompare : public Compare +{ +public: + SmallIntCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + class IntCompare : public Compare { @@ -124,24 +170,56 @@ public: }; -class UintCompare : public Compare +class BigIntCompare : public Compare { public: - UintCompare(const IdbSortSpec& spec) : Compare(spec) {} + BigIntCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + +// End of comparators for signed types +// Comparators for unsigned types + +class UTinyIntCompare : public Compare +{ +public: + UTinyIntCompare(const IdbSortSpec& spec) : Compare(spec) {} int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); }; -class StringCompare : public Compare +class USmallIntCompare : public Compare { public: - StringCompare(const IdbSortSpec& spec) : Compare(spec) {} + USmallIntCompare(const IdbSortSpec& spec) : Compare(spec) {} int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); }; +class UIntCompare : public Compare +{ +public: + UIntCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + + +class UBigIntCompare : public Compare +{ +public: + UBigIntCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + +// end of comparators for unsigned types + +// Comparators for float types + class DoubleCompare : public Compare { public: @@ -150,6 +228,7 @@ public: int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); }; + class LongDoubleCompare : public Compare { public: @@ -167,6 +246,48 @@ public: int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); }; +// End of comparators for float types +// Comparators for temporal types + +class DateCompare : public Compare +{ +public: + DateCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + + +class DatetimeCompare : public Compare +{ +public: + DatetimeCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + + +class TimeCompare : public Compare +{ +public: + TimeCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + +// End of comparators for temporal types + +// Comparators for non-fixed size types + +class StringCompare : public Compare +{ +public: + StringCompare(const IdbSortSpec& spec) : Compare(spec) {} + + int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); +}; + +// End of comparators for variable sized types class TimeCompare : public Compare { @@ -186,6 +307,7 @@ public: bool less(rowgroup::Row::Pointer r1, rowgroup::Row::Pointer r2); void compileRules(const std::vector&, const rowgroup::RowGroup&); + void revertRules(); std::vector fCompares; IdbCompare* fIdbCompare; @@ -227,7 +349,7 @@ public: return fRule->less(fData, rhs.fData); } - rowgroup::Row::Pointer fData; + rowgroup::Row::Pointer fData; CompareRule* fRule; }; @@ -246,7 +368,6 @@ public: bool operator()(rowgroup::Row::Pointer, rowgroup::Row::Pointer); -//protected: std::vector fIndex; }; @@ -293,10 +414,18 @@ public: { return fDistinct; } + SortingPQ& getQueue() + { + return fOrderByQueue; + } + CompareRule &getRule() + { + return fRule; + } + SortingPQ fOrderByQueue; protected: std::vector fOrderByCond; - std::priority_queue fOrderByQueue; rowgroup::Row fRow0; CompareRule fRule; From 0696696cf6c36aebda42b5da53fd4a02f9ce2a04 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Wed, 30 Oct 2019 17:00:51 +0300 Subject: [PATCH 4/4] MCOL-894 Upmerged post review changes. Raised the default for orderby threads from 4 to 16. Removed if LIMIT check block in makeVtableModeSteps(). Removed a duplicate of TimeCompare class and methods. MCOL-3536 Upmerged the change. MCOL-894 Post review changes. Uncomment if LIMIT check block in makeVtableModeSteps(). TupleAnnexStep dtor now uses vector::size() as a boundary. Removed useless try-catch blocks. executeParallelOrderBy() now calculates rowSize only once. Removed forward declaration in the unexisting namespace. Trim UTs a bit. --- dbcon/joblist/orderby-tests.cpp | 7 +- dbcon/joblist/tupleannexstep.cpp | 476 ++++++++++++---------------- dbcon/joblist/tupleannexstep.h | 7 - dbcon/joblist/tupleconstantstep.cpp | 3 +- utils/windowfunction/idborderby.cpp | 38 --- utils/windowfunction/idborderby.h | 14 +- 6 files changed, 212 insertions(+), 333 deletions(-) diff --git a/dbcon/joblist/orderby-tests.cpp b/dbcon/joblist/orderby-tests.cpp index e9e552881..fa98f0772 100644 --- a/dbcon/joblist/orderby-tests.cpp +++ b/dbcon/joblist/orderby-tests.cpp @@ -315,7 +315,7 @@ private: void ORDERBY_TIME_TEST() { uint64_t numRows = 8192; - uint64_t maxThreads = 16; + uint64_t maxThreads = 8; // limit == 100000 is still twice as good to sort in parallel // limit == 1000000 however is better to sort using single threaded sorting uint64_t limit = 100000; @@ -324,9 +324,10 @@ private: bool generateRandValues = true; bool hasDistinct = true; bool noDistinct = false; - //orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, noDistinct); - //orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, hasDistinct); + orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, noDistinct); orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, noDistinct); + orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, hasDistinct); + orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, hasDistinct); } void QUICK_TEST() { diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index 4581ec119..3f9e8a7b4 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -138,7 +138,7 @@ TupleAnnexStep::~TupleAnnexStep() { if(fOrderByList.size() > 0) { - for(uint64_t id = 0; id <= fMaxThreads; id++) + for(uint64_t id = 0; id < fOrderByList.size(); id++) { delete fOrderByList[id]; } @@ -704,7 +704,8 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() fRowGroupOut.getRow(0, &fRowOut); ordering::SortingPQ finalPQ; scoped_ptr distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this))); - + fRowGroupIn.initRow(&row1); + fRowGroupIn.initRow(&row2); try { @@ -744,156 +745,120 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() ERR_IN_PROCESS, fErrorInfo, fSessionId); } + // OFFSET processing + while (finalPQ.size() && offset < fLimitStart) + { + offset++; + finalPQ.pop(); + } - try + // Calculate rowSize only once + if (finalPQ.size()) { - // OFFSET processing - while (finalPQ.size() && offset < fLimitStart) + ordering::OrderByRow& topOBRow = + const_cast(finalPQ.top()); + fRowIn.setData(topOBRow.fData); + if (!fConstant) { - offset++; - finalPQ.pop(); - } - } - catch (const std::exception& ex) - { - catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); - } - catch (...) - { - catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\ - caught an unknown exception 2", - ERR_IN_PROCESS, fErrorInfo, fSessionId); + copyRow(fRowIn, &fRowOut); + } + else + { + fConstant->fillInConstants(fRowIn, fRowOut); + } + rowSize = fRowOut.getSize(); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + finalPQ.pop(); + count++; } - try + if (!fConstant) { - // Calculate rowSize only once - if (finalPQ.size()) + while(finalPQ.size()) { - ordering::OrderByRow& topOBRow = - const_cast(finalPQ.top()); - fRowIn.setData(topOBRow.fData); - if (!fConstant) + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); copyRow(fRowIn, &fRowOut); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + + finalPQ.pop(); + count++; + if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) + { + break; + } + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } else + { + break; + } + } // end of limit bound while loop + } + else // Add ConstantColumns striped earlier + { + while(finalPQ.size()) + { + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); fConstant->fillInConstants(fRowIn, fRowOut); - rowSize = fRowOut.getSize(); - fRowGroupOut.incRowCount(); - fRowOut.nextRow(rowSize); - finalPQ.pop(); - count++; - } - } - catch (const std::exception& ex) - { - catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); - } - catch (...) - { - catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\ - caught an unknown exception 3", - ERR_IN_PROCESS, fErrorInfo, fSessionId); - } + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); - try - { - if (!fConstant) - { - while(finalPQ.size()) + finalPQ.pop(); + count++; + if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) + { + break; + } + } + + if (fRowGroupOut.getRowCount() > 0) { - if (cancelled()) - { - break; - } - - while (count < fLimitCount && finalPQ.size() - && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) - { - ordering::OrderByRow &topOBRow = - const_cast(finalPQ.top()); - - fRowIn.setData(topOBRow.fData); - copyRow(fRowIn, &fRowOut); - fRowGroupOut.incRowCount(); - fRowOut.nextRow(rowSize); - - finalPQ.pop(); - count++; - if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) - { - break; - } - } - - if (fRowGroupOut.getRowCount() > 0) - { - fRowsReturned += fRowGroupOut.getRowCount(); - fOutputDL->insert(rgDataOut); - rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize); - fRowGroupOut.setData(&rgDataOut); - fRowGroupOut.resetRowGroup(0); - fRowGroupOut.getRow(0, &fRowOut); - } - else - { - break; - } - } // end of limit bound while loop - } - else // Add ConstantColumns striped earlier - { - while(finalPQ.size()) + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } + else { - if (cancelled()) - { break; - } - - while (count < fLimitCount && finalPQ.size() - && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) - { - ordering::OrderByRow &topOBRow = - const_cast(finalPQ.top()); - - fRowIn.setData(topOBRow.fData); - fConstant->fillInConstants(fRowIn, fRowOut); - fRowGroupOut.incRowCount(); - fRowOut.nextRow(rowSize); - - finalPQ.pop(); - count++; - if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) - { - break; - } - } - - if (fRowGroupOut.getRowCount() > 0) - { - fRowsReturned += fRowGroupOut.getRowCount(); - fOutputDL->insert(rgDataOut); - rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); - fRowGroupOut.setData(&rgDataOut); - fRowGroupOut.resetRowGroup(0); - fRowGroupOut.getRow(0, &fRowOut); - } - else - { - break; - } - } // end of limit bound while loop - } // end of if-else - } - catch (const std::exception& ex) - { - catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); - } - catch (...) - { - catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\ - caught an unknown exception 4", - ERR_IN_PROCESS, fErrorInfo, fSessionId); - } + } + } // end of limit bound while loop + } // end of if-else if (fRowGroupOut.getRowCount() > 0) { @@ -982,152 +947,116 @@ void TupleAnnexStep::finalizeParallelOrderBy() ERR_IN_PROCESS, fErrorInfo, fSessionId); } + // OFFSET processing + while (finalPQ.size() && offset < fLimitStart) + { + offset++; + finalPQ.pop(); + } - try + // Calculate rowSize only once + if (finalPQ.size()) { - // OFFSET processing - while (finalPQ.size() && offset < fLimitStart) + ordering::OrderByRow& topOBRow = + const_cast(finalPQ.top()); + fRowIn.setData(topOBRow.fData); + if (!fConstant) { - offset++; - finalPQ.pop(); - } - } - catch (const std::exception& ex) - { - catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); - } - catch (...) - { - catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\ - caught an unknown exception 2", - ERR_IN_PROCESS, fErrorInfo, fSessionId); + copyRow(fRowIn, &fRowOut); + } + else + { + fConstant->fillInConstants(fRowIn, fRowOut); + } + rowSize = fRowOut.getSize(); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + finalPQ.pop(); + count++; } - try + if (!fConstant) { - // Calculate rowSize only once - if (finalPQ.size()) + while(finalPQ.size()) { - ordering::OrderByRow& topOBRow = - const_cast(finalPQ.top()); - fRowIn.setData(topOBRow.fData); - if (!fConstant) + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); copyRow(fRowIn, &fRowOut); + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); + + finalPQ.pop(); + count++; + } + + if (fRowGroupOut.getRowCount() > 0) + { + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } else + { + break; + } + } // end of limit bound while loop + } + else // Add ConstantColumns striped earlier + { + while(finalPQ.size()) + { + if (cancelled()) + { + break; + } + + while (count < fLimitCount && finalPQ.size() + && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) + { + ordering::OrderByRow &topOBRow = + const_cast(finalPQ.top()); + + fRowIn.setData(topOBRow.fData); fConstant->fillInConstants(fRowIn, fRowOut); - rowSize = fRowOut.getSize(); - fRowGroupOut.incRowCount(); - fRowOut.nextRow(rowSize); - finalPQ.pop(); - count++; - } - } - catch (const std::exception& ex) - { - catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); - } - catch (...) - { - catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\ - caught an unknown exception 3", - ERR_IN_PROCESS, fErrorInfo, fSessionId); - } + fRowGroupOut.incRowCount(); + fRowOut.nextRow(rowSize); - try - { - if (!fConstant) - { - while(finalPQ.size()) + finalPQ.pop(); + count++; + if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) + { + break; + } + } + + if (fRowGroupOut.getRowCount() > 0) { - if (cancelled()) - { - break; - } - - while (count < fLimitCount && finalPQ.size() - && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) - { - ordering::OrderByRow &topOBRow = - const_cast(finalPQ.top()); - - fRowIn.setData(topOBRow.fData); - copyRow(fRowIn, &fRowOut); - fRowGroupOut.incRowCount(); - fRowOut.nextRow(rowSize); - - finalPQ.pop(); - count++; - } - - if (fRowGroupOut.getRowCount() > 0) - { - fRowsReturned += fRowGroupOut.getRowCount(); - fOutputDL->insert(rgDataOut); - rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize); - fRowGroupOut.setData(&rgDataOut); - fRowGroupOut.resetRowGroup(0); - fRowGroupOut.getRow(0, &fRowOut); - } - else - { - break; - } - } // end of limit bound while loop - } - else // Add ConstantColumns striped earlier - { - while(finalPQ.size()) + fRowsReturned += fRowGroupOut.getRowCount(); + fOutputDL->insert(rgDataOut); + rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); + fRowGroupOut.setData(&rgDataOut); + fRowGroupOut.resetRowGroup(0); + fRowGroupOut.getRow(0, &fRowOut); + } + else { - if (cancelled()) - { - break; - } - - while (count < fLimitCount && finalPQ.size() - && fRowGroupOut.getRowCount() < rowgroup::rgCommonSize) - { - ordering::OrderByRow &topOBRow = - const_cast(finalPQ.top()); - - fRowIn.setData(topOBRow.fData); - fConstant->fillInConstants(fRowIn, fRowOut); - fRowGroupOut.incRowCount(); - fRowOut.nextRow(rowSize); - - finalPQ.pop(); - count++; - if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize) - { - break; - } - } - - if (fRowGroupOut.getRowCount() > 0) - { - fRowsReturned += fRowGroupOut.getRowCount(); - fOutputDL->insert(rgDataOut); - rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); - fRowGroupOut.setData(&rgDataOut); - fRowGroupOut.resetRowGroup(0); - fRowGroupOut.getRow(0, &fRowOut); - } - else - { - break; - } - } // end of limit bound while loop - } // end of if-else - } - catch (const std::exception& ex) - { - catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId); - } - catch (...) - { - catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\ - caught an unknown exception 4", - ERR_IN_PROCESS, fErrorInfo, fSessionId); - } + break; + } + } // end of limit bound while loop + } // end of if-else if (fRowGroupOut.getRowCount() > 0) { @@ -1200,7 +1129,10 @@ void TupleAnnexStep::executeParallelOrderBy(uint64_t id) rg.setData(&rgDataIn); rg.getRow(0, &r); - rowSize = r.getSize(); + if (!rowSize) + { + rowSize = r.getSize(); + } rowCount = rg.getRowCount(); for (uint64_t i = 0; i < rowCount; ++i) diff --git a/dbcon/joblist/tupleannexstep.h b/dbcon/joblist/tupleannexstep.h index 76cac326d..f9efd459b 100644 --- a/dbcon/joblist/tupleannexstep.h +++ b/dbcon/joblist/tupleannexstep.h @@ -28,13 +28,6 @@ #include "jobstep.h" #include "limitedorderby.h" -// forward reference -namespace fucexp -{ -class FuncExp; -} - - namespace joblist { class TupleConstantStep; diff --git a/dbcon/joblist/tupleconstantstep.cpp b/dbcon/joblist/tupleconstantstep.cpp index 4ffb811e7..3e2b61e39 100644 --- a/dbcon/joblist/tupleconstantstep.cpp +++ b/dbcon/joblist/tupleconstantstep.cpp @@ -472,7 +472,7 @@ void TupleConstantStep::execute() fOutputDL->endOfInput(); } - +// *DRRTUY Copy row at once not one field at a time void TupleConstantStep::fillInConstants() { fRowGroupIn.getRow(0, &fRowIn); @@ -495,7 +495,6 @@ void TupleConstantStep::fillInConstants() } else // only first column is constant { - //size_t n = fRowOut.getOffset(fRowOut.getColumnCount()) - fRowOut.getOffset(1); for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i) { fRowOut.setRid(fRowIn.getRelRid()); diff --git a/utils/windowfunction/idborderby.cpp b/utils/windowfunction/idborderby.cpp index d1fdd2801..b2e70a6c3 100644 --- a/utils/windowfunction/idborderby.cpp +++ b/utils/windowfunction/idborderby.cpp @@ -468,37 +468,6 @@ int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) } else { - if (v1 > v2) - ret = fSpec.fAsc; - else if (v1 < v2) - ret = -fSpec.fAsc; - } - - return ret; -} - -int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2) -{ - l->row1().setData(r1); - l->row2().setData(r2); - - bool b1 = l->row1().isNullValue(fSpec.fIndex); - bool b2 = l->row2().isNullValue(fSpec.fIndex); - - int ret = 0; - - if (b1 == true || b2 == true) - { - if (b1 == false && b2 == true) - ret = fSpec.fNf; - else if (b1 == true && b2 == false) - ret = -fSpec.fNf; - } - else - { - int64_t v1 = l->row1().getIntField(fSpec.fIndex); - int64_t v2 = l->row2().getIntField(fSpec.fIndex); - // ((int64_t) -00:00:26) > ((int64_t) -00:00:25) // i.e. For 2 negative TIME values, we invert the order of // comparison operations to force "-00:00:26" to appear before @@ -684,13 +653,6 @@ void CompareRule::compileRules(const std::vector& spec, const rowgr break; } - case CalpontSystemCatalog::TIME: - { - Compare* c = new TimeCompare(*i); - fCompares.push_back(c); - break; - } - default: { break; diff --git a/utils/windowfunction/idborderby.h b/utils/windowfunction/idborderby.h index b4fd7c441..e26ba1aa3 100644 --- a/utils/windowfunction/idborderby.h +++ b/utils/windowfunction/idborderby.h @@ -51,8 +51,8 @@ class ResourceManager; namespace ordering { -template, - typename _Compare = less > +template, + typename _Compare = std::less > class reservablePQ: private std::priority_queue<_Tp, _Sequence, _Compare> { public: @@ -132,6 +132,7 @@ public: void revertSortSpec() { fSpec.fAsc = -fSpec.fAsc; + fSpec.fNf = -fSpec.fNf; } protected: @@ -289,15 +290,6 @@ public: // End of comparators for variable sized types -class TimeCompare : public Compare -{ -public: - TimeCompare(const IdbSortSpec& spec) : Compare(spec) {} - - int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer); -}; - - class CompareRule { public: