1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Merge pull request #925 from drrtuy/MCOL-894_develop_2

MCOL-894 parallel sort + MCOL-3536 UTF-8 aware sorting upmerge from 1.2
This commit is contained in:
Andrew Hutchings
2019-11-07 17:26:19 +00:00
committed by GitHub
23 changed files with 2077 additions and 361 deletions

View File

@ -82,6 +82,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(const int location):
fQueryType(SELECT),
fPriority(querystats::DEFAULT_USER_PRIORITY_LEVEL),
fStringTableThreshold(20),
fOrderByThreads(1),
fDJSSmallSideLimit(0),
fDJSLargeSideLimit(0),
fDJSPartitionSize(100 * 1024 * 1024), // 100MB mem usage for disk based join,
@ -125,6 +126,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(
fQueryType(SELECT),
fPriority(querystats::DEFAULT_USER_PRIORITY_LEVEL),
fStringTableThreshold(20),
fOrderByThreads(1),
fDJSSmallSideLimit(0),
fDJSLargeSideLimit(0),
fDJSPartitionSize(100 * 1024 * 1024), // 100MB mem usage for disk based join
@ -151,6 +153,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan (string data) :
fQueryType(SELECT),
fPriority(querystats::DEFAULT_USER_PRIORITY_LEVEL),
fStringTableThreshold(20),
fOrderByThreads(1),
fDJSSmallSideLimit(0),
fDJSLargeSideLimit(0),
fDJSPartitionSize(100 * 1024 * 1024), // 100MB mem usage for disk based join
@ -479,6 +482,7 @@ void CalpontSelectExecutionPlan::serialize(messageqcpp::ByteStream& b) const
b << (uint64_t)fLimitNum;
b << static_cast<const ByteStream::byte>(fHasOrderBy);
b << static_cast<const ByteStream::byte>(fSpecHandlerProcessed);
b << reinterpret_cast<uint32_t>(fOrderByThreads);
b << static_cast<uint32_t>(fSelectSubList.size());
@ -648,6 +652,7 @@ void CalpontSelectExecutionPlan::unserialize(messageqcpp::ByteStream& b)
b >> (uint64_t&)fLimitNum;
b >> reinterpret_cast< ByteStream::byte&>(fHasOrderBy);
b >> reinterpret_cast< ByteStream::byte&>(fSpecHandlerProcessed);
b >> reinterpret_cast<uint32_t&>(fOrderByThreads);
// for SELECT subquery
b >> size;

View File

@ -588,6 +588,16 @@ public:
return fSpecHandlerProcessed;
}
void orderByThreads(const uint32_t number)
{
fOrderByThreads = number;
}
const uint32_t orderByThreads() const
{
return fOrderByThreads;
}
void selectSubList(const SelectList& selectSubList)
{
fSelectSubList = selectSubList;
@ -897,6 +907,7 @@ private:
// for specific handlers processing, e.g. GROUP BY
bool fSpecHandlerProcessed;
uint32_t fOrderByThreads;
// Derived table involved in the query. For derived table optimization
std::vector<SCSEP> fSubSelectList;

View File

@ -57,12 +57,15 @@ set(joblist_LIB_SRCS
virtualtable.cpp
windowfunctionstep.cpp)
#libjoblist_la_CXXFLAGS = $(march_flags) $(AM_CXXFLAGS)
add_library(joblist SHARED ${joblist_LIB_SRCS})
set_target_properties(joblist PROPERTIES VERSION 1.0.0 SOVERSION 1)
install(TARGETS joblist DESTINATION ${ENGINE_LIBDIR} COMPONENT libs)
if (WITH_ORDERBY_UT)
add_executable(job_orderby_tests orderby-tests.cpp)
target_link_libraries(job_orderby_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit)
install(TARGETS job_orderby_tests DESTINATION ${ENGINE_BINDIR} COMPONENT platform)
endif()

View File

@ -247,6 +247,7 @@ struct JobInfo
std::vector<std::pair<uint32_t, bool> > orderByColVec;
uint64_t limitStart;
uint64_t limitCount;
uint32_t orderByThreads;
// tupleInfo
boost::shared_ptr<TupleKeyInfo> keyInfo;

View File

@ -769,6 +769,7 @@ void addOrderByAndLimit(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
{
jobInfo.limitStart = csep->limitStart();
jobInfo.limitCount = csep->limitNum();
jobInfo.orderByThreads = csep->orderByThreads();
CalpontSelectExecutionPlan::OrderByColumnList& orderByCols = csep->orderByCols();

View File

@ -484,6 +484,8 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps,
deliverySteps[CNX_VTABLE_ID] = ws;
}
// TODO MCOL-894 we don't need to run sorting|distinct
// every time
// if ((jobInfo.limitCount != (uint64_t) - 1) ||
// (jobInfo.constantCol == CONST_COL_EXIST) ||
// (jobInfo.hasDistinct))
@ -494,12 +496,13 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps,
TupleAnnexStep* tas = dynamic_cast<TupleAnnexStep*>(jobInfo.annexStep.get());
tas->setLimit(jobInfo.limitStart, jobInfo.limitCount);
// if (jobInfo.limitCount != (uint64_t) - 1)
// {
if (jobInfo.orderByColVec.size() > 0)
{
tas->addOrderBy(new LimitedOrderBy());
// }
if (jobInfo.orderByThreads > 1)
tas->setParallelOp();
tas->setMaxThreads(jobInfo.orderByThreads);
}
if (jobInfo.constantCol == CONST_COL_EXIST)
tas->addConstant(new TupleConstantStep(jobInfo));
@ -518,7 +521,11 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps,
if (jobInfo.trace) cout << "Output RowGroup 2: " << rg2.toString() << endl;
AnyDataListSPtr spdlIn(new AnyDataList());
RowGroupDL* dlIn = new RowGroupDL(1, jobInfo.fifoSize);
RowGroupDL* dlIn;
if (jobInfo.orderByColVec.size() > 0)
dlIn = new RowGroupDL(jobInfo.orderByThreads, jobInfo.fifoSize);
else
dlIn = new RowGroupDL(1, jobInfo.fifoSize);
dlIn->OID(CNX_VTABLE_ID);
spdlIn->rowGroupDL(dlIn);
JobStepAssociation jsaIn;

View File

@ -1727,14 +1727,12 @@ void makeVtableModeSteps(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo,
{
jobInfo.limitCount = (uint64_t) - 1;
}
// support order by and limit in sub-query/union or
// GROUP BY handler processed outer query order
else if (csep->orderByCols().size() > 0)
{
addOrderByAndLimit(csep, jobInfo);
}
// limit without order by in any query
else
{

View File

@ -55,7 +55,7 @@ LimitedOrderBy::~LimitedOrderBy()
}
void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo)
void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo, bool invertRules, bool isMultiThreaded)
{
fRm = jobInfo.rm;
fSessionMemLimit = jobInfo.umMemLimit;
@ -77,12 +77,22 @@ void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo)
map<uint32_t, uint32_t>::iterator j = keyToIndexMap.find(i->first);
idbassert(j != keyToIndexMap.end());
fOrderByCond.push_back(IdbSortSpec(j->second, i->second));
fOrderByCond.push_back(IdbSortSpec(j->second, i->second ^ invertRules));
}
// limit row count info
if (isMultiThreaded)
{
// CS can't apply offset at the first stage
// otherwise it looses records.
fStart = 0;
fCount = jobInfo.limitStart+jobInfo.limitCount;
}
else
{
fStart = jobInfo.limitStart;
fCount = jobInfo.limitCount;
}
IdbOrderBy::initialize(rg);
}
@ -169,6 +179,8 @@ void LimitedOrderBy::finalize()
if (fOrderByQueue.size() > 0)
{
// *DRRTUY Very memory intensive. CS needs to account active
// memory only and release memory if needed.
uint64_t memSizeInc = fRowsPerRG * fRowGroup.getRowSize();
fMemSize += memSizeInc;
@ -207,6 +219,8 @@ void LimitedOrderBy::finalize()
fData.reinit(fRowGroup, fRowsPerRG);
fRowGroup.setData(&fData);
fRowGroup.resetRowGroup(0);
// *DRRTUY This approach won't work with
// OFSET > fRowsPerRG
offset = offset != 0 ? offset - 1 : offset;
fRowGroup.getRow(offset, &fRow0);
@ -220,6 +234,7 @@ void LimitedOrderBy::finalize()
fRow0.prevRow(rSize);
fOrderByQueue.pop();
// if RG has fRowsPerRG rows
if(offset == (uint64_t)-1)
{
tempRGDataList.push_front(fData);

View File

@ -46,9 +46,16 @@ public:
LimitedOrderBy();
virtual ~LimitedOrderBy();
void initialize(const rowgroup::RowGroup&, const JobInfo&);
void initialize(const rowgroup::RowGroup&,
const JobInfo&,
bool invertRules = false,
bool isMultiThreded = false);
void processRow(const rowgroup::Row&);
uint64_t getKeyLength() const;
uint64_t getLimitCount() const
{
return fCount;
}
const std::string toString() const;
void finalize();

View File

@ -0,0 +1,390 @@
/* Copyright (C) 2019 MariaDB Corporaton.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: tdriver-filter.cpp 9210 2013-01-21 14:10:42Z rdempsey $
#include <list>
#include <sstream>
#include <pthread.h>
#include <iomanip>
#include <cppunit/extensions/HelperMacros.h>
#include <cppunit/extensions/TestFactoryRegistry.h>
#include <cppunit/ui/text/TestRunner.h>
#include "jobstep.h"
#include "funcexp.h"
#include "jlf_common.h"
#include "tupleannexstep.h"
#include "calpontsystemcatalog.h"
#include "resourcemanager.h"
#include <boost/any.hpp>
#include <boost/function.hpp>
#include "bytestream.h"
#include <time.h>
#include <sys/time.h>
#include <limits.h>
#define DEBUG
#define MEMORY_LIMIT 14983602176
using namespace std;
using namespace joblist;
using namespace messageqcpp;
// Timer class used by this tdriver to output elapsed times, etc.
class Timer
{
public:
void start(const string& message)
{
if (!fHeaderDisplayed)
{
header();
fHeaderDisplayed = true;
}
gettimeofday(&fTvStart, 0);
cout << timestr() << " Start " << message << endl;
}
void stop(const string& message)
{
time_t now;
time(&now);
string secondsElapsed;
getTimeElapsed(secondsElapsed);
cout << timestr() << " " << secondsElapsed << " Stop " << message << endl;
}
Timer() : fHeaderDisplayed(false) {}
private:
struct timeval fTvStart;
bool fHeaderDisplayed;
double getTimeElapsed(string& seconds)
{
struct timeval tvStop;
gettimeofday(&tvStop, 0);
double secondsElapsed =
(tvStop.tv_sec + (tvStop.tv_usec / 1000000.0)) -
(fTvStart.tv_sec + (fTvStart.tv_usec / 1000000.0));
ostringstream oss;
oss << secondsElapsed;
seconds = oss.str();
seconds.resize(8, '0');
return secondsElapsed;
}
string timestr()
{
struct tm tm;
struct timeval tv;
gettimeofday(&tv, 0);
localtime_r(&tv.tv_sec, &tm);
ostringstream oss;
oss << setfill('0')
<< setw(2) << tm.tm_hour << ':'
<< setw(2) << tm.tm_min << ':'
<< setw(2) << tm.tm_sec << '.'
<< setw(6) << tv.tv_usec
;
return oss.str();
}
void header()
{
cout << endl;
cout << "Time Seconds Activity" << endl;
}
};
class FilterDriver : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(FilterDriver);
CPPUNIT_TEST(ORDERBY_TIME_TEST);
CPPUNIT_TEST_SUITE_END();
private:
void orderByTest_nRGs(uint64_t numRows, uint64_t limit,
uint64_t maxThreads,
bool parallelExecution,
bool generateRandValues,
bool hasDistinct)
{
Timer timer;
// This test creates TAS for single bigint column and run sorting on it.
cout << endl;
cout << "------------------------------------------------------------" << endl;
timer.start("insert");
stringstream ss2;
ss2.flush();
ss2 << "loading " << numRows << " rows into initial RowGroup.";
std::string message = ss2.str();
timer.start(message);
ResourceManager* rm = ResourceManager::instance(true);
joblist::JobInfo jobInfo = joblist::JobInfo(rm);
// 1st column is the sorting key
uint8_t tupleKey = 1;
uint8_t offset = 0;
uint16_t rowsPerRG = 8192; // hardcoded max rows in RowGroup value
uint32_t numberOfRGs = numRows / rowsPerRG;
// set sorting rules
// true - ascending order, false otherwise
jobInfo.orderByColVec.push_back(make_pair(tupleKey, false));
jobInfo.limitStart = offset;
jobInfo.limitCount = limit;
jobInfo.hasDistinct = hasDistinct;
// JobInfo doesn't set these SP in ctor
jobInfo.umMemLimit.reset(new int64_t);
*(jobInfo.umMemLimit) = MEMORY_LIMIT;
SErrorInfo errorInfo(new ErrorInfo());
jobInfo.errorInfo = errorInfo;
uint32_t oid =3001;
// populate JobInfo.nonConstDelCols with dummy shared_pointers
// to notify TupleAnnexStep::initialise() about number of non-constant columns
execplan::SRCP srcp1, srcp2;
jobInfo.nonConstDelCols.push_back(srcp1);
jobInfo.nonConstDelCols.push_back(srcp2);
// create two columns RG. 1st is the sorting key, second is the data column
std::vector<uint32_t> offsets, roids, tkeys, cscale, cprecision;
std::vector<execplan::CalpontSystemCatalog::ColDataType> types;
offsets.push_back(2); offsets.push_back(10); offsets.push_back(18);
roids.push_back(oid); roids.push_back(oid);
tkeys.push_back(1); tkeys.push_back(1);
types.push_back(execplan::CalpontSystemCatalog::UBIGINT);
types.push_back(execplan::CalpontSystemCatalog::UBIGINT);
cscale.push_back(0); cscale.push_back(0);
cprecision.push_back(20); cprecision.push_back(20);
rowgroup::RowGroup inRG(2, //column count
offsets, //oldOffset
roids, // column oids
tkeys, //keys
types, // types
cscale, //scale
cprecision, // precision
20, // sTableThreshold
false //useStringTable
);
rowgroup::RowGroup jobInfoRG(inRG);
joblist::TupleAnnexStep tns = joblist::TupleAnnexStep(jobInfo);
tns.addOrderBy(new joblist::LimitedOrderBy());
tns.delivery(true);
// activate parallel sort
if(parallelExecution)
{
tns.setParallelOp();
}
tns.setMaxThreads(maxThreads);
tns.initialize(jobInfoRG, jobInfo);
tns.setLimit(0, limit);
// Create JobStepAssociation mediator class ins to connect DL and JS
joblist::AnyDataListSPtr spdlIn(new AnyDataList());
//joblist::RowGroupDL* dlIn = new RowGroupDL(maxThreads, jobInfo.fifoSize);
joblist::RowGroupDL* dlIn = new RowGroupDL(maxThreads, 21001);
dlIn->OID(oid);
spdlIn->rowGroupDL(dlIn);
joblist::JobStepAssociation jsaIn;
jsaIn.outAdd(spdlIn);
tns.inputAssociation(jsaIn);
uint64_t base = 42;
uint64_t maxInt = 0;
if(generateRandValues)
::srand(base);
uint64_t nextValue;
for(uint32_t i = 1; i <= numberOfRGs; i++)
{
// create RGData with the RG structure and manually populate RG
// Use reinint(numberOfRGs) to preset an array
rowgroup::RGData rgD = rowgroup::RGData(inRG);
inRG.setData(&rgD);
rowgroup::Row r;
inRG.initRow(&r);
uint32_t rowSize = r.getSize();
inRG.getRow(0, &r);
// populate RGData
//for(uint64_t i = rowsPerRG+1; i > 0; i--)
for(uint64_t i = 0; i < rowsPerRG; i++) // Worst case scenario for PQ
{
// TBD Use set..._offset methods to avoid offset calculation instructions
if(generateRandValues)
{
nextValue = ::rand();
}
else
{
nextValue = base + i;
}
r.setUintField<8>(nextValue, 0);
r.setUintField<8>(nextValue, 1);
if (maxInt < nextValue)
{
maxInt = nextValue;
}
r.nextRow(rowSize);
}
base += 1;
inRG.setRowCount(rowsPerRG);
// Insert RGData into input DL
dlIn->insert(rgD);
}
// end of input signal
std::cout << "orderByTest_nRGs input DataList totalSize " << dlIn->totalSize() << std::endl;
dlIn->endOfInput();
timer.stop(message);
joblist::AnyDataListSPtr spdlOut(new AnyDataList());
// Set the ring buffer big enough to take RGData-s with results b/c
// there is nobody to read out of the buffer.
joblist::RowGroupDL* dlOut = new RowGroupDL(1, numberOfRGs);
dlOut->OID(oid);
spdlOut->rowGroupDL(dlOut);
joblist::JobStepAssociation jsaOut;
jsaOut.outAdd(spdlOut);
//uint64_t outputDLIter = dlOut->getIterator();
tns.outputAssociation(jsaOut);
// Run Annex Step
message = "Sorting";
timer.start(message);
tns.run();
tns.join();
timer.stop(message);
// serialize RGData into bs and later back
// to follow ExeMgr whilst getting TAS result RowGroup
messageqcpp::ByteStream bs;
uint32_t result = 0;
rowgroup::RowGroup outRG(inRG);
rowgroup::RGData outRGData(outRG);
result = tns.nextBand(bs);
/*bool more = false;
do
{
dlOut->next(outputDLIter, &outRGData);
} while (more);*/
std::cout << "orderByTest_nRGs result " << result << std::endl;
//CPPUNIT_ASSERT( result == limit );
outRGData.deserialize(bs);
outRG.setData(&outRGData);
//std::cout << "orderByTest_nRGs output RG " << outRG.toString() << std::endl;
std::cout << "maxInt " << maxInt << std::endl;
{
rowgroup::Row r;
outRG.initRow(&r);
outRG.getRow(0, &r);
CPPUNIT_ASSERT(limit == outRG.getRowCount() || outRG.getRowCount() == 8192);
CPPUNIT_ASSERT_EQUAL(maxInt, r.getUintField(1));
}
cout << "------------------------------------------------------------" << endl;
}
void ORDERBY_TIME_TEST()
{
uint64_t numRows = 8192;
uint64_t maxThreads = 8;
// limit == 100000 is still twice as good to sort in parallel
// limit == 1000000 however is better to sort using single threaded sorting
uint64_t limit = 100000;
bool parallel = true;
bool woParallel = false;
bool generateRandValues = true;
bool hasDistinct = true;
bool noDistinct = false;
orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, noDistinct);
orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, noDistinct);
orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, hasDistinct);
orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, hasDistinct);
}
void QUICK_TEST()
{
float f = 1.1;
double d = 1.2;
uint64_t i = 1;
uint64_t* i_ptr = &i;
double* d_ptr = &d;
uint64_t* i2_ptr = (uint64_t*) d_ptr;
float* f_ptr = &f;
i_ptr = (uint64_t*) f_ptr;
cout << "*i_ptr=" << *i_ptr << endl;
cout << "*i2_ptr=" << *i2_ptr << endl;
f_ptr = (float*) i_ptr;
cout << "*f_ptr=" << *f_ptr << endl;
cout << endl;
if (d > i)
cout << "1.2 is greater than 1." << endl;
if (f > i)
cout << "1.1 is greater than 1." << endl;
if (d > f)
cout << "1.2 is greater than 1.1" << endl;
if (*i_ptr < *i2_ptr)
cout << "1.1 < 1.2 when represented as uint64_t." << endl;
cout << "sizeof(f) = " << sizeof(f) << endl;
cout << "sizeof(i) = " << sizeof(i) << endl;
cout << "sizeof(d) = " << sizeof(d) << endl;
double dbl = 9.7;
double dbl2 = 1.3;
i_ptr = (uint64_t*) &dbl;
i2_ptr = (uint64_t*) &dbl2;
cout << endl;
cout << "9.7 as int is " << *i_ptr << endl;
cout << "9.7 as int is " << *i2_ptr << endl;
cout << "1.2 < 9.7 is " << (*i_ptr < *i2_ptr) << endl;
}
};
CPPUNIT_TEST_SUITE_REGISTRATION(FilterDriver);
int main( int argc, char** argv)
{
CppUnit::TextUi::TestRunner runner;
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
runner.addTest( registry.makeTest() );
bool wasSuccessful = runner.run( "", false );
return (wasSuccessful ? 0 : 1);
}

View File

@ -3153,12 +3153,6 @@ void TupleBPS::rgDataToDl(RGData& rgData, RowGroup& rg, RowGroupDL* dlp)
if (dupColumns.size() > 0)
dupOutputColumns(rgData, rg);
/*
if (!(fSessionId & 0x80000000)) {
rg.setData(&rgData);
cerr << "TBPS rowcount: " << rg.getRowCount() << endl;
}
*/
dlp->insert(rgData);
}

View File

@ -1,4 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -65,6 +66,8 @@ using namespace querytele;
#include "tupleannexstep.h"
#define QUEUE_RESERVE_SIZE 100000
namespace
{
struct TAHasher
@ -117,10 +120,12 @@ TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) :
fLimitHit(false),
fEndOfResult(false),
fDistinct(false),
fParallelOp(false),
fOrderBy(NULL),
fConstant(NULL),
fFeInstance(funcexp::FuncExp::instance()),
fJobList(jobInfo.jobListPtr)
fJobList(jobInfo.jobListPtr),
fFinishedThreads(0)
{
fExtendedInfo = "TNS: ";
fQtc.stepParms().stepType = StepTeleStats::T_TNS;
@ -129,6 +134,22 @@ TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) :
TupleAnnexStep::~TupleAnnexStep()
{
if(fParallelOp)
{
if(fOrderByList.size() > 0)
{
for(uint64_t id = 0; id < fOrderByList.size(); id++)
{
delete fOrderByList[id];
}
fOrderByList.clear();
}
fInputIteratorsList.clear();
fRunnersList.clear();
}
if (fOrderBy)
delete fOrderBy;
@ -149,25 +170,41 @@ void TupleAnnexStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
void TupleAnnexStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo)
{
// Initialize structures used by separate workers
uint64_t id = 1;
fRowGroupIn = rgIn;
fRowGroupIn.initRow(&fRowIn);
if(fParallelOp && fOrderBy)
{
fOrderByList.resize(fMaxThreads+1);
for(id = 0; id <= fMaxThreads; id++)
{
// *DRRTUY use SP here?
fOrderByList[id] = new LimitedOrderBy();
fOrderByList[id]->distinct(fDistinct);
fOrderByList[id]->initialize(rgIn, jobInfo, false, true);
}
}
else
{
if (fOrderBy)
{
fOrderBy->distinct(fDistinct);
fOrderBy->initialize(rgIn, jobInfo);
}
}
if (fConstant == NULL)
{
vector<uint32_t> oids, oidsIn = fRowGroupIn.getOIDs();
vector<uint32_t> keys, keysIn = fRowGroupIn.getKeys();
vector<uint32_t> scale, scaleIn = fRowGroupIn.getScale();
vector<uint32_t> precision, precisionIn = fRowGroupIn.getPrecision();
vector<CalpontSystemCatalog::ColDataType> types, typesIn = fRowGroupIn.getColTypes();
vector<uint32_t> pos, posIn = fRowGroupIn.getOffsets();
vector<uint32_t> oids, oidsIn = rgIn.getOIDs();
vector<uint32_t> keys, keysIn = rgIn.getKeys();
vector<uint32_t> scale, scaleIn = rgIn.getScale();
vector<uint32_t> precision, precisionIn = rgIn.getPrecision();
vector<CalpontSystemCatalog::ColDataType> types, typesIn = rgIn.getColTypes();
vector<uint32_t> pos, posIn = rgIn.getOffsets();
size_t n = jobInfo.nonConstDelCols.size();
// Add all columns into output RG as keys. Can we put only keys?
oids.insert(oids.end(), oidsIn.begin(), oidsIn.begin() + n);
keys.insert(keys.end(), keysIn.begin(), keysIn.begin() + n);
scale.insert(scale.end(), scaleIn.begin(), scaleIn.begin() + n);
@ -198,8 +235,6 @@ void TupleAnnexStep::run()
if (fInputDL == NULL)
throw logic_error("Input is not a RowGroup data list.");
fInputIterator = fInputDL->getIterator();
if (fOutputJobStepAssociation.outSize() == 0)
throw logic_error("No output data list for annex step.");
@ -213,16 +248,57 @@ void TupleAnnexStep::run()
fOutputIterator = fOutputDL->getIterator();
}
fRunner = jobstepThreadPool.invoke(Runner(this));
}
if(fParallelOp)
{
// Indexing begins with 1
fRunnersList.resize(fMaxThreads);
fInputIteratorsList.resize(fMaxThreads+1);
// Activate stats collecting before CS spawns threads.
if (traceOn()) dlTimes.setFirstReadTime();
// *DRRTUY Make this block conditional
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
sts.msg_type = StepTeleStats::ST_START;
sts.total_units_of_work = 1;
postStepStartTele(sts);
for(uint32_t id = 1; id <= fMaxThreads; id++)
{
fInputIteratorsList[id] = fInputDL->getIterator();
fRunnersList[id-1] = jobstepThreadPool.invoke(Runner(this, id));
}
}
else
{
fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
if (fInputDL == NULL)
throw logic_error("Input is not a RowGroup data list.");
fInputIterator = fInputDL->getIterator();
fRunner = jobstepThreadPool.invoke(Runner(this));
}
}
void TupleAnnexStep::join()
{
if(fParallelOp)
{
jobstepThreadPool.join(fRunnersList);
}
else
{
if (fRunner)
{
jobstepThreadPool.join(fRunner);
}
}
}
}
uint32_t TupleAnnexStep::nextBand(messageqcpp::ByteStream& bs)
{
@ -312,6 +388,12 @@ void TupleAnnexStep::execute()
}
}
void TupleAnnexStep::execute(uint32_t id)
{
if(fOrderByList[id])
executeParallelOrderBy(id);
}
void TupleAnnexStep::executeNoOrderBy()
{
@ -337,7 +419,6 @@ void TupleAnnexStep::executeNoOrderBy()
{
fRowGroupIn.setData(&rgDataIn);
fRowGroupIn.getRow(0, &fRowIn);
// Get a new output rowgroup for each input rowgroup to preserve the rids
rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
fRowGroupOut.setData(&rgDataOut);
@ -459,7 +540,7 @@ void TupleAnnexStep::executeNoOrderByWithDistinct()
fJobList->abortOnLimit((JobStep*) this);
}
if (UNLIKELY(fRowGroupOut.getRowCount() >= 8192))
if (UNLIKELY(fRowGroupOut.getRowCount() >= rowgroup::rgCommonSize))
{
dataVec.push_back(rgDataOut);
rgDataOut.reinit(fRowGroupOut);
@ -596,6 +677,511 @@ void TupleAnnexStep::executeWithOrderBy()
fOutputDL->endOfInput();
}
/*
The m() iterates over thread's LimitedOrderBy instances,
reverts the rules and then populates the final collection
used for final sorting. The method uses OrderByRow that
combination of Row::data and comparison rules.
When m() finishes with thread's LOBs it iterates over
final sorting collection, populates rgDataOut, then
sends it into outputDL.
Changing this method don't forget to make changes in
finalizeParallelOrderBy() that is a clone.
!!!The method doesn't set Row::baseRid
*/
void TupleAnnexStep::finalizeParallelOrderByDistinct()
{
utils::setThreadName("TASwParOrdDistM");
uint64_t count = 0;
uint64_t offset = 0;
uint32_t rowSize = 0;
rowgroup::RGData rgDataOut;
rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
// Calculate offset here
fRowGroupOut.getRow(0, &fRowOut);
ordering::SortingPQ finalPQ;
scoped_ptr<DistinctMap_t> distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this)));
fRowGroupIn.initRow(&row1);
fRowGroupIn.initRow(&row2);
try
{
for(uint64_t id = 1; id <= fMaxThreads; id++)
{
if (cancelled())
{
break;
}
// Revert the ordering rules before we
// add rows into the final PQ.
fOrderByList[id]->getRule().revertRules();
ordering::SortingPQ &currentPQ = fOrderByList[id]->getQueue();
finalPQ.reserve(finalPQ.size()+currentPQ.size());
pair<DistinctMap_t::iterator, bool> inserted;
while (currentPQ.size())
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(currentPQ.top());
inserted = distinctMap->insert(topOBRow.fData);
if (inserted.second)
{
finalPQ.push(topOBRow);
}
currentPQ.pop();
}
}
}
catch (const std::exception& ex)
{
catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
catch (...)
{
catchHandler("TupleAnnexStep::finalizeParallelOrderByDistinct execute\
caught an unknown exception 1",
ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
// OFFSET processing
while (finalPQ.size() && offset < fLimitStart)
{
offset++;
finalPQ.pop();
}
// Calculate rowSize only once
if (finalPQ.size())
{
ordering::OrderByRow& topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
if (!fConstant)
{
copyRow(fRowIn, &fRowOut);
}
else
{
fConstant->fillInConstants(fRowIn, fRowOut);
}
rowSize = fRowOut.getSize();
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
}
if (!fConstant)
{
while(finalPQ.size())
{
if (cancelled())
{
break;
}
while (count < fLimitCount && finalPQ.size()
&& fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
copyRow(fRowIn, &fRowOut);
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
{
break;
}
}
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.getRow(0, &fRowOut);
}
else
{
break;
}
} // end of limit bound while loop
}
else // Add ConstantColumns striped earlier
{
while(finalPQ.size())
{
if (cancelled())
{
break;
}
while (count < fLimitCount && finalPQ.size()
&& fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
fConstant->fillInConstants(fRowIn, fRowOut);
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
{
break;
}
}
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.getRow(0, &fRowOut);
}
else
{
break;
}
} // end of limit bound while loop
} // end of if-else
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
}
fOutputDL->endOfInput();
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
if (traceOn())
{
if (dlTimes.FirstReadTime().tv_sec == 0)
dlTimes.setFirstReadTime();
dlTimes.setLastReadTime();
dlTimes.setEndOfInputTime();
printCalTrace();
}
}
/*
The m() iterates over thread's LimitedOrderBy instances,
reverts the rules and then populates the final collection
used for final sorting. The method uses OrderByRow that
combination of Row::data and comparison rules.
When m() finishes with thread's LOBs it iterates over
final sorting collection, populates rgDataOut, then
sends it into outputDL.
Changing this method don't forget to make changes in
finalizeParallelOrderByDistinct() that is a clone.
!!!The method doesn't set Row::baseRid
*/
void TupleAnnexStep::finalizeParallelOrderBy()
{
utils::setThreadName("TASwParOrdMerge");
uint64_t count = 0;
uint64_t offset = 0;
uint32_t rowSize = 0;
rowgroup::RGData rgDataOut;
ordering::SortingPQ finalPQ;
rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
// Calculate offset here
fRowGroupOut.getRow(0, &fRowOut);
try
{
for(uint64_t id = 1; id <= fMaxThreads; id++)
{
if (cancelled())
{
break;
}
// Revert the ordering rules before we
// add rows into the final PQ.
fOrderByList[id]->getRule().revertRules();
ordering::SortingPQ &currentPQ = fOrderByList[id]->getQueue();
finalPQ.reserve(currentPQ.size());
while (currentPQ.size())
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(currentPQ.top());
finalPQ.push(topOBRow);
currentPQ.pop();
}
}
}
catch (const std::exception& ex)
{
catchHandler(ex.what(),ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
catch (...)
{
catchHandler("TupleAnnexStep::finalizeParallelOrderBy execute\
caught an unknown exception 1",
ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
// OFFSET processing
while (finalPQ.size() && offset < fLimitStart)
{
offset++;
finalPQ.pop();
}
// Calculate rowSize only once
if (finalPQ.size())
{
ordering::OrderByRow& topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
if (!fConstant)
{
copyRow(fRowIn, &fRowOut);
}
else
{
fConstant->fillInConstants(fRowIn, fRowOut);
}
rowSize = fRowOut.getSize();
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
}
if (!fConstant)
{
while(finalPQ.size())
{
if (cancelled())
{
break;
}
while (count < fLimitCount && finalPQ.size()
&& fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
copyRow(fRowIn, &fRowOut);
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
}
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
rgDataOut.reinit(fRowGroupIn, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.getRow(0, &fRowOut);
}
else
{
break;
}
} // end of limit bound while loop
}
else // Add ConstantColumns striped earlier
{
while(finalPQ.size())
{
if (cancelled())
{
break;
}
while (count < fLimitCount && finalPQ.size()
&& fRowGroupOut.getRowCount() < rowgroup::rgCommonSize)
{
ordering::OrderByRow &topOBRow =
const_cast<ordering::OrderByRow&>(finalPQ.top());
fRowIn.setData(topOBRow.fData);
fConstant->fillInConstants(fRowIn, fRowOut);
fRowGroupOut.incRowCount();
fRowOut.nextRow(rowSize);
finalPQ.pop();
count++;
if (fRowGroupOut.getRowCount() == rowgroup::rgCommonSize)
{
break;
}
}
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize);
fRowGroupOut.setData(&rgDataOut);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.getRow(0, &fRowOut);
}
else
{
break;
}
} // end of limit bound while loop
} // end of if-else
if (fRowGroupOut.getRowCount() > 0)
{
fRowsReturned += fRowGroupOut.getRowCount();
fOutputDL->insert(rgDataOut);
}
fOutputDL->endOfInput();
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
if (traceOn())
{
if (dlTimes.FirstReadTime().tv_sec == 0)
dlTimes.setFirstReadTime();
dlTimes.setLastReadTime();
dlTimes.setEndOfInputTime();
printCalTrace();
}
}
void TupleAnnexStep::executeParallelOrderBy(uint64_t id)
{
utils::setThreadName("TASwParOrd");
RGData rgDataIn;
RGData rgDataOut;
bool more = false;
uint64_t dlOffset = 0;
uint32_t rowSize = 0;
uint64_t rowCount = 0;
uint64_t doubleRGSize = 2*rowgroup::rgCommonSize;
rowgroup::Row r = fRowIn;
rowgroup::RowGroup rg = fRowGroupIn;
rg.initRow(&r);
LimitedOrderBy *limOrderBy = fOrderByList[id];
ordering::SortingPQ &currentPQ = limOrderBy->getQueue();
if (limOrderBy->getLimitCount() < QUEUE_RESERVE_SIZE)
{
currentPQ.reserve(limOrderBy->getLimitCount());
}
else
{
currentPQ.reserve(QUEUE_RESERVE_SIZE);
}
try
{
more = fInputDL->next(fInputIteratorsList[id], &rgDataIn);
if (more) dlOffset++;
while (more && !cancelled())
{
if (dlOffset%fMaxThreads == id-1)
{
if (cancelled())
break;
if (currentPQ.capacity()-currentPQ.size() < doubleRGSize)
{
currentPQ.reserve(QUEUE_RESERVE_SIZE);
}
rg.setData(&rgDataIn);
rg.getRow(0, &r);
if (!rowSize)
{
rowSize = r.getSize();
}
rowCount = rg.getRowCount();
for (uint64_t i = 0; i < rowCount; ++i)
{
limOrderBy->processRow(r);
r.nextRow(rowSize);
}
}
// *DRRTUY Implement a method to skip elements in FIFO
more = fInputDL->next(fInputIteratorsList[id], &rgDataIn);
if(more) dlOffset++;
}
}
catch (const std::exception& ex)
{
catchHandler(ex.what(), ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
catch (...)
{
catchHandler("TupleAnnexStep::executeParallelOrderBy execute caught an unknown exception",
ERR_IN_PROCESS, fErrorInfo, fSessionId);
}
// read out the input DL
while (more)
more = fInputDL->next(fInputIteratorsList[id], &rgDataIn);
// Count finished sorting threads under mutex and run final
// sort step when the last thread converges
fParallelFinalizeMutex.lock();
fFinishedThreads++;
if (fFinishedThreads == fMaxThreads)
{
fParallelFinalizeMutex.unlock();
if(fDistinct)
{
finalizeParallelOrderByDistinct();
}
else
{
finalizeParallelOrderBy();
}
}
else
{
fParallelFinalizeMutex.unlock();
}
}
const RowGroup& TupleAnnexStep::getOutputRowGroup() const
{
@ -626,7 +1212,8 @@ bool TupleAnnexStep::deliverStringTableRowGroup() const
const string TupleAnnexStep::toString() const
{
ostringstream oss;
oss << "AnnexStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
oss << "AnnexStep ";
oss << " ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
oss << " in:";
@ -673,8 +1260,8 @@ void TupleAnnexStep::printCalTrace()
void TupleAnnexStep::formatMiniStats()
{
ostringstream oss;
oss << "TNS "
<< "UM "
oss << "TNS ";
oss << "UM "
<< "- "
<< "- "
<< "- "

View File

@ -1,4 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2019 MariaDB Corporaton.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -21,15 +22,11 @@
#ifndef JOBLIST_TUPLEANNEXSTEP_H
#define JOBLIST_TUPLEANNEXSTEP_H
#include <queue>
#include <boost/thread/thread.hpp>
#include "jobstep.h"
// forward reference
namespace fucexp
{
class FuncExp;
}
#include "limitedorderby.h"
namespace joblist
{
@ -49,6 +46,8 @@ public:
/** @brief TupleAnnexStep constructor
*/
TupleAnnexStep(const JobInfo& jobInfo);
// Copy ctor to have a class mutex
TupleAnnexStep(const TupleAnnexStep &copy);
/** @brief TupleAnnexStep destructor
*/
@ -90,6 +89,14 @@ public:
fLimitStart = s;
fLimitCount = c;
}
void setParallelOp()
{
fParallelOp = true;
}
void setMaxThreads(uint32_t number)
{
fMaxThreads = number;
}
virtual bool stringTableFriendly()
{
@ -100,11 +107,15 @@ public:
protected:
void execute();
void execute(uint32_t);
void executeNoOrderBy();
void executeWithOrderBy();
void executeParallelOrderBy(uint64_t id);
void executeNoOrderByWithDistinct();
void formatMiniStats();
void printCalTrace();
void finalizeParallelOrderBy();
void finalizeParallelOrderByDistinct();
// input/output rowgroup and row
rowgroup::RowGroup fRowGroupIn;
@ -117,18 +128,26 @@ protected:
RowGroupDL* fInputDL;
RowGroupDL* fOutputDL;
uint64_t fInputIterator;
std::vector<uint64_t> fInputIteratorsList;
uint64_t fOutputIterator;
class Runner
{
public:
Runner(TupleAnnexStep* step) : fStep(step) { }
Runner(TupleAnnexStep* step) :
fStep(step), id(0) { }
Runner(TupleAnnexStep* step, uint32_t id) :
fStep(step), id(id) { }
void operator()()
{
if(id)
fStep->execute(id);
else
fStep->execute();
}
TupleAnnexStep* fStep;
uint16_t id;
};
uint64_t fRunner; // thread pool handle
@ -136,9 +155,11 @@ protected:
uint64_t fRowsReturned;
uint64_t fLimitStart;
uint64_t fLimitCount;
uint64_t fMaxThreads;
bool fLimitHit;
bool fEndOfResult;
bool fDistinct;
bool fParallelOp;
LimitedOrderBy* fOrderBy;
TupleConstantStep* fConstant;
@ -146,8 +167,21 @@ protected:
funcexp::FuncExp* fFeInstance;
JobList* fJobList;
std::vector<LimitedOrderBy*> fOrderByList;
std::vector<uint64_t> fRunnersList;
uint16_t fFinishedThreads;
boost::mutex fParallelFinalizeMutex;
};
template <class T>
class reservablePQ: private std::priority_queue<T>
{
public:
typedef typename std::priority_queue<T>::size_type size_type;
reservablePQ(size_type capacity = 0) { reserve(capacity); };
void reserve(size_type capacity) { this->c.reserve(capacity); }
size_type capacity() const { return this->c.capacity(); }
};
} // namespace

View File

@ -472,7 +472,7 @@ void TupleConstantStep::execute()
fOutputDL->endOfInput();
}
// *DRRTUY Copy row at once not one field at a time
void TupleConstantStep::fillInConstants()
{
fRowGroupIn.getRow(0, &fRowIn);
@ -495,7 +495,6 @@ void TupleConstantStep::fillInConstants()
}
else // only first column is constant
{
//size_t n = fRowOut.getOffset(fRowOut.getColumnCount()) - fRowOut.getOffset(1);
for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
{
fRowOut.setRid(fRowIn.getRelRid());

View File

@ -7807,6 +7807,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
// To activate LimitedOrderBy
if(isPushdownHand)
{
csep->orderByThreads(get_orderby_threads(gwi.thd));
csep->specHandlerProcessed(true);
}
}
@ -7844,6 +7845,8 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
{
Item_int* select = (Item_int*)select_lex.master_unit()->global_parameters()->select_limit;
csep->limitNum(select->val_int());
// MCOL-894 Activate parallel ORDER BY
csep->orderByThreads(get_orderby_threads(gwi.thd));
}
}
}
@ -9840,6 +9843,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
{
csep->hasOrderBy(true);
csep->specHandlerProcessed(true);
csep->orderByThreads(get_orderby_threads(gwi.thd));
}
}

View File

@ -107,6 +107,18 @@ static MYSQL_THDVAR_BOOL(
0
);
static MYSQL_THDVAR_UINT(
orderby_threads,
PLUGIN_VAR_RQCMDARG,
"Number of parallel threads used by ORDER BY. (default to 16)",
NULL,
NULL,
16,
0,
2048,
1
);
// legacy system variables
static MYSQL_THDVAR_ULONG(
decimal_scale,
@ -294,6 +306,7 @@ st_mysql_sys_var* mcs_system_variables[] =
MYSQL_SYSVAR(derived_handler),
MYSQL_SYSVAR(processing_handlers_fallback),
MYSQL_SYSVAR(group_by_handler),
MYSQL_SYSVAR(orderby_threads),
MYSQL_SYSVAR(decimal_scale),
MYSQL_SYSVAR(use_decimal_scale),
MYSQL_SYSVAR(ordered_only),
@ -396,6 +409,16 @@ mcs_compression_type_t get_compression_type(THD* thd) {
return (mcs_compression_type_t) THDVAR(thd, compression_type);
}
uint get_orderby_threads(THD* thd)
{
return ( thd == NULL ) ? 0 : THDVAR(thd, orderby_threads);
}
void set_orderby_threads(THD* thd, uint value)
{
THDVAR(thd, orderby_threads) = value;
}
bool get_use_decimal_scale(THD* thd)
{
return ( thd == NULL ) ? false : THDVAR(thd, use_decimal_scale);

View File

@ -58,6 +58,9 @@ void set_group_by_handler(THD* thd, bool value);
bool get_fallback_knob(THD* thd);
void set_fallback_knob(THD* thd, bool value);
uint get_orderby_threads(THD* thd);
void set_orderby_threads(THD* thd, uint value);
bool get_use_decimal_scale(THD* thd);
void set_use_decimal_scale(THD* thd, bool value);

View File

@ -979,117 +979,6 @@ bool Row::isNullValue(uint32_t colIndex) const
uint64_t Row::getNullValue(uint32_t colIndex) const
{
return utils::getNullValue(types[colIndex], getColumnWidth(colIndex));
#if 0
switch (types[colIndex])
{
case CalpontSystemCatalog::TINYINT:
return joblist::TINYINTNULL;
case CalpontSystemCatalog::SMALLINT:
return joblist::SMALLINTNULL;
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
return joblist::INTNULL;
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
return joblist::FLOATNULL;
case CalpontSystemCatalog::DATE:
return joblist::DATENULL;
case CalpontSystemCatalog::BIGINT:
return joblist::BIGINTNULL;
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
return joblist::DOUBLENULL;
case CalpontSystemCatalog::DATETIME:
return joblist::DATETIMENULL;
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::STRINT:
{
uint32_t len = getColumnWidth(colIndex);
switch (len)
{
case 1:
return joblist::CHAR1NULL;
case 2:
return joblist::CHAR2NULL;
case 3:
case 4:
return joblist::CHAR4NULL;
case 5:
case 6:
case 7:
case 8:
return joblist::CHAR8NULL;
default:
throw logic_error("Row::getNullValue() Can't return the NULL string");
}
break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
uint32_t len = getColumnWidth(colIndex);
switch (len)
{
case 1 :
return joblist::TINYINTNULL;
case 2 :
return joblist::SMALLINTNULL;
case 4 :
return joblist::INTNULL;
default:
return joblist::BIGINTNULL;
}
break;
}
case CalpontSystemCatalog::UTINYINT:
return joblist::UTINYINTNULL;
case CalpontSystemCatalog::USMALLINT:
return joblist::USMALLINTNULL;
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
return joblist::UINTNULL;
case CalpontSystemCatalog::UBIGINT:
return joblist::UBIGINTNULL;
case CalpontSystemCatalog::LONGDOUBLE:
return -1; // no NULL value for long double yet, this is a nan.
case CalpontSystemCatalog::VARBINARY:
default:
ostringstream os;
os << "Row::getNullValue(): got bad column type (" << types[colIndex] <<
"). Width=" << getColumnWidth(colIndex) << endl;
os << toString() << endl;
throw logic_error(os.str());
}
#endif
}
/* This fcn might produce overflow warnings from the compiler, but that's OK.
@ -1098,117 +987,6 @@ uint64_t Row::getNullValue(uint32_t colIndex) const
int64_t Row::getSignedNullValue(uint32_t colIndex) const
{
return utils::getSignedNullValue(types[colIndex], getColumnWidth(colIndex));
#if 0
switch (types[colIndex])
{
case CalpontSystemCatalog::TINYINT:
return (int64_t) ((int8_t) joblist::TINYINTNULL);
case CalpontSystemCatalog::SMALLINT:
return (int64_t) ((int16_t) joblist::SMALLINTNULL);
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
return (int64_t) ((int32_t) joblist::INTNULL);
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
return (int64_t) ((int32_t) joblist::FLOATNULL);
case CalpontSystemCatalog::DATE:
return (int64_t) ((int32_t) joblist::DATENULL);
case CalpontSystemCatalog::BIGINT:
return joblist::BIGINTNULL;
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
return joblist::DOUBLENULL;
case CalpontSystemCatalog::DATETIME:
return joblist::DATETIMENULL;
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::STRINT:
{
uint32_t len = getColumnWidth(colIndex);
switch (len)
{
case 1:
return (int64_t) ((int8_t) joblist::CHAR1NULL);
case 2:
return (int64_t) ((int16_t) joblist::CHAR2NULL);
case 3:
case 4:
return (int64_t) ((int32_t) joblist::CHAR4NULL);
case 5:
case 6:
case 7:
case 8:
return joblist::CHAR8NULL;
default:
throw logic_error("Row::getSignedNullValue() Can't return the NULL string");
}
break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
uint32_t len = getColumnWidth(colIndex);
switch (len)
{
case 1 :
return (int64_t) ((int8_t) joblist::TINYINTNULL);
case 2 :
return (int64_t) ((int16_t) joblist::SMALLINTNULL);
case 4 :
return (int64_t) ((int32_t) joblist::INTNULL);
default:
return joblist::BIGINTNULL;
}
break;
}
case CalpontSystemCatalog::UTINYINT:
return (int64_t) ((int8_t) joblist::UTINYINTNULL);
case CalpontSystemCatalog::USMALLINT:
return (int64_t) ((int16_t) joblist::USMALLINTNULL);
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
return (int64_t) ((int32_t) joblist::UINTNULL);
case CalpontSystemCatalog::UBIGINT:
return (int64_t)joblist::UBIGINTNULL;
case CalpontSystemCatalog::LONGDOUBLE:
return -1; // no NULL value for long double yet, this is a nan.
case CalpontSystemCatalog::VARBINARY:
default:
ostringstream os;
os << "Row::getSignedNullValue(): got bad column type (" << types[colIndex] <<
"). Width=" << getColumnWidth(colIndex) << endl;
os << toString() << endl;
throw logic_error(os.str());
}
#endif
}
RowGroup::RowGroup() : columnCount(0), data(NULL), rgData(NULL), strings(NULL),

View File

@ -63,6 +63,8 @@
namespace rowgroup
{
const int16_t rgCommonSize = 8192;
/*
The format of the data RowGroup points to is currently ...

View File

@ -30,3 +30,8 @@ set_target_properties(windowfunction PROPERTIES VERSION 1.0.0 SOVERSION 1)
install(TARGETS windowfunction DESTINATION ${ENGINE_LIBDIR} COMPONENT libs)
if (WITH_SORTING_COMPARATORS_UT)
add_executable(comparators_tests comparators-tests.cpp)
target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit)
install(TARGETS comparators_tests DESTINATION ${ENGINE_BINDIR} COMPONENT platform)
endif()

View File

@ -0,0 +1,425 @@
/* Copyright (C) 2019 MariaDB Corporaton.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: tdriver-filter.cpp 9210 2013-01-21 14:10:42Z rdempsey $
#include <list>
#include <sstream>
#include <pthread.h>
#include <iomanip>
#include <cppunit/extensions/HelperMacros.h>
#include <cppunit/extensions/TestFactoryRegistry.h>
#include <cppunit/ui/text/TestRunner.h>
#include <time.h>
#include <sys/time.h>
#include <limits.h>
#include "jobstep.h"
#include "funcexp.h"
#include "jlf_common.h"
#include "tupleannexstep.h"
#include "calpontsystemcatalog.h"
#include <boost/any.hpp>
#include <boost/function.hpp>
#include "bytestream.h"
#include "idborderby.h"
#define DEBUG
#define MEMORY_LIMIT 14983602176
using namespace std;
using namespace joblist;
using namespace messageqcpp;
// Timer class used by this tdriver to output elapsed times, etc.
class Timer
{
public:
void start(const string& message)
{
if (!fHeaderDisplayed)
{
header();
fHeaderDisplayed = true;
}
gettimeofday(&fTvStart, 0);
cout << timestr() << " Start " << message << endl;
}
void stop(const string& message)
{
time_t now;
time(&now);
string secondsElapsed;
getTimeElapsed(secondsElapsed);
cout << timestr() << " " << secondsElapsed << " Stop " << message << endl;
}
Timer() : fHeaderDisplayed(false) {}
private:
struct timeval fTvStart;
bool fHeaderDisplayed;
double getTimeElapsed(string& seconds)
{
struct timeval tvStop;
gettimeofday(&tvStop, 0);
double secondsElapsed =
(tvStop.tv_sec + (tvStop.tv_usec / 1000000.0)) -
(fTvStart.tv_sec + (fTvStart.tv_usec / 1000000.0));
ostringstream oss;
oss << secondsElapsed;
seconds = oss.str();
seconds.resize(8, '0');
return secondsElapsed;
}
string timestr()
{
struct tm tm;
struct timeval tv;
gettimeofday(&tv, 0);
localtime_r(&tv.tv_sec, &tm);
ostringstream oss;
oss << setfill('0')
<< setw(2) << tm.tm_hour << ':'
<< setw(2) << tm.tm_min << ':'
<< setw(2) << tm.tm_sec << '.'
<< setw(6) << tv.tv_usec
;
return oss.str();
}
void header()
{
cout << endl;
cout << "Time Seconds Activity" << endl;
}
};
class FilterDriver : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(FilterDriver);
CPPUNIT_TEST(INT_TEST);
CPPUNIT_TEST(FLOAT_TEST);
CPPUNIT_TEST_SUITE_END();
private:
// The tests creates an RG with 1 column of the cscDt type
// then initialize RGData. After that it adds two numeric values (v1 < v2)and two NULL.
// Then creates comparator structores and run a number of tests. v1 < v2
void testComparatorWithDT(execplan::CalpontSystemCatalog::ColDataType cscDt,
uint32_t width,
bool generateRandValues)
{
std::cout << std::endl << "------------------------------------------------------------" << std::endl;
uint32_t oid =3001;
std::vector<uint32_t> offsets, roids, tkeys, cscale, cprecision;
std::vector<execplan::CalpontSystemCatalog::ColDataType> types;
offsets.push_back(2); offsets.push_back(2+width);
roids.push_back(oid);
tkeys.push_back(1);
types.push_back(cscDt);
cscale.push_back(0);
cprecision.push_back(20);
rowgroup::RowGroup inRG(1, //column count
offsets, //oldOffset
roids, // column oids
tkeys, //keys
types, // types
cscale, //scale
cprecision, // precision
20, // sTableThreshold
false //useStringTable
);
rowgroup::RGData rgD = rowgroup::RGData(inRG);
inRG.setData(&rgD);
rowgroup::Row r, r1, r2, r3;
inRG.initRow(&r);
uint32_t rowSize = r.getSize();
inRG.getRow(0, &r);
// Sorting spec describes sorting direction and NULL comparision
// preferences
ordering::IdbSortSpec spec = ordering::IdbSortSpec(0, // column index
true, // ascending
true); // NULLs first
std::vector<ordering::IdbSortSpec> specVect;
specVect.push_back(spec);
switch(cscDt)
{
case execplan::CalpontSystemCatalog::UTINYINT:
{
std::cout << "UTINYINT " << std::endl;
r.setUintField<1>(42, 0);
r.nextRow(rowSize);
r.setUintField<1>(43, 0);
r.nextRow(rowSize);
r.setUintField<1>(joblist::UTINYINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::USMALLINT:
{
std::cout << "USMALLINT " << std::endl;
r.setUintField<2>(42, 0);
r.nextRow(rowSize);
r.setUintField<2>(43, 0);
r.nextRow(rowSize);
r.setUintField<2>(joblist::USMALLINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
{
std::cout << "UINT " << std::endl;
r.setUintField<4>(42, 0);
r.nextRow(rowSize);
r.setUintField<4>(43, 0);
r.nextRow(rowSize);
r.setUintField<4>(joblist::UINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::DATETIME:
case execplan::CalpontSystemCatalog::UBIGINT:
{
std::cout << "UBIGINT " << std::endl;
r.setUintField<8>(42, 0);
r.nextRow(rowSize);
r.setUintField<8>(43, 0);
r.nextRow(rowSize);
r.setUintField<8>(joblist::UBIGINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::TINYINT:
{
std::cout << "TINYINT " << std::endl;
r.setIntField<1>(42, 0);
r.nextRow(rowSize);
r.setIntField<1>(43, 0);
r.nextRow(rowSize);
r.setIntField<1>(joblist::TINYINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::SMALLINT:
{
std::cout << "SMALLINT " << std::endl;
r.setIntField<2>(42, 0);
r.nextRow(rowSize);
r.setIntField<2>(43, 0);
r.nextRow(rowSize);
r.setIntField<2>(joblist::SMALLINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::DATE:
{
if (cscDt == execplan::CalpontSystemCatalog::DATE)
std::cout << "DATE" << std::endl;
else
std::cout << "INT " << std::endl;
r.setIntField<4>(42, 0);
r.nextRow(rowSize);
r.setIntField<4>(43, 0);
r.nextRow(rowSize);
if (cscDt == execplan::CalpontSystemCatalog::DATE)
r.setIntField<4>(joblist::DATENULL, 0);
else
r.setIntField<4>(joblist::INTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::BIGINT:
{
std::cout << "BIGINT " << std::endl;
r.setIntField<8>(42, 0);
r.nextRow(rowSize);
r.setIntField<8>(43, 0);
r.nextRow(rowSize);
r.setIntField<8>(joblist::BIGINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
{
std::cout << "DECIMAL " << std::endl;
switch (width)
{
case 1 :
{
r.setUintField<1>(42, 0);
r.nextRow(rowSize);
r.setUintField<1>(43, 0);
r.nextRow(rowSize);
r.setUintField<1>(joblist::TINYINTNULL, 0);
break;
}
case 2 :
{
r.setUintField<2>(42, 0);
r.nextRow(rowSize);
r.setUintField<2>(43, 0);
r.nextRow(rowSize);
r.setUintField<2>(joblist::SMALLINTNULL, 0);
break;
}
case 4 :
{
r.setUintField<4>(42, 0);
r.nextRow(rowSize);
r.setUintField<4>(43, 0);
r.nextRow(rowSize);
r.setUintField<4>(joblist::INTNULL, 0);
break;
}
default:
{
r.setUintField<8>(42, 0);
r.nextRow(rowSize);
r.setUintField<8>(43, 0);
r.nextRow(rowSize);
r.setUintField<8>(joblist::BIGINTNULL, 0);
break;
}
}
break;
}
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
{
std::cout << "FLOAT " << std::endl;
r.setFloatField(42.1, 0);
r.nextRow(rowSize);
r.setFloatField(43.1, 0);
r.nextRow(rowSize);
r.setUintField(joblist::FLOATNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
{
std::cout << "DOUBLE " << std::endl;
r.setDoubleField(42.1, 0);
r.nextRow(rowSize);
r.setDoubleField(43.1, 0);
r.nextRow(rowSize);
r.setUintField(joblist::DOUBLENULL, 0);
break;
}
case execplan::CalpontSystemCatalog::LONGDOUBLE:
{
r.setLongDoubleField(42.1, 0);
r.nextRow(rowSize);
r.setLongDoubleField(43.1, 0);
r.nextRow(rowSize);
r.setLongDoubleField(joblist::LONGDOUBLENULL, 0);
break;
}
default:
{
break;
}
}
inRG.setRowCount(3);
inRG.initRow(&r1);
inRG.initRow(&r2);
inRG.initRow(&r3);
inRG.getRow(0, &r1);
inRG.getRow(1, &r2);
inRG.getRow(2, &r3);
std::cout<< "r1 " << r1.toString() << " r2 " << r2.toString()
<< " r3 " << r3.toString() << std::endl;
ordering::IdbCompare idbCompare;
idbCompare.initialize(inRG);
ordering::OrderByData odbData = ordering::OrderByData(specVect, inRG);
bool result = odbData(r1.getPointer(), r2.getPointer());
std::cout << r1.toString() << " < " << r2.toString() << " is "
<< ((result) ? "true" : "false") << std::endl;
CPPUNIT_ASSERT(result == true);
result = odbData(r2.getPointer(), r1.getPointer());
std::cout << r2.toString() << " < " << r1.toString() << " is "
<< ((result) ? "true" : "false") << std::endl;
CPPUNIT_ASSERT(result == false);
result = odbData(r2.getPointer(), r2.getPointer());
std::cout << r2.toString() << " < " << r2.toString() << " is "
<< ((result) ? "true" : "false") << std::endl;
CPPUNIT_ASSERT(result == false);
// Compare value with NULL. if spec.fNf then NULLs goes first
result = odbData(r3.getPointer(), r1.getPointer());
std::cout << r3.toString() << " < " << r1.toString() << " is "
<< ((result) ? "true" : "false") << std::endl;
CPPUNIT_ASSERT(result == true);
// Compare NULL with NULL
result = odbData(r3.getPointer(), r1.getPointer());
std::cout << r3.toString() << " < " << r3.toString() << " is "
<< ((result) ? "true" : "false") << std::endl;
CPPUNIT_ASSERT(result == true);
}
void INT_TEST()
{
//bool generateValues = true;
bool fixedValues = false;
testComparatorWithDT(execplan::CalpontSystemCatalog::UTINYINT, 1, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::USMALLINT, 2, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::UMEDINT, 4, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::UBIGINT, 8, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::DATETIME, 8, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::TINYINT, 1, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::SMALLINT, 2, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::MEDINT, 4, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::DATE, 4, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::BIGINT, 8, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::DECIMAL, 8, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::FLOAT, 4, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::DOUBLE, 8, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::LONGDOUBLE, 8, fixedValues);
}
void FLOAT_TEST()
{
}
};
CPPUNIT_TEST_SUITE_REGISTRATION(FilterDriver);
int main( int argc, char** argv)
{
CppUnit::TextUi::TestRunner runner;
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
runner.addTest( registry.makeTest() );
bool wasSuccessful = runner.run( "", false );
return (wasSuccessful ? 0 : 1);
}

View File

@ -43,32 +43,85 @@ using namespace rowgroup;
#include "idborderby.h"
#include "joblisttypes.h"
namespace ordering
{
int TinyIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
int8_t v1 = l->row1().getIntField(fSpec.fIndex);
int8_t v2 = l->row2().getIntField(fSpec.fIndex);
int8_t nullValue = static_cast<int8_t>(joblist::TINYINTNULL);
if (v1 == nullValue || v2 == nullValue)
{
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int SmallIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
int16_t v1 = l->row1().getIntField(fSpec.fIndex);
int16_t v2 = l->row2().getIntField(fSpec.fIndex);
int16_t nullValue = static_cast<int16_t>(joblist::SMALLINTNULL);
if (v1 == nullValue || v2 == nullValue)
{
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int IntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
bool b1 = l->row1().isNullValue(fSpec.fIndex);
bool b2 = l->row2().isNullValue(fSpec.fIndex);
int ret = 0;
int32_t v1 = l->row1().getIntField(fSpec.fIndex);
int32_t v2 = l->row2().getIntField(fSpec.fIndex);
int32_t nullValue = static_cast<int32_t>(joblist::INTNULL);
if (b1 == true || b2 == true)
if (v1 == nullValue || v2 == nullValue)
{
if (b1 == false && b2 == true)
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (b1 == true && b2 == false)
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
int64_t v1 = l->row1().getIntField(fSpec.fIndex);
int64_t v2 = l->row2().getIntField(fSpec.fIndex);
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
@ -78,29 +131,25 @@ int IntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
return ret;
}
int UintCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
int BigIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
bool b1 = l->row1().isNullValue(fSpec.fIndex);
bool b2 = l->row2().isNullValue(fSpec.fIndex);
int ret = 0;
int64_t v1 = l->row1().getIntField(fSpec.fIndex);
int64_t v2 = l->row2().getIntField(fSpec.fIndex);
int64_t nullValue = static_cast<int64_t>(joblist::BIGINTNULL);
if (b1 == true || b2 == true)
if (v1 == nullValue || v2 == nullValue)
{
if (b1 == false && b2 == true)
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (b1 == true && b2 == false)
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
uint64_t v1 = l->row1().getUintField(fSpec.fIndex);
uint64_t v2 = l->row2().getUintField(fSpec.fIndex);
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
@ -110,6 +159,116 @@ int UintCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
return ret;
}
int UTinyIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint8_t v1 = l->row1().getUintField(fSpec.fIndex);
uint8_t v2 = l->row2().getUintField(fSpec.fIndex);
uint8_t nullValue = static_cast<uint8_t>(joblist::UTINYINTNULL);
if (v1 == nullValue || v2 == nullValue)
{
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int USmallIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint16_t v1 = l->row1().getUintField(fSpec.fIndex);
uint16_t v2 = l->row2().getUintField(fSpec.fIndex);
uint16_t nullValue = static_cast<uint16_t>(joblist::USMALLINTNULL);
if (v1 == nullValue || v2 == nullValue)
{
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int UIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint32_t v1 = l->row1().getUintField(fSpec.fIndex);
uint32_t v2 = l->row2().getUintField(fSpec.fIndex);
uint32_t nullValue = static_cast<uint32_t>(joblist::UINTNULL);
if (v1 == nullValue || v2 == nullValue)
{
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int UBigIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint64_t v1 = l->row1().getUintField(fSpec.fIndex);
uint64_t v2 = l->row2().getUintField(fSpec.fIndex);
if (v1 == joblist::UBIGINTNULL || v2 == joblist::UBIGINTNULL)
{
if (v1 != joblist::UBIGINTNULL && v2 == joblist::UBIGINTNULL)
ret = fSpec.fNf;
else if (v1 == joblist::UBIGINTNULL && v2 != joblist::UBIGINTNULL)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int StringCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
@ -130,41 +289,45 @@ int StringCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
}
else
{
string v1 = l->row1().getStringField(fSpec.fIndex);
string v2 = l->row2().getStringField(fSpec.fIndex);
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
int len1 = l->row1().getStringLength(fSpec.fIndex);
int len2 = l->row2().getStringLength(fSpec.fIndex);
const char* s1 = (const char*)l->row1().getStringPointer(fSpec.fIndex);
const char* s2 = (const char*)l->row2().getStringPointer(fSpec.fIndex);
// For Japanese, coll.compare() may not be as correct as strncmp
if (JPcodePoint)
{
ret = fSpec.fAsc * strncmp(s1, s2, max(len1,len2));
}
else
{
const std::collate<char>& coll = std::use_facet<std::collate<char> >(loc);
ret = fSpec.fAsc * coll.compare(s1, s1+len1, s2, s2+len2);
}
}
return ret;
}
int DoubleCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
bool b1 = l->row1().isNullValue(fSpec.fIndex);
bool b2 = l->row2().isNullValue(fSpec.fIndex);
uint64_t uiv1 = l->row1().getUintField(fSpec.fIndex);
uint64_t uiv2 = l->row2().getUintField(fSpec.fIndex);
int ret = 0;
if (b1 == true || b2 == true)
if (uiv1 == joblist::DOUBLENULL || uiv2 == joblist::DOUBLENULL)
{
if (b1 == false && b2 == true)
if (uiv1 != joblist::DOUBLENULL && uiv2 == joblist::DOUBLENULL)
ret = fSpec.fNf;
else if (b1 == true && b2 == false)
else if (uiv1 == joblist::DOUBLENULL && uiv2 != joblist::DOUBLENULL)
ret = -fSpec.fNf;
}
else
{
double v1 = l->row1().getDoubleField(fSpec.fIndex);
double v2 = l->row2().getDoubleField(fSpec.fIndex);
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
@ -179,23 +342,22 @@ int FloatCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
l->row1().setData(r1);
l->row2().setData(r2);
bool b1 = l->row1().isNullValue(fSpec.fIndex);
bool b2 = l->row2().isNullValue(fSpec.fIndex);
int32_t iv1 = l->row1().getIntField(fSpec.fIndex);
int32_t iv2 = l->row2().getIntField(fSpec.fIndex);
int32_t nullValue = static_cast<int32_t>(joblist::FLOATNULL);
int ret = 0;
if (b1 == true || b2 == true)
if (iv1 == nullValue || iv2 == nullValue)
{
if (b1 == false && b2 == true)
if (iv1 != nullValue && iv2 == nullValue)
ret = fSpec.fNf;
else if (b1 == true && b2 == false)
else if (iv1 == nullValue && iv2 != nullValue)
ret = -fSpec.fNf;
}
else
{
float v1 = l->row1().getFloatField(fSpec.fIndex);
float v2 = l->row2().getFloatField(fSpec.fIndex);
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
@ -210,23 +372,20 @@ int LongDoubleCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r
l->row1().setData(r1);
l->row2().setData(r2);
bool b1 = l->row1().isNullValue(fSpec.fIndex);
bool b2 = l->row2().isNullValue(fSpec.fIndex);
long double v1 = l->row1().getLongDoubleField(fSpec.fIndex);
long double v2 = l->row2().getLongDoubleField(fSpec.fIndex);
int ret = 0;
if (b1 == true || b2 == true)
if (v1 == joblist::LONGDOUBLENULL || v2 == joblist::LONGDOUBLENULL)
{
if (b1 == false && b2 == true)
if (v1 != joblist::LONGDOUBLENULL && v2 == joblist::LONGDOUBLENULL)
ret = fSpec.fNf;
else if (b1 == true && b2 == false)
else if (v1 == joblist::LONGDOUBLENULL && v2 != joblist::LONGDOUBLENULL)
ret = -fSpec.fNf;
}
else
{
long double v1 = l->row1().getLongDoubleField(fSpec.fIndex);
long double v2 = l->row2().getLongDoubleField(fSpec.fIndex);
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
@ -236,29 +395,79 @@ int LongDoubleCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r
return ret;
}
int DateCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint32_t v1 = l->row1().getUintField(fSpec.fIndex);
uint32_t v2 = l->row2().getUintField(fSpec.fIndex);
uint32_t nullValue = static_cast<uint32_t>(joblist::DATENULL);
if (v1 == nullValue || v2 == nullValue)
{
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int DatetimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint64_t v1 = l->row1().getUintField(fSpec.fIndex);
uint64_t v2 = l->row2().getUintField(fSpec.fIndex);
if (v1 == joblist::DATETIMENULL || v2 == joblist::DATETIMENULL)
{
if (v1 != joblist::DATETIMENULL && v2 == joblist::DATETIMENULL)
ret = fSpec.fNf;
else if (v1 == joblist::DATETIMENULL && v2 != joblist::DATETIMENULL)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
bool b1 = l->row1().isNullValue(fSpec.fIndex);
bool b2 = l->row2().isNullValue(fSpec.fIndex);
int ret = 0;
uint64_t v1 = l->row1().getUintField(fSpec.fIndex);
uint64_t v2 = l->row2().getUintField(fSpec.fIndex);
if (b1 == true || b2 == true)
if (v1 == joblist::TIMENULL || v2 == joblist::TIMENULL)
{
if (b1 == false && b2 == true)
if (v1 != joblist::TIMENULL && v2 == joblist::TIMENULL)
ret = fSpec.fNf;
else if (b1 == true && b2 == false)
else if (v1 == joblist::TIMENULL && v2 != joblist::TIMENULL)
ret = -fSpec.fNf;
}
else
{
int64_t v1 = l->row1().getIntField(fSpec.fIndex);
int64_t v2 = l->row2().getIntField(fSpec.fIndex);
// ((int64_t) -00:00:26) > ((int64_t) -00:00:25)
// i.e. For 2 negative TIME values, we invert the order of
// comparison operations to force "-00:00:26" to appear before
@ -301,6 +510,16 @@ bool CompareRule::less(Row::Pointer r1, Row::Pointer r2)
return false;
}
void CompareRule::revertRules()
{
std::vector<Compare*>::iterator fCompareIter = fCompares.begin();
for(; fCompareIter!=fCompares.end(); fCompareIter++)
{
(*fCompareIter)->revertSortSpec();
}
}
void CompareRule::compileRules(const std::vector<IdbSortSpec>& spec, const rowgroup::RowGroup& rg)
{
@ -311,31 +530,80 @@ void CompareRule::compileRules(const std::vector<IdbSortSpec>& spec, const rowgr
switch (types[i->fIndex])
{
case CalpontSystemCatalog::TINYINT:
{
Compare* c = new TinyIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::SMALLINT:
{
Compare* c = new SmallIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
Compare* c = new IntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::BIGINT:
{
Compare* c = new BigIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
Compare* c;
uint32_t len = rg.getColumnWidth(i->fIndex);
switch (len)
{
case 1 :
c = new TinyIntCompare(*i); break;
case 2 :
c = new SmallIntCompare(*i); break;
case 4 :
c = new IntCompare(*i); break;
default:
c = new BigIntCompare(*i);
}
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::UTINYINT:
{
Compare* c = new UTinyIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::USMALLINT:
{
Compare* c = new USmallIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
{
Compare* c = new UIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::UBIGINT:
{
Compare* c = new UintCompare(*i);
Compare* c = new UBigIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::TEXT:
{
Compare* c = new StringCompare(*i);
fCompares.push_back(c);
@ -366,14 +634,18 @@ void CompareRule::compileRules(const std::vector<IdbSortSpec>& spec, const rowgr
}
case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME:
case CalpontSystemCatalog::TIMESTAMP:
{
Compare* c = new UintCompare(*i);
Compare* c = new DateCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::DATETIME:
case CalpontSystemCatalog::TIMESTAMP:
{
Compare* c = new DatetimeCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::TIME:
{
Compare* c = new TimeCompare(*i);
@ -417,9 +689,10 @@ OrderByData::OrderByData(const std::vector<IdbSortSpec>& spec, const rowgroup::R
// IdbOrderBy class implementation
IdbOrderBy::IdbOrderBy() :
fDistinct(false), fMemSize(0), fRowsPerRG(8192), fErrorCode(0), fRm(NULL)
{
}
fDistinct(false), fMemSize(0), fRowsPerRG(rowgroup::rgCommonSize),
fErrorCode(0),
fRm(NULL)
{ }
IdbOrderBy::~IdbOrderBy()

View File

@ -41,7 +41,6 @@
#include "hasher.h"
#include "stlpoolallocator.h"
// forward reference
namespace joblist
{
@ -52,9 +51,27 @@ class ResourceManager;
namespace ordering
{
template<typename _Tp, typename _Sequence = std::vector<_Tp>,
typename _Compare = std::less<typename _Sequence::value_type> >
class reservablePQ: private std::priority_queue<_Tp, _Sequence, _Compare>
{
public:
typedef typename std::priority_queue<_Tp, _Sequence, _Compare>::size_type size_type;
reservablePQ(size_type capacity = 0) { reserve(capacity); };
void reserve(size_type capacity) { this->c.reserve(capacity); }
size_type capacity() const { return this->c.capacity(); }
using std::priority_queue<_Tp, _Sequence, _Compare>::size;
using std::priority_queue<_Tp, _Sequence, _Compare>::top;
using std::priority_queue<_Tp, _Sequence, _Compare>::pop;
using std::priority_queue<_Tp, _Sequence, _Compare>::push;
using std::priority_queue<_Tp, _Sequence, _Compare>::empty;
};
// forward reference
class IdbCompare;
class OrderByRow;
typedef reservablePQ<OrderByRow> SortingPQ;
// order by specification
struct IdbSortSpec
@ -63,6 +80,7 @@ struct IdbSortSpec
// TODO There are three ordering specs since 10.2
int fAsc; // <ordering specification> ::= ASC | DESC
int fNf; // <null ordering> ::= NULLS FIRST | NULLS LAST
std::string fLocale;
IdbSortSpec() : fIndex(-1), fAsc(1), fNf(1) {}
IdbSortSpec(int i, bool b) : fIndex(i), fAsc(b ? 1 : -1), fNf(fAsc) {}
@ -75,13 +93,72 @@ struct IdbSortSpec
class Compare
{
public:
Compare(const IdbSortSpec& spec) : fSpec(spec) {}
Compare(const IdbSortSpec& spec) : fSpec(spec)
{
// Save off the current Locale in case something goes wrong.
std::string curLocale = setlocale(LC_COLLATE, NULL);
if (spec.fLocale.length() > 0)
{
fLocale = spec.fLocale;
}
else
{
fLocale = curLocale;
}
try
{
std::locale localloc(fLocale.c_str());
loc = localloc;
}
catch(...)
{
fLocale = curLocale;
std::locale localloc(fLocale.c_str());
loc = localloc;
}
if (fLocale.find("ja_JP") != std::string::npos)
{
JPcodePoint = true;
}
else
{
JPcodePoint = false;
}
}
virtual ~Compare() {}
virtual int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer) = 0;
void revertSortSpec()
{
fSpec.fAsc = -fSpec.fAsc;
fSpec.fNf = -fSpec.fNf;
}
protected:
IdbSortSpec fSpec;
std::string fLocale;
std::locale loc;
bool JPcodePoint; // code point ordering (Japanese UTF) flag
};
// Comparators for signed types
class TinyIntCompare : public Compare
{
public:
TinyIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class SmallIntCompare : public Compare
{
public:
SmallIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
@ -94,24 +171,56 @@ public:
};
class UintCompare : public Compare
class BigIntCompare : public Compare
{
public:
UintCompare(const IdbSortSpec& spec) : Compare(spec) {}
BigIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
// End of comparators for signed types
// Comparators for unsigned types
class UTinyIntCompare : public Compare
{
public:
UTinyIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class StringCompare : public Compare
class USmallIntCompare : public Compare
{
public:
StringCompare(const IdbSortSpec& spec) : Compare(spec) {}
USmallIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class UIntCompare : public Compare
{
public:
UIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class UBigIntCompare : public Compare
{
public:
UBigIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
// end of comparators for unsigned types
// Comparators for float types
class DoubleCompare : public Compare
{
public:
@ -120,6 +229,7 @@ public:
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class LongDoubleCompare : public Compare
{
public:
@ -137,6 +247,26 @@ public:
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
// End of comparators for float types
// Comparators for temporal types
class DateCompare : public Compare
{
public:
DateCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class DatetimeCompare : public Compare
{
public:
DatetimeCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class TimeCompare : public Compare
{
@ -146,6 +276,19 @@ public:
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
// End of comparators for temporal types
// Comparators for non-fixed size types
class StringCompare : public Compare
{
public:
StringCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
// End of comparators for variable sized types
class CompareRule
{
@ -156,6 +299,7 @@ public:
bool less(rowgroup::Row::Pointer r1, rowgroup::Row::Pointer r2);
void compileRules(const std::vector<IdbSortSpec>&, const rowgroup::RowGroup&);
void revertRules();
std::vector<Compare*> fCompares;
IdbCompare* fIdbCompare;
@ -216,7 +360,6 @@ public:
bool operator()(rowgroup::Row::Pointer, rowgroup::Row::Pointer);
//protected:
std::vector<uint64_t> fIndex;
};
@ -263,10 +406,18 @@ public:
{
return fDistinct;
}
SortingPQ& getQueue()
{
return fOrderByQueue;
}
CompareRule &getRule()
{
return fRule;
}
SortingPQ fOrderByQueue;
protected:
std::vector<IdbSortSpec> fOrderByCond;
std::priority_queue<OrderByRow> fOrderByQueue;
rowgroup::Row fRow0;
CompareRule fRule;