1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-894 Upmerged the fist part of the patch into develop.

MCOL-894 Add default values in Compare and CSEP ctors to activate UTF-8 sorting
    properly.

MCOL-894 Unit tests to build a framework for a new parallel sorting.

MCOL-894 Finished with parallel workers invocation.
     The implementation lacks final aggregation step.

MCOL-894 TupleAnnexStep's init and destructor are now parallel execution aware.

    Implemented final merging step for parallel execution finalizeParallelOrderBy().

    Templated unit test to use it with arbitrary number of rows, threads.

Reuse LimitedOrderBy in the final step

MCOL-894 Cleaned up finalizeParallelOrderBy.

MCOL-894 Add and propagate thread variable that controls a number of threads.

    Optimized comparators used for sorting and add corresponding UTs.

    Refactored TupleAnnexStep::finalizeParallelOrderByDistinct.

    Parallel sorting methods now preallocates memory in batches.

MCOL-894 Fixed comparator for StringCompare.
This commit is contained in:
Roman Nozdrin
2019-05-27 16:27:47 +03:00
parent 0d2b0e0070
commit 7b5e5f0eb6
22 changed files with 2132 additions and 332 deletions

View File

@ -30,3 +30,8 @@ set_target_properties(windowfunction PROPERTIES VERSION 1.0.0 SOVERSION 1)
install(TARGETS windowfunction DESTINATION ${ENGINE_LIBDIR} COMPONENT libs)
if (WITH_SORTING_COMPARATORS_UT)
add_executable(comparators_tests comparators-tests.cpp)
target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit)
install(TARGETS comparators_tests DESTINATION ${ENGINE_BINDIR} COMPONENT platform)
endif()

View File

@ -0,0 +1,425 @@
/* Copyright (C) 2019 MariaDB Corporaton.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: tdriver-filter.cpp 9210 2013-01-21 14:10:42Z rdempsey $
#include <list>
#include <sstream>
#include <pthread.h>
#include <iomanip>
#include <cppunit/extensions/HelperMacros.h>
#include <cppunit/extensions/TestFactoryRegistry.h>
#include <cppunit/ui/text/TestRunner.h>
#include <time.h>
#include <sys/time.h>
#include <limits.h>
#include "jobstep.h"
#include "funcexp.h"
#include "jlf_common.h"
#include "tupleannexstep.h"
#include "calpontsystemcatalog.h"
#include <boost/any.hpp>
#include <boost/function.hpp>
#include "bytestream.h"
#include "idborderby.h"
#define DEBUG
#define MEMORY_LIMIT 14983602176
using namespace std;
using namespace joblist;
using namespace messageqcpp;
// Timer class used by this tdriver to output elapsed times, etc.
class Timer
{
public:
void start(const string& message)
{
if (!fHeaderDisplayed)
{
header();
fHeaderDisplayed = true;
}
gettimeofday(&fTvStart, 0);
cout << timestr() << " Start " << message << endl;
}
void stop(const string& message)
{
time_t now;
time(&now);
string secondsElapsed;
getTimeElapsed(secondsElapsed);
cout << timestr() << " " << secondsElapsed << " Stop " << message << endl;
}
Timer() : fHeaderDisplayed(false) {}
private:
struct timeval fTvStart;
bool fHeaderDisplayed;
double getTimeElapsed(string& seconds)
{
struct timeval tvStop;
gettimeofday(&tvStop, 0);
double secondsElapsed =
(tvStop.tv_sec + (tvStop.tv_usec / 1000000.0)) -
(fTvStart.tv_sec + (fTvStart.tv_usec / 1000000.0));
ostringstream oss;
oss << secondsElapsed;
seconds = oss.str();
seconds.resize(8, '0');
return secondsElapsed;
}
string timestr()
{
struct tm tm;
struct timeval tv;
gettimeofday(&tv, 0);
localtime_r(&tv.tv_sec, &tm);
ostringstream oss;
oss << setfill('0')
<< setw(2) << tm.tm_hour << ':'
<< setw(2) << tm.tm_min << ':'
<< setw(2) << tm.tm_sec << '.'
<< setw(6) << tv.tv_usec
;
return oss.str();
}
void header()
{
cout << endl;
cout << "Time Seconds Activity" << endl;
}
};
class FilterDriver : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(FilterDriver);
CPPUNIT_TEST(INT_TEST);
CPPUNIT_TEST(FLOAT_TEST);
CPPUNIT_TEST_SUITE_END();
private:
// The tests creates an RG with 1 column of the cscDt type
// then initialize RGData. After that it adds two numeric values (v1 < v2)and two NULL.
// Then creates comparator structores and run a number of tests. v1 < v2
void testComparatorWithDT(execplan::CalpontSystemCatalog::ColDataType cscDt,
uint32_t width,
bool generateRandValues)
{
std::cout << std::endl << "------------------------------------------------------------" << std::endl;
uint32_t oid =3001;
std::vector<uint32_t> offsets, roids, tkeys, cscale, cprecision;
std::vector<execplan::CalpontSystemCatalog::ColDataType> types;
offsets.push_back(2); offsets.push_back(2+width);
roids.push_back(oid);
tkeys.push_back(1);
types.push_back(cscDt);
cscale.push_back(0);
cprecision.push_back(20);
rowgroup::RowGroup inRG(1, //column count
offsets, //oldOffset
roids, // column oids
tkeys, //keys
types, // types
cscale, //scale
cprecision, // precision
20, // sTableThreshold
false //useStringTable
);
rowgroup::RGData rgD = rowgroup::RGData(inRG);
inRG.setData(&rgD);
rowgroup::Row r, r1, r2, r3;
inRG.initRow(&r);
uint32_t rowSize = r.getSize();
inRG.getRow(0, &r);
// Sorting spec describes sorting direction and NULL comparision
// preferences
ordering::IdbSortSpec spec = ordering::IdbSortSpec(0, // column index
true, // ascending
true); // NULLs first
std::vector<ordering::IdbSortSpec> specVect;
specVect.push_back(spec);
switch(cscDt)
{
case execplan::CalpontSystemCatalog::UTINYINT:
{
std::cout << "UTINYINT " << std::endl;
r.setUintField<1>(42, 0);
r.nextRow(rowSize);
r.setUintField<1>(43, 0);
r.nextRow(rowSize);
r.setUintField<1>(joblist::UTINYINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::USMALLINT:
{
std::cout << "USMALLINT " << std::endl;
r.setUintField<2>(42, 0);
r.nextRow(rowSize);
r.setUintField<2>(43, 0);
r.nextRow(rowSize);
r.setUintField<2>(joblist::USMALLINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
{
std::cout << "UINT " << std::endl;
r.setUintField<4>(42, 0);
r.nextRow(rowSize);
r.setUintField<4>(43, 0);
r.nextRow(rowSize);
r.setUintField<4>(joblist::UINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::DATETIME:
case execplan::CalpontSystemCatalog::UBIGINT:
{
std::cout << "UBIGINT " << std::endl;
r.setUintField<8>(42, 0);
r.nextRow(rowSize);
r.setUintField<8>(43, 0);
r.nextRow(rowSize);
r.setUintField<8>(joblist::UBIGINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::TINYINT:
{
std::cout << "TINYINT " << std::endl;
r.setIntField<1>(42, 0);
r.nextRow(rowSize);
r.setIntField<1>(43, 0);
r.nextRow(rowSize);
r.setIntField<1>(joblist::TINYINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::SMALLINT:
{
std::cout << "SMALLINT " << std::endl;
r.setIntField<2>(42, 0);
r.nextRow(rowSize);
r.setIntField<2>(43, 0);
r.nextRow(rowSize);
r.setIntField<2>(joblist::SMALLINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::DATE:
{
if (cscDt == execplan::CalpontSystemCatalog::DATE)
std::cout << "DATE" << std::endl;
else
std::cout << "INT " << std::endl;
r.setIntField<4>(42, 0);
r.nextRow(rowSize);
r.setIntField<4>(43, 0);
r.nextRow(rowSize);
if (cscDt == execplan::CalpontSystemCatalog::DATE)
r.setIntField<4>(joblist::DATENULL, 0);
else
r.setIntField<4>(joblist::INTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::BIGINT:
{
std::cout << "BIGINT " << std::endl;
r.setIntField<8>(42, 0);
r.nextRow(rowSize);
r.setIntField<8>(43, 0);
r.nextRow(rowSize);
r.setIntField<8>(joblist::BIGINTNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
{
std::cout << "DECIMAL " << std::endl;
switch (width)
{
case 1 :
{
r.setUintField<1>(42, 0);
r.nextRow(rowSize);
r.setUintField<1>(43, 0);
r.nextRow(rowSize);
r.setUintField<1>(joblist::TINYINTNULL, 0);
break;
}
case 2 :
{
r.setUintField<2>(42, 0);
r.nextRow(rowSize);
r.setUintField<2>(43, 0);
r.nextRow(rowSize);
r.setUintField<2>(joblist::SMALLINTNULL, 0);
break;
}
case 4 :
{
r.setUintField<4>(42, 0);
r.nextRow(rowSize);
r.setUintField<4>(43, 0);
r.nextRow(rowSize);
r.setUintField<4>(joblist::INTNULL, 0);
break;
}
default:
{
r.setUintField<8>(42, 0);
r.nextRow(rowSize);
r.setUintField<8>(43, 0);
r.nextRow(rowSize);
r.setUintField<8>(joblist::BIGINTNULL, 0);
break;
}
}
break;
}
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
{
std::cout << "FLOAT " << std::endl;
r.setFloatField(42.1, 0);
r.nextRow(rowSize);
r.setFloatField(43.1, 0);
r.nextRow(rowSize);
r.setUintField(joblist::FLOATNULL, 0);
break;
}
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
{
std::cout << "DOUBLE " << std::endl;
r.setDoubleField(42.1, 0);
r.nextRow(rowSize);
r.setDoubleField(43.1, 0);
r.nextRow(rowSize);
r.setUintField(joblist::DOUBLENULL, 0);
break;
}
case execplan::CalpontSystemCatalog::LONGDOUBLE:
{
r.setLongDoubleField(42.1, 0);
r.nextRow(rowSize);
r.setLongDoubleField(43.1, 0);
r.nextRow(rowSize);
r.setLongDoubleField(joblist::LONGDOUBLENULL, 0);
break;
}
default:
{
break;
}
}
inRG.setRowCount(3);
inRG.initRow(&r1);
inRG.initRow(&r2);
inRG.initRow(&r3);
inRG.getRow(0, &r1);
inRG.getRow(1, &r2);
inRG.getRow(2, &r3);
std::cout<< "r1 " << r1.toString() << " r2 " << r2.toString()
<< " r3 " << r3.toString() << std::endl;
ordering::IdbCompare idbCompare;
idbCompare.initialize(inRG);
ordering::OrderByData odbData = ordering::OrderByData(specVect, inRG);
bool result = odbData(r1.getPointer(), r2.getPointer());
std::cout << r1.toString() << " < " << r2.toString() << " is "
<< ((result) ? "true" : "false") << std::endl;
CPPUNIT_ASSERT(result == true);
result = odbData(r2.getPointer(), r1.getPointer());
std::cout << r2.toString() << " < " << r1.toString() << " is "
<< ((result) ? "true" : "false") << std::endl;
CPPUNIT_ASSERT(result == false);
result = odbData(r2.getPointer(), r2.getPointer());
std::cout << r2.toString() << " < " << r2.toString() << " is "
<< ((result) ? "true" : "false") << std::endl;
CPPUNIT_ASSERT(result == false);
// Compare value with NULL. if spec.fNf then NULLs goes first
result = odbData(r3.getPointer(), r1.getPointer());
std::cout << r3.toString() << " < " << r1.toString() << " is "
<< ((result) ? "true" : "false") << std::endl;
CPPUNIT_ASSERT(result == true);
// Compare NULL with NULL
result = odbData(r3.getPointer(), r1.getPointer());
std::cout << r3.toString() << " < " << r3.toString() << " is "
<< ((result) ? "true" : "false") << std::endl;
CPPUNIT_ASSERT(result == true);
}
void INT_TEST()
{
//bool generateValues = true;
bool fixedValues = false;
testComparatorWithDT(execplan::CalpontSystemCatalog::UTINYINT, 1, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::USMALLINT, 2, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::UMEDINT, 4, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::UBIGINT, 8, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::DATETIME, 8, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::TINYINT, 1, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::SMALLINT, 2, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::MEDINT, 4, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::DATE, 4, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::BIGINT, 8, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::DECIMAL, 8, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::FLOAT, 4, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::DOUBLE, 8, fixedValues);
testComparatorWithDT(execplan::CalpontSystemCatalog::LONGDOUBLE, 8, fixedValues);
}
void FLOAT_TEST()
{
}
};
CPPUNIT_TEST_SUITE_REGISTRATION(FilterDriver);
int main( int argc, char** argv)
{
CppUnit::TextUi::TestRunner runner;
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
runner.addTest( registry.makeTest() );
bool wasSuccessful = runner.run( "", false );
return (wasSuccessful ? 0 : 1);
}

View File

@ -43,32 +43,85 @@ using namespace rowgroup;
#include "idborderby.h"
#include "joblisttypes.h"
namespace ordering
{
int TinyIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
int8_t v1 = l->row1().getIntField(fSpec.fIndex);
int8_t v2 = l->row2().getIntField(fSpec.fIndex);
int8_t nullValue = static_cast<int8_t>(joblist::TINYINTNULL);
if (v1 == nullValue || v2 == nullValue)
{
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int SmallIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
int16_t v1 = l->row1().getIntField(fSpec.fIndex);
int16_t v2 = l->row2().getIntField(fSpec.fIndex);
int16_t nullValue = static_cast<int16_t>(joblist::SMALLINTNULL);
if (v1 == nullValue || v2 == nullValue)
{
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int IntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
bool b1 = l->row1().isNullValue(fSpec.fIndex);
bool b2 = l->row2().isNullValue(fSpec.fIndex);
int ret = 0;
int32_t v1 = l->row1().getIntField(fSpec.fIndex);
int32_t v2 = l->row2().getIntField(fSpec.fIndex);
int32_t nullValue = static_cast<int32_t>(joblist::INTNULL);
if (b1 == true || b2 == true)
if (v1 == nullValue || v2 == nullValue)
{
if (b1 == false && b2 == true)
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (b1 == true && b2 == false)
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
int64_t v1 = l->row1().getIntField(fSpec.fIndex);
int64_t v2 = l->row2().getIntField(fSpec.fIndex);
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
@ -78,29 +131,25 @@ int IntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
return ret;
}
int UintCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
int BigIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
bool b1 = l->row1().isNullValue(fSpec.fIndex);
bool b2 = l->row2().isNullValue(fSpec.fIndex);
int ret = 0;
int64_t v1 = l->row1().getIntField(fSpec.fIndex);
int64_t v2 = l->row2().getIntField(fSpec.fIndex);
int64_t nullValue = static_cast<int64_t>(joblist::BIGINTNULL);
if (b1 == true || b2 == true)
if (v1 == nullValue || v2 == nullValue)
{
if (b1 == false && b2 == true)
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (b1 == true && b2 == false)
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
uint64_t v1 = l->row1().getUintField(fSpec.fIndex);
uint64_t v2 = l->row2().getUintField(fSpec.fIndex);
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
@ -110,6 +159,116 @@ int UintCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
return ret;
}
int UTinyIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint8_t v1 = l->row1().getUintField(fSpec.fIndex);
uint8_t v2 = l->row2().getUintField(fSpec.fIndex);
uint8_t nullValue = static_cast<uint8_t>(joblist::UTINYINTNULL);
if (v1 == nullValue || v2 == nullValue)
{
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int USmallIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint16_t v1 = l->row1().getUintField(fSpec.fIndex);
uint16_t v2 = l->row2().getUintField(fSpec.fIndex);
uint16_t nullValue = static_cast<uint16_t>(joblist::USMALLINTNULL);
if (v1 == nullValue || v2 == nullValue)
{
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int UIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint32_t v1 = l->row1().getUintField(fSpec.fIndex);
uint32_t v2 = l->row2().getUintField(fSpec.fIndex);
uint32_t nullValue = static_cast<uint32_t>(joblist::UINTNULL);
if (v1 == nullValue || v2 == nullValue)
{
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int UBigIntCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint64_t v1 = l->row1().getUintField(fSpec.fIndex);
uint64_t v2 = l->row2().getUintField(fSpec.fIndex);
if (v1 == joblist::UBIGINTNULL || v2 == joblist::UBIGINTNULL)
{
if (v1 != joblist::UBIGINTNULL && v2 == joblist::UBIGINTNULL)
ret = fSpec.fNf;
else if (v1 == joblist::UBIGINTNULL && v2 != joblist::UBIGINTNULL)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int StringCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
@ -149,29 +308,26 @@ int StringCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
return ret;
}
int DoubleCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
bool b1 = l->row1().isNullValue(fSpec.fIndex);
bool b2 = l->row2().isNullValue(fSpec.fIndex);
uint64_t uiv1 = l->row1().getUintField(fSpec.fIndex);
uint64_t uiv2 = l->row2().getUintField(fSpec.fIndex);
int ret = 0;
if (b1 == true || b2 == true)
if (uiv1 == joblist::DOUBLENULL || uiv2 == joblist::DOUBLENULL)
{
if (b1 == false && b2 == true)
if (uiv1 != joblist::DOUBLENULL && uiv2 == joblist::DOUBLENULL)
ret = fSpec.fNf;
else if (b1 == true && b2 == false)
else if (uiv1 == joblist::DOUBLENULL && uiv2 != joblist::DOUBLENULL)
ret = -fSpec.fNf;
}
else
{
double v1 = l->row1().getDoubleField(fSpec.fIndex);
double v2 = l->row2().getDoubleField(fSpec.fIndex);
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
@ -186,23 +342,22 @@ int FloatCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
l->row1().setData(r1);
l->row2().setData(r2);
bool b1 = l->row1().isNullValue(fSpec.fIndex);
bool b2 = l->row2().isNullValue(fSpec.fIndex);
int32_t iv1 = l->row1().getIntField(fSpec.fIndex);
int32_t iv2 = l->row2().getIntField(fSpec.fIndex);
int32_t nullValue = static_cast<int32_t>(joblist::FLOATNULL);
int ret = 0;
if (b1 == true || b2 == true)
if (iv1 == nullValue || iv2 == nullValue)
{
if (b1 == false && b2 == true)
if (iv1 != nullValue && iv2 == nullValue)
ret = fSpec.fNf;
else if (b1 == true && b2 == false)
else if (iv1 == nullValue && iv2 != nullValue)
ret = -fSpec.fNf;
}
else
{
float v1 = l->row1().getFloatField(fSpec.fIndex);
float v2 = l->row2().getFloatField(fSpec.fIndex);
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
@ -217,23 +372,20 @@ int LongDoubleCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r
l->row1().setData(r1);
l->row2().setData(r2);
bool b1 = l->row1().isNullValue(fSpec.fIndex);
bool b2 = l->row2().isNullValue(fSpec.fIndex);
long double v1 = l->row1().getLongDoubleField(fSpec.fIndex);
long double v2 = l->row2().getLongDoubleField(fSpec.fIndex);
int ret = 0;
if (b1 == true || b2 == true)
if (v1 == joblist::LONGDOUBLENULL || v2 == joblist::LONGDOUBLENULL)
{
if (b1 == false && b2 == true)
if (v1 != joblist::LONGDOUBLENULL && v2 == joblist::LONGDOUBLENULL)
ret = fSpec.fNf;
else if (b1 == true && b2 == false)
else if (v1 == joblist::LONGDOUBLENULL && v2 != joblist::LONGDOUBLENULL)
ret = -fSpec.fNf;
}
else
{
long double v1 = l->row1().getLongDoubleField(fSpec.fIndex);
long double v2 = l->row2().getLongDoubleField(fSpec.fIndex);
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
@ -243,6 +395,87 @@ int LongDoubleCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r
return ret;
}
int DateCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint32_t v1 = l->row1().getUintField(fSpec.fIndex);
uint32_t v2 = l->row2().getUintField(fSpec.fIndex);
uint32_t nullValue = static_cast<uint32_t>(joblist::DATENULL);
if (v1 == nullValue || v2 == nullValue)
{
if (v1 != nullValue && v2 == nullValue)
ret = fSpec.fNf;
else if (v1 == nullValue && v2 != nullValue)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int DatetimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint64_t v1 = l->row1().getUintField(fSpec.fIndex);
uint64_t v2 = l->row2().getUintField(fSpec.fIndex);
if (v1 == joblist::DATETIMENULL || v2 == joblist::DATETIMENULL)
{
if (v1 != joblist::DATETIMENULL && v2 == joblist::DATETIMENULL)
ret = fSpec.fNf;
else if (v1 == joblist::DATETIMENULL && v2 != joblist::DATETIMENULL)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
l->row1().setData(r1);
l->row2().setData(r2);
int ret = 0;
uint64_t v1 = l->row1().getUintField(fSpec.fIndex);
uint64_t v2 = l->row2().getUintField(fSpec.fIndex);
if (v1 == joblist::TIMENULL || v2 == joblist::TIMENULL)
{
if (v1 != joblist::TIMENULL && v2 == joblist::TIMENULL)
ret = fSpec.fNf;
else if (v1 == joblist::TIMENULL && v2 != joblist::TIMENULL)
ret = -fSpec.fNf;
}
else
{
if (v1 > v2)
ret = fSpec.fAsc;
else if (v1 < v2)
ret = -fSpec.fAsc;
}
return ret;
}
int TimeCompare::operator()(IdbCompare* l, Row::Pointer r1, Row::Pointer r2)
{
@ -308,6 +541,16 @@ bool CompareRule::less(Row::Pointer r1, Row::Pointer r2)
return false;
}
void CompareRule::revertRules()
{
std::vector<Compare*>::iterator fCompareIter = fCompares.begin();
for(; fCompareIter!=fCompares.end(); fCompareIter++)
{
(*fCompareIter)->revertSortSpec();
}
}
void CompareRule::compileRules(const std::vector<IdbSortSpec>& spec, const rowgroup::RowGroup& rg)
{
@ -318,31 +561,80 @@ void CompareRule::compileRules(const std::vector<IdbSortSpec>& spec, const rowgr
switch (types[i->fIndex])
{
case CalpontSystemCatalog::TINYINT:
{
Compare* c = new TinyIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::SMALLINT:
{
Compare* c = new SmallIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
Compare* c = new IntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::BIGINT:
{
Compare* c = new BigIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
Compare* c;
uint32_t len = rg.getColumnWidth(i->fIndex);
switch (len)
{
case 1 :
c = new TinyIntCompare(*i); break;
case 2 :
c = new SmallIntCompare(*i); break;
case 4 :
c = new IntCompare(*i); break;
default:
c = new BigIntCompare(*i);
}
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::UTINYINT:
{
Compare* c = new UTinyIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::USMALLINT:
{
Compare* c = new USmallIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
{
Compare* c = new UIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::UBIGINT:
{
Compare* c = new UintCompare(*i);
Compare* c = new UBigIntCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::TEXT:
{
Compare* c = new StringCompare(*i);
fCompares.push_back(c);
@ -373,10 +665,21 @@ void CompareRule::compileRules(const std::vector<IdbSortSpec>& spec, const rowgr
}
case CalpontSystemCatalog::DATE:
{
Compare* c = new DateCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::DATETIME:
case CalpontSystemCatalog::TIMESTAMP:
{
Compare* c = new UintCompare(*i);
Compare* c = new DatetimeCompare(*i);
fCompares.push_back(c);
break;
}
case CalpontSystemCatalog::TIME:
{
Compare* c = new TimeCompare(*i);
fCompares.push_back(c);
break;
}
@ -424,9 +727,10 @@ OrderByData::OrderByData(const std::vector<IdbSortSpec>& spec, const rowgroup::R
// IdbOrderBy class implementation
IdbOrderBy::IdbOrderBy() :
fDistinct(false), fMemSize(0), fRowsPerRG(8192), fErrorCode(0), fRm(NULL)
{
}
fDistinct(false), fMemSize(0), fRowsPerRG(rowgroup::rgCommonSize),
fErrorCode(0),
fRm(NULL)
{ }
IdbOrderBy::~IdbOrderBy()

View File

@ -41,7 +41,6 @@
#include "hasher.h"
#include "stlpoolallocator.h"
// forward reference
namespace joblist
{
@ -52,9 +51,27 @@ class ResourceManager;
namespace ordering
{
template<typename _Tp, typename _Sequence = vector<_Tp>,
typename _Compare = less<typename _Sequence::value_type> >
class reservablePQ: private std::priority_queue<_Tp, _Sequence, _Compare>
{
public:
typedef typename std::priority_queue<_Tp, _Sequence, _Compare>::size_type size_type;
reservablePQ(size_type capacity = 0) { reserve(capacity); };
void reserve(size_type capacity) { this->c.reserve(capacity); }
size_type capacity() const { return this->c.capacity(); }
using std::priority_queue<_Tp, _Sequence, _Compare>::size;
using std::priority_queue<_Tp, _Sequence, _Compare>::top;
using std::priority_queue<_Tp, _Sequence, _Compare>::pop;
using std::priority_queue<_Tp, _Sequence, _Compare>::push;
using std::priority_queue<_Tp, _Sequence, _Compare>::empty;
};
// forward reference
class IdbCompare;
class OrderByRow;
typedef reservablePQ<OrderByRow> SortingPQ;
// order by specification
struct IdbSortSpec
@ -101,11 +118,21 @@ public:
loc = localloc;
}
if (fLocale.find("ja_JP") != std::string::npos)
{
JPcodePoint = true;
}
else
{
JPcodePoint = false;
}
}
virtual ~Compare() {}
virtual int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer) = 0;
void revertSortSpec()
{
fSpec.fAsc = -fSpec.fAsc;
}
protected:
IdbSortSpec fSpec;
@ -114,6 +141,25 @@ protected:
bool JPcodePoint; // code point ordering (Japanese UTF) flag
};
// Comparators for signed types
class TinyIntCompare : public Compare
{
public:
TinyIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class SmallIntCompare : public Compare
{
public:
SmallIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class IntCompare : public Compare
{
@ -124,24 +170,56 @@ public:
};
class UintCompare : public Compare
class BigIntCompare : public Compare
{
public:
UintCompare(const IdbSortSpec& spec) : Compare(spec) {}
BigIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
// End of comparators for signed types
// Comparators for unsigned types
class UTinyIntCompare : public Compare
{
public:
UTinyIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class StringCompare : public Compare
class USmallIntCompare : public Compare
{
public:
StringCompare(const IdbSortSpec& spec) : Compare(spec) {}
USmallIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class UIntCompare : public Compare
{
public:
UIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class UBigIntCompare : public Compare
{
public:
UBigIntCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
// end of comparators for unsigned types
// Comparators for float types
class DoubleCompare : public Compare
{
public:
@ -150,6 +228,7 @@ public:
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class LongDoubleCompare : public Compare
{
public:
@ -167,6 +246,48 @@ public:
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
// End of comparators for float types
// Comparators for temporal types
class DateCompare : public Compare
{
public:
DateCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class DatetimeCompare : public Compare
{
public:
DatetimeCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
class TimeCompare : public Compare
{
public:
TimeCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
// End of comparators for temporal types
// Comparators for non-fixed size types
class StringCompare : public Compare
{
public:
StringCompare(const IdbSortSpec& spec) : Compare(spec) {}
int operator()(IdbCompare*, rowgroup::Row::Pointer, rowgroup::Row::Pointer);
};
// End of comparators for variable sized types
class TimeCompare : public Compare
{
@ -186,6 +307,7 @@ public:
bool less(rowgroup::Row::Pointer r1, rowgroup::Row::Pointer r2);
void compileRules(const std::vector<IdbSortSpec>&, const rowgroup::RowGroup&);
void revertRules();
std::vector<Compare*> fCompares;
IdbCompare* fIdbCompare;
@ -227,7 +349,7 @@ public:
return fRule->less(fData, rhs.fData);
}
rowgroup::Row::Pointer fData;
rowgroup::Row::Pointer fData;
CompareRule* fRule;
};
@ -246,7 +368,6 @@ public:
bool operator()(rowgroup::Row::Pointer, rowgroup::Row::Pointer);
//protected:
std::vector<uint64_t> fIndex;
};
@ -293,10 +414,18 @@ public:
{
return fDistinct;
}
SortingPQ& getQueue()
{
return fOrderByQueue;
}
CompareRule &getRule()
{
return fRule;
}
SortingPQ fOrderByQueue;
protected:
std::vector<IdbSortSpec> fOrderByCond;
std::priority_queue<OrderByRow> fOrderByQueue;
rowgroup::Row fRow0;
CompareRule fRule;