/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: tdriver-datalist.cpp 9210 2013-01-21 14:10:42Z rdempsey $ #include #include #include #include #include #include #include #include #include #include "fifo.h" #include "wsdl.h" #include "constantdatalist.h" #include "bucketdl.h" #include "bandeddl.h" #include "elementtype.h" #include "stopwatch.cpp" // #undef CPPUNIT_ASSERT // #define CPPUNIT_ASSERT(x) using namespace std; using namespace joblist; Stopwatch timer; struct DLElement { public: int i; bool operator<(const DLElement& c) const { return (i < c.i); } const char* getHashString(int mode, int* len) const { *len = 4; return (char*)&i; } }; istream& operator>>(istream& is, DLElement& dl) { is.read((char*)&dl.i, sizeof(int)); return is; } ostream& operator<<(ostream& os, const DLElement& dl) { os.write((char*)&dl.i, sizeof(int)); return os; } int64_t count = 150000000 /*1000000*/; int maxElements = 32000000 /*50000*/; // max elements in memory at once for the benchmarks int buckets = 16; // # of buckets to use in bucketDL tests. #define MAXINT64 (uint64_t)0xffffffffffffffffLL /* Note, this is like what we'll use for banding operations */ class BandGrouper { public: uint32_t operator()(const char* data, uint32_t len) const { stringstream ss(string(data, len)); ElementType dle; ss >> dle; // cout << "bandGrouper sees " << dle.i << " returning " << // dle.i/(::count/buckets) << endl; return dle.first / (::count / buckets); } }; void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, struct timespec& diff) { if (tv2.tv_nsec < tv1.tv_nsec) { diff.tv_sec = tv2.tv_sec - tv1.tv_sec - 1; diff.tv_nsec = tv1.tv_nsec - tv2.tv_nsec; } else { diff.tv_sec = tv2.tv_sec - tv1.tv_sec; diff.tv_nsec = tv2.tv_nsec - tv1.tv_nsec; } } int cCount = 0, pCount = 0; void* MP_FIFO_stress_producer(void* arg) { FIFO* f = reinterpret_cast*>(arg); uint64_t i; ElementType val; uint64_t id = ::pCount++; // cout << "producer " << id << " started" << endl; for (i = 0; i < (uint32_t)::count / 10; i++) { val.first = (id << 60) | i; val.second = val.first; f->insert(val); } return NULL; } void* MP_FIFO_stress_consumer(void* arg) { FIFO* f = reinterpret_cast*>(arg); int it; ElementType val; bool ret; // int id = ::cCount++; uint64_t pCounter[10]; // 10 producers right now; make this a global var... uint64_t pnum, pnext; memset(pCounter, 0, 8 * 10); // cout << "consumer " << id << " started" << endl; it = f->getIterator(); ret = f->next(it, &val); while (ret) { pnum = val.first >> 60; pnext = pCounter[pnum]++; // cerr << id << ": .first=0x" << hex << val.first << dec << " pnum:" << pnum << " val:" // << pnext << endl; CPPUNIT_ASSERT((val.first & 0xfffffffffffffffLL) == pnext); CPPUNIT_ASSERT(val.second == val.first); ret = f->next(it, &val); } return NULL; } void* FIFO_stress_consumer(void* arg) { FIFO* f = reinterpret_cast*>(arg); uint64_t i; int it; ElementType val; bool ret; int id = ++::cCount; cout << "consumer " << id << " started" << endl; it = f->getIterator(); for (i = 0; i < MAXINT64; i++) { // cout << id << ": " << i << " "; ret = f->next(it, &val); if (!ret) { cout << "consumer " << id << " exiting" << endl; return NULL; } else { // cout << "first: " << val.first << " second: " << val.second << endl; CPPUNIT_ASSERT(ret == true); CPPUNIT_ASSERT(val.first == i && val.second == i); } } return NULL; } void* FIFO_bench_consumer(void* arg) { FIFO* f = reinterpret_cast*>(arg); int i, it, val; bool ret; it = f->getIterator(); for (i = 0; i < ::count; i++) { // cout << "t " << i << endl; ret = f->next(it, &val); // CPPUNIT_ASSERT(ret == true); } return NULL; } void* FIFO_2_helper(void* arg) { FIFO* f = reinterpret_cast*>(arg); int i; DLElement fe; for (i = 0; i < ::count; i++) { fe.i = i; #ifdef DEBUG cout << "inserting " << i << endl; #endif f->insert(fe); } sleep(5); // causes the test to block on moreData until endOfInput is called f->endOfInput(); return NULL; } void* WSDL_2_helper(void* arg) { WSDL* w = reinterpret_cast*>(arg); uint32_t i, id; bool nextRet; ElementType dle; id = w->getIterator(); // cout << "id = " << id << endl; for (i = 0; i < ::count; i++) { nextRet = w->next(id, &dle); // cout << "tgot " << dle.i << endl; CPPUNIT_ASSERT(nextRet == true); CPPUNIT_ASSERT(dle.first == i && dle.second == i); } return NULL; } void* BandedDL_1_helper(void* arg) { BandedDL* w = reinterpret_cast*>(arg); uint32_t i, id; bool nextRet; ElementType dle; id = w->getIterator(); // cout << "id = " << id << endl; for (i = 0; i < ::count; i++) { nextRet = w->next(id, &dle); // cout << "tgot " << dle.i << endl; // CPPUNIT_ASSERT(nextRet == true); // CPPUNIT_ASSERT(dle.first == i && dle.second == i); } return NULL; } void* BandedDL_2_helper(void* arg) { BandedDL* w = reinterpret_cast*>(arg); uint32_t i, id; bool nextRet; StringElementType dle; id = w->getIterator(); // cout << "id = " << id << endl; for (i = 0; i < ::count; i++) { ostringstream os; os << "blah blah" << i; nextRet = w->next(id, &dle); // cout << "tgot " << dle.i << endl; CPPUNIT_ASSERT(nextRet == true); CPPUNIT_ASSERT(dle.first == i && dle.second == os.str()); } return NULL; } void* WSDL_bench_helper(void* arg) { WSDL* w = reinterpret_cast*>(arg); int i, id; bool nextRet; ElementType e; id = w->getIterator(); for (i = 0; i < ::count; i++) nextRet = w->next(id, &e); return NULL; } void* SWSDL_bench_helper(void* arg) { SWSDL* sw = reinterpret_cast*>(arg); int i, id; bool nextRet; ElementType e; id = sw->getIterator(); for (i = 0; i < ::count * 8; i++) nextRet = sw->next(id, &e); return NULL; } void* BDL_bench_helper(void* arg) { BandedDL* sw = reinterpret_cast*>(arg); int i, id; bool nextRet; ElementType e; id = sw->getIterator(); for (i = 0; i < ::count * 8; i++) nextRet = sw->next(id, &e); return NULL; } void* SWSDL_producer(void* arg) { SWSDL* sw = reinterpret_cast*>(arg); for (int i = 0; i < ::count; i++) sw->insert(ElementType(i, i)); return NULL; } void* BDL_producer(void* arg) { BandedDL* sw = reinterpret_cast*>(arg); ElementType dle; for (int i = 0; i < ::count; i++) { // dle.first = dle.second = i; sw->insert(ElementType(i, i)); } // sw->endOfInput(); // cout << "end of input size is " << sw->totalSize() << endl; return NULL; } void* ZDL_producer(void* arg) { ZDL* zdl = reinterpret_cast*>(arg); for (int i = 0; i < ::count; i++) { zdl.insert(ElementType((int)N * rand() / (RAND_MAX + 1.0), i)); } return NULL; } void* ZDL_consumer(void* arg) { ZDL* zdl = reinterpret_cast*>(arg); int i, id; bool nextRet; ElementType e; id = zdl->getIterator(); nextRet = zdl->next(id, &e); while (nextRet) nextRet = zdl->next(id, &e); return NULL; } void* SWSDL_consumer(void* arg) { SWSDL* sw = reinterpret_cast*>(arg); int i, id; bool nextRet; ElementType e; id = sw->getIterator(); nextRet = sw->next(id, &e); while (nextRet) nextRet = zdl->next(id, &e); return NULL; } class DataListDriver : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(DataListDriver); // CPPUNIT_TEST(FIFO_1); // CPPUNIT_TEST(FIFO_2); // CPPUNIT_TEST(MP_FIFO_stress); //10 consumers 10 producers // CPPUNIT_TEST(FIFO_stress); // 10 consumers 1 producer; doesn't stop // CPPUNIT_TEST(FIFO_bench); // CPPUNIT_TEST(FIFO_singlethread_bench); // CPPUNIT_TEST(WSDL_1); // CPPUNIT_TEST(WSDL_2); // CPPUNIT_TEST(WSDL_3); // WSDL_1 with StringElementTypes // CPPUNIT_TEST(WSDL_bench); // CPPUNIT_TEST(WSDL_singlethread_bench); // CPPUNIT_TEST(CON_DL_1); // ConstantDataList // CPPUNIT_TEST(BucketDL_1); // CPPUNIT_TEST(BandedDL_1); // WSDL_2 ported to BandedDLs // CPPUNIT_TEST(BandedDL_2); // BandedDL_1 that uses StringElementTypes // make sure the DataList consumer-side interface works. // CPPUNIT_TEST(BandedDL_as_WSDL_1); // random inserts to bucketDL, create bandedDL, make sure it's in order // CPPUNIT_TEST(BandedDL_as_WSDL_2); /* tests prompted by feedback */ // CPPUNIT_TEST(polymorphism_1); // make sure we can store basic types (int in this case) // CPPUNIT_TEST(polymorphism_2); // make sure we can create a FIFO, fill it, cast it to a DataList, and read the elements. // CPPUNIT_TEST(polymorphism_3); // CPPUNIT_TEST(SWSDL_bench); // CPPUNIT_TEST(BDL_multiproducer_bench); // CPPUNIT_TEST(BDL_consumer_bench); // CPPUNIT_TEST(SWSDL_multiproducer_bench); CPPUNIT_TEST(SWSDL_consumer_bench); // CPPUNIT_TEST(BDL_singlethread_bench); CPPUNIT_TEST(ZDL_bench); CPPUNIT_TEST_SUITE_END(); private: public: void FIFO_1() { int i, it; DLElement dummy; FIFO f(1, 20); bool nextRet; for (i = 0; i < 10; i++) { dummy.i = i; f.insert(dummy); } f.endOfInput(); it = f.getIterator(); CPPUNIT_ASSERT(it == 0); for (i = 0; i < 10; i++) { nextRet = f.next(it, &dummy); CPPUNIT_ASSERT(nextRet == true); CPPUNIT_ASSERT(dummy.i == i); } } void FIFO_2() { int i, it; DLElement fe; FIFO fifo(1, 2000); pthread_t thread; bool ret; fe.i = 0; pthread_create(&thread, NULL, FIFO_2_helper, &fifo); sleep(1); // make sure the thread sleeps after 2000 inserts it = fifo.getIterator(); for (i = 0; i < ::count; i++) { if (i % 100000 == 0) cout << i << "/" << ::count << endl; ret = fifo.next(it, &fe); // cout << "fe = " << fe.i << endl; CPPUNIT_ASSERT(ret == true); CPPUNIT_ASSERT(fe.i == i); } ret = fifo.next(it, &fe); // should sleep until the thread calls endOfInput() CPPUNIT_ASSERT(ret == false); pthread_join(thread, NULL); } void MP_FIFO_stress() { int64_t i; FIFO f(10, 25000); pthread_t consumer[10]; pthread_t producer[10]; f.setMultipleProducers(true); for (i = 0; i < 10; i++) pthread_create(&consumer[i], NULL, MP_FIFO_stress_consumer, &f); for (i = 0; i < 10; i++) pthread_create(&producer[i], NULL, MP_FIFO_stress_producer, &f); for (i = 0; i < 10; i++) pthread_join(producer[i], NULL); f.endOfInput(); /* // for (i = 0; i < MAXINT64; i++) { for (i = 0; i < ::count; i++) { if ((i % 10000000) == 0) cout << i << endl; f.insert(ElementType(i, i)); } f.endOfInput(); */ for (i = 0; i < 10; i++) pthread_join(consumer[i], NULL); } void FIFO_stress() { int64_t i; FIFO f(10, 25000); pthread_t consumer[10]; for (i = 0; i < 10; i++) pthread_create(&consumer[i], NULL, FIFO_stress_consumer, &f); // for (i = 0; i < MAXINT64; i++) { for (i = 0; i < ::count; i++) { if ((i % 10000000) == 0) cout << i << endl; f.insert(ElementType(i, i)); } f.endOfInput(); for (i = 0; i < 10; i++) pthread_join(consumer[i], NULL); } void FIFO_bench() { int i; FIFO f(1, ::maxElements); pthread_t consumer; struct timespec tv1, tv2, diff; pthread_create(&consumer, NULL, FIFO_bench_consumer, &f); clock_gettime(CLOCK_REALTIME, &tv1); for (i = 0; i < ::count; i++) { f.insert(i); } f.endOfInput(); pthread_join(consumer, NULL); clock_gettime(CLOCK_REALTIME, &tv2); timespec_sub(tv1, tv2, diff); cout << "producer & consumer passed " << ::count << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; } void FIFO_singlethread_bench() { int i, it, e; FIFO f(1, ::count); bool more; struct timespec tv1, tv2, diff; clock_gettime(CLOCK_REALTIME, &tv1); for (i = 0; i < ::count; i++) { f.insert(i); } f.endOfInput(); clock_gettime(CLOCK_REALTIME, &tv2); timespec_sub(tv1, tv2, diff); cout << "FIFO_singlethreaded_bench: inserted " << ::count << " ints in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; it = f.getIterator(); for (i = 0; i < ::count; i++) { more = f.next(it, &e); } clock_gettime(CLOCK_REALTIME, &tv1); timespec_sub(tv2, tv1, diff); cout << "FIFO_singlethread_bench: consumed, took " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; } void WSDL_1() { WSDL w(1, ::count / 100); ElementType dle; uint32_t i, id; bool nextRet; for (i = 0; i < ::count; i++) { // cout << "inserting " << i << endl; dle.first = dle.second = i; w.insert(dle); } w.endOfInput(); id = w.getIterator(); for (i = 0; i < ::count; i++) { nextRet = w.next(id, &dle); // cout << i << ":got " << dle.first << " " << dle.second << endl; CPPUNIT_ASSERT(nextRet == true); CPPUNIT_ASSERT(dle.first == i && dle.second == i); } } void WSDL_2() { WSDL w(2, ::count / 10); ElementType dle; uint32_t i, id, size; bool nextRet; pthread_t consumer1; pthread_create(&consumer1, NULL, WSDL_2_helper, &w); // thread should wait here waiting on endOfInput() for (i = 0; i < ::count; i++) { dle.first = dle.second = i; w.insert(dle); } sleep(1); w.endOfInput(); /* let the thread consume the loaded set; make sure that the load doesn't happen until the main thread finishes with the loaded set */ sleep(1); size = w.totalSize(); id = w.getIterator(); for (i = 0; i < ::count; i++) { nextRet = w.next(id, &dle); // cout << "got " << dle.i << endl; CPPUNIT_ASSERT(nextRet == true); CPPUNIT_ASSERT(dle.first == i && dle.second == i); } pthread_join(consumer1, NULL); } void WSDL_3() { WSDL w(1, ::count / 10); StringElementType dle; uint32_t i, id; bool nextRet; for (i = 0; i < ::count; i++) { ostringstream os; os << "blah blah blah" << i; // cout << "inserting " << i << endl; dle.first = i; dle.second = os.str(); w.insert(dle); } w.endOfInput(); id = w.getIterator(); for (i = 0; i < ::count; i++) { ostringstream os; os << "blah blah blah" << i; nextRet = w.next(id, &dle); // cout << i << ":got " << dle.first << " " << dle.second << endl; CPPUNIT_ASSERT(nextRet == true); CPPUNIT_ASSERT(dle.first == i && dle.second == os.str()); } } void WSDL_bench() { WSDL w(1, ::maxElements); int i; pthread_t consumer; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); pthread_create(&consumer, NULL, WSDL_bench_helper, &w); for (i = 0; i < ::count; i++) w.insert(ElementType(i, i)); w.endOfInput(); cout << "end of input size is " << w.totalSize() << endl; pthread_join(consumer, NULL); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "WSDL_bench: producer & consumer passed " << ::count << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; } void WSDL_singlethread_bench() { WSDL w(1, ::maxElements); int i, id, e; bool nextRet; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); for (i = 0; i < ::count; i++) w.insert(i); w.endOfInput(); id = w.getIterator(); for (i = 0; i < ::count; i++) nextRet = w.next(id, &e); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "WSDL_singlethread_bench: producer & consumer passed " << ::count << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; } void SWSDL_bench() { /* SWSDL sw(1, ::maxElements); int i; pthread_t consumer; pthread_t producer; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); pthread_create(&producer, NULL, SWSDL_producer, &sw); pthread_create(&consumer, NULL, SWSDL_bench_helper, &sw); pthread_join(producer, NULL); pthread_join(consumer, NULL); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "SWSDL_bench: producer & consumer passed " << ::count << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; */ SWSDL sw(1, ::maxElements); int i; pthread_t consumer; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); pthread_create(&consumer, NULL, SWSDL_bench_helper, &sw); for (i = 0; i < ::count; i++) sw.insert(ElementType(i, i)); sw.endOfInput(); cout << "end of input size is " << sw.totalSize() << endl; pthread_join(consumer, NULL); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "SWSDL_bench: producer & consumer passed " << ::count << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; } void SWSDL_multiproducer_bench() { SWSDL sw(1, ::maxElements); sw.setMultipleProducers(true); int i, id; ElementType e; bool nextRet; uint32_t numOfThreads = 8; pthread_t producer[numOfThreads]; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); for (i = 0; i < numOfThreads; i++) pthread_create(&producer[i], NULL, SWSDL_producer, &sw); for (i = 0; i < numOfThreads; i++) pthread_join(producer[i], NULL); sw.endOfInput(); cout << "end of input size is " << sw.totalSize() << endl; clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "# of Producer: " << numOfThreads << endl; cout << "SWSDL_producer_phase_bench: producer & consumer passed " << ::count * numOfThreads << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; clock_gettime(CLOCK_REALTIME, &ts1); id = sw.getIterator(); for (i = 0; i < ::count; i++) nextRet = sw.next(id, &e); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "SWSDL_consumer_phase_bench: producer & consumer passed " << ::count * numOfThreads << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; } void SWSDL_consumer_bench() { SWSDL sw(4); sw.setMultipleProducers(true); ElementType dle; uint32_t i, id; bool nextRet; uint32_t numOfProducers = 8; uint32_t numOfConsumers = 4; pthread_t producer[numOfProducers]; pthread_t consumer[numOfConsumers]; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); for (i = 0; i < numOfProducers; i++) pthread_create(&producer[i], NULL, SWSDL_producer, &sw); for (i = 0; i < numOfConsumers; i++) pthread_create(&consumer[i], NULL, SWSDL_consumer, &sw); for (i = 0; i < numOfThreads; i++) pthread_join(producer[i], NULL); sw.endOfInput(); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "# of Producer: " << numOfThreads << endl; cout << "SWSDL_consumer_bench: producer & consumer passed " << ::count * numOfThreads << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; } void BDL_multiproducer_bench() { BandedDL w(2); w.setMultipleProducers(true); ElementType dle; uint32_t i, id; bool nextRet; uint32_t numOfThreads = 8; pthread_t producer[numOfThreads]; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); for (i = 0; i < numOfThreads; i++) pthread_create(&producer[i], NULL, BDL_producer, &w); for (i = 0; i < numOfThreads; i++) pthread_join(producer[i], NULL); w.endOfInput(); cout << "end of input size is " << w.totalSize() << endl; clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "# of Producer: " << numOfThreads << endl; cout << "BDL_producer_phase_bench: producer & consumer passed " << ::count * numOfThreads << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; clock_gettime(CLOCK_REALTIME, &ts1); id = w.getIterator(); for (i = 0; i < ::count; i++) { nextRet = w.next(id, &dle); } clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "BDL_consumer_phase_bench: producer & consumer passed " << ::count * numOfThreads << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; /* SWSDL sw(1, ::maxElements); int i; pthread_t consumer; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); pthread_create(&consumer, NULL, SWSDL_bench_helper, &sw); for (i = 0; i < ::count; i++) sw.insert(ElementType(i, i)); sw.endOfInput(); cout << "end of input size is " << sw.totalSize() << endl; pthread_join(consumer, NULL); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "SWSDL_bench: producer & consumer passed " << ::count << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; */ } void BDL_consumer_bench() { BandedDL w(2); w.setMultipleProducers(true); ElementType dle; uint32_t i, id; bool nextRet; uint32_t numOfThreads = 8; pthread_t producer[numOfThreads]; pthread_t consumer; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); for (i = 0; i < numOfThreads; i++) pthread_create(&producer[i], NULL, BDL_producer, &w); for (i = 0; i < numOfThreads; i++) pthread_join(producer[i], NULL); w.endOfInput(); // pthread_create(&consumer, NULL, BDL_bench_helper, &w); // cout << "end of input size is " << w.totalSize() << endl; // clock_gettime(CLOCK_REALTIME, &ts2); // timespec_sub(ts1, ts2, diff); // cout << "# of Producer: " << numOfThreads << endl; // cout << "BDL_producer_phase_bench: producer & consumer passed " << // ::count * numOfThreads << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << // "ns" << endl; // clock_gettime(CLOCK_REALTIME, &ts1); id = w.getIterator(); for (i = 0; i < ::count * 8; i++) { nextRet = w.next(id, &dle); } clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "end of input size is " << w.totalSize() << endl; cout << "BDL_consumer_phase_bench: producer & consumer passed " << ::count * numOfThreads << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; /* SWSDL sw(1, ::maxElements); int i; pthread_t consumer; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); pthread_create(&consumer, NULL, SWSDL_bench_helper, &sw); for (i = 0; i < ::count; i++) sw.insert(ElementType(i, i)); sw.endOfInput(); cout << "end of input size is " << sw.totalSize() << endl; pthread_join(consumer, NULL); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "SWSDL_bench: producer & consumer passed " << ::count << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; */ } void BDL_singlethread_bench() { /*SWSDL sw(1, ::maxElements); int i; pthread_t consumer; pthread_t producer; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); pthread_create(&producer, NULL, SWSDL_producer, &sw); pthread_join(producer, NULL); pthread_create(&consumer, NULL, SWSDL_bench_helper, &sw); pthread_join(consumer, NULL); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff);*/ SWSDL sw(1, ::maxElements); int i, id; ElementType e; bool nextRet; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); cout << "start of input size is " << ::count << endl; for (i = 0; i < ::count; i++) sw.insert(ElementType(i, i)); // cout << "end of input size is " << sw.totalSize() << endl; // sw.endOfInput(); // id = sw.getIterator(); for (i = 0; i < ::count; i++) nextRet = sw.next(id, &e); clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); // cout << "SWSDL_consumer_phase_bench: producer & consumer passed " << // ::count << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << // "ns" << endl; } void CON_DL_1() { DLElement d, d2; int i = 0; bool nextRet; d.i = 1; ConstantDataList c(d); nextRet = c.next(i, &d2); CPPUNIT_ASSERT(nextRet == true); CPPUNIT_ASSERT(d.i == 1 && d2.i == 1); d.i = 2; c.insert(d); nextRet = c.next(i, &d2); CPPUNIT_ASSERT(nextRet == true); CPPUNIT_ASSERT(d.i == 2 && d2.i == 2); } void BucketDL_1() { BucketDL bdl(16, 1, ::count / 100); ElementType dle; uint32_t i, it, eCount = 0, buckets; bool nextRet; for (i = 0; i < ::count; i++) { // if (i % 1000000 == 0) cout << i << endl; dle.first = dle.second = i; bdl.insert(dle); } bdl.endOfInput(); // cout << "inserted " << ::count << " elements" << endl; buckets = bdl.bucketCount(); for (i = 0; i < buckets; i++) { it = bdl.getIterator(i); do { nextRet = bdl.next(i, it, &dle); if (nextRet) { CPPUNIT_ASSERT(dle.first >= 0 && dle.first <= (uint64_t)::count); CPPUNIT_ASSERT(++eCount <= ::count); } } while (nextRet == true); } CPPUNIT_ASSERT(eCount == ::count); } void BandedDL_1() { BandedDL w(2); ElementType dle; uint32_t i, id, size; bool nextRet; pthread_t consumer1; pthread_create(&consumer1, NULL, BandedDL_1_helper, &w); // thread should wait here waiting on endOfInput() for (i = 0; i < ::count; i++) { dle.first = dle.second = i; w.insert(dle); if (((i + 1) % (::count / 10)) == 0) { cout << "inserted " << i + 1 << "/" << ::count << endl; w.saveBand(); } } sleep(1); w.endOfInput(); /* let the thread consume the loaded set; make sure that the load doesn't happen until the main thread finishes with the loaded set */ sleep(5); size = w.totalSize(); id = w.getIterator(); for (i = 0; i < ::count; i++) { nextRet = w.next(id, &dle); // cout << "got " << dle.i << endl; CPPUNIT_ASSERT(nextRet == true); CPPUNIT_ASSERT(dle.first == i); } pthread_join(consumer1, NULL); } void BandedDL_2() { BandedDL w(2); StringElementType dle; uint32_t i, id, size; bool nextRet; pthread_t consumer1; pthread_create(&consumer1, NULL, BandedDL_2_helper, &w); // thread should wait here waiting on endOfInput() for (i = 0; i < ::count; i++) { ostringstream os; os << "blah blah" << i; dle.first = i; dle.second = os.str(); w.insert(dle); if (((i + 1) % (::count / 10)) == 0) { cout << "inserted " << i + 1 << "/" << ::count << endl; w.saveBand(); } } sleep(1); w.endOfInput(); /* let the thread consume the loaded set; make sure that the load doesn't happen until the main thread finishes with the loaded set */ cout << "endofInput finished\n"; sleep(5); size = w.totalSize(); id = w.getIterator(); for (i = 0; i < ::count; i++) { ostringstream os; os << "blah blah" << i; nextRet = w.next(id, &dle); // cout << "got " << dle.i << endl; CPPUNIT_ASSERT(nextRet == true); CPPUNIT_ASSERT(dle.first == i && dle.second == os.str()); } pthread_join(consumer1, NULL); } void BandedDL_as_WSDL_1() { BandedDL bdl(1); ElementType e; uint32_t i, it; bool more; for (i = 1; i <= ::count; i++) { e.first = e.second = i; bdl.insert(e); if (i % (::count / 10) == 0) bdl.saveBand(); } bdl.endOfInput(); it = bdl.getIterator(); for (i = 1; i <= ::count; i++) { more = bdl.next(it, &e); CPPUNIT_ASSERT(more == true); CPPUNIT_ASSERT(e.first == i && e.second == i); } more = bdl.next(it, &e); CPPUNIT_ASSERT(more == false); bdl.restart(); /* make sure it can be read again */ for (i = 1; i <= ::count; i++) { more = bdl.next(it, &e); if (!more) cerr << i << endl; CPPUNIT_ASSERT(more == true); CPPUNIT_ASSERT(e.first == i && e.second == i); } more = bdl.next(it, &e); CPPUNIT_ASSERT(more == false); } void BandedDL_as_WSDL_2() { BucketDL bucketDL(::buckets, 1, 1000, BandGrouper()); uint32_t last, i, it; ElementType dle; bool more; srand(time(NULL)); for (i = 0; i < ::count; i++) { dle.first = dle.second = rand() % ::count; // artificial max bucketDL.insert(dle); } bucketDL.endOfInput(); // cout << "made bucketdl" << endl; BandedDL bdl(bucketDL, 1); // cout << "made bandeddl" << endl; it = bdl.getIterator(); last = 0; // duplicates were removed, there won't be ::count elements in bdl more = bdl.next(it, &dle); while (more) { CPPUNIT_ASSERT(dle.first < (uint64_t)::count); // cout << dle.first << " >= " << last << endl; CPPUNIT_ASSERT(dle.first >= last); last = dle.first; more = bdl.next(it, &dle); } // cerr << "read through bandeddl" << endl; more = bdl.next(it, &dle); CPPUNIT_ASSERT(more == false); } void ZDL_bench() { ZDL zdl(4); zdl.setMultipleProducers(true); ElementType dle; uint32_t i, id; bool nextRet; uint32_t numOfProducers = 8; uint32_t numOfConsumers = 4; pthread_t producer[numOfProducers]; pthread_t consumer[numOfConsumers]; struct timespec ts1, ts2, diff; clock_gettime(CLOCK_REALTIME, &ts1); for (i = 0; i < numOfProducers; i++) pthread_create(&producer[i], NULL, ZDL_producer, &zdl); for (i = 0; i < numOfConsumers; i++) pthread_create(&consumer[i], NULL, ZDL_consumer, &zdl); for (i = 0; i < numOfThreads; i++) pthread_join(producer[i], NULL); zdl.endOfInput(); // clock_gettime(CLOCK_REALTIME, &ts1); id = w.getIterator(); for (i = 0; i < ::count * 8; i++) { nextRet = w.next(id, &dle); } clock_gettime(CLOCK_REALTIME, &ts2); timespec_sub(ts1, ts2, diff); cout << "end of input size is " << w.totalSize() << endl; cout << "ZDL_consumer_producer_bench: producer & consumer passed " << ::count * numOfProducers << " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl; } /* Sanity test */ void polymorphism_1() { DataList* dl; FIFO* f; DLElement dle; int it, i; bool more; f = new FIFO(1, 20); dl = (DataList*)f; it = dl->getIterator(); for (i = 0; i < 10; i++) { dle.i = i; dl->insert(dle); } dl->endOfInput(); for (i = 0; i < 10; i++) { more = dl->next(it, &dle); // cout << dle.i << endl; CPPUNIT_ASSERT(more == true); CPPUNIT_ASSERT(dle.i == i); } delete f; } /* Sanity test 2. Can we stuff basic datatypes into DataLists? Apparently not. WHY?*/ void polymorphism_2() { DataList* dl; FIFO* f = new FIFO(1, 20); int it, i, dle; bool more; dl = (DataList*)f; it = dl->getIterator(); for (i = 0; i < 10; i++) dl->insert(i); dl->endOfInput(); for (i = 0; i < 10; i++) { more = dl->next(it, &dle); CPPUNIT_ASSERT(more == true); CPPUNIT_ASSERT(dle == i); // cout << "dle[" << i << "]: " << dle << endl; } delete f; } void polymorphism_3() { DataList* dl; FIFO* f; int it, i; DLElement dle; bool more; f = new FIFO(1, 20); dl = (DataList*)f; it = dl->getIterator(); for (i = 0; i < 10; i++) { dle.i = i; f->insert(dle); } f->endOfInput(); for (i = 0; i < 10; i++) { // cout << "next" << endl; more = dl->next(it, &dle); CPPUNIT_ASSERT(more == true); CPPUNIT_ASSERT(dle.i == i); // cout << "dle[" << i << "]: " << dle.i << endl; } delete f; } }; CPPUNIT_TEST_SUITE_REGISTRATION(DataListDriver); int main(int argc, char** argv) { CppUnit::TextUi::TestRunner runner; CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry(); runner.addTest(registry.makeTest()); bool wasSuccessful = runner.run("", false); return (wasSuccessful ? 0 : 1); }