You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
This patch improves handling of NULLs in textual fields in ColumnStore. Previously empty strings were considered NULLs and it could be a problem if data scheme allows for empty strings. It was also one of major reasons of behavior difference between ColumnStore and other engines in MariaDB family. Also, this patch fixes some other bugs and incorrect behavior, for example, incorrect comparison for "column <= ''" which evaluates to constant True for all purposes before this patch.
1413 lines
39 KiB
C++
1413 lines
39 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2016 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: tdriver-agg.cpp 9210 2013-01-21 14:10:42Z rdempsey $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
#include <iostream>
|
|
#include <sstream>
|
|
#include <fstream>
|
|
#include <list>
|
|
#include <vector>
|
|
|
|
#include <pthread.h>
|
|
#include <time.h>
|
|
#include <sys/time.h>
|
|
|
|
#include <cppunit/extensions/HelperMacros.h>
|
|
#include <cppunit/extensions/TestFactoryRegistry.h>
|
|
#include <cppunit/ui/text/TestRunner.h>
|
|
|
|
#include "fifo.h"
|
|
#include "wsdl.h"
|
|
#include "constantdatalist.h"
|
|
#include "bucketdl.h"
|
|
#include "elementtype.h"
|
|
#include "zdl.h"
|
|
#include "stopwatch.cpp"
|
|
#include "jobstep.h"
|
|
//#include "aggregator.h"
|
|
#include "constantcolumn.h"
|
|
#include "simplefilter.h"
|
|
#include "aggregatecolumn.h"
|
|
#include "simplecolumn.h"
|
|
#include "dataconvert.h"
|
|
|
|
using namespace dataconvert;
|
|
|
|
// #undef CPPUNIT_ASSERT
|
|
// #define CPPUNIT_ASSERT(x)
|
|
|
|
using namespace std;
|
|
using namespace joblist;
|
|
using namespace execplan;
|
|
|
|
Stopwatch timer1;
|
|
const uint32_t NUM_BUCKETS = 256;
|
|
const uint32_t MAX_SIZE = 0x100000;
|
|
const uint32_t MAX_ELEMENTS = 0x20000;
|
|
const uint32_t NUM_THREADS = 4;
|
|
const string datapath = "/home/zzhu/genii/tools/dbbuilder/lineitem.tbl";
|
|
// const string datapath="/usr/local/mariadb/columnstore/bin/lineitem.tbl";
|
|
int numConsumers = 1;
|
|
int numRuns = 1;
|
|
int printInterval = numRuns * 100000;
|
|
JSTimeStamp dlTimes;
|
|
|
|
void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, struct timespec& diff)
|
|
{
|
|
if (tv2.tv_nsec < tv1.tv_nsec)
|
|
{
|
|
diff.tv_sec = tv2.tv_sec - tv1.tv_sec - 1;
|
|
diff.tv_nsec = tv1.tv_nsec - tv2.tv_nsec;
|
|
}
|
|
else
|
|
{
|
|
diff.tv_sec = tv2.tv_sec - tv1.tv_sec;
|
|
diff.tv_nsec = tv2.tv_nsec - tv1.tv_nsec;
|
|
}
|
|
}
|
|
|
|
struct ThreadParms
|
|
{
|
|
BucketDL<ElementType>* bdl;
|
|
TupleBucketDataList* tbdl;
|
|
int count;
|
|
int threadNumber;
|
|
};
|
|
|
|
void* bucketConsumer(void* arg)
|
|
{
|
|
ThreadParms* parms = reinterpret_cast<ThreadParms*>(arg);
|
|
// BucketDL<ElementType> *bdl = reinterpret_cast<BucketDL<ElementType> *>(arg);
|
|
BucketDL<ElementType>* bdl = parms->bdl;
|
|
int numBucketsToConsume = parms->count;
|
|
int threadNumber = parms->threadNumber;
|
|
int64_t i;
|
|
ElementType val;
|
|
uint32_t it;
|
|
int bucketIndex;
|
|
|
|
int cnt = 0;
|
|
|
|
for (i = 0; i < numBucketsToConsume; i++)
|
|
{
|
|
bucketIndex = (threadNumber * numBucketsToConsume) + i;
|
|
it = bdl->getIterator(bucketIndex);
|
|
|
|
while (bdl->next(bucketIndex, it, &val))
|
|
cnt++;
|
|
}
|
|
|
|
cout << "consumer " << threadNumber << "consumed " << cnt << endl;
|
|
return NULL;
|
|
}
|
|
|
|
void* tupleConsumer(void* arg)
|
|
{
|
|
ThreadParms* parms = reinterpret_cast<ThreadParms*>(arg);
|
|
// BucketDL<ElementType> *bdl = reinterpret_cast<BucketDL<ElementType> *>(arg);
|
|
TupleBucketDataList* tbdl = parms->tbdl;
|
|
int numBucketsToConsume = parms->count;
|
|
int threadNumber = parms->threadNumber;
|
|
int i;
|
|
// ElementType val;
|
|
TupleType val;
|
|
uint32_t it;
|
|
int bucketIndex;
|
|
|
|
int cnt = 0;
|
|
|
|
for (i = 0; i < numBucketsToConsume; i++)
|
|
{
|
|
bucketIndex = (threadNumber * numBucketsToConsume) + i;
|
|
it = tbdl->getIterator(bucketIndex);
|
|
|
|
while (tbdl->next(bucketIndex, it, &val))
|
|
cnt++;
|
|
}
|
|
|
|
cout << "consumer " << threadNumber << "consumed " << cnt << endl;
|
|
return NULL;
|
|
}
|
|
|
|
/** @brief class TupleHasher
|
|
*
|
|
*/
|
|
class TupleHasher1
|
|
{
|
|
private:
|
|
Hasher fHasher;
|
|
uint32_t fHashLen;
|
|
|
|
public:
|
|
TupleHasher1(uint32_t len) : fHashLen(len)
|
|
{
|
|
}
|
|
uint32_t operator()(const char* v) const
|
|
{
|
|
return fHasher(v, fHashLen);
|
|
}
|
|
/*
|
|
uint32_t operator()(const uint64_t v) const
|
|
{
|
|
return v;
|
|
}*/
|
|
};
|
|
|
|
/** @brief class TupleComparator
|
|
*
|
|
*/
|
|
class TupleComparator1
|
|
{
|
|
private:
|
|
uint32_t fCmpLen;
|
|
|
|
public:
|
|
TupleComparator1(uint32_t len) : fCmpLen(len)
|
|
{
|
|
}
|
|
bool operator()(const char* v1, const char* v2) const
|
|
{
|
|
return (memcmp(v1, v2, fCmpLen) == 0);
|
|
}
|
|
/*
|
|
bool operator()(const uint64_t v1, const uint64_t v2) const
|
|
{
|
|
return (v1 == v2);
|
|
}*/
|
|
};
|
|
|
|
void* aggregator(void* arg)
|
|
{
|
|
uint32_t fHashLen = 4;
|
|
TupleType tt;
|
|
tt.second = new char[fHashLen];
|
|
ThreadParms* parms = reinterpret_cast<ThreadParms*>(arg);
|
|
int threadNumber = parms->threadNumber;
|
|
struct timespec ts1, ts2, diff;
|
|
uint32_t size = 585938;
|
|
#if 0
|
|
//typedef std::vector<uint64_t> RIDVec;
|
|
typedef std::pair<int64_t, Elem> Results;
|
|
//typedef std::tr1::unordered_map<uint64_t, Results, TupleHasher, TupleComparator> shmp;
|
|
//typedef std::tr1::unordered_map<uint64_t, Results, TupleHasher, TupleComparator>::iterator TupleHMIter;
|
|
//typedef boost::shared_ptr<TupleHashMap> SHMP;
|
|
typedef std::tr1::unordered_multimap<uint64_t, Results> TupleHashMap;
|
|
typedef boost::shared_ptr<TupleHashMap> SHMP;
|
|
typedef std::tr1::unordered_multimap<uint64_t, Results>::iterator TupleHMIter;
|
|
typedef std::pair<TupleHMIter, TupleHMIter> HashItPair;
|
|
HashItPair hashItPair;
|
|
|
|
SHMP shmp(new TupleHashMap());
|
|
// TupleHashMap *shmp = new TupleHashMap();
|
|
TupleHMIter iter;
|
|
uint32_t val = 0;
|
|
uint64_t hv;
|
|
Hasher hasher;
|
|
bool flag = true;
|
|
|
|
for (uint32_t k = 0; k < NUM_BUCKETS / 4; k++)
|
|
{
|
|
clock_gettime(CLOCK_REALTIME, &ts1);
|
|
|
|
for (uint32_t j = 0; j < 4; j++)
|
|
{
|
|
for (uint32_t i = 0; i < size / 10; i++)
|
|
{
|
|
flag = true;
|
|
memcpy(tt.second, &i, 4);
|
|
hv = hasher(tt.second, fHashLen);
|
|
iter = shmp->find(hv);
|
|
|
|
if (iter != shmp->end())
|
|
{
|
|
hashItPair = shmp->equal_range(hv);
|
|
|
|
for (iter = hashItPair.first; iter != hashItPair.second; iter++)
|
|
{
|
|
if (memcmp(iter->second.second.hashStr, tt.second, fHashLen) == 0)
|
|
{
|
|
//cout << "real hit" << endl;
|
|
//updateAggResult<result_t>(tt, hashIt->second.first);
|
|
iter->second.second.rids.push_back(tt.first);
|
|
flag = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (flag)
|
|
{
|
|
Results rr;
|
|
rr.second.hashStr = new char[fHashLen];
|
|
//getAggResult<result_t>(tt, rr.first);
|
|
memcpy(rr.second.hashStr, tt.second, fHashLen);
|
|
rr.second.rids.push_back(tt.first);
|
|
shmp->insert(std::pair<uint64_t, Results> (hv, rr));
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
cout << "thread " << threadNumber << " vector " << k << " size=" << shmp->size() << endl;
|
|
|
|
// clean up hashmap memory
|
|
for (iter = shmp->begin(); iter != shmp->end(); iter++)
|
|
delete [] iter->second.second.hashStr;
|
|
|
|
shmp->clear();
|
|
// shmp.reset(new TupleHashMap());
|
|
clock_gettime(CLOCK_REALTIME, &ts2);
|
|
timespec_sub(ts1, ts2, diff);
|
|
cout << "thread " << threadNumber << "do aggregation took " << diff.tv_sec << "s " << diff.tv_nsec << " ns" << endl;
|
|
}
|
|
|
|
#endif
|
|
|
|
#if 0
|
|
typedef std::vector<uint64_t> RIDVec;
|
|
typedef std::pair<uint64_t, RIDVec> Results;
|
|
typedef std::tr1::unordered_map<char*, Results, TupleHasher1, TupleComparator1> TupleHashMap1;
|
|
typedef std::tr1::unordered_map<char*, Results, TupleHasher1, TupleComparator1>::iterator TupleHMIter1;
|
|
typedef boost::shared_ptr<TupleHashMap1> SHMP1;
|
|
|
|
SHMP1 shmp;
|
|
TupleHasher1 tupleHash(fHashLen);
|
|
TupleComparator1 tupleComp(fHashLen);
|
|
|
|
shmp.reset(new TupleHashMap1(1, tupleHash, tupleComp));
|
|
TupleHMIter1 iter;
|
|
Results rr;
|
|
|
|
uint32_t val = 0;
|
|
|
|
for (uint32_t k = 0; k < NUM_BUCKETS / 4; k++)
|
|
{
|
|
clock_gettime(CLOCK_REALTIME, &ts1);
|
|
|
|
for (uint32_t j = 0; j < 4; j++)
|
|
{
|
|
for (uint32_t i = 0; i < size / 10; i++)
|
|
{
|
|
memcpy(tt.second, &i, 4);
|
|
iter = shmp->find(tt.second);
|
|
|
|
if (iter == shmp->end())
|
|
{
|
|
rr.second.clear();
|
|
//getAggResult<result_t>(tt, rr.first);
|
|
rr.second.push_back(tt.first);
|
|
char* hashStr = new char[fHashLen];
|
|
memcpy(hashStr, tt.second, fHashLen);
|
|
shmp->insert(std::pair<char*, Results> (hashStr, rr));
|
|
}
|
|
else
|
|
{
|
|
//updateAggResult<result_t>(tt, hashIt->second.first);
|
|
iter->second.second.push_back(tt.first);
|
|
}
|
|
}
|
|
}
|
|
|
|
cout << "thread " << threadNumber << " hashmap " << k << " size=" << shmp->size() << endl;
|
|
|
|
// clean up hashmap memory
|
|
for (iter = shmp->begin(); iter != shmp->end(); iter++)
|
|
delete [] iter->first;
|
|
|
|
shmp->clear();
|
|
clock_gettime(CLOCK_REALTIME, &ts2);
|
|
timespec_sub(ts1, ts2, diff);
|
|
cout << "thread " << threadNumber << "do aggregation took " << diff.tv_sec << "s " << diff.tv_nsec << " ns" << endl;
|
|
}
|
|
|
|
#endif
|
|
|
|
#if 1
|
|
// typedef std::vector<uint64_t> RIDVec;
|
|
// typedef std::pair<uint64_t, RIDVec> Results;
|
|
typedef std::tr1::unordered_map<char*, int64_t, TupleHasher1, TupleComparator1> TupleHashMap1;
|
|
typedef std::tr1::unordered_map<char*, int64_t, TupleHasher1, TupleComparator1>::iterator TupleHMIter1;
|
|
typedef boost::shared_ptr<TupleHashMap1> SHMP1;
|
|
vector<TupleType> vt;
|
|
vt.reserve(size / 10 * 4);
|
|
|
|
SHMP1 shmp;
|
|
TupleHasher1 tupleHash(fHashLen);
|
|
TupleComparator1 tupleComp(fHashLen);
|
|
|
|
shmp.reset(new TupleHashMap1(1, tupleHash, tupleComp));
|
|
TupleHMIter1 iter;
|
|
int64_t rr;
|
|
|
|
uint32_t val = 0;
|
|
|
|
for (uint32_t k = 0; k < NUM_BUCKETS / 4; k++)
|
|
{
|
|
clock_gettime(CLOCK_REALTIME, &ts1);
|
|
|
|
for (uint32_t j = 0; j < 4; j++)
|
|
{
|
|
for (uint32_t i = 0; i < size / 10; i++)
|
|
{
|
|
memcpy(tt.second, &i, 4);
|
|
vt.push_back(tt);
|
|
|
|
iter = shmp->find(tt.second);
|
|
|
|
if (iter == shmp->end())
|
|
{
|
|
// rr.second.clear();
|
|
// getAggResult<result_t>(tt, rr.first);
|
|
// rr.second.push_back(tt.first);
|
|
char* hashStr = new char[fHashLen];
|
|
memcpy(hashStr, tt.second, fHashLen);
|
|
shmp->insert(std::pair<char*, int64_t>(hashStr, i));
|
|
}
|
|
else
|
|
{
|
|
// updateAggResult<result_t>(tt, hashIt->second.first);
|
|
// iter->second.second.push_back(tt.first);
|
|
iter->second += i;
|
|
}
|
|
}
|
|
}
|
|
|
|
cout << "thread " << threadNumber << " hashmap " << k << " size=" << shmp->size() << endl;
|
|
|
|
// loop through vector to find match in map for rid
|
|
vector<TupleType>::iterator it;
|
|
int val = 0;
|
|
|
|
for (it = vt.begin(); it != vt.end(); it++)
|
|
{
|
|
iter = shmp->find(it->second);
|
|
|
|
if (iter != shmp->end())
|
|
val++;
|
|
}
|
|
|
|
cout << "val = " << val << endl;
|
|
|
|
// clean up hashmap memory
|
|
for (iter = shmp->begin(); iter != shmp->end(); iter++)
|
|
delete[] iter->first;
|
|
|
|
shmp->clear();
|
|
vt.clear();
|
|
clock_gettime(CLOCK_REALTIME, &ts2);
|
|
timespec_sub(ts1, ts2, diff);
|
|
cout << "thread " << threadNumber << "do aggregation took " << diff.tv_sec << "s " << diff.tv_nsec
|
|
<< " ns" << endl;
|
|
}
|
|
|
|
#endif
|
|
}
|
|
|
|
void* TBDL_producer(void* arg)
|
|
{
|
|
ThreadParms* parms = reinterpret_cast<ThreadParms*>(arg);
|
|
TupleBucketDataList* tbdl = parms->tbdl;
|
|
TupleType t;
|
|
vector<TupleType> vt;
|
|
|
|
for (uint32_t i = 0; i < 150000000; i++)
|
|
{
|
|
t.first = i;
|
|
t.second = new char[4];
|
|
memcpy(t.second, &i, 4);
|
|
vt.push_back(t);
|
|
|
|
if (vt.size() == 8192)
|
|
{
|
|
tbdl->insert(vt);
|
|
vt.clear();
|
|
}
|
|
}
|
|
|
|
if (vt.size() > 0)
|
|
tbdl->insert(vt);
|
|
}
|
|
|
|
void* BDL_producer(void* arg)
|
|
{
|
|
ThreadParms* parms = reinterpret_cast<ThreadParms*>(arg);
|
|
BucketDL<ElementType>* bdl = parms->bdl;
|
|
TupleType t;
|
|
ElementType e;
|
|
vector<ElementType> vt;
|
|
|
|
for (uint32_t i = 0; i < 150000000; i++)
|
|
{
|
|
e.first = i;
|
|
e.second = i;
|
|
// t.second = new char[4];
|
|
// memcpy(t.second, &i, 4);
|
|
vt.push_back(e);
|
|
|
|
if (vt.size() == 8192)
|
|
{
|
|
bdl->insert(vt);
|
|
vt.clear();
|
|
}
|
|
}
|
|
|
|
if (vt.size() > 0)
|
|
bdl->insert(vt);
|
|
}
|
|
|
|
class AggDriver : public CppUnit::TestFixture
|
|
{
|
|
CPPUNIT_TEST_SUITE(AggDriver);
|
|
// CPPUNIT_TEST(aggFilter_group2);
|
|
// CPPUNIT_TEST(tuplewsdl);
|
|
// CPPUNIT_TEST(bucketdl);
|
|
// CPPUNIT_TEST(aggFilter_group1);
|
|
// CPPUNIT_TEST(hashjoin);
|
|
// CPPUNIT_TEST(hashmap_tuple);
|
|
// CPPUNIT_TEST(hashmap_tuple_multi);
|
|
// CPPUNIT_TEST(hashmap_ET);
|
|
// CPPUNIT_TEST(tuplewsdl_multi);
|
|
CPPUNIT_TEST(bdl_multi);
|
|
CPPUNIT_TEST_SUITE_END();
|
|
typedef boost::shared_ptr<SimpleFilter> SSFP;
|
|
|
|
private:
|
|
ResourceManager fRm;
|
|
|
|
public:
|
|
void setUp1(JobStepAssociation& in, JobStepAssociation& out)
|
|
{
|
|
// sleep(20);
|
|
// input - TupleBucket
|
|
AnyDataListSPtr adl1(new AnyDataList());
|
|
TupleBucketDataList* tbdl = new TupleBucketDataList(NUM_BUCKETS, numConsumers, MAX_SIZE, fRm);
|
|
tbdl->setMultipleProducers(0);
|
|
// tbdl->setElementMode(1);
|
|
|
|
adl1->tupleBucketDL(tbdl);
|
|
in.outAdd(adl1);
|
|
|
|
// output - ElementType Bucket
|
|
AnyDataListSPtr adl2(new AnyDataList());
|
|
BucketDataList* bdl = new BucketDataList(NUM_BUCKETS, numConsumers, MAX_ELEMENTS, fRm);
|
|
bdl->setElementMode(1);
|
|
adl2->bucketDL(bdl);
|
|
out.outAdd(adl2);
|
|
|
|
// prepare the input datalist
|
|
// ifstream ifs(datapath.c_str(), ios::in);
|
|
uint64_t l_orderkey;
|
|
uint64_t l_quantity;
|
|
char data[1000];
|
|
// char row[50];
|
|
char* tok;
|
|
uint64_t count = 0;
|
|
uint32_t rid = 0;
|
|
TupleType* tt = new TupleType();
|
|
tbdl->hashLen(sizeof(l_orderkey));
|
|
uint32_t size = sizeof(l_orderkey) + sizeof(l_quantity);
|
|
// tbdl->elementLen(size + sizeof(rid));
|
|
tbdl->elementLen(sizeof(rid), size);
|
|
// char* second = new char[size];
|
|
vector<TupleType> v;
|
|
|
|
timer1.start("input");
|
|
ifstream ifs(datapath.c_str(), ios::in);
|
|
|
|
while (!ifs.eof())
|
|
{
|
|
ifs.getline(data, 1000);
|
|
tok = strtok(data, "|");
|
|
count = 0;
|
|
|
|
while (tok != NULL)
|
|
{
|
|
if (count == 0)
|
|
l_orderkey = atol(tok);
|
|
|
|
if (count == 4)
|
|
l_quantity = atof(tok) * 100;
|
|
|
|
count++;
|
|
tok = strtok(NULL, "|");
|
|
}
|
|
|
|
for (int i = 0; i < numRuns; i++)
|
|
{
|
|
l_orderkey += 6000000 * i;
|
|
tt->first = rid;
|
|
tt->second = new char[size];
|
|
memcpy(tt->second, &l_orderkey, sizeof(l_orderkey));
|
|
memcpy(tt->second + sizeof(l_orderkey), &l_quantity, sizeof(l_quantity));
|
|
v.push_back(*tt);
|
|
|
|
if (v.size() == 2048)
|
|
{
|
|
tbdl->insert(v);
|
|
v.clear();
|
|
}
|
|
|
|
rid++;
|
|
}
|
|
|
|
if (rid % printInterval == 0)
|
|
cout << rid << " " << l_orderkey << " " << l_quantity << endl;
|
|
}
|
|
|
|
ifs.close();
|
|
|
|
if (v.size() > 0)
|
|
{
|
|
tbdl->insert(v);
|
|
// for (uint32_t i = 0; i < v.size(); i++)
|
|
// delete [] (v[i].second);
|
|
v.clear();
|
|
}
|
|
|
|
tbdl->endOfInput();
|
|
timer1.stop("input");
|
|
cout << "input size=" << tbdl->totalSize() << endl;
|
|
// ifs.close();
|
|
delete tt;
|
|
}
|
|
|
|
int64_t convertValueNum(const string& str, const CalpontSystemCatalog::ColType& ct, bool isNull)
|
|
{
|
|
// if (str.size() == 0 || isNull ) return valueNullNum(ct);
|
|
|
|
int64_t v = 0;
|
|
|
|
boost::any anyVal = DataConvert::convertColumnData(ct, str, false);
|
|
|
|
switch (ct.colDataType)
|
|
{
|
|
case CalpontSystemCatalog::BIT: v = boost::any_cast<bool>(anyVal); break;
|
|
|
|
case CalpontSystemCatalog::TINYINT: v = boost::any_cast<char>(anyVal); break;
|
|
|
|
case CalpontSystemCatalog::SMALLINT: v = boost::any_cast<int16_t>(anyVal); break;
|
|
|
|
case CalpontSystemCatalog::MEDINT:
|
|
case CalpontSystemCatalog::INT: v = boost::any_cast<int32_t>(anyVal); break;
|
|
|
|
case CalpontSystemCatalog::BIGINT: v = boost::any_cast<long long>(anyVal); break;
|
|
|
|
case CalpontSystemCatalog::FLOAT:
|
|
{
|
|
float i = boost::any_cast<float>(anyVal);
|
|
v = (Int64)i;
|
|
}
|
|
break;
|
|
|
|
case CalpontSystemCatalog::DOUBLE:
|
|
{
|
|
double i = boost::any_cast<double>(anyVal);
|
|
v = (Int64)i;
|
|
}
|
|
break;
|
|
|
|
case CalpontSystemCatalog::CHAR:
|
|
case CalpontSystemCatalog::VARCHAR:
|
|
case CalpontSystemCatalog::BLOB:
|
|
case CalpontSystemCatalog::CLOB:
|
|
{
|
|
// v = boost::any_cast<string>(anyVal);
|
|
string i = boost::any_cast<string>(anyVal);
|
|
v = *((Int64*)i.c_str());
|
|
}
|
|
|
|
break;
|
|
|
|
case CalpontSystemCatalog::DATE: v = boost::any_cast<uint32_t>(anyVal); break;
|
|
|
|
case CalpontSystemCatalog::DATETIME: v = boost::any_cast<uint64_t>(anyVal); break;
|
|
|
|
case CalpontSystemCatalog::DECIMAL: v = boost::any_cast<long long>(anyVal); break;
|
|
|
|
default: break;
|
|
}
|
|
|
|
return v;
|
|
}
|
|
|
|
void tuplewsdl_multi()
|
|
{
|
|
TupleBucketDataList* tbdl = new TupleBucketDataList(NUM_BUCKETS, 1, MAX_SIZE, fRm);
|
|
tbdl->setMultipleProducers(true);
|
|
tbdl->setElementMode(1); // RID_VALUE
|
|
tbdl->hashLen(4);
|
|
tbdl->elementLen(4, 12);
|
|
pthread_t producer[4];
|
|
ThreadParms producerThreadParms[4];
|
|
|
|
timer1.start("insert-tbdl");
|
|
|
|
for (uint32_t i = 0; i < 4; i++)
|
|
{
|
|
producerThreadParms[i].tbdl = tbdl;
|
|
producerThreadParms[i].threadNumber = i;
|
|
// producerThreadParms[i].count = ::count;
|
|
pthread_create(&producer[i], NULL, TBDL_producer, &producerThreadParms[i]);
|
|
}
|
|
|
|
for (uint32_t i = 0; i < 4; i++)
|
|
pthread_join(producer[i], NULL);
|
|
|
|
tbdl->endOfInput();
|
|
timer1.stop("insert-tbdl");
|
|
cout << "tbdl finish insert " << tbdl->totalSize() << endl;
|
|
// timer1.finish();
|
|
}
|
|
|
|
void bdl_multi()
|
|
{
|
|
BucketDL<ElementType>* bdl = new BucketDL<ElementType>(128, 1, MAX_ELEMENTS, fRm);
|
|
bdl->setMultipleProducers(true);
|
|
bdl->setElementMode(1); // RID_VALUE
|
|
bdl->setDiskElemSize(4, 4);
|
|
pthread_t producer[4];
|
|
ThreadParms producerThreadParms[4];
|
|
|
|
timer1.start("insert-bdl");
|
|
|
|
for (uint32_t i = 0; i < 4; i++)
|
|
{
|
|
producerThreadParms[i].bdl = bdl;
|
|
producerThreadParms[i].threadNumber = i;
|
|
// producerThreadParms[i].count = ::count;
|
|
pthread_create(&producer[i], NULL, BDL_producer, &producerThreadParms[i]);
|
|
}
|
|
|
|
for (uint32_t i = 0; i < 4; i++)
|
|
pthread_join(producer[i], NULL);
|
|
|
|
bdl->endOfInput();
|
|
timer1.stop("insert-bdl");
|
|
cout << "bdl finish insert " << bdl->totalSize() << endl;
|
|
timer1.finish();
|
|
}
|
|
|
|
void tuplewsdl()
|
|
{
|
|
TupleBucketDataList* tbdl = new TupleBucketDataList(NUM_BUCKETS, numConsumers, MAX_SIZE, fRm);
|
|
// prepare the input datalist
|
|
// ifstream ifs(datapath.c_str(), ios::in);
|
|
uint64_t l_orderkey, l_quantity;
|
|
char data[1000];
|
|
// char row[50];
|
|
char* tok;
|
|
uint64_t count = 0, rid = 0;
|
|
TupleType* tt = new TupleType();
|
|
tbdl->hashLen(sizeof(l_orderkey));
|
|
uint32_t size = sizeof(l_orderkey) + sizeof(l_quantity);
|
|
// tbdl->elementLen(size + sizeof(rid));
|
|
tbdl->elementLen(sizeof(rid), size);
|
|
// char* second = new char[size];
|
|
vector<TupleType> v;
|
|
|
|
timer1.start("tuple-input");
|
|
int i;
|
|
|
|
for (i = 0; i < numRuns; i++)
|
|
{
|
|
ifstream ifs(datapath.c_str(), ios::in);
|
|
|
|
while (!ifs.eof())
|
|
{
|
|
ifs.getline(data, 1000);
|
|
tok = strtok(data, "|");
|
|
count = 0;
|
|
|
|
while (tok != NULL)
|
|
{
|
|
if (count == 0)
|
|
l_orderkey = 6000000 * i + atol(tok);
|
|
|
|
if (count == 4)
|
|
l_quantity = atof(tok) * 100;
|
|
|
|
count++;
|
|
tok = strtok(NULL, "|");
|
|
}
|
|
|
|
tt->first = rid;
|
|
tt->second = new char[size];
|
|
memcpy(tt->second, &l_orderkey, sizeof(l_orderkey));
|
|
memcpy(tt->second + sizeof(l_orderkey), &l_quantity, sizeof(l_quantity));
|
|
v.push_back(*tt);
|
|
|
|
if (v.size() == 2048)
|
|
{
|
|
tbdl->insert(v);
|
|
|
|
for (uint32_t i = 0; i < v.size(); i++)
|
|
v[i].deleter();
|
|
|
|
v.clear();
|
|
}
|
|
|
|
rid++;
|
|
|
|
if (rid % printInterval == 0)
|
|
cout << rid << " " << l_orderkey << " " << l_quantity << endl;
|
|
}
|
|
|
|
ifs.close();
|
|
}
|
|
|
|
if (v.size() > 0)
|
|
{
|
|
tbdl->insert(v);
|
|
|
|
for (uint32_t i = 0; i < v.size(); i++)
|
|
delete[](v[i].second);
|
|
|
|
v.clear();
|
|
}
|
|
|
|
tbdl->endOfInput();
|
|
timer1.stop("tuple-input");
|
|
cout << "input size=" << tbdl->totalSize() << endl;
|
|
// ifs.close();
|
|
delete tt;
|
|
|
|
ThreadParms thparms;
|
|
thparms.count = NUM_BUCKETS / NUM_THREADS;
|
|
thparms.tbdl = tbdl;
|
|
pthread_t consumer[NUM_THREADS];
|
|
|
|
ThreadParms thParms[NUM_THREADS];
|
|
timer1.start("tuple-consumer");
|
|
|
|
for (i = 0; i < NUM_THREADS; i++)
|
|
{
|
|
thparms.threadNumber = i;
|
|
thParms[i] = thparms;
|
|
pthread_create(&consumer[i], NULL, tupleConsumer, &thParms[i]);
|
|
}
|
|
|
|
for (i = 0; i < NUM_THREADS; i++)
|
|
pthread_join(consumer[i], NULL);
|
|
|
|
timer1.stop("tuple-consumer");
|
|
|
|
timer1.finish();
|
|
delete tbdl;
|
|
}
|
|
|
|
void aggFilter_group1()
|
|
{
|
|
/*
|
|
int tm;
|
|
for (int m = 0; m < 1024; m++)
|
|
{
|
|
char *a = new char[5*1024*1024];
|
|
memset(a,0,5*1024*1024);
|
|
memcpy(&tm, a, 4);
|
|
}
|
|
*/
|
|
// for (int i = 0; i < 10; i++) {
|
|
JobStepAssociation in, out;
|
|
setUp1(in, out);
|
|
CalpontSystemCatalog::TableName tn("test", "lineitem");
|
|
string s_l_orderkey = "test.lineitem.l_orderkey";
|
|
string s_l_quantity = "test.lineitem.l_quantity";
|
|
uint32_t sessionid = 1;
|
|
uint32_t txnId = 1;
|
|
uint32_t verId = 1;
|
|
uint16_t stepID = 0;
|
|
uint32_t statementID = 1;
|
|
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionid);
|
|
|
|
SRCP srcp;
|
|
SimpleColumn* l_orderkey = new SimpleColumn(s_l_orderkey, sessionid);
|
|
SimpleColumn* l_quantity = new SimpleColumn(s_l_quantity, sessionid);
|
|
CalpontSystemCatalog::ROPair ropair = csc->tableRID(tn);
|
|
CalpontSystemCatalog::OID tableoid = ropair.objnum;
|
|
|
|
// sum(l_quantity) group by l_ordereky
|
|
srcp.reset(l_quantity);
|
|
AggregateColumn* ac = new AggregateColumn("sum", srcp.get()->clone(), sessionid);
|
|
|
|
ac->addProjectCol(srcp);
|
|
srcp.reset(l_orderkey);
|
|
ac->addGroupByCol(srcp);
|
|
|
|
// constant column 318
|
|
ConstantColumn* cc = new ConstantColumn(318, ConstantColumn::NUM);
|
|
// >
|
|
Operator* op = new Operator(">");
|
|
SOP sop(op);
|
|
// sum(l_quantity) > 318
|
|
SSFP ssfp;
|
|
SimpleFilter* sf = new SimpleFilter(sop, ac, cc);
|
|
ssfp.reset(sf);
|
|
SJSTEP step;
|
|
|
|
AggregateFilterStep* afs = new AggregateFilterStep(in, out, ac->functionName(), ac->groupByColList(),
|
|
ac->projectColList(), ac->functionParms(), tableoid,
|
|
sessionid, txnId, verId, stepID, statementID, fRm);
|
|
|
|
// output column l_orderkey;
|
|
afs->outputCol(dynamic_cast<SimpleColumn*>(srcp.get())->oid());
|
|
step.reset(afs);
|
|
// one column case, nomalize filter value
|
|
CalpontSystemCatalog::ColType ct;
|
|
int64_t intVal;
|
|
string strVal;
|
|
|
|
// int8_t cop = op2num(sop);
|
|
if (typeid((*ac->functionParms().get())) == typeid(SimpleColumn))
|
|
{
|
|
SimpleColumn* sc = reinterpret_cast<SimpleColumn*>(ac->functionParms().get());
|
|
ct = csc->colType(sc->oid());
|
|
intVal = convertValueNum(cc->constval(), ct, false);
|
|
afs->addFilter(COMPARE_GT, intVal);
|
|
}
|
|
else
|
|
{
|
|
if (cc->type() == ConstantColumn::NUM)
|
|
{
|
|
intVal = atol(cc->constval().c_str());
|
|
afs->addFilter(COMPARE_GT, intVal, false);
|
|
}
|
|
else if (cc->type() == ConstantColumn::LITERAL)
|
|
afs->addFilter(COMPARE_GT, cc->constval(), false);
|
|
}
|
|
|
|
timer1.start("agg");
|
|
afs->run();
|
|
afs->join();
|
|
timer1.stop("agg");
|
|
timer1.finish();
|
|
//}
|
|
}
|
|
|
|
void bucketdl()
|
|
{
|
|
BucketDL<ElementType>* bdl = new BucketDL<ElementType>(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm);
|
|
bdl->setElementMode(1);
|
|
ElementType* et = new ElementType();
|
|
|
|
uint64_t l_orderkey, l_quantity;
|
|
char data[1000];
|
|
char* tok;
|
|
uint64_t count = 0, rid = 0;
|
|
TupleType* tt = new TupleType();
|
|
|
|
vector<ElementType> v;
|
|
timer1.start("bucket-input");
|
|
int i;
|
|
|
|
for (i = 0; i < numRuns; i++)
|
|
{
|
|
ifstream ifs(datapath.c_str(), ios::in);
|
|
|
|
while (!ifs.eof())
|
|
{
|
|
ifs.getline(data, 1000);
|
|
tok = strtok(data, "|");
|
|
count = 0;
|
|
|
|
while (tok != NULL)
|
|
{
|
|
if (count == 0)
|
|
l_orderkey = 6000000 * i + atol(tok);
|
|
|
|
if (count == 4)
|
|
l_quantity = atof(tok) * 100;
|
|
|
|
count++;
|
|
tok = strtok(NULL, "|");
|
|
}
|
|
|
|
et->first = rid;
|
|
et->second = rid;
|
|
// tt->second = new char[16];
|
|
// memcpy(tt->second, &l_orderkey, sizeof(l_orderkey));
|
|
// memcpy(tt->second+sizeof(l_orderkey), &l_quantity, sizeof(l_quantity));
|
|
v.push_back(*et);
|
|
|
|
if (v.size() == 2048)
|
|
{
|
|
bdl->insert(v);
|
|
v.clear();
|
|
}
|
|
|
|
rid++;
|
|
|
|
// delete [] tt->second;
|
|
if (rid % printInterval == 0)
|
|
cout << rid << " " << l_orderkey << " " << l_quantity << endl;
|
|
}
|
|
|
|
ifs.close();
|
|
}
|
|
|
|
if (v.size() > 0)
|
|
{
|
|
bdl->insert(v);
|
|
v.clear();
|
|
}
|
|
|
|
bdl->endOfInput();
|
|
timer1.stop("bucket-input");
|
|
cout << "input size=" << bdl->totalSize() << endl;
|
|
delete tt;
|
|
delete et;
|
|
|
|
ThreadParms thparms;
|
|
thparms.count = NUM_BUCKETS / NUM_THREADS;
|
|
thparms.bdl = bdl;
|
|
pthread_t consumer[NUM_THREADS];
|
|
|
|
ThreadParms thParms[NUM_THREADS];
|
|
timer1.start("bucket-consumer");
|
|
|
|
for (i = 0; i < NUM_THREADS; i++)
|
|
{
|
|
thparms.threadNumber = i;
|
|
thParms[i] = thparms;
|
|
pthread_create(&consumer[i], NULL, bucketConsumer, &thParms[i]);
|
|
}
|
|
|
|
for (i = 0; i < NUM_THREADS; i++)
|
|
pthread_join(consumer[i], NULL);
|
|
|
|
timer1.stop("bucket-consumer");
|
|
|
|
timer1.finish();
|
|
delete bdl;
|
|
}
|
|
|
|
void aggFilter_group2()
|
|
{
|
|
CalpontSystemCatalog::TableName tn("test", "lineitem");
|
|
string s_l_orderkey = "test.lineitem.l_orderkey";
|
|
string s_l_quantity = "test.lineitem.l_quantity";
|
|
string s_l_partkey = "test.lineitem.l_partkey";
|
|
uint32_t sessionid = 1;
|
|
uint32_t txnId = 1;
|
|
uint32_t verId = 1;
|
|
uint16_t stepID = 0;
|
|
uint32_t statementID = 1;
|
|
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionid);
|
|
|
|
SRCP srcp;
|
|
SimpleColumn* l_orderkey = new SimpleColumn(s_l_orderkey, sessionid);
|
|
SimpleColumn* l_quantity = new SimpleColumn(s_l_quantity, sessionid);
|
|
SimpleColumn* l_partkey = new SimpleColumn(s_l_partkey, sessionid);
|
|
CalpontSystemCatalog::ROPair ropair = csc->tableRID(tn);
|
|
CalpontSystemCatalog::OID tableoid = ropair.objnum;
|
|
|
|
// sum(l_quantity) group by l_ordereky
|
|
AggregateColumn* ac = new AggregateColumn("sum", l_quantity, sessionid);
|
|
srcp.reset(l_quantity);
|
|
ac->addProjectCol(srcp);
|
|
srcp.reset(l_orderkey);
|
|
ac->addGroupByCol(srcp);
|
|
srcp.reset(l_partkey);
|
|
ac->addGroupByCol(srcp);
|
|
|
|
// constant column 318
|
|
ConstantColumn* cc = new ConstantColumn(318, ConstantColumn::NUM);
|
|
// >
|
|
Operator* op = new Operator(">");
|
|
SOP sop(op);
|
|
// sum(l_quantity) > 318
|
|
SimpleFilter* sf = new SimpleFilter(sop, ac, cc);
|
|
|
|
// aggregate filter step
|
|
/** @brief Constructor */
|
|
JobStepAssociation in;
|
|
JobStepAssociation out;
|
|
AggregateFilterStep* afs = new AggregateFilterStep(in, out, ac->functionName(), ac->groupByColList(),
|
|
ac->projectColList(), ac->functionParms(), tableoid,
|
|
sessionid, txnId, verId, stepID, statementID, fRm);
|
|
afs->run();
|
|
afs->join();
|
|
}
|
|
|
|
void hashjoin()
|
|
{
|
|
BucketDL<ElementType> A(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm);
|
|
BucketDL<ElementType> B(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm);
|
|
BucketDL<ElementType> C(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm);
|
|
BucketDL<ElementType> D(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm);
|
|
A.setDiskElemSize(4, 4);
|
|
B.setDiskElemSize(4, 4);
|
|
A.setElementMode(1);
|
|
A.setHashMode(1);
|
|
B.setElementMode(1);
|
|
B.setHashMode(1);
|
|
int i;
|
|
timer1.start("insert");
|
|
|
|
for (i = 0; i < 6000000 * numRuns; i++)
|
|
A.insert(ElementType(i, i));
|
|
|
|
A.endOfInput();
|
|
|
|
for (i = 0; i < 1500000 * numRuns; i++)
|
|
B.insert(ElementType(i, i + 1000000));
|
|
|
|
B.endOfInput();
|
|
timer1.stop("insert");
|
|
C.setElementMode(1);
|
|
D.setElementMode(1);
|
|
BDLWrapper<ElementType> setA(&A);
|
|
BDLWrapper<ElementType> setB(&B);
|
|
DataList<ElementType>* resultA(&C);
|
|
DataList<ElementType>* resultB(&D);
|
|
timer1.start("hashjoin");
|
|
HashJoin<ElementType>* hj = new HashJoin<ElementType>(setA, setB, resultA, resultB, INNER, &dlTimes);
|
|
hj->performJoin();
|
|
timer1.stop("hashjoin");
|
|
timer1.finish();
|
|
cout << "hash join output size: " << resultA->totalSize() << "/" << resultB->totalSize() << endl;
|
|
}
|
|
|
|
typedef struct Elem
|
|
{
|
|
char* hashStr;
|
|
// int64_t result;
|
|
vector<uint64_t> rids;
|
|
} Elem;
|
|
|
|
void hashmap_tuple()
|
|
{
|
|
int tm;
|
|
|
|
for (int m = 0; m < 1024; m++)
|
|
{
|
|
char* a = new char[5 * 1024 * 1024];
|
|
memset(a, 0, 5 * 1024 * 1024);
|
|
memcpy(&tm, a, 4);
|
|
}
|
|
|
|
uint32_t fHashLen = 4;
|
|
TupleType tt;
|
|
tt.second = new char[fHashLen];
|
|
uint64_t size = 585937;
|
|
uint64_t i = 0, j = 0;
|
|
#if 1
|
|
pthread_t agg[NUM_THREADS];
|
|
ThreadParms thParms[NUM_THREADS];
|
|
ThreadParms thParm;
|
|
struct timespec ts1, ts2, diff;
|
|
|
|
clock_gettime(CLOCK_REALTIME, &ts1);
|
|
|
|
for (i = 0; i < NUM_THREADS; i++)
|
|
{
|
|
thParm.threadNumber = i;
|
|
thParms[i] = thParm;
|
|
pthread_create(&agg[i], NULL, aggregator, &thParms[i]);
|
|
}
|
|
|
|
for (i = 0; i < NUM_THREADS; i++)
|
|
pthread_join(agg[i], NULL);
|
|
|
|
clock_gettime(CLOCK_REALTIME, &ts2);
|
|
timespec_sub(ts1, ts2, diff);
|
|
cout << "aggregation took " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
|
|
#endif
|
|
#if 0
|
|
//typedef std::vector<uint64_t> RIDVec;
|
|
typedef std::pair<int64_t, Elem> Results;
|
|
//typedef std::tr1::unordered_map<uint64_t, Results, TupleHasher, TupleComparator> shmp;
|
|
//typedef std::tr1::unordered_map<uint64_t, Results, TupleHasher, TupleComparator>::iterator TupleHMIter;
|
|
//typedef boost::shared_ptr<TupleHashMap> SHMP;
|
|
typedef std::tr1::unordered_multimap<uint64_t, Results> TupleHashMap;
|
|
typedef boost::shared_ptr<TupleHashMap> SHMP;
|
|
typedef std::tr1::unordered_multimap<uint64_t, Results>::iterator TupleHMIter;
|
|
typedef std::pair<TupleHMIter, TupleHMIter> HashItPair;
|
|
HashItPair hashItPair;
|
|
|
|
SHMP shmp(new TupleHashMap());
|
|
//TupleHashMap *shmp = new TupleHashMap();
|
|
TupleHMIter iter;
|
|
uint32_t val = 0;
|
|
uint64_t hv;
|
|
Elem elem;
|
|
Results rr;
|
|
Hasher hasher;
|
|
bool flag = true;
|
|
timer1.start("vectormap");
|
|
|
|
for (int k = 0; k < 1; k++)
|
|
{
|
|
for (j = 0; j < 4; j++)
|
|
{
|
|
for (i = 0; i < size / 10; i++)
|
|
{
|
|
flag = true;
|
|
memcpy(tt.second, &i, 4);
|
|
hv = hasher(tt.second, fHashLen);
|
|
iter = shmp->find(hv);
|
|
|
|
if (iter != shmp->end())
|
|
{
|
|
hashItPair = shmp->equal_range(hv);
|
|
|
|
for (iter = hashItPair.first; iter != hashItPair.second; iter++)
|
|
{
|
|
if (memcmp(iter->second.second.hashStr, tt.second, fHashLen) == 0)
|
|
{
|
|
//cout << "real hit" << endl;
|
|
//updateAggResult<result_t>(tt, hashIt->second.first);
|
|
iter->second.second.rids.push_back(tt.first);
|
|
flag = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (flag)
|
|
{
|
|
rr.second.hashStr = new char[fHashLen];
|
|
memcpy(rr.second.hashStr, tt.second, fHashLen);
|
|
rr.second.rids.clear();
|
|
rr.second.rids.push_back(tt.first);
|
|
shmp->insert(std::pair<uint64_t, Results> (hv, rr));
|
|
}
|
|
}
|
|
}
|
|
|
|
cout << "vector size=" << shmp->size() << endl;
|
|
|
|
// clean up hashmap memory
|
|
for (iter = shmp->begin(); iter != shmp->end(); iter++)
|
|
delete [] iter->second.second.hashStr;
|
|
|
|
shmp->clear();
|
|
}
|
|
|
|
timer1.stop("vectormap");
|
|
timer1.finish();
|
|
|
|
#endif
|
|
#if 0
|
|
typedef std::vector<uint64_t> RIDVec;
|
|
typedef std::pair<uint64_t, RIDVec> Results;
|
|
typedef std::tr1::unordered_map<char*, Results, TupleHasher, TupleComparator> TupleHashMap1;
|
|
typedef std::tr1::unordered_map<char*, Results, TupleHasher, TupleComparator>::iterator TupleHMIter1;
|
|
typedef boost::shared_ptr<TupleHashMap1> SHMP1;
|
|
|
|
SHMP1 shmp;
|
|
TupleHasher tupleHash(fHashLen);
|
|
TupleComparator tupleComp(fHashLen);
|
|
|
|
shmp.reset(new TupleHashMap1(1, tupleHash, tupleComp));
|
|
TupleHMIter1 iter;
|
|
Results rr;
|
|
|
|
uint32_t val = 0;
|
|
timer1.start("uniquemap");
|
|
|
|
for (j = 0; j < 4; j++)
|
|
for (i = 0; i < size / 10; i++)
|
|
{
|
|
memcpy(tt.second, &i, 4);
|
|
iter = shmp->find(tt.second);
|
|
|
|
if (iter == shmp->end())
|
|
{
|
|
rr.second.clear();
|
|
//getAggResult<result_t>(tt, rr.first);
|
|
rr.second.push_back(tt.first);
|
|
char* hashStr = new char[fHashLen];
|
|
memcpy(hashStr, tt.second, fHashLen);
|
|
shmp->insert(std::pair<char*, Results> (hashStr, rr));
|
|
}
|
|
else
|
|
{
|
|
//updateAggResult<result_t>(tt, hashIt->second.first);
|
|
iter->second.second.push_back(tt.first);
|
|
}
|
|
}
|
|
|
|
timer1.stop("uniquemap");
|
|
timer1.finish();
|
|
cout << "hashmap size=" << shmp->size() << endl;
|
|
|
|
// clean up hashmap memory
|
|
for (iter = shmp->begin(); iter != shmp->end(); iter++)
|
|
delete [] iter->first;
|
|
|
|
shmp->clear();
|
|
#endif
|
|
}
|
|
|
|
#if 0
|
|
void hashmap_tuple_multi()
|
|
{
|
|
uint32_t fHashLen = 4;
|
|
TupleHasher tupleHash(fHashLen);
|
|
TupleComparator tupleComp(fHashLen);
|
|
TupleType tt;
|
|
|
|
//typedef std::vector<uint64_t> RIDVec;
|
|
typedef std::pair<uint64_t, uint64_t> Results;
|
|
typedef std::tr1::unordered_multimap<char*, Results, TupleHasher, TupleComparator> TupleHashMap;
|
|
typedef std::tr1::unordered_multimap<char*, Results, TupleHasher, TupleComparator>::iterator TupleHMIter;
|
|
typedef boost::shared_ptr<TupleHashMap> SHMP;
|
|
std::pair<TupleHMIter, TupleHMIter> hashItPair;
|
|
|
|
SHMP shmp;
|
|
shmp.reset(new TupleHashMap(1, tupleHash, tupleComp));
|
|
TupleHMIter iter;
|
|
Results rr;
|
|
Hasher fHasher;
|
|
|
|
timer1.start("multi_insert");
|
|
uint32_t i = 0, j = 0;
|
|
uint32_t val = 0;
|
|
|
|
for ( j = 0; j < 4; j++)
|
|
for ( i = 0; i < 1171875; i++)
|
|
{
|
|
tt.second = new char[fHashLen];
|
|
memcpy(tt.second, &i, fHashLen);
|
|
rr.first = j;
|
|
rr.second = i;
|
|
shmp->insert(std::pair<char*, Results> (tt.second, rr));
|
|
}
|
|
|
|
timer1.stop("multi_insert");
|
|
cout << "hashmap size=" << shmp->size() << endl;
|
|
|
|
tt.second = new char[fHashLen];
|
|
timer1.start("multi_result");
|
|
|
|
iter = shmp->begin();
|
|
char* key;
|
|
|
|
if (iter != shmp->end())
|
|
key = iter->first;
|
|
|
|
uint32_t ct = 0;
|
|
|
|
for (iter = ++iter; iter != shmp->end(); iter++)
|
|
{
|
|
if (memcmp(iter->first, key, 4) == 0)
|
|
val += iter->second.first;
|
|
else
|
|
{
|
|
key = iter->first;
|
|
val = iter->second.first;
|
|
ct++;
|
|
}
|
|
}
|
|
|
|
timer1.stop("multi_result");
|
|
|
|
timer1.start("base");
|
|
|
|
for ( j = 0; j < 4; j++)
|
|
for ( i = 0; i < 1171875; i++)
|
|
{
|
|
tt.second = new char[fHashLen];
|
|
memcpy(tt.second, &i, fHashLen);
|
|
}
|
|
|
|
timer1.stop("base");
|
|
timer1.finish();
|
|
cout << "group val=" << val << endl;
|
|
|
|
// clean up hashmap memory
|
|
for (iter = shmp->begin(); iter != shmp->end(); iter++)
|
|
delete [] iter->first;
|
|
}
|
|
#endif
|
|
void hashmap_ET()
|
|
{
|
|
typedef std::list<ElementType> hashList_t;
|
|
typedef std::list<ElementType>::iterator hashListIter_t;
|
|
|
|
// typedef std::tr1::unordered_multimap<ElementType::second_type, ElementType::first_type> hash_t;
|
|
// typedef std::tr1::unordered_multimap<ElementType::second_type, ElementType::first_type>::iterator
|
|
// hashIter_t; typedef std::tr1::unordered_multimap<ElementType::second_type,
|
|
// ElementType::first_type>::value_type hashPair_t;
|
|
|
|
typedef std::tr1::unordered_map<ElementType::second_type, hashList_t> hash_t;
|
|
typedef std::tr1::unordered_map<ElementType::second_type, hashList_t>::iterator hashIter_t;
|
|
typedef std::tr1::unordered_map<ElementType::second_type, hashList_t>::value_type hashPair_t;
|
|
|
|
hash_t hashmap;
|
|
hashIter_t it;
|
|
hashList_t list;
|
|
|
|
timer1.start("et_insert");
|
|
ElementType et;
|
|
|
|
// while (true)
|
|
for (int i = 0; i < 1200000; i++)
|
|
{
|
|
list.push_back(et);
|
|
et.second = i;
|
|
hashmap.insert(hashPair_t(et.second, list));
|
|
list.clear();
|
|
}
|
|
|
|
timer1.stop("et_insert");
|
|
cout << "hashmap size=" << hashmap.size() << endl;
|
|
|
|
int val = 0, i, j;
|
|
timer1.start("et_find");
|
|
|
|
for (j = 0; j < 4; j++)
|
|
for (i = 0; i < 1200000; i++)
|
|
{
|
|
et.second = i;
|
|
it = hashmap.find(i);
|
|
|
|
if (it == hashmap.end())
|
|
{
|
|
list.clear();
|
|
// getAggResult<result_t>(tt, rr.first);
|
|
list.push_back(et);
|
|
hashmap.insert(hashPair_t(et.second, list));
|
|
}
|
|
else
|
|
{
|
|
it->second.push_back(et);
|
|
}
|
|
}
|
|
|
|
timer1.stop("et_find");
|
|
timer1.finish();
|
|
cout << "hashmap size=" << hashmap.size() << endl;
|
|
}
|
|
};
|
|
|
|
CPPUNIT_TEST_SUITE_REGISTRATION(AggDriver);
|
|
|
|
int main(int argc, char** argv)
|
|
{
|
|
CppUnit::TextUi::TestRunner runner;
|
|
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
|
|
runner.addTest(registry.makeTest());
|
|
bool wasSuccessful = runner.run("", false);
|
|
return (wasSuccessful ? 0 : 1);
|
|
}
|