/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: tdriver-agg.cpp 9210 2013-01-21 14:10:42Z rdempsey $ * * ***********************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include "fifo.h" #include "wsdl.h" #include "constantdatalist.h" #include "bucketdl.h" #include "elementtype.h" #include "zdl.h" #include "stopwatch.cpp" #include "jobstep.h" //#include "aggregator.h" #include "constantcolumn.h" #include "simplefilter.h" #include "aggregatecolumn.h" #include "simplecolumn.h" #include "dataconvert.h" using namespace dataconvert; // #undef CPPUNIT_ASSERT // #define CPPUNIT_ASSERT(x) using namespace std; using namespace joblist; using namespace execplan; Stopwatch timer1; const uint32_t NUM_BUCKETS = 256; const uint32_t MAX_SIZE = 0x100000; const uint32_t MAX_ELEMENTS = 0x20000; const uint32_t NUM_THREADS = 4; const string datapath = "/home/zzhu/genii/tools/dbbuilder/lineitem.tbl"; // const string datapath="/usr/local/mariadb/columnstore/bin/lineitem.tbl"; int numConsumers = 1; int numRuns = 1; int printInterval = numRuns * 100000; JSTimeStamp dlTimes; void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, struct timespec& diff) { if (tv2.tv_nsec < tv1.tv_nsec) { diff.tv_sec = tv2.tv_sec - tv1.tv_sec - 1; diff.tv_nsec = tv1.tv_nsec - tv2.tv_nsec; } else { diff.tv_sec = tv2.tv_sec - tv1.tv_sec; diff.tv_nsec = tv2.tv_nsec - tv1.tv_nsec; } } struct ThreadParms { BucketDL* bdl; TupleBucketDataList* tbdl; int count; int threadNumber; }; void* bucketConsumer(void* arg) { ThreadParms* parms = reinterpret_cast(arg); // BucketDL *bdl = reinterpret_cast *>(arg); BucketDL* bdl = parms->bdl; int numBucketsToConsume = parms->count; int threadNumber = parms->threadNumber; int64_t i; ElementType val; uint32_t it; int bucketIndex; int cnt = 0; for (i = 0; i < numBucketsToConsume; i++) { bucketIndex = (threadNumber * numBucketsToConsume) + i; it = bdl->getIterator(bucketIndex); while (bdl->next(bucketIndex, it, &val)) cnt++; } cout << "consumer " << threadNumber << "consumed " << cnt << endl; return NULL; } void* tupleConsumer(void* arg) { ThreadParms* parms = reinterpret_cast(arg); // BucketDL *bdl = reinterpret_cast *>(arg); TupleBucketDataList* tbdl = parms->tbdl; int numBucketsToConsume = parms->count; int threadNumber = parms->threadNumber; int i; // ElementType val; TupleType val; uint32_t it; int bucketIndex; int cnt = 0; for (i = 0; i < numBucketsToConsume; i++) { bucketIndex = (threadNumber * numBucketsToConsume) + i; it = tbdl->getIterator(bucketIndex); while (tbdl->next(bucketIndex, it, &val)) cnt++; } cout << "consumer " << threadNumber << "consumed " << cnt << endl; return NULL; } /** @brief class TupleHasher * */ class TupleHasher1 { private: Hasher fHasher; uint32_t fHashLen; public: TupleHasher1(uint32_t len) : fHashLen(len) { } uint32_t operator()(const char* v) const { return fHasher(v, fHashLen); } /* uint32_t operator()(const uint64_t v) const { return v; }*/ }; /** @brief class TupleComparator * */ class TupleComparator1 { private: uint32_t fCmpLen; public: TupleComparator1(uint32_t len) : fCmpLen(len) { } bool operator()(const char* v1, const char* v2) const { return (memcmp(v1, v2, fCmpLen) == 0); } /* bool operator()(const uint64_t v1, const uint64_t v2) const { return (v1 == v2); }*/ }; void* aggregator(void* arg) { uint32_t fHashLen = 4; TupleType tt; tt.second = new char[fHashLen]; ThreadParms* parms = reinterpret_cast(arg); int threadNumber = parms->threadNumber; struct timespec ts1, ts2, diff; uint32_t size = 585938; #if 0 //typedef std::vector RIDVec; typedef std::pair Results; //typedef std::tr1::unordered_map shmp; //typedef std::tr1::unordered_map::iterator TupleHMIter; //typedef boost::shared_ptr SHMP; typedef std::tr1::unordered_multimap TupleHashMap; typedef boost::shared_ptr SHMP; typedef std::tr1::unordered_multimap::iterator TupleHMIter; typedef std::pair HashItPair; HashItPair hashItPair; SHMP shmp(new TupleHashMap()); // TupleHashMap *shmp = new TupleHashMap(); TupleHMIter iter; uint32_t val = 0; uint64_t hv; Hasher hasher; bool flag = true; for (uint32_t k = 0; k < NUM_BUCKETS / 4; k++) { clock_gettime(CLOCK_REALTIME, &ts1); for (uint32_t j = 0; j < 4; j++) { for (uint32_t i = 0; i < size / 10; i++) { flag = true; memcpy(tt.second, &i, 4); hv = hasher(tt.second, fHashLen); iter = shmp->find(hv); if (iter != shmp->end()) { hashItPair = shmp->equal_range(hv); for (iter = hashItPair.first; iter != hashItPair.second; iter++) { if (memcmp(iter->second.second.hashStr, tt.second, fHashLen) == 0) { //cout << "real hit" << endl; //updateAggResult(tt, hashIt->second.first); iter->second.second.rids.push_back(tt.first); flag = false; break; } } } if (flag) { Results rr; rr.second.hashStr = new char[fHashLen]; //getAggResult(tt, rr.first); memcpy(rr.second.hashStr, tt.second, fHashLen); rr.second.rids.push_back(tt.first); shmp->insert(std::pair (hv, rr)); } } } cout << "thread " << threadNumber << " vector " << k << " size=" << shmp->size() << endl; // clean up hashmap memory for (iter = shmp->begin(); iter != shmp->end(); iter++) delete [] iter->second.second.hashStr; shmp->clear(); // shmp.reset(new TupleHashMap()); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "thread " << threadNumber << "do aggregation took " << diff.tv_sec << "s " << diff.tv_nsec << " ns" << endl; } #endif #if 0 typedef std::vector RIDVec; typedef std::pair Results; typedef std::tr1::unordered_map TupleHashMap1; typedef std::tr1::unordered_map::iterator TupleHMIter1; typedef boost::shared_ptr SHMP1; SHMP1 shmp; TupleHasher1 tupleHash(fHashLen); TupleComparator1 tupleComp(fHashLen); shmp.reset(new TupleHashMap1(1, tupleHash, tupleComp)); TupleHMIter1 iter; Results rr; uint32_t val = 0; for (uint32_t k = 0; k < NUM_BUCKETS / 4; k++) { clock_gettime(CLOCK_REALTIME, &ts1); for (uint32_t j = 0; j < 4; j++) { for (uint32_t i = 0; i < size / 10; i++) { memcpy(tt.second, &i, 4); iter = shmp->find(tt.second); if (iter == shmp->end()) { rr.second.clear(); //getAggResult(tt, rr.first); rr.second.push_back(tt.first); char* hashStr = new char[fHashLen]; memcpy(hashStr, tt.second, fHashLen); shmp->insert(std::pair (hashStr, rr)); } else { //updateAggResult(tt, hashIt->second.first); iter->second.second.push_back(tt.first); } } } cout << "thread " << threadNumber << " hashmap " << k << " size=" << shmp->size() << endl; // clean up hashmap memory for (iter = shmp->begin(); iter != shmp->end(); iter++) delete [] iter->first; shmp->clear(); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "thread " << threadNumber << "do aggregation took " << diff.tv_sec << "s " << diff.tv_nsec << " ns" << endl; } #endif #if 1 // typedef std::vector RIDVec; // typedef std::pair Results; typedef std::tr1::unordered_map TupleHashMap1; typedef std::tr1::unordered_map::iterator TupleHMIter1; typedef boost::shared_ptr SHMP1; vector vt; vt.reserve(size / 10 * 4); SHMP1 shmp; TupleHasher1 tupleHash(fHashLen); TupleComparator1 tupleComp(fHashLen); shmp.reset(new TupleHashMap1(1, tupleHash, tupleComp)); TupleHMIter1 iter; int64_t rr; uint32_t val = 0; for (uint32_t k = 0; k < NUM_BUCKETS / 4; k++) { clock_gettime(CLOCK_REALTIME, &ts1); for (uint32_t j = 0; j < 4; j++) { for (uint32_t i = 0; i < size / 10; i++) { memcpy(tt.second, &i, 4); vt.push_back(tt); iter = shmp->find(tt.second); if (iter == shmp->end()) { // rr.second.clear(); // getAggResult(tt, rr.first); // rr.second.push_back(tt.first); char* hashStr = new char[fHashLen]; memcpy(hashStr, tt.second, fHashLen); shmp->insert(std::pair(hashStr, i)); } else { // updateAggResult(tt, hashIt->second.first); // iter->second.second.push_back(tt.first); iter->second += i; } } } cout << "thread " << threadNumber << " hashmap " << k << " size=" << shmp->size() << endl; // loop through vector to find match in map for rid vector::iterator it; int val = 0; for (it = vt.begin(); it != vt.end(); it++) { iter = shmp->find(it->second); if (iter != shmp->end()) val++; } cout << "val = " << val << endl; // clean up hashmap memory for (iter = shmp->begin(); iter != shmp->end(); iter++) delete[] iter->first; shmp->clear(); vt.clear(); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "thread " << threadNumber << "do aggregation took " << diff.tv_sec << "s " << diff.tv_nsec << " ns" << endl; } #endif } void* TBDL_producer(void* arg) { ThreadParms* parms = reinterpret_cast(arg); TupleBucketDataList* tbdl = parms->tbdl; TupleType t; vector vt; for (uint32_t i = 0; i < 150000000; i++) { t.first = i; t.second = new char[4]; memcpy(t.second, &i, 4); vt.push_back(t); if (vt.size() == 8192) { tbdl->insert(vt); vt.clear(); } } if (vt.size() > 0) tbdl->insert(vt); } void* BDL_producer(void* arg) { ThreadParms* parms = reinterpret_cast(arg); BucketDL* bdl = parms->bdl; TupleType t; ElementType e; vector vt; for (uint32_t i = 0; i < 150000000; i++) { e.first = i; e.second = i; // t.second = new char[4]; // memcpy(t.second, &i, 4); vt.push_back(e); if (vt.size() == 8192) { bdl->insert(vt); vt.clear(); } } if (vt.size() > 0) bdl->insert(vt); } class AggDriver : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(AggDriver); // CPPUNIT_TEST(aggFilter_group2); // CPPUNIT_TEST(tuplewsdl); // CPPUNIT_TEST(bucketdl); // CPPUNIT_TEST(aggFilter_group1); // CPPUNIT_TEST(hashjoin); // CPPUNIT_TEST(hashmap_tuple); // CPPUNIT_TEST(hashmap_tuple_multi); // CPPUNIT_TEST(hashmap_ET); // CPPUNIT_TEST(tuplewsdl_multi); CPPUNIT_TEST(bdl_multi); CPPUNIT_TEST_SUITE_END(); typedef boost::shared_ptr SSFP; private: ResourceManager fRm; public: void setUp1(JobStepAssociation& in, JobStepAssociation& out) { // sleep(20); // input - TupleBucket AnyDataListSPtr adl1(new AnyDataList()); TupleBucketDataList* tbdl = new TupleBucketDataList(NUM_BUCKETS, numConsumers, MAX_SIZE, fRm); tbdl->setMultipleProducers(0); // tbdl->setElementMode(1); adl1->tupleBucketDL(tbdl); in.outAdd(adl1); // output - ElementType Bucket AnyDataListSPtr adl2(new AnyDataList()); BucketDataList* bdl = new BucketDataList(NUM_BUCKETS, numConsumers, MAX_ELEMENTS, fRm); bdl->setElementMode(1); adl2->bucketDL(bdl); out.outAdd(adl2); // prepare the input datalist // ifstream ifs(datapath.c_str(), ios::in); uint64_t l_orderkey; uint64_t l_quantity; char data[1000]; // char row[50]; char* tok; uint64_t count = 0; uint32_t rid = 0; TupleType* tt = new TupleType(); tbdl->hashLen(sizeof(l_orderkey)); uint32_t size = sizeof(l_orderkey) + sizeof(l_quantity); // tbdl->elementLen(size + sizeof(rid)); tbdl->elementLen(sizeof(rid), size); // char* second = new char[size]; vector v; timer1.start("input"); ifstream ifs(datapath.c_str(), ios::in); while (!ifs.eof()) { ifs.getline(data, 1000); tok = strtok(data, "|"); count = 0; while (tok != NULL) { if (count == 0) l_orderkey = atol(tok); if (count == 4) l_quantity = atof(tok) * 100; count++; tok = strtok(NULL, "|"); } for (int i = 0; i < numRuns; i++) { l_orderkey += 6000000 * i; tt->first = rid; tt->second = new char[size]; memcpy(tt->second, &l_orderkey, sizeof(l_orderkey)); memcpy(tt->second + sizeof(l_orderkey), &l_quantity, sizeof(l_quantity)); v.push_back(*tt); if (v.size() == 2048) { tbdl->insert(v); v.clear(); } rid++; } if (rid % printInterval == 0) cout << rid << " " << l_orderkey << " " << l_quantity << endl; } ifs.close(); if (v.size() > 0) { tbdl->insert(v); // for (uint32_t i = 0; i < v.size(); i++) // delete [] (v[i].second); v.clear(); } tbdl->endOfInput(); timer1.stop("input"); cout << "input size=" << tbdl->totalSize() << endl; // ifs.close(); delete tt; } int64_t convertValueNum(const string& str, const CalpontSystemCatalog::ColType& ct, bool isNull) { // if (str.size() == 0 || isNull ) return valueNullNum(ct); int64_t v = 0; boost::any anyVal = DataConvert::convertColumnData(ct, str, false); switch (ct.colDataType) { case CalpontSystemCatalog::BIT: v = boost::any_cast(anyVal); break; case CalpontSystemCatalog::TINYINT: v = boost::any_cast(anyVal); break; case CalpontSystemCatalog::SMALLINT: v = boost::any_cast(anyVal); break; case CalpontSystemCatalog::MEDINT: case CalpontSystemCatalog::INT: v = boost::any_cast(anyVal); break; case CalpontSystemCatalog::BIGINT: v = boost::any_cast(anyVal); break; case CalpontSystemCatalog::FLOAT: { float i = boost::any_cast(anyVal); v = (Int64)i; } break; case CalpontSystemCatalog::DOUBLE: { double i = boost::any_cast(anyVal); v = (Int64)i; } break; case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::CLOB: { // v = boost::any_cast(anyVal); string i = boost::any_cast(anyVal); v = *((Int64*)i.c_str()); } break; case CalpontSystemCatalog::DATE: v = boost::any_cast(anyVal); break; case CalpontSystemCatalog::DATETIME: v = boost::any_cast(anyVal); break; case CalpontSystemCatalog::DECIMAL: v = boost::any_cast(anyVal); break; default: break; } return v; } void tuplewsdl_multi() { TupleBucketDataList* tbdl = new TupleBucketDataList(NUM_BUCKETS, 1, MAX_SIZE, fRm); tbdl->setMultipleProducers(true); tbdl->setElementMode(1); // RID_VALUE tbdl->hashLen(4); tbdl->elementLen(4, 12); pthread_t producer[4]; ThreadParms producerThreadParms[4]; timer1.start("insert-tbdl"); for (uint32_t i = 0; i < 4; i++) { producerThreadParms[i].tbdl = tbdl; producerThreadParms[i].threadNumber = i; // producerThreadParms[i].count = ::count; pthread_create(&producer[i], NULL, TBDL_producer, &producerThreadParms[i]); } for (uint32_t i = 0; i < 4; i++) pthread_join(producer[i], NULL); tbdl->endOfInput(); timer1.stop("insert-tbdl"); cout << "tbdl finish insert " << tbdl->totalSize() << endl; // timer1.finish(); } void bdl_multi() { BucketDL* bdl = new BucketDL(128, 1, MAX_ELEMENTS, fRm); bdl->setMultipleProducers(true); bdl->setElementMode(1); // RID_VALUE bdl->setDiskElemSize(4, 4); pthread_t producer[4]; ThreadParms producerThreadParms[4]; timer1.start("insert-bdl"); for (uint32_t i = 0; i < 4; i++) { producerThreadParms[i].bdl = bdl; producerThreadParms[i].threadNumber = i; // producerThreadParms[i].count = ::count; pthread_create(&producer[i], NULL, BDL_producer, &producerThreadParms[i]); } for (uint32_t i = 0; i < 4; i++) pthread_join(producer[i], NULL); bdl->endOfInput(); timer1.stop("insert-bdl"); cout << "bdl finish insert " << bdl->totalSize() << endl; timer1.finish(); } void tuplewsdl() { TupleBucketDataList* tbdl = new TupleBucketDataList(NUM_BUCKETS, numConsumers, MAX_SIZE, fRm); // prepare the input datalist // ifstream ifs(datapath.c_str(), ios::in); uint64_t l_orderkey, l_quantity; char data[1000]; // char row[50]; char* tok; uint64_t count = 0, rid = 0; TupleType* tt = new TupleType(); tbdl->hashLen(sizeof(l_orderkey)); uint32_t size = sizeof(l_orderkey) + sizeof(l_quantity); // tbdl->elementLen(size + sizeof(rid)); tbdl->elementLen(sizeof(rid), size); // char* second = new char[size]; vector v; timer1.start("tuple-input"); int i; for (i = 0; i < numRuns; i++) { ifstream ifs(datapath.c_str(), ios::in); while (!ifs.eof()) { ifs.getline(data, 1000); tok = strtok(data, "|"); count = 0; while (tok != NULL) { if (count == 0) l_orderkey = 6000000 * i + atol(tok); if (count == 4) l_quantity = atof(tok) * 100; count++; tok = strtok(NULL, "|"); } tt->first = rid; tt->second = new char[size]; memcpy(tt->second, &l_orderkey, sizeof(l_orderkey)); memcpy(tt->second + sizeof(l_orderkey), &l_quantity, sizeof(l_quantity)); v.push_back(*tt); if (v.size() == 2048) { tbdl->insert(v); for (uint32_t i = 0; i < v.size(); i++) v[i].deleter(); v.clear(); } rid++; if (rid % printInterval == 0) cout << rid << " " << l_orderkey << " " << l_quantity << endl; } ifs.close(); } if (v.size() > 0) { tbdl->insert(v); for (uint32_t i = 0; i < v.size(); i++) delete[](v[i].second); v.clear(); } tbdl->endOfInput(); timer1.stop("tuple-input"); cout << "input size=" << tbdl->totalSize() << endl; // ifs.close(); delete tt; ThreadParms thparms; thparms.count = NUM_BUCKETS / NUM_THREADS; thparms.tbdl = tbdl; pthread_t consumer[NUM_THREADS]; ThreadParms thParms[NUM_THREADS]; timer1.start("tuple-consumer"); for (i = 0; i < NUM_THREADS; i++) { thparms.threadNumber = i; thParms[i] = thparms; pthread_create(&consumer[i], NULL, tupleConsumer, &thParms[i]); } for (i = 0; i < NUM_THREADS; i++) pthread_join(consumer[i], NULL); timer1.stop("tuple-consumer"); timer1.finish(); delete tbdl; } void aggFilter_group1() { /* int tm; for (int m = 0; m < 1024; m++) { char *a = new char[5*1024*1024]; memset(a,0,5*1024*1024); memcpy(&tm, a, 4); } */ // for (int i = 0; i < 10; i++) { JobStepAssociation in, out; setUp1(in, out); CalpontSystemCatalog::TableName tn("test", "lineitem"); string s_l_orderkey = "test.lineitem.l_orderkey"; string s_l_quantity = "test.lineitem.l_quantity"; uint32_t sessionid = 1; uint32_t txnId = 1; uint32_t verId = 1; uint16_t stepID = 0; uint32_t statementID = 1; boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionid); SRCP srcp; SimpleColumn* l_orderkey = new SimpleColumn(s_l_orderkey, sessionid); SimpleColumn* l_quantity = new SimpleColumn(s_l_quantity, sessionid); CalpontSystemCatalog::ROPair ropair = csc->tableRID(tn); CalpontSystemCatalog::OID tableoid = ropair.objnum; // sum(l_quantity) group by l_ordereky srcp.reset(l_quantity); AggregateColumn* ac = new AggregateColumn("sum", srcp.get()->clone(), sessionid); ac->addProjectCol(srcp); srcp.reset(l_orderkey); ac->addGroupByCol(srcp); // constant column 318 ConstantColumn* cc = new ConstantColumn(318, ConstantColumn::NUM); // > Operator* op = new Operator(">"); SOP sop(op); // sum(l_quantity) > 318 SSFP ssfp; SimpleFilter* sf = new SimpleFilter(sop, ac, cc); ssfp.reset(sf); SJSTEP step; AggregateFilterStep* afs = new AggregateFilterStep(in, out, ac->functionName(), ac->groupByColList(), ac->projectColList(), ac->functionParms(), tableoid, sessionid, txnId, verId, stepID, statementID, fRm); // output column l_orderkey; afs->outputCol(dynamic_cast(srcp.get())->oid()); step.reset(afs); // one column case, nomalize filter value CalpontSystemCatalog::ColType ct; int64_t intVal; string strVal; // int8_t cop = op2num(sop); if (typeid((*ac->functionParms().get())) == typeid(SimpleColumn)) { SimpleColumn* sc = reinterpret_cast(ac->functionParms().get()); ct = csc->colType(sc->oid()); intVal = convertValueNum(cc->constval(), ct, false); afs->addFilter(COMPARE_GT, intVal); } else { if (cc->type() == ConstantColumn::NUM) { intVal = atol(cc->constval().c_str()); afs->addFilter(COMPARE_GT, intVal, false); } else if (cc->type() == ConstantColumn::LITERAL) afs->addFilter(COMPARE_GT, cc->constval(), false); } timer1.start("agg"); afs->run(); afs->join(); timer1.stop("agg"); timer1.finish(); //} } void bucketdl() { BucketDL* bdl = new BucketDL(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm); bdl->setElementMode(1); ElementType* et = new ElementType(); uint64_t l_orderkey, l_quantity; char data[1000]; char* tok; uint64_t count = 0, rid = 0; TupleType* tt = new TupleType(); vector v; timer1.start("bucket-input"); int i; for (i = 0; i < numRuns; i++) { ifstream ifs(datapath.c_str(), ios::in); while (!ifs.eof()) { ifs.getline(data, 1000); tok = strtok(data, "|"); count = 0; while (tok != NULL) { if (count == 0) l_orderkey = 6000000 * i + atol(tok); if (count == 4) l_quantity = atof(tok) * 100; count++; tok = strtok(NULL, "|"); } et->first = rid; et->second = rid; // tt->second = new char[16]; // memcpy(tt->second, &l_orderkey, sizeof(l_orderkey)); // memcpy(tt->second+sizeof(l_orderkey), &l_quantity, sizeof(l_quantity)); v.push_back(*et); if (v.size() == 2048) { bdl->insert(v); v.clear(); } rid++; // delete [] tt->second; if (rid % printInterval == 0) cout << rid << " " << l_orderkey << " " << l_quantity << endl; } ifs.close(); } if (v.size() > 0) { bdl->insert(v); v.clear(); } bdl->endOfInput(); timer1.stop("bucket-input"); cout << "input size=" << bdl->totalSize() << endl; delete tt; delete et; ThreadParms thparms; thparms.count = NUM_BUCKETS / NUM_THREADS; thparms.bdl = bdl; pthread_t consumer[NUM_THREADS]; ThreadParms thParms[NUM_THREADS]; timer1.start("bucket-consumer"); for (i = 0; i < NUM_THREADS; i++) { thparms.threadNumber = i; thParms[i] = thparms; pthread_create(&consumer[i], NULL, bucketConsumer, &thParms[i]); } for (i = 0; i < NUM_THREADS; i++) pthread_join(consumer[i], NULL); timer1.stop("bucket-consumer"); timer1.finish(); delete bdl; } void aggFilter_group2() { CalpontSystemCatalog::TableName tn("test", "lineitem"); string s_l_orderkey = "test.lineitem.l_orderkey"; string s_l_quantity = "test.lineitem.l_quantity"; string s_l_partkey = "test.lineitem.l_partkey"; uint32_t sessionid = 1; uint32_t txnId = 1; uint32_t verId = 1; uint16_t stepID = 0; uint32_t statementID = 1; boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionid); SRCP srcp; SimpleColumn* l_orderkey = new SimpleColumn(s_l_orderkey, sessionid); SimpleColumn* l_quantity = new SimpleColumn(s_l_quantity, sessionid); SimpleColumn* l_partkey = new SimpleColumn(s_l_partkey, sessionid); CalpontSystemCatalog::ROPair ropair = csc->tableRID(tn); CalpontSystemCatalog::OID tableoid = ropair.objnum; // sum(l_quantity) group by l_ordereky AggregateColumn* ac = new AggregateColumn("sum", l_quantity, sessionid); srcp.reset(l_quantity); ac->addProjectCol(srcp); srcp.reset(l_orderkey); ac->addGroupByCol(srcp); srcp.reset(l_partkey); ac->addGroupByCol(srcp); // constant column 318 ConstantColumn* cc = new ConstantColumn(318, ConstantColumn::NUM); // > Operator* op = new Operator(">"); SOP sop(op); // sum(l_quantity) > 318 SimpleFilter* sf = new SimpleFilter(sop, ac, cc); // aggregate filter step /** @brief Constructor */ JobStepAssociation in; JobStepAssociation out; AggregateFilterStep* afs = new AggregateFilterStep(in, out, ac->functionName(), ac->groupByColList(), ac->projectColList(), ac->functionParms(), tableoid, sessionid, txnId, verId, stepID, statementID, fRm); afs->run(); afs->join(); } void hashjoin() { BucketDL A(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm); BucketDL B(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm); BucketDL C(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm); BucketDL D(NUM_BUCKETS, 1, MAX_ELEMENTS, fRm); A.setDiskElemSize(4, 4); B.setDiskElemSize(4, 4); A.setElementMode(1); A.setHashMode(1); B.setElementMode(1); B.setHashMode(1); int i; timer1.start("insert"); for (i = 0; i < 6000000 * numRuns; i++) A.insert(ElementType(i, i)); A.endOfInput(); for (i = 0; i < 1500000 * numRuns; i++) B.insert(ElementType(i, i + 1000000)); B.endOfInput(); timer1.stop("insert"); C.setElementMode(1); D.setElementMode(1); BDLWrapper setA(&A); BDLWrapper setB(&B); DataList* resultA(&C); DataList* resultB(&D); timer1.start("hashjoin"); HashJoin* hj = new HashJoin(setA, setB, resultA, resultB, INNER, &dlTimes); hj->performJoin(); timer1.stop("hashjoin"); timer1.finish(); cout << "hash join output size: " << resultA->totalSize() << "/" << resultB->totalSize() << endl; } typedef struct Elem { char* hashStr; // int64_t result; vector rids; } Elem; void hashmap_tuple() { int tm; for (int m = 0; m < 1024; m++) { char* a = new char[5 * 1024 * 1024]; memset(a, 0, 5 * 1024 * 1024); memcpy(&tm, a, 4); } uint32_t fHashLen = 4; TupleType tt; tt.second = new char[fHashLen]; uint64_t size = 585937; uint64_t i = 0, j = 0; #if 1 pthread_t agg[NUM_THREADS]; ThreadParms thParms[NUM_THREADS]; ThreadParms thParm; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); for (i = 0; i < NUM_THREADS; i++) { thParm.threadNumber = i; thParms[i] = thParm; pthread_create(&agg[i], NULL, aggregator, &thParms[i]); } for (i = 0; i < NUM_THREADS; i++) pthread_join(agg[i], NULL); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "aggregation took " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; #endif #if 0 //typedef std::vector RIDVec; typedef std::pair Results; //typedef std::tr1::unordered_map shmp; //typedef std::tr1::unordered_map::iterator TupleHMIter; //typedef boost::shared_ptr SHMP; typedef std::tr1::unordered_multimap TupleHashMap; typedef boost::shared_ptr SHMP; typedef std::tr1::unordered_multimap::iterator TupleHMIter; typedef std::pair HashItPair; HashItPair hashItPair; SHMP shmp(new TupleHashMap()); //TupleHashMap *shmp = new TupleHashMap(); TupleHMIter iter; uint32_t val = 0; uint64_t hv; Elem elem; Results rr; Hasher hasher; bool flag = true; timer1.start("vectormap"); for (int k = 0; k < 1; k++) { for (j = 0; j < 4; j++) { for (i = 0; i < size / 10; i++) { flag = true; memcpy(tt.second, &i, 4); hv = hasher(tt.second, fHashLen); iter = shmp->find(hv); if (iter != shmp->end()) { hashItPair = shmp->equal_range(hv); for (iter = hashItPair.first; iter != hashItPair.second; iter++) { if (memcmp(iter->second.second.hashStr, tt.second, fHashLen) == 0) { //cout << "real hit" << endl; //updateAggResult(tt, hashIt->second.first); iter->second.second.rids.push_back(tt.first); flag = false; break; } } } if (flag) { rr.second.hashStr = new char[fHashLen]; memcpy(rr.second.hashStr, tt.second, fHashLen); rr.second.rids.clear(); rr.second.rids.push_back(tt.first); shmp->insert(std::pair (hv, rr)); } } } cout << "vector size=" << shmp->size() << endl; // clean up hashmap memory for (iter = shmp->begin(); iter != shmp->end(); iter++) delete [] iter->second.second.hashStr; shmp->clear(); } timer1.stop("vectormap"); timer1.finish(); #endif #if 0 typedef std::vector RIDVec; typedef std::pair Results; typedef std::tr1::unordered_map TupleHashMap1; typedef std::tr1::unordered_map::iterator TupleHMIter1; typedef boost::shared_ptr SHMP1; SHMP1 shmp; TupleHasher tupleHash(fHashLen); TupleComparator tupleComp(fHashLen); shmp.reset(new TupleHashMap1(1, tupleHash, tupleComp)); TupleHMIter1 iter; Results rr; uint32_t val = 0; timer1.start("uniquemap"); for (j = 0; j < 4; j++) for (i = 0; i < size / 10; i++) { memcpy(tt.second, &i, 4); iter = shmp->find(tt.second); if (iter == shmp->end()) { rr.second.clear(); //getAggResult(tt, rr.first); rr.second.push_back(tt.first); char* hashStr = new char[fHashLen]; memcpy(hashStr, tt.second, fHashLen); shmp->insert(std::pair (hashStr, rr)); } else { //updateAggResult(tt, hashIt->second.first); iter->second.second.push_back(tt.first); } } timer1.stop("uniquemap"); timer1.finish(); cout << "hashmap size=" << shmp->size() << endl; // clean up hashmap memory for (iter = shmp->begin(); iter != shmp->end(); iter++) delete [] iter->first; shmp->clear(); #endif } #if 0 void hashmap_tuple_multi() { uint32_t fHashLen = 4; TupleHasher tupleHash(fHashLen); TupleComparator tupleComp(fHashLen); TupleType tt; //typedef std::vector RIDVec; typedef std::pair Results; typedef std::tr1::unordered_multimap TupleHashMap; typedef std::tr1::unordered_multimap::iterator TupleHMIter; typedef boost::shared_ptr SHMP; std::pair hashItPair; SHMP shmp; shmp.reset(new TupleHashMap(1, tupleHash, tupleComp)); TupleHMIter iter; Results rr; Hasher fHasher; timer1.start("multi_insert"); uint32_t i = 0, j = 0; uint32_t val = 0; for ( j = 0; j < 4; j++) for ( i = 0; i < 1171875; i++) { tt.second = new char[fHashLen]; memcpy(tt.second, &i, fHashLen); rr.first = j; rr.second = i; shmp->insert(std::pair (tt.second, rr)); } timer1.stop("multi_insert"); cout << "hashmap size=" << shmp->size() << endl; tt.second = new char[fHashLen]; timer1.start("multi_result"); iter = shmp->begin(); char* key; if (iter != shmp->end()) key = iter->first; uint32_t ct = 0; for (iter = ++iter; iter != shmp->end(); iter++) { if (memcmp(iter->first, key, 4) == 0) val += iter->second.first; else { key = iter->first; val = iter->second.first; ct++; } } timer1.stop("multi_result"); timer1.start("base"); for ( j = 0; j < 4; j++) for ( i = 0; i < 1171875; i++) { tt.second = new char[fHashLen]; memcpy(tt.second, &i, fHashLen); } timer1.stop("base"); timer1.finish(); cout << "group val=" << val << endl; // clean up hashmap memory for (iter = shmp->begin(); iter != shmp->end(); iter++) delete [] iter->first; } #endif void hashmap_ET() { typedef std::list hashList_t; typedef std::list::iterator hashListIter_t; // typedef std::tr1::unordered_multimap hash_t; // typedef std::tr1::unordered_multimap::iterator // hashIter_t; typedef std::tr1::unordered_multimap::value_type hashPair_t; typedef std::tr1::unordered_map hash_t; typedef std::tr1::unordered_map::iterator hashIter_t; typedef std::tr1::unordered_map::value_type hashPair_t; hash_t hashmap; hashIter_t it; hashList_t list; timer1.start("et_insert"); ElementType et; // while (true) for (int i = 0; i < 1200000; i++) { list.push_back(et); et.second = i; hashmap.insert(hashPair_t(et.second, list)); list.clear(); } timer1.stop("et_insert"); cout << "hashmap size=" << hashmap.size() << endl; int val = 0, i, j; timer1.start("et_find"); for (j = 0; j < 4; j++) for (i = 0; i < 1200000; i++) { et.second = i; it = hashmap.find(i); if (it == hashmap.end()) { list.clear(); // getAggResult(tt, rr.first); list.push_back(et); hashmap.insert(hashPair_t(et.second, list)); } else { it->second.push_back(et); } } timer1.stop("et_find"); timer1.finish(); cout << "hashmap size=" << hashmap.size() << endl; } }; CPPUNIT_TEST_SUITE_REGISTRATION(AggDriver); int main(int argc, char** argv) { CppUnit::TextUi::TestRunner runner; CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry(); runner.addTest(registry.makeTest()); bool wasSuccessful = runner.run("", false); return (wasSuccessful ? 0 : 1); }