You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
389 lines
12 KiB
C++
389 lines
12 KiB
C++
/* Copyright (C) 2019 MariaDB Corporaton.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: tdriver-filter.cpp 9210 2013-01-21 14:10:42Z rdempsey $
|
|
|
|
#include <list>
|
|
#include <sstream>
|
|
#include <pthread.h>
|
|
#include <iomanip>
|
|
#include <cppunit/extensions/HelperMacros.h>
|
|
#include <cppunit/extensions/TestFactoryRegistry.h>
|
|
#include <cppunit/ui/text/TestRunner.h>
|
|
|
|
#include "jobstep.h"
|
|
#include "funcexp.h"
|
|
#include "jlf_common.h"
|
|
#include "tupleannexstep.h"
|
|
#include "calpontsystemcatalog.h"
|
|
#include "resourcemanager.h"
|
|
#include <boost/any.hpp>
|
|
#include <boost/function.hpp>
|
|
#include "bytestream.h"
|
|
#include <time.h>
|
|
#include <sys/time.h>
|
|
#include <limits.h>
|
|
|
|
#define DEBUG
|
|
#define MEMORY_LIMIT 14983602176
|
|
|
|
using namespace std;
|
|
using namespace joblist;
|
|
using namespace messageqcpp;
|
|
|
|
// Timer class used by this tdriver to output elapsed times, etc.
|
|
class Timer
|
|
{
|
|
public:
|
|
void start(const string& message)
|
|
{
|
|
if (!fHeaderDisplayed)
|
|
{
|
|
header();
|
|
fHeaderDisplayed = true;
|
|
}
|
|
|
|
gettimeofday(&fTvStart, 0);
|
|
cout << timestr() << " Start " << message << endl;
|
|
}
|
|
|
|
void stop(const string& message)
|
|
{
|
|
time_t now;
|
|
time(&now);
|
|
string secondsElapsed;
|
|
getTimeElapsed(secondsElapsed);
|
|
cout << timestr() << " " << secondsElapsed << " Stop " << message << endl;
|
|
}
|
|
|
|
Timer() : fHeaderDisplayed(false)
|
|
{
|
|
}
|
|
|
|
private:
|
|
struct timeval fTvStart;
|
|
bool fHeaderDisplayed;
|
|
|
|
double getTimeElapsed(string& seconds)
|
|
{
|
|
struct timeval tvStop;
|
|
gettimeofday(&tvStop, 0);
|
|
double secondsElapsed =
|
|
(tvStop.tv_sec + (tvStop.tv_usec / 1000000.0)) - (fTvStart.tv_sec + (fTvStart.tv_usec / 1000000.0));
|
|
ostringstream oss;
|
|
oss << secondsElapsed;
|
|
seconds = oss.str();
|
|
seconds.resize(8, '0');
|
|
return secondsElapsed;
|
|
}
|
|
|
|
string timestr()
|
|
{
|
|
struct tm tm;
|
|
struct timeval tv;
|
|
|
|
gettimeofday(&tv, 0);
|
|
localtime_r(&tv.tv_sec, &tm);
|
|
|
|
ostringstream oss;
|
|
oss << setfill('0') << setw(2) << tm.tm_hour << ':' << setw(2) << tm.tm_min << ':' << setw(2) << tm.tm_sec
|
|
<< '.' << setw(6) << tv.tv_usec;
|
|
return oss.str();
|
|
}
|
|
|
|
void header()
|
|
{
|
|
cout << endl;
|
|
cout << "Time Seconds Activity" << endl;
|
|
}
|
|
};
|
|
|
|
class FilterDriver : public CppUnit::TestFixture
|
|
{
|
|
CPPUNIT_TEST_SUITE(FilterDriver);
|
|
|
|
CPPUNIT_TEST(ORDERBY_TIME_TEST);
|
|
|
|
CPPUNIT_TEST_SUITE_END();
|
|
|
|
private:
|
|
void orderByTest_nRGs(uint64_t numRows, uint64_t limit, uint64_t maxThreads, bool parallelExecution,
|
|
bool generateRandValues, bool hasDistinct)
|
|
{
|
|
Timer timer;
|
|
// This test creates TAS for single bigint column and run sorting on it.
|
|
|
|
cout << endl;
|
|
cout << "------------------------------------------------------------" << endl;
|
|
timer.start("insert");
|
|
|
|
stringstream ss2;
|
|
ss2.flush();
|
|
ss2 << "loading " << numRows << " rows into initial RowGroup.";
|
|
std::string message = ss2.str();
|
|
timer.start(message);
|
|
|
|
ResourceManager* rm = ResourceManager::instance(true);
|
|
|
|
joblist::JobInfo jobInfo = joblist::JobInfo(rm);
|
|
// 1st column is the sorting key
|
|
uint8_t tupleKey = 1;
|
|
uint8_t offset = 0;
|
|
uint16_t rowsPerRG = 8192; // hardcoded max rows in RowGroup value
|
|
uint32_t numberOfRGs = numRows / rowsPerRG;
|
|
|
|
// set sorting rules
|
|
// true - ascending order, false otherwise
|
|
jobInfo.orderByColVec.push_back(make_pair(tupleKey, false));
|
|
jobInfo.limitStart = offset;
|
|
jobInfo.limitCount = limit;
|
|
jobInfo.hasDistinct = hasDistinct;
|
|
// JobInfo doesn't set these SP in ctor
|
|
jobInfo.umMemLimit.reset(new int64_t);
|
|
*(jobInfo.umMemLimit) = MEMORY_LIMIT;
|
|
SErrorInfo errorInfo(new ErrorInfo());
|
|
jobInfo.errorInfo = errorInfo;
|
|
uint32_t oid = 3001;
|
|
// populate JobInfo.nonConstDelCols with dummy shared_pointers
|
|
// to notify TupleAnnexStep::initialise() about number of non-constant columns
|
|
execplan::SRCP srcp1, srcp2;
|
|
jobInfo.nonConstDelCols.push_back(srcp1);
|
|
jobInfo.nonConstDelCols.push_back(srcp2);
|
|
|
|
// create two columns RG. 1st is the sorting key, second is the data column
|
|
std::vector<uint32_t> offsets, roids, tkeys, cscale, cprecision;
|
|
std::vector<uint32_t> charSetNumVec;
|
|
std::vector<execplan::CalpontSystemCatalog::ColDataType> types;
|
|
offsets.push_back(2);
|
|
offsets.push_back(10);
|
|
offsets.push_back(18);
|
|
roids.push_back(oid);
|
|
roids.push_back(oid);
|
|
tkeys.push_back(1);
|
|
tkeys.push_back(1);
|
|
types.push_back(execplan::CalpontSystemCatalog::UBIGINT);
|
|
types.push_back(execplan::CalpontSystemCatalog::UBIGINT);
|
|
cscale.push_back(0);
|
|
cscale.push_back(0);
|
|
cprecision.push_back(20);
|
|
cprecision.push_back(20);
|
|
charSetNumVec.push_back(8);
|
|
charSetNumVec.push_back(8);
|
|
rowgroup::RowGroup inRG(2, // column count
|
|
offsets, // oldOffset
|
|
roids, // column oids
|
|
tkeys, // keys
|
|
types, // types
|
|
charSetNumVec, // charset numbers
|
|
cscale, // scale
|
|
cprecision, // precision
|
|
20, // sTableThreshold
|
|
false // useStringTable
|
|
);
|
|
rowgroup::RowGroup jobInfoRG(inRG);
|
|
joblist::TupleAnnexStep tns = joblist::TupleAnnexStep(jobInfo);
|
|
tns.addOrderBy(new joblist::LimitedOrderBy());
|
|
tns.delivery(true);
|
|
// activate parallel sort
|
|
if (parallelExecution)
|
|
{
|
|
tns.setParallelOp();
|
|
}
|
|
tns.setMaxThreads(maxThreads);
|
|
tns.initialize(jobInfoRG, jobInfo);
|
|
tns.setLimit(0, limit);
|
|
|
|
// Create JobStepAssociation mediator class ins to connect DL and JS
|
|
joblist::AnyDataListSPtr spdlIn(new AnyDataList());
|
|
// joblist::RowGroupDL* dlIn = new RowGroupDL(maxThreads, jobInfo.fifoSize);
|
|
joblist::RowGroupDL* dlIn = new RowGroupDL(maxThreads, 21001);
|
|
dlIn->OID(oid);
|
|
spdlIn->rowGroupDL(dlIn);
|
|
joblist::JobStepAssociation jsaIn;
|
|
jsaIn.outAdd(spdlIn);
|
|
tns.inputAssociation(jsaIn);
|
|
|
|
uint64_t base = 42;
|
|
uint64_t maxInt = 0;
|
|
if (generateRandValues)
|
|
::srand(base);
|
|
uint64_t nextValue;
|
|
for (uint32_t i = 1; i <= numberOfRGs; i++)
|
|
{
|
|
// create RGData with the RG structure and manually populate RG
|
|
// Use reinint(numberOfRGs) to preset an array
|
|
rowgroup::RGData rgD = rowgroup::RGData(inRG);
|
|
inRG.setData(&rgD);
|
|
rowgroup::Row r;
|
|
inRG.initRow(&r);
|
|
uint32_t rowSize = r.getSize();
|
|
inRG.getRow(0, &r);
|
|
// populate RGData
|
|
// for(uint64_t i = rowsPerRG+1; i > 0; i--)
|
|
for (uint64_t i = 0; i < rowsPerRG; i++) // Worst case scenario for PQ
|
|
{
|
|
// TBD Use set..._offset methods to avoid offset calculation instructions
|
|
if (generateRandValues)
|
|
{
|
|
nextValue = ::rand();
|
|
}
|
|
else
|
|
{
|
|
nextValue = base + i;
|
|
}
|
|
|
|
r.setUintField<8>(nextValue, 0);
|
|
r.setUintField<8>(nextValue, 1);
|
|
if (maxInt < nextValue)
|
|
{
|
|
maxInt = nextValue;
|
|
}
|
|
r.nextRow(rowSize);
|
|
}
|
|
base += 1;
|
|
inRG.setRowCount(rowsPerRG);
|
|
|
|
// Insert RGData into input DL
|
|
dlIn->insert(rgD);
|
|
}
|
|
// end of input signal
|
|
std::cout << "orderByTest_nRGs input DataList totalSize " << dlIn->totalSize() << std::endl;
|
|
dlIn->endOfInput();
|
|
timer.stop(message);
|
|
|
|
joblist::AnyDataListSPtr spdlOut(new AnyDataList());
|
|
// Set the ring buffer big enough to take RGData-s with results b/c
|
|
// there is nobody to read out of the buffer.
|
|
joblist::RowGroupDL* dlOut = new RowGroupDL(1, numberOfRGs);
|
|
dlOut->OID(oid);
|
|
spdlOut->rowGroupDL(dlOut);
|
|
joblist::JobStepAssociation jsaOut;
|
|
jsaOut.outAdd(spdlOut);
|
|
// uint64_t outputDLIter = dlOut->getIterator();
|
|
tns.outputAssociation(jsaOut);
|
|
|
|
// Run Annex Step
|
|
message = "Sorting";
|
|
timer.start(message);
|
|
tns.run();
|
|
tns.join();
|
|
timer.stop(message);
|
|
|
|
// serialize RGData into bs and later back
|
|
// to follow ExeMgr whilst getting TAS result RowGroup
|
|
messageqcpp::ByteStream bs;
|
|
uint32_t result = 0;
|
|
rowgroup::RowGroup outRG(inRG);
|
|
rowgroup::RGData outRGData(outRG);
|
|
result = tns.nextBand(bs);
|
|
/*bool more = false;
|
|
do
|
|
{
|
|
dlOut->next(outputDLIter, &outRGData);
|
|
} while (more);*/
|
|
std::cout << "orderByTest_nRGs result " << result << std::endl;
|
|
// CPPUNIT_ASSERT( result == limit );
|
|
outRGData.deserialize(bs);
|
|
outRG.setData(&outRGData);
|
|
|
|
// std::cout << "orderByTest_nRGs output RG " << outRG.toString() << std::endl;
|
|
std::cout << "maxInt " << maxInt << std::endl;
|
|
{
|
|
rowgroup::Row r;
|
|
outRG.initRow(&r);
|
|
outRG.getRow(0, &r);
|
|
CPPUNIT_ASSERT(limit == outRG.getRowCount() || outRG.getRowCount() == 8192);
|
|
CPPUNIT_ASSERT_EQUAL(maxInt, r.getUintField(1));
|
|
}
|
|
|
|
cout << "------------------------------------------------------------" << endl;
|
|
}
|
|
|
|
void ORDERBY_TIME_TEST()
|
|
{
|
|
uint64_t numRows = 8192;
|
|
uint64_t maxThreads = 8;
|
|
// limit == 100000 is still twice as good to sort in parallel
|
|
// limit == 1000000 however is better to sort using single threaded sorting
|
|
uint64_t limit = 100000;
|
|
bool parallel = true;
|
|
bool woParallel = false;
|
|
bool generateRandValues = true;
|
|
bool hasDistinct = true;
|
|
bool noDistinct = false;
|
|
orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, noDistinct);
|
|
orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, noDistinct);
|
|
orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, hasDistinct);
|
|
orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, hasDistinct);
|
|
}
|
|
void QUICK_TEST()
|
|
{
|
|
float f = 1.1;
|
|
double d = 1.2;
|
|
uint64_t i = 1;
|
|
uint64_t* i_ptr = &i;
|
|
double* d_ptr = &d;
|
|
uint64_t* i2_ptr = (uint64_t*)d_ptr;
|
|
float* f_ptr = &f;
|
|
i_ptr = (uint64_t*)f_ptr;
|
|
|
|
cout << "*i_ptr=" << *i_ptr << endl;
|
|
cout << "*i2_ptr=" << *i2_ptr << endl;
|
|
f_ptr = (float*)i_ptr;
|
|
|
|
cout << "*f_ptr=" << *f_ptr << endl;
|
|
|
|
cout << endl;
|
|
|
|
if (d > i)
|
|
cout << "1.2 is greater than 1." << endl;
|
|
|
|
if (f > i)
|
|
cout << "1.1 is greater than 1." << endl;
|
|
|
|
if (d > f)
|
|
cout << "1.2 is greater than 1.1" << endl;
|
|
|
|
if (*i_ptr < *i2_ptr)
|
|
cout << "1.1 < 1.2 when represented as uint64_t." << endl;
|
|
|
|
cout << "sizeof(f) = " << sizeof(f) << endl;
|
|
cout << "sizeof(i) = " << sizeof(i) << endl;
|
|
cout << "sizeof(d) = " << sizeof(d) << endl;
|
|
|
|
double dbl = 9.7;
|
|
double dbl2 = 1.3;
|
|
i_ptr = (uint64_t*)&dbl;
|
|
i2_ptr = (uint64_t*)&dbl2;
|
|
cout << endl;
|
|
cout << "9.7 as int is " << *i_ptr << endl;
|
|
cout << "9.7 as int is " << *i2_ptr << endl;
|
|
cout << "1.2 < 9.7 is " << (*i_ptr < *i2_ptr) << endl;
|
|
}
|
|
};
|
|
|
|
CPPUNIT_TEST_SUITE_REGISTRATION(FilterDriver);
|
|
|
|
int main(int argc, char** argv)
|
|
{
|
|
CppUnit::TextUi::TestRunner runner;
|
|
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
|
|
runner.addTest(registry.makeTest());
|
|
bool wasSuccessful = runner.run("", false);
|
|
return (wasSuccessful ? 0 : 1);
|
|
}
|