1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/tdriver-agg.cpp
2022-01-21 16:43:49 +00:00

1414 lines
39 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id: tdriver-agg.cpp 9210 2013-01-21 14:10:42Z rdempsey $
*
*
***********************************************************************/
#include <iostream>
#include <sstream>
#include <fstream>
#include <list>
#include <vector>
#include <pthread.h>
#include <time.h>
#include <sys/time.h>
#include <cppunit/extensions/HelperMacros.h>
#include <cppunit/extensions/TestFactoryRegistry.h>
#include <cppunit/ui/text/TestRunner.h>
#include "fifo.h"
#include "wsdl.h"
#include "constantdatalist.h"
#include "bucketdl.h"
#include "elementtype.h"
#include "zdl.h"
#include "stopwatch.cpp"
#include "jobstep.h"
//#include "aggregator.h"
#include "constantcolumn.h"
#include "simplefilter.h"
#include "aggregatecolumn.h"
#include "simplecolumn.h"
#include "dataconvert.h"
#include "largehashjoin.h"
using namespace dataconvert;
// #undef CPPUNIT_ASSERT
// #define CPPUNIT_ASSERT(x)
using namespace std;
using namespace joblist;
using namespace execplan;
Stopwatch timer1;
const uint32_t NUM_BUCKETS = 256;
const uint32_t MAX_SIZE = 0x100000;
const uint32_t MAX_ELEMENTS = 0x20000;
const uint32_t NUM_THREADS = 4;
const string datapath = "/home/zzhu/genii/tools/dbbuilder/lineitem.tbl";
// const string datapath="/usr/local/mariadb/columnstore/bin/lineitem.tbl";
int numConsumers = 1;
int numRuns = 1;
int printInterval = numRuns * 100000;
JSTimeStamp dlTimes;
void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, struct timespec& diff)
{
if (tv2.tv_nsec < tv1.tv_nsec)
{
diff.tv_sec = tv2.tv_sec - tv1.tv_sec - 1;
diff.tv_nsec = tv1.tv_nsec - tv2.tv_nsec;
}
else
{
diff.tv_sec = tv2.tv_sec - tv1.tv_sec;
diff.tv_nsec = tv2.tv_nsec - tv1.tv_nsec;
}
}
struct ThreadParms
{
BucketDL<ElementType>* bdl;
TupleBucketDataList* tbdl;
int count;
int threadNumber;
};
void* bucketConsumer(void* arg)
{
ThreadParms* parms = reinterpret_cast<ThreadParms*>(arg);
// BucketDL<ElementType> *bdl = reinterpret_cast<BucketDL<ElementType> *>(arg);
BucketDL<ElementType>* bdl = parms->bdl;
int numBucketsToConsume = parms->count;
int threadNumber = parms->threadNumber;
int64_t i;
ElementType val;
uint32_t it;
int bucketIndex;
int cnt = 0;
for (i = 0; i < numBucketsToConsume; i++)
{
bucketIndex = (threadNumber * numBucketsToConsume) + i;
it = bdl->getIterator(bucketIndex);
while (bdl->next(bucketIndex, it, &val))
cnt++;
}
cout << "consumer " << threadNumber << "consumed " << cnt << endl;
return NULL;
}
void* tupleConsumer(void* arg)
{
ThreadParms* parms = reinterpret_cast<ThreadParms*>(arg);
// BucketDL<ElementType> *bdl = reinterpret_cast<BucketDL<ElementType> *>(arg);
TupleBucketDataList* tbdl = parms->tbdl;
int numBucketsToConsume = parms->count;
int threadNumber = parms->threadNumber;
int i;
// ElementType val;
TupleType val;
uint32_t it;
int bucketIndex;
int cnt = 0;
for (i = 0; i < numBucketsToConsume; i++)
{
bucketIndex = (threadNumber * numBucketsToConsume) + i;
it = tbdl->getIterator(bucketIndex);
while (tbdl->next(bucketIndex, it, &val))
cnt++;
}
cout << "consumer " << threadNumber << "consumed " << cnt << endl;
return NULL;
}
/** @brief class TupleHasher
*
*/
class TupleHasher1
{
private:
Hasher fHasher;
uint32_t fHashLen;
public:
TupleHasher1(uint32_t len) : fHashLen(len)
{
}
uint32_t operator()(const char* v) const
{
return fHasher(v, fHashLen);
}
/*
uint32_t operator()(const uint64_t v) const
{
return v;
}*/
};
/** @brief class TupleComparator
*
*/
class TupleComparator1
{
private:
uint32_t fCmpLen;
public:
TupleComparator1(uint32_t len) : fCmpLen(len)
{
}
bool operator()(const char* v1, const char* v2) const
{
return (memcmp(v1, v2, fCmpLen) == 0);
}
/*
bool operator()(const uint64_t v1, const uint64_t v2) const
{
return (v1 == v2);
}*/
};
void* aggregator(void* arg)
{
uint32_t fHashLen = 4;
TupleType tt;
tt.second = new char[fHashLen];
ThreadParms* parms = reinterpret_cast<ThreadParms*>(arg);
int threadNumber = parms->threadNumber;
struct timespec ts1, ts2, diff;
uint32_t size = 585938;
#if 0
//typedef std::vector<uint64_t> RIDVec;
typedef std::pair<int64_t, Elem> Results;
//typedef std::tr1::unordered_map<uint64_t, Results, TupleHasher, TupleComparator> shmp;
//typedef std::tr1::unordered_map<uint64_t, Results, TupleHasher, TupleComparator>::iterator TupleHMIter;
//typedef boost::shared_ptr<TupleHashMap> SHMP;
typedef std::tr1::unordered_multimap<uint64_t, Results> TupleHashMap;
typedef boost::shared_ptr<TupleHashMap> SHMP;
typedef std::tr1::unordered_multimap<uint64_t, Results>::iterator TupleHMIter;
typedef std::pair<TupleHMIter, TupleHMIter> HashItPair;
HashItPair hashItPair;
SHMP shmp(new TupleHashMap());
// TupleHashMap *shmp = new TupleHashMap();
TupleHMIter iter;
uint32_t val = 0;
uint64_t hv;
Hasher hasher;
bool flag = true;
for (uint32_t k = 0; k < NUM_BUCKETS / 4; k++)
{
clock_gettime(CLOCK_REALTIME, &ts1);
for (uint32_t j = 0; j < 4; j++)
{
for (uint32_t i = 0; i < size / 10; i++)
{
flag = true;
memcpy(tt.second, &i, 4);
hv = hasher(tt.second, fHashLen);
iter = shmp->find(hv);
if (iter != shmp->end())
{
hashItPair = shmp->equal_range(hv);
for (iter = hashItPair.first; iter != hashItPair.second; iter++)
{
if (memcmp(iter->second.second.hashStr, tt.second, fHashLen) == 0)
{
//cout << "real hit" << endl;
//updateAggResult<result_t>(tt, hashIt->second.first);
iter->second.second.rids.push_back(tt.first);
flag = false;
break;
}
}
}
if (flag)
{
Results rr;
rr.second.hashStr = new char[fHashLen];
//getAggResult<result_t>(tt, rr.first);
memcpy(rr.second.hashStr, tt.second, fHashLen);
rr.second.rids.push_back(tt.first);
shmp->insert(std::pair<uint64_t, Results> (hv, rr));
}
}
}
cout << "thread " << threadNumber << " vector " << k << " size=" << shmp->size() << endl;
// clean up hashmap memory
for (iter = shmp->begin(); iter != shmp->end(); iter++)
delete [] iter->second.second.hashStr;
shmp->clear();
// shmp.reset(new TupleHashMap());
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "thread " << threadNumber << "do aggregation took " << diff.tv_sec << "s " << diff.tv_nsec << " ns" << endl;
}
#endif
#if 0
typedef std::vector<uint64_t> RIDVec;
typedef std::pair<uint64_t, RIDVec> Results;
typedef std::tr1::unordered_map<char*, Results, TupleHasher1, TupleComparator1> TupleHashMap1;
typedef std::tr1::unordered_map<char*, Results, TupleHasher1, TupleComparator1>::iterator TupleHMIter1;
typedef boost::shared_ptr<TupleHashMap1> SHMP1;
SHMP1 shmp;
TupleHasher1 tupleHash(fHashLen);
TupleComparator1 tupleComp(fHashLen);
shmp.reset(new TupleHashMap1(1, tupleHash, tupleComp));
TupleHMIter1 iter;
Results rr;
uint32_t val = 0;
for (uint32_t k = 0; k < NUM_BUCKETS / 4; k++)
{
clock_gettime(CLOCK_REALTIME, &ts1);
for (uint32_t j = 0; j < 4; j++)
{
for (uint32_t i = 0; i < size / 10; i++)
{
memcpy(tt.second, &i, 4);
iter = shmp->find(tt.second);
if (iter == shmp->end())
{
rr.second.clear();
//getAggResult<result_t>(tt, rr.first);
rr.second.push_back(tt.first);
char* hashStr = new char[fHashLen];
memcpy(hashStr, tt.second, fHashLen);
shmp->insert(std::pair<char*, Results> (hashStr, rr));
}
else
{
//updateAggResult<result_t>(tt, hashIt->second.first);
iter->second.second.push_back(tt.first);
}
}
}
cout << "thread " << threadNumber << " hashmap " << k << " size=" << shmp->size() << endl;
// clean up hashmap memory
for (iter = shmp->begin(); iter != shmp->end(); iter++)
delete [] iter->first;
shmp->clear();
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "thread " << threadNumber << "do aggregation took " << diff.tv_sec << "s " << diff.tv_nsec << " ns" << endl;
}
#endif
#if 1
// typedef std::vector<uint64_t> RIDVec;
// typedef std::pair<uint64_t, RIDVec> Results;
typedef std::tr1::unordered_map<char*, int64_t, TupleHasher1, TupleComparator1> TupleHashMap1;
typedef std::tr1::unordered_map<char*, int64_t, TupleHasher1, TupleComparator1>::iterator TupleHMIter1;
typedef boost::shared_ptr<TupleHashMap1> SHMP1;
vector<TupleType> vt;
vt.reserve(size / 10 * 4);
SHMP1 shmp;
TupleHasher1 tupleHash(fHashLen);
TupleComparator1 tupleComp(fHashLen);
shmp.reset(new TupleHashMap1(1, tupleHash, tupleComp));
TupleHMIter1 iter;
int64_t rr;
uint32_t val = 0;
for (uint32_t k = 0; k < NUM_BUCKETS / 4; k++)
{
clock_gettime(CLOCK_REALTIME, &ts1);
for (uint32_t j = 0; j < 4; j++)
{
for (uint32_t i = 0; i < size / 10; i++)
{
memcpy(tt.second, &i, 4);
vt.push_back(tt);
iter = shmp->find(tt.second);
if (iter == shmp->end())
{
// rr.second.clear();
// getAggResult<result_t>(tt, rr.first);
// rr.second.push_back(tt.first);
char* hashStr = new char[fHashLen];
memcpy(hashStr, tt.second, fHashLen);
shmp->insert(std::pair<char*, int64_t>(hashStr, i));
}
else
{
// updateAggResult<result_t>(tt, hashIt->second.first);
// iter->second.second.push_back(tt.first);
iter->second += i;
}
}
}
cout << "thread " << threadNumber << " hashmap " << k << " size=" << shmp->size() << endl;
// loop through vector to find match in map for rid
vector<TupleType>::iterator it;
int val = 0;
for (it = vt.begin(); it != vt.end(); it++)
{
iter = shmp->find(it->second);
if (iter != shmp->end())
val++;
}
cout << "val = " << val << endl;
// clean up hashmap memory
for (iter = shmp->begin(); iter != shmp->end(); iter++)
delete[] iter->first;
shmp->clear();
vt.clear();
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "thread " << threadNumber << "do aggregation took " << diff.tv_sec << "s " << diff.tv_nsec
<< " ns" << endl;
}
#endif
}
void* TBDL_producer(void* arg)
{
ThreadParms* parms = reinterpret_cast<ThreadParms*>(arg);
TupleBucketDataList* tbdl = parms->tbdl;
TupleType t;
vector<TupleType> vt;
for (uint32_t i = 0; i < 150000000; i++)
{
t.first = i;
t.second = new char[4];
memcpy(t.second, &i, 4);
vt.push_back(t);
if (vt.size() == 8192)
{
tbdl->insert(vt);
vt.clear();
}
}
if (vt.size() > 0)
tbdl->insert(vt);
}
void* BDL_producer(void* arg)
{
ThreadParms* parms = reinterpret_cast<ThreadParms*>(arg);
BucketDL<ElementType>* bdl = parms->bdl;
TupleType t;
ElementType e;
vector<ElementType> vt;
for (uint32_t i = 0; i < 150000000; i++)
{
e.first = i;
e.second = i;
// t.second = new char[4];
// memcpy(t.second, &i, 4);
vt.push_back(e);
if (vt.size() == 8192)
{
bdl->insert(vt);
vt.clear();
}
}
if (vt.size() > 0)
bdl->insert(vt);
}
class AggDriver : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(AggDriver);
// CPPUNIT_TEST(aggFilter_group2);
// CPPUNIT_TEST(tuplewsdl);
// CPPUNIT_TEST(bucketdl);
// CPPUNIT_TEST(aggFilter_group1);
// CPPUNIT_TEST(hashjoin);
// CPPUNIT_TEST(hashmap_tuple);
// CPPUNIT_TEST(hashmap_tuple_multi);
// CPPUNIT_TEST(hashmap_ET);
// CPPUNIT_TEST(tuplewsdl_multi);
CPPUNIT_TEST(bdl_multi);
CPPUNIT_TEST_SUITE_END();
typedef boost::shared_ptr<SimpleFilter> SSFP;
private:
ResourceManager fRm;
public:
void setUp1(JobStepAssociation& in, JobStepAssociation& out)
{
// sleep(20);
// input - TupleBucket
AnyDataListSPtr adl1(new AnyDataList());
TupleBucketDataList* tbdl = new TupleBucketDataList(NUM_BUCKETS, numConsumers, MAX_SIZE, fRm);
tbdl->setMultipleProducers(0);
// tbdl->setElementMode(1);
adl1->tupleBucketDL(tbdl);
in.outAdd(adl1);
// output - ElementType Bucket
AnyDataListSPtr adl2(new AnyDataList());
BucketDataList* bdl = new BucketDataList(NUM_BUCKETS, numConsumers, MAX_ELEMENTS, fRm);
bdl->setElementMode(1);
adl2->bucketDL(bdl);
out.outAdd(adl2);
// prepare the input datalist
// ifstream ifs(datapath.c_str(), ios::in);
uint64_t l_orderkey;
uint64_t l_quantity;
char data[1000];
// char row[50];
char* tok;
uint64_t count = 0;
uint32_t rid = 0;
TupleType* tt = new TupleType();
tbdl->hashLen(sizeof(l_orderkey));
uint32_t size = sizeof(l_orderkey) + sizeof(l_quantity);
// tbdl->elementLen(size + sizeof(rid));
tbdl->elementLen(sizeof(rid), size);
// char* second = new char[size];
vector<TupleType> v;
timer1.start("input");
ifstream ifs(datapath.c_str(), ios::in);
while (!ifs.eof())
{
ifs.getline(data, 1000);
tok = strtok(data, "|");
count = 0;
while (tok != NULL)
{
if (count == 0)
l_orderkey = atol(tok);
if (count == 4)
l_quantity = atof(tok) * 100;
count++;
tok = strtok(NULL, "|");
}
for (int i = 0; i < numRuns; i++)
{
l_orderkey += 6000000 * i;
tt->first = rid;
tt->second = new char[size];
memcpy(tt->second, &l_orderkey, sizeof(l_orderkey));
memcpy(tt->second + sizeof(l_orderkey), &l_quantity, sizeof(l_quantity));
v.push_back(*tt);
if (v.size() == 2048)
{
tbdl->insert(v);
v.clear();
}
rid++;
}
if (rid % printInterval == 0)
cout << rid << " " << l_orderkey << " " << l_quantity << endl;
}
ifs.close();
if (v.size() > 0)
{
tbdl->insert(v);
// for (uint32_t i = 0; i < v.size(); i++)
// delete [] (v[i].second);
v.clear();
}
tbdl->endOfInput();
timer1.stop("input");
cout << "input size=" << tbdl->totalSize() << endl;
// ifs.close();
delete tt;
}
int64_t convertValueNum(const string& str, const CalpontSystemCatalog::ColType& ct, bool isNull)
{
// if (str.size() == 0 || isNull ) return valueNullNum(ct);
int64_t v = 0;
boost::any anyVal = DataConvert::convertColumnData(ct, str, false);
switch (ct.colDataType)
{
case CalpontSystemCatalog::BIT: v = boost::any_cast<bool>(anyVal); break;
case CalpontSystemCatalog::TINYINT: v = boost::any_cast<char>(anyVal); break;
case CalpontSystemCatalog::SMALLINT: v = boost::any_cast<int16_t>(anyVal); break;
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT: v = boost::any_cast<int32_t>(anyVal); break;
case CalpontSystemCatalog::BIGINT: v = boost::any_cast<long long>(anyVal); break;
case CalpontSystemCatalog::FLOAT:
{
float i = boost::any_cast<float>(anyVal);
v = (Int64)i;
}
break;
case CalpontSystemCatalog::DOUBLE:
{
double i = boost::any_cast<double>(anyVal);
v = (Int64)i;
}
break;
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::CLOB:
{
// v = boost::any_cast<string>(anyVal);
string i = boost::any_cast<string>(anyVal);
v = *((Int64*)i.c_str());
}
break;
case CalpontSystemCatalog::DATE: v = boost::any_cast<uint32_t>(anyVal); break;
case CalpontSystemCatalog::DATETIME: v = boost::any_cast<uint64_t>(anyVal); break;
case CalpontSystemCatalog::DECIMAL: v = boost::any_cast<long long>(anyVal); break;
default: break;
}
return v;
}
void tuplewsdl_multi()
{
TupleBucketDataList* tbdl = new TupleBucketDataList(NUM_BUCKETS, 1, MAX_SIZE, fRm);
tbdl->setMultipleProducers(true);
tbdl->setElementMode(1); // RID_VALUE
tbdl->hashLen(4);
tbdl->elementLen(4, 12);
pthread_t producer[4];
ThreadParms producerThreadParms[4];
timer1.start("insert-tbdl");
for (uint32_t i = 0; i < 4; i++)
{
producerThreadParms[i].tbdl = tbdl;
producerThreadParms[i].threadNumber = i;
// producerThreadParms[i].count = ::count;
pthread_create(&producer[i], NULL, TBDL_producer, &producerThreadParms[i]);
}
for (uint32_t i = 0; i < 4; i++)
pthread_join(producer[i], NULL);
tbdl->endOfInput();
timer1.stop("insert-tbdl");
cout << "tbdl finish insert " << tbdl->totalSize() << endl;
// timer1.finish();
}
void bdl_multi()
{
BucketDL<ElementType>* bdl = new BucketDL<ElementType>(128, 1, MAX_ELEMENTS, fRm);
bdl->setMultipleProducers(true);
bdl->setElementMode(1); // RID_VALUE
bdl->setDiskElemSize(4, 4);
pthread_t producer[4];
ThreadParms producerThreadParms[4];
timer1.start("insert-bdl");
for (uint32_t i = 0; i < 4; i++)
{
producerThreadParms[i].bdl = bdl;
producerThreadParms[i].threadNumber = i;
// producerThreadParms[i].count = ::count;
pthread_create(&producer[i], NULL, BDL_producer, &producerThreadParms[i]);
}
for (uint32_t i = 0; i < 4; i++)
pthread_join(producer[i], NULL);
bdl->endOfInput();
timer1.stop("insert-bdl");
cout << "bdl finish insert " << bdl->totalSize() << endl;
timer1.finish();
}
void tuplewsdl()
{
TupleBucketDataList* tbdl = new TupleBucketDataList(NUM_BUCKETS, numConsumers, MAX_SIZE, fRm);
// prepare the input datalist
// ifstream ifs(datapath.c_str(), ios::in);
uint64_t l_orderkey, l_quantity;
char data[1000];
// char row[50];
char* tok;
uint64_t count = 0, rid = 0;
TupleType* tt = new TupleType();
tbdl->hashLen(sizeof(l_orderkey));
uint32_t size = sizeof(l_orderkey) + sizeof(l_quantity);
// tbdl->elementLen(size + sizeof(rid));
tbdl->elementLen(sizeof(rid), size);
// char* second = new char[size];
vector<TupleType> v;
timer1.start("tuple-input");
int i;
for (i = 0; i < numRuns; i++)
{
ifstream ifs(datapath.c_str(), ios::in);
while (!ifs.eof())
{
ifs.getline(data, 1000);
tok = strtok(data, "|");
count = 0;
while (tok != NULL)
{
if (count == 0)
l_orderkey = 6000000 * i + atol(tok);
if (count == 4)
l_quantity = atof(tok) * 100;
count++;
tok = strtok(NULL, "|");
}
tt->first = rid;
tt->second = new char[size];
memcpy(tt->second, &l_orderkey, sizeof(l_orderkey));
memcpy(tt->second + sizeof(l_orderkey), &l_quantity, sizeof(l_quantity));
v.push_back(*tt);
if (v.size() == 2048)
{
tbdl->insert(v);
for (uint32_t i = 0; i < v.size(); i++)
v[i].deleter();
v.clear();
}
rid++;
if (rid % printInterval == 0)
cout << rid << " " << l_orderkey << " " << l_quantity << endl;
}
ifs.close();
}
if (v.size() > 0)
{
tbdl->insert(v);
for (uint32_t i = 0; i < v.size(); i++)
delete[](v[i].second);
v.clear();
}
tbdl->endOfInput();
timer1.stop("tuple-input");
cout << "input size=" << tbdl->totalSize() << endl;
// ifs.close();
delete tt;
ThreadParms thparms;
thparms.count = NUM_BUCKETS / NUM_THREADS;
thparms.tbdl = tbdl;
pthread_t consumer[NUM_THREADS];
ThreadParms thParms[NUM_THREADS];
timer1.start("tuple-consumer");
for (i = 0; i < NUM_THREADS; i++)
{
thparms.threadNumber = i;
thParms[i] = thparms;
pthread_create(&consumer[i], NULL, tupleConsumer, &thParms[i]);
}
for (i = 0; i < NUM_THREADS; i++)
pthread_join(consumer[i], NULL);
timer1.stop("tuple-consumer");
timer1.finish();
delete tbdl;
}
void aggFilter_group1()
{
/*
int tm;
for (int m = 0; m < 1024; m++)
{
char *a = new char[5*1024*1024];
memset(a,0,5*1024*1024);
memcpy(&tm, a, 4);
}
*/
// for (int i = 0; i < 10; i++) {
JobStepAssociation in, out;
setUp1(in, out);
CalpontSystemCatalog::TableName tn("test", "lineitem");
string s_l_orderkey = "test.lineitem.l_orderkey";
string s_l_quantity = "test.lineitem.l_quantity";
uint32_t sessionid = 1;
uint32_t txnId = 1;
uint32_t verId = 1;
uint16_t stepID = 0;
uint32_t statementID = 1;
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionid);
SRCP srcp;
SimpleColumn* l_orderkey = new SimpleColumn(s_l_orderkey, sessionid);
SimpleColumn* l_quantity = new SimpleColumn(s_l_quantity, sessionid);
CalpontSystemCatalog::ROPair ropair = csc->tableRID(tn);
CalpontSystemCatalog::OID tableoid = ropair.objnum;
// sum(l_quantity) group by l_ordereky
srcp.reset(l_quantity);
AggregateColumn* ac = new AggregateColumn("sum", srcp.get()->clone(), sessionid);
ac->addProjectCol(srcp);
srcp.reset(l_orderkey);
ac->addGroupByCol(srcp);
// constant column 318
ConstantColumn* cc = new ConstantColumn(318, ConstantColumn::NUM);
// >
Operator* op = new Operator(">");
SOP sop(op);
// sum(l_quantity) > 318
SSFP ssfp;
SimpleFilter* sf = new SimpleFilter(sop, ac, cc);
ssfp.reset(sf);
SJSTEP step;
AggregateFilterStep* afs = new AggregateFilterStep(in, out, ac->functionName(), ac->groupByColList(),
ac->projectColList(), ac->functionParms(), tableoid,
sessionid, txnId, verId, stepID, statementID, fRm);
// output column l_orderkey;
afs->outputCol(dynamic_cast<SimpleColumn*>(srcp.get())->oid());
step.reset(afs);
// one column case, nomalize filter value
CalpontSystemCatalog::ColType ct;
int64_t intVal;
string strVal;
// int8_t cop = op2num(sop);
if (typeid((*ac->functionParms().get())) == typeid(SimpleColumn))
{
SimpleColumn* sc = reinterpret_cast<SimpleColumn*>(ac->functionParms().get());
ct = csc->colType(sc->oid());
intVal = convertValueNum(cc->constval(), ct, false);
afs->addFilter(COMPARE_GT, intVal);
}
else
{
if (cc->type() == ConstantColumn::NUM)
{
intVal = atol(cc->constval().c_str());
afs->addFilter(COMPARE_GT, intVal, false);
}
else if (cc->type() == ConstantColumn::LITERAL)
afs->addFilter(COMPARE_GT, cc->constval(), false);
}
timer1.start("agg");
afs->run();
afs->join();
timer1.stop("agg");
timer1.finish();
//}
}
void bucketdl()
{
BucketDL<ElementType>* bdl = new BucketDL<ElementType>(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm);
bdl->setElementMode(1);
ElementType* et = new ElementType();
uint64_t l_orderkey, l_quantity;
char data[1000];
char* tok;
uint64_t count = 0, rid = 0;
TupleType* tt = new TupleType();
vector<ElementType> v;
timer1.start("bucket-input");
int i;
for (i = 0; i < numRuns; i++)
{
ifstream ifs(datapath.c_str(), ios::in);
while (!ifs.eof())
{
ifs.getline(data, 1000);
tok = strtok(data, "|");
count = 0;
while (tok != NULL)
{
if (count == 0)
l_orderkey = 6000000 * i + atol(tok);
if (count == 4)
l_quantity = atof(tok) * 100;
count++;
tok = strtok(NULL, "|");
}
et->first = rid;
et->second = rid;
// tt->second = new char[16];
// memcpy(tt->second, &l_orderkey, sizeof(l_orderkey));
// memcpy(tt->second+sizeof(l_orderkey), &l_quantity, sizeof(l_quantity));
v.push_back(*et);
if (v.size() == 2048)
{
bdl->insert(v);
v.clear();
}
rid++;
// delete [] tt->second;
if (rid % printInterval == 0)
cout << rid << " " << l_orderkey << " " << l_quantity << endl;
}
ifs.close();
}
if (v.size() > 0)
{
bdl->insert(v);
v.clear();
}
bdl->endOfInput();
timer1.stop("bucket-input");
cout << "input size=" << bdl->totalSize() << endl;
delete tt;
delete et;
ThreadParms thparms;
thparms.count = NUM_BUCKETS / NUM_THREADS;
thparms.bdl = bdl;
pthread_t consumer[NUM_THREADS];
ThreadParms thParms[NUM_THREADS];
timer1.start("bucket-consumer");
for (i = 0; i < NUM_THREADS; i++)
{
thparms.threadNumber = i;
thParms[i] = thparms;
pthread_create(&consumer[i], NULL, bucketConsumer, &thParms[i]);
}
for (i = 0; i < NUM_THREADS; i++)
pthread_join(consumer[i], NULL);
timer1.stop("bucket-consumer");
timer1.finish();
delete bdl;
}
void aggFilter_group2()
{
CalpontSystemCatalog::TableName tn("test", "lineitem");
string s_l_orderkey = "test.lineitem.l_orderkey";
string s_l_quantity = "test.lineitem.l_quantity";
string s_l_partkey = "test.lineitem.l_partkey";
uint32_t sessionid = 1;
uint32_t txnId = 1;
uint32_t verId = 1;
uint16_t stepID = 0;
uint32_t statementID = 1;
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionid);
SRCP srcp;
SimpleColumn* l_orderkey = new SimpleColumn(s_l_orderkey, sessionid);
SimpleColumn* l_quantity = new SimpleColumn(s_l_quantity, sessionid);
SimpleColumn* l_partkey = new SimpleColumn(s_l_partkey, sessionid);
CalpontSystemCatalog::ROPair ropair = csc->tableRID(tn);
CalpontSystemCatalog::OID tableoid = ropair.objnum;
// sum(l_quantity) group by l_ordereky
AggregateColumn* ac = new AggregateColumn("sum", l_quantity, sessionid);
srcp.reset(l_quantity);
ac->addProjectCol(srcp);
srcp.reset(l_orderkey);
ac->addGroupByCol(srcp);
srcp.reset(l_partkey);
ac->addGroupByCol(srcp);
// constant column 318
ConstantColumn* cc = new ConstantColumn(318, ConstantColumn::NUM);
// >
Operator* op = new Operator(">");
SOP sop(op);
// sum(l_quantity) > 318
SimpleFilter* sf = new SimpleFilter(sop, ac, cc);
// aggregate filter step
/** @brief Constructor */
JobStepAssociation in;
JobStepAssociation out;
AggregateFilterStep* afs = new AggregateFilterStep(in, out, ac->functionName(), ac->groupByColList(),
ac->projectColList(), ac->functionParms(), tableoid,
sessionid, txnId, verId, stepID, statementID, fRm);
afs->run();
afs->join();
}
void hashjoin()
{
BucketDL<ElementType> A(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm);
BucketDL<ElementType> B(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm);
BucketDL<ElementType> C(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm);
BucketDL<ElementType> D(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm);
A.setDiskElemSize(4, 4);
B.setDiskElemSize(4, 4);
A.setElementMode(1);
A.setHashMode(1);
B.setElementMode(1);
B.setHashMode(1);
int i;
timer1.start("insert");
for (i = 0; i < 6000000 * numRuns; i++)
A.insert(ElementType(i, i));
A.endOfInput();
for (i = 0; i < 1500000 * numRuns; i++)
B.insert(ElementType(i, i + 1000000));
B.endOfInput();
timer1.stop("insert");
C.setElementMode(1);
D.setElementMode(1);
BDLWrapper<ElementType> setA(&A);
BDLWrapper<ElementType> setB(&B);
DataList<ElementType>* resultA(&C);
DataList<ElementType>* resultB(&D);
timer1.start("hashjoin");
HashJoin<ElementType>* hj = new HashJoin<ElementType>(setA, setB, resultA, resultB, INNER, &dlTimes);
hj->performJoin();
timer1.stop("hashjoin");
timer1.finish();
cout << "hash join output size: " << resultA->totalSize() << "/" << resultB->totalSize() << endl;
}
typedef struct Elem
{
char* hashStr;
// int64_t result;
vector<uint64_t> rids;
} Elem;
void hashmap_tuple()
{
int tm;
for (int m = 0; m < 1024; m++)
{
char* a = new char[5 * 1024 * 1024];
memset(a, 0, 5 * 1024 * 1024);
memcpy(&tm, a, 4);
}
uint32_t fHashLen = 4;
TupleType tt;
tt.second = new char[fHashLen];
uint64_t size = 585937;
uint64_t i = 0, j = 0;
#if 1
pthread_t agg[NUM_THREADS];
ThreadParms thParms[NUM_THREADS];
ThreadParms thParm;
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
for (i = 0; i < NUM_THREADS; i++)
{
thParm.threadNumber = i;
thParms[i] = thParm;
pthread_create(&agg[i], NULL, aggregator, &thParms[i]);
}
for (i = 0; i < NUM_THREADS; i++)
pthread_join(agg[i], NULL);
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "aggregation took " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
#endif
#if 0
//typedef std::vector<uint64_t> RIDVec;
typedef std::pair<int64_t, Elem> Results;
//typedef std::tr1::unordered_map<uint64_t, Results, TupleHasher, TupleComparator> shmp;
//typedef std::tr1::unordered_map<uint64_t, Results, TupleHasher, TupleComparator>::iterator TupleHMIter;
//typedef boost::shared_ptr<TupleHashMap> SHMP;
typedef std::tr1::unordered_multimap<uint64_t, Results> TupleHashMap;
typedef boost::shared_ptr<TupleHashMap> SHMP;
typedef std::tr1::unordered_multimap<uint64_t, Results>::iterator TupleHMIter;
typedef std::pair<TupleHMIter, TupleHMIter> HashItPair;
HashItPair hashItPair;
SHMP shmp(new TupleHashMap());
//TupleHashMap *shmp = new TupleHashMap();
TupleHMIter iter;
uint32_t val = 0;
uint64_t hv;
Elem elem;
Results rr;
Hasher hasher;
bool flag = true;
timer1.start("vectormap");
for (int k = 0; k < 1; k++)
{
for (j = 0; j < 4; j++)
{
for (i = 0; i < size / 10; i++)
{
flag = true;
memcpy(tt.second, &i, 4);
hv = hasher(tt.second, fHashLen);
iter = shmp->find(hv);
if (iter != shmp->end())
{
hashItPair = shmp->equal_range(hv);
for (iter = hashItPair.first; iter != hashItPair.second; iter++)
{
if (memcmp(iter->second.second.hashStr, tt.second, fHashLen) == 0)
{
//cout << "real hit" << endl;
//updateAggResult<result_t>(tt, hashIt->second.first);
iter->second.second.rids.push_back(tt.first);
flag = false;
break;
}
}
}
if (flag)
{
rr.second.hashStr = new char[fHashLen];
memcpy(rr.second.hashStr, tt.second, fHashLen);
rr.second.rids.clear();
rr.second.rids.push_back(tt.first);
shmp->insert(std::pair<uint64_t, Results> (hv, rr));
}
}
}
cout << "vector size=" << shmp->size() << endl;
// clean up hashmap memory
for (iter = shmp->begin(); iter != shmp->end(); iter++)
delete [] iter->second.second.hashStr;
shmp->clear();
}
timer1.stop("vectormap");
timer1.finish();
#endif
#if 0
typedef std::vector<uint64_t> RIDVec;
typedef std::pair<uint64_t, RIDVec> Results;
typedef std::tr1::unordered_map<char*, Results, TupleHasher, TupleComparator> TupleHashMap1;
typedef std::tr1::unordered_map<char*, Results, TupleHasher, TupleComparator>::iterator TupleHMIter1;
typedef boost::shared_ptr<TupleHashMap1> SHMP1;
SHMP1 shmp;
TupleHasher tupleHash(fHashLen);
TupleComparator tupleComp(fHashLen);
shmp.reset(new TupleHashMap1(1, tupleHash, tupleComp));
TupleHMIter1 iter;
Results rr;
uint32_t val = 0;
timer1.start("uniquemap");
for (j = 0; j < 4; j++)
for (i = 0; i < size / 10; i++)
{
memcpy(tt.second, &i, 4);
iter = shmp->find(tt.second);
if (iter == shmp->end())
{
rr.second.clear();
//getAggResult<result_t>(tt, rr.first);
rr.second.push_back(tt.first);
char* hashStr = new char[fHashLen];
memcpy(hashStr, tt.second, fHashLen);
shmp->insert(std::pair<char*, Results> (hashStr, rr));
}
else
{
//updateAggResult<result_t>(tt, hashIt->second.first);
iter->second.second.push_back(tt.first);
}
}
timer1.stop("uniquemap");
timer1.finish();
cout << "hashmap size=" << shmp->size() << endl;
// clean up hashmap memory
for (iter = shmp->begin(); iter != shmp->end(); iter++)
delete [] iter->first;
shmp->clear();
#endif
}
#if 0
void hashmap_tuple_multi()
{
uint32_t fHashLen = 4;
TupleHasher tupleHash(fHashLen);
TupleComparator tupleComp(fHashLen);
TupleType tt;
//typedef std::vector<uint64_t> RIDVec;
typedef std::pair<uint64_t, uint64_t> Results;
typedef std::tr1::unordered_multimap<char*, Results, TupleHasher, TupleComparator> TupleHashMap;
typedef std::tr1::unordered_multimap<char*, Results, TupleHasher, TupleComparator>::iterator TupleHMIter;
typedef boost::shared_ptr<TupleHashMap> SHMP;
std::pair<TupleHMIter, TupleHMIter> hashItPair;
SHMP shmp;
shmp.reset(new TupleHashMap(1, tupleHash, tupleComp));
TupleHMIter iter;
Results rr;
Hasher fHasher;
timer1.start("multi_insert");
uint32_t i = 0, j = 0;
uint32_t val = 0;
for ( j = 0; j < 4; j++)
for ( i = 0; i < 1171875; i++)
{
tt.second = new char[fHashLen];
memcpy(tt.second, &i, fHashLen);
rr.first = j;
rr.second = i;
shmp->insert(std::pair<char*, Results> (tt.second, rr));
}
timer1.stop("multi_insert");
cout << "hashmap size=" << shmp->size() << endl;
tt.second = new char[fHashLen];
timer1.start("multi_result");
iter = shmp->begin();
char* key;
if (iter != shmp->end())
key = iter->first;
uint32_t ct = 0;
for (iter = ++iter; iter != shmp->end(); iter++)
{
if (memcmp(iter->first, key, 4) == 0)
val += iter->second.first;
else
{
key = iter->first;
val = iter->second.first;
ct++;
}
}
timer1.stop("multi_result");
timer1.start("base");
for ( j = 0; j < 4; j++)
for ( i = 0; i < 1171875; i++)
{
tt.second = new char[fHashLen];
memcpy(tt.second, &i, fHashLen);
}
timer1.stop("base");
timer1.finish();
cout << "group val=" << val << endl;
// clean up hashmap memory
for (iter = shmp->begin(); iter != shmp->end(); iter++)
delete [] iter->first;
}
#endif
void hashmap_ET()
{
typedef std::list<ElementType> hashList_t;
typedef std::list<ElementType>::iterator hashListIter_t;
// typedef std::tr1::unordered_multimap<ElementType::second_type, ElementType::first_type> hash_t;
// typedef std::tr1::unordered_multimap<ElementType::second_type, ElementType::first_type>::iterator
// hashIter_t; typedef std::tr1::unordered_multimap<ElementType::second_type,
// ElementType::first_type>::value_type hashPair_t;
typedef std::tr1::unordered_map<ElementType::second_type, hashList_t> hash_t;
typedef std::tr1::unordered_map<ElementType::second_type, hashList_t>::iterator hashIter_t;
typedef std::tr1::unordered_map<ElementType::second_type, hashList_t>::value_type hashPair_t;
hash_t hashmap;
hashIter_t it;
hashList_t list;
timer1.start("et_insert");
ElementType et;
// while (true)
for (int i = 0; i < 1200000; i++)
{
list.push_back(et);
et.second = i;
hashmap.insert(hashPair_t(et.second, list));
list.clear();
}
timer1.stop("et_insert");
cout << "hashmap size=" << hashmap.size() << endl;
int val = 0, i, j;
timer1.start("et_find");
for (j = 0; j < 4; j++)
for (i = 0; i < 1200000; i++)
{
et.second = i;
it = hashmap.find(i);
if (it == hashmap.end())
{
list.clear();
// getAggResult<result_t>(tt, rr.first);
list.push_back(et);
hashmap.insert(hashPair_t(et.second, list));
}
else
{
it->second.push_back(et);
}
}
timer1.stop("et_find");
timer1.finish();
cout << "hashmap size=" << hashmap.size() << endl;
}
};
CPPUNIT_TEST_SUITE_REGISTRATION(AggDriver);
int main(int argc, char** argv)
{
CppUnit::TextUi::TestRunner runner;
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
runner.addTest(registry.makeTest());
bool wasSuccessful = runner.run("", false);
return (wasSuccessful ? 0 : 1);
}