1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
2022-01-21 16:43:49 +00:00

1443 lines
35 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: tdriver-datalist.cpp 9210 2013-01-21 14:10:42Z rdempsey $
#include <iostream>
#include <sstream>
#include <list>
#include <pthread.h>
#include <time.h>
#include <sys/time.h>
#include <cppunit/extensions/HelperMacros.h>
#include <cppunit/extensions/TestFactoryRegistry.h>
#include <cppunit/ui/text/TestRunner.h>
#include "fifo.h"
#include "wsdl.h"
#include "constantdatalist.h"
#include "bucketdl.h"
#include "bandeddl.h"
#include "elementtype.h"
#include "stopwatch.cpp"
// #undef CPPUNIT_ASSERT
// #define CPPUNIT_ASSERT(x)
using namespace std;
using namespace joblist;
Stopwatch timer;
struct DLElement
{
public:
int i;
bool operator<(const DLElement& c) const
{
return (i < c.i);
}
const char* getHashString(int mode, int* len) const
{
*len = 4;
return (char*)&i;
}
};
istream& operator>>(istream& is, DLElement& dl)
{
is.read((char*)&dl.i, sizeof(int));
return is;
}
ostream& operator<<(ostream& os, const DLElement& dl)
{
os.write((char*)&dl.i, sizeof(int));
return os;
}
int64_t count = 150000000 /*1000000*/;
int maxElements = 32000000 /*50000*/; // max elements in memory at once for the benchmarks
int buckets = 16; // # of buckets to use in bucketDL tests.
#define MAXINT64 (uint64_t)0xffffffffffffffffLL
/* Note, this is like what we'll use for banding operations */
class BandGrouper
{
public:
uint32_t operator()(const char* data, uint32_t len) const
{
stringstream ss(string(data, len));
ElementType dle;
ss >> dle;
// cout << "bandGrouper sees " << dle.i << " returning " <<
// dle.i/(::count/buckets) << endl;
return dle.first / (::count / buckets);
}
};
void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, struct timespec& diff)
{
if (tv2.tv_nsec < tv1.tv_nsec)
{
diff.tv_sec = tv2.tv_sec - tv1.tv_sec - 1;
diff.tv_nsec = tv1.tv_nsec - tv2.tv_nsec;
}
else
{
diff.tv_sec = tv2.tv_sec - tv1.tv_sec;
diff.tv_nsec = tv2.tv_nsec - tv1.tv_nsec;
}
}
int cCount = 0, pCount = 0;
void* MP_FIFO_stress_producer(void* arg)
{
FIFO<ElementType>* f = reinterpret_cast<FIFO<ElementType>*>(arg);
uint64_t i;
ElementType val;
uint64_t id = ::pCount++;
// cout << "producer " << id << " started" << endl;
for (i = 0; i < (uint32_t)::count / 10; i++)
{
val.first = (id << 60) | i;
val.second = val.first;
f->insert(val);
}
return NULL;
}
void* MP_FIFO_stress_consumer(void* arg)
{
FIFO<ElementType>* f = reinterpret_cast<FIFO<ElementType>*>(arg);
int it;
ElementType val;
bool ret;
// int id = ::cCount++;
uint64_t pCounter[10]; // 10 producers right now; make this a global var...
uint64_t pnum, pnext;
memset(pCounter, 0, 8 * 10);
// cout << "consumer " << id << " started" << endl;
it = f->getIterator();
ret = f->next(it, &val);
while (ret)
{
pnum = val.first >> 60;
pnext = pCounter[pnum]++;
// cerr << id << ": .first=0x" << hex << val.first << dec << " pnum:" << pnum << " val:"
// << pnext << endl;
CPPUNIT_ASSERT((val.first & 0xfffffffffffffffLL) == pnext);
CPPUNIT_ASSERT(val.second == val.first);
ret = f->next(it, &val);
}
return NULL;
}
void* FIFO_stress_consumer(void* arg)
{
FIFO<ElementType>* f = reinterpret_cast<FIFO<ElementType>*>(arg);
uint64_t i;
int it;
ElementType val;
bool ret;
int id = ++::cCount;
cout << "consumer " << id << " started" << endl;
it = f->getIterator();
for (i = 0; i < MAXINT64; i++)
{
// cout << id << ": " << i << " ";
ret = f->next(it, &val);
if (!ret)
{
cout << "consumer " << id << " exiting" << endl;
return NULL;
}
else
{
// cout << "first: " << val.first << " second: " << val.second << endl;
CPPUNIT_ASSERT(ret == true);
CPPUNIT_ASSERT(val.first == i && val.second == i);
}
}
return NULL;
}
void* FIFO_bench_consumer(void* arg)
{
FIFO<int>* f = reinterpret_cast<FIFO<int>*>(arg);
int i, it, val;
bool ret;
it = f->getIterator();
for (i = 0; i < ::count; i++)
{
// cout << "t " << i << endl;
ret = f->next(it, &val);
// CPPUNIT_ASSERT(ret == true);
}
return NULL;
}
void* FIFO_2_helper(void* arg)
{
FIFO<DLElement>* f = reinterpret_cast<FIFO<DLElement>*>(arg);
int i;
DLElement fe;
for (i = 0; i < ::count; i++)
{
fe.i = i;
#ifdef DEBUG
cout << "inserting " << i << endl;
#endif
f->insert(fe);
}
sleep(5); // causes the test to block on moreData until endOfInput is called
f->endOfInput();
return NULL;
}
void* WSDL_2_helper(void* arg)
{
WSDL<ElementType>* w = reinterpret_cast<WSDL<ElementType>*>(arg);
uint32_t i, id;
bool nextRet;
ElementType dle;
id = w->getIterator();
// cout << "id = " << id << endl;
for (i = 0; i < ::count; i++)
{
nextRet = w->next(id, &dle);
// cout << "tgot " << dle.i << endl;
CPPUNIT_ASSERT(nextRet == true);
CPPUNIT_ASSERT(dle.first == i && dle.second == i);
}
return NULL;
}
void* BandedDL_1_helper(void* arg)
{
BandedDL<ElementType>* w = reinterpret_cast<BandedDL<ElementType>*>(arg);
uint32_t i, id;
bool nextRet;
ElementType dle;
id = w->getIterator();
// cout << "id = " << id << endl;
for (i = 0; i < ::count; i++)
{
nextRet = w->next(id, &dle);
// cout << "tgot " << dle.i << endl;
// CPPUNIT_ASSERT(nextRet == true);
// CPPUNIT_ASSERT(dle.first == i && dle.second == i);
}
return NULL;
}
void* BandedDL_2_helper(void* arg)
{
BandedDL<StringElementType>* w = reinterpret_cast<BandedDL<StringElementType>*>(arg);
uint32_t i, id;
bool nextRet;
StringElementType dle;
id = w->getIterator();
// cout << "id = " << id << endl;
for (i = 0; i < ::count; i++)
{
ostringstream os;
os << "blah blah" << i;
nextRet = w->next(id, &dle);
// cout << "tgot " << dle.i << endl;
CPPUNIT_ASSERT(nextRet == true);
CPPUNIT_ASSERT(dle.first == i && dle.second == os.str());
}
return NULL;
}
void* WSDL_bench_helper(void* arg)
{
WSDL<ElementType>* w = reinterpret_cast<WSDL<ElementType>*>(arg);
int i, id;
bool nextRet;
ElementType e;
id = w->getIterator();
for (i = 0; i < ::count; i++)
nextRet = w->next(id, &e);
return NULL;
}
void* SWSDL_bench_helper(void* arg)
{
SWSDL<ElementType>* sw = reinterpret_cast<SWSDL<ElementType>*>(arg);
int i, id;
bool nextRet;
ElementType e;
id = sw->getIterator();
for (i = 0; i < ::count * 8; i++)
nextRet = sw->next(id, &e);
return NULL;
}
void* BDL_bench_helper(void* arg)
{
BandedDL<ElementType>* sw = reinterpret_cast<BandedDL<ElementType>*>(arg);
int i, id;
bool nextRet;
ElementType e;
id = sw->getIterator();
for (i = 0; i < ::count * 8; i++)
nextRet = sw->next(id, &e);
return NULL;
}
void* SWSDL_producer(void* arg)
{
SWSDL<ElementType>* sw = reinterpret_cast<SWSDL<ElementType>*>(arg);
for (int i = 0; i < ::count; i++)
sw->insert(ElementType(i, i));
return NULL;
}
void* BDL_producer(void* arg)
{
BandedDL<ElementType>* sw = reinterpret_cast<BandedDL<ElementType>*>(arg);
ElementType dle;
for (int i = 0; i < ::count; i++)
{
// dle.first = dle.second = i;
sw->insert(ElementType(i, i));
}
// sw->endOfInput();
// cout << "end of input size is " << sw->totalSize() << endl;
return NULL;
}
void* ZDL_producer(void* arg)
{
ZDL<ElementType>* zdl = reinterpret_cast<ZDL<ElementType>*>(arg);
for (int i = 0; i < ::count; i++)
{
zdl.insert(ElementType((int)N * rand() / (RAND_MAX + 1.0), i));
}
return NULL;
}
void* ZDL_consumer(void* arg)
{
ZDL<ElementType>* zdl = reinterpret_cast<ZDL<ElementType>*>(arg);
int i, id;
bool nextRet;
ElementType e;
id = zdl->getIterator();
nextRet = zdl->next(id, &e);
while (nextRet)
nextRet = zdl->next(id, &e);
return NULL;
}
void* SWSDL_consumer(void* arg)
{
SWSDL<ElementType>* sw = reinterpret_cast<SWSDL<ElementType>*>(arg);
int i, id;
bool nextRet;
ElementType e;
id = sw->getIterator();
nextRet = sw->next(id, &e);
while (nextRet)
nextRet = zdl->next(id, &e);
return NULL;
}
class DataListDriver : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(DataListDriver);
// CPPUNIT_TEST(FIFO_1);
// CPPUNIT_TEST(FIFO_2);
// CPPUNIT_TEST(MP_FIFO_stress); //10 consumers 10 producers
// CPPUNIT_TEST(FIFO_stress); // 10 consumers 1 producer; doesn't stop
// CPPUNIT_TEST(FIFO_bench);
// CPPUNIT_TEST(FIFO_singlethread_bench);
// CPPUNIT_TEST(WSDL_1);
// CPPUNIT_TEST(WSDL_2);
// CPPUNIT_TEST(WSDL_3); // WSDL_1 with StringElementTypes
// CPPUNIT_TEST(WSDL_bench);
// CPPUNIT_TEST(WSDL_singlethread_bench);
// CPPUNIT_TEST(CON_DL_1); // ConstantDataList
// CPPUNIT_TEST(BucketDL_1);
// CPPUNIT_TEST(BandedDL_1); // WSDL_2 ported to BandedDLs
// CPPUNIT_TEST(BandedDL_2); // BandedDL_1 that uses StringElementTypes
// make sure the DataList consumer-side interface works.
// CPPUNIT_TEST(BandedDL_as_WSDL_1);
// random inserts to bucketDL, create bandedDL, make sure it's in order
// CPPUNIT_TEST(BandedDL_as_WSDL_2);
/* tests prompted by feedback */
// CPPUNIT_TEST(polymorphism_1);
// make sure we can store basic types (int in this case)
// CPPUNIT_TEST(polymorphism_2);
// make sure we can create a FIFO, fill it, cast it to a DataList, and read the elements.
// CPPUNIT_TEST(polymorphism_3);
// CPPUNIT_TEST(SWSDL_bench);
// CPPUNIT_TEST(BDL_multiproducer_bench);
// CPPUNIT_TEST(BDL_consumer_bench);
// CPPUNIT_TEST(SWSDL_multiproducer_bench);
CPPUNIT_TEST(SWSDL_consumer_bench);
// CPPUNIT_TEST(BDL_singlethread_bench);
CPPUNIT_TEST(ZDL_bench);
CPPUNIT_TEST_SUITE_END();
private:
public:
void FIFO_1()
{
int i, it;
DLElement dummy;
FIFO<DLElement> f(1, 20);
bool nextRet;
for (i = 0; i < 10; i++)
{
dummy.i = i;
f.insert(dummy);
}
f.endOfInput();
it = f.getIterator();
CPPUNIT_ASSERT(it == 0);
for (i = 0; i < 10; i++)
{
nextRet = f.next(it, &dummy);
CPPUNIT_ASSERT(nextRet == true);
CPPUNIT_ASSERT(dummy.i == i);
}
}
void FIFO_2()
{
int i, it;
DLElement fe;
FIFO<DLElement> fifo(1, 2000);
pthread_t thread;
bool ret;
fe.i = 0;
pthread_create(&thread, NULL, FIFO_2_helper, &fifo);
sleep(1); // make sure the thread sleeps after 2000 inserts
it = fifo.getIterator();
for (i = 0; i < ::count; i++)
{
if (i % 100000 == 0)
cout << i << "/" << ::count << endl;
ret = fifo.next(it, &fe);
// cout << "fe = " << fe.i << endl;
CPPUNIT_ASSERT(ret == true);
CPPUNIT_ASSERT(fe.i == i);
}
ret = fifo.next(it, &fe); // should sleep until the thread calls endOfInput()
CPPUNIT_ASSERT(ret == false);
pthread_join(thread, NULL);
}
void MP_FIFO_stress()
{
int64_t i;
FIFO<ElementType> f(10, 25000);
pthread_t consumer[10];
pthread_t producer[10];
f.setMultipleProducers(true);
for (i = 0; i < 10; i++)
pthread_create(&consumer[i], NULL, MP_FIFO_stress_consumer, &f);
for (i = 0; i < 10; i++)
pthread_create(&producer[i], NULL, MP_FIFO_stress_producer, &f);
for (i = 0; i < 10; i++)
pthread_join(producer[i], NULL);
f.endOfInput();
/*
// for (i = 0; i < MAXINT64; i++) {
for (i = 0; i < ::count; i++) {
if ((i % 10000000) == 0)
cout << i << endl;
f.insert(ElementType(i, i));
}
f.endOfInput();
*/
for (i = 0; i < 10; i++)
pthread_join(consumer[i], NULL);
}
void FIFO_stress()
{
int64_t i;
FIFO<ElementType> f(10, 25000);
pthread_t consumer[10];
for (i = 0; i < 10; i++)
pthread_create(&consumer[i], NULL, FIFO_stress_consumer, &f);
// for (i = 0; i < MAXINT64; i++) {
for (i = 0; i < ::count; i++)
{
if ((i % 10000000) == 0)
cout << i << endl;
f.insert(ElementType(i, i));
}
f.endOfInput();
for (i = 0; i < 10; i++)
pthread_join(consumer[i], NULL);
}
void FIFO_bench()
{
int i;
FIFO<int> f(1, ::maxElements);
pthread_t consumer;
struct timespec tv1, tv2, diff;
pthread_create(&consumer, NULL, FIFO_bench_consumer, &f);
clock_gettime(CLOCK_REALTIME, &tv1);
for (i = 0; i < ::count; i++)
{
f.insert(i);
}
f.endOfInput();
pthread_join(consumer, NULL);
clock_gettime(CLOCK_REALTIME, &tv2);
timespec_sub(tv1, tv2, diff);
cout << "producer & consumer passed " << ::count << " elements in " << diff.tv_sec << "s " << diff.tv_nsec
<< "ns" << endl;
}
void FIFO_singlethread_bench()
{
int i, it, e;
FIFO<int> f(1, ::count);
bool more;
struct timespec tv1, tv2, diff;
clock_gettime(CLOCK_REALTIME, &tv1);
for (i = 0; i < ::count; i++)
{
f.insert(i);
}
f.endOfInput();
clock_gettime(CLOCK_REALTIME, &tv2);
timespec_sub(tv1, tv2, diff);
cout << "FIFO_singlethreaded_bench: inserted " << ::count << " ints in " << diff.tv_sec << "s "
<< diff.tv_nsec << "ns" << endl;
it = f.getIterator();
for (i = 0; i < ::count; i++)
{
more = f.next(it, &e);
}
clock_gettime(CLOCK_REALTIME, &tv1);
timespec_sub(tv2, tv1, diff);
cout << "FIFO_singlethread_bench: consumed, took " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
}
void WSDL_1()
{
WSDL<ElementType> w(1, ::count / 100);
ElementType dle;
uint32_t i, id;
bool nextRet;
for (i = 0; i < ::count; i++)
{
// cout << "inserting " << i << endl;
dle.first = dle.second = i;
w.insert(dle);
}
w.endOfInput();
id = w.getIterator();
for (i = 0; i < ::count; i++)
{
nextRet = w.next(id, &dle);
// cout << i << ":got " << dle.first << " " << dle.second << endl;
CPPUNIT_ASSERT(nextRet == true);
CPPUNIT_ASSERT(dle.first == i && dle.second == i);
}
}
void WSDL_2()
{
WSDL<ElementType> w(2, ::count / 10);
ElementType dle;
uint32_t i, id, size;
bool nextRet;
pthread_t consumer1;
pthread_create(&consumer1, NULL, WSDL_2_helper, &w);
// thread should wait here waiting on endOfInput()
for (i = 0; i < ::count; i++)
{
dle.first = dle.second = i;
w.insert(dle);
}
sleep(1);
w.endOfInput();
/* let the thread consume the loaded set; make sure that the load
doesn't happen until the main thread finishes with the loaded set */
sleep(1);
size = w.totalSize();
id = w.getIterator();
for (i = 0; i < ::count; i++)
{
nextRet = w.next(id, &dle);
// cout << "got " << dle.i << endl;
CPPUNIT_ASSERT(nextRet == true);
CPPUNIT_ASSERT(dle.first == i && dle.second == i);
}
pthread_join(consumer1, NULL);
}
void WSDL_3()
{
WSDL<StringElementType> w(1, ::count / 10);
StringElementType dle;
uint32_t i, id;
bool nextRet;
for (i = 0; i < ::count; i++)
{
ostringstream os;
os << "blah blah blah" << i;
// cout << "inserting " << i << endl;
dle.first = i;
dle.second = os.str();
w.insert(dle);
}
w.endOfInput();
id = w.getIterator();
for (i = 0; i < ::count; i++)
{
ostringstream os;
os << "blah blah blah" << i;
nextRet = w.next(id, &dle);
// cout << i << ":got " << dle.first << " " << dle.second << endl;
CPPUNIT_ASSERT(nextRet == true);
CPPUNIT_ASSERT(dle.first == i && dle.second == os.str());
}
}
void WSDL_bench()
{
WSDL<ElementType> w(1, ::maxElements);
int i;
pthread_t consumer;
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
pthread_create(&consumer, NULL, WSDL_bench_helper, &w);
for (i = 0; i < ::count; i++)
w.insert(ElementType(i, i));
w.endOfInput();
cout << "end of input size is " << w.totalSize() << endl;
pthread_join(consumer, NULL);
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "WSDL_bench: producer & consumer passed " << ::count << " elements in " << diff.tv_sec << "s "
<< diff.tv_nsec << "ns" << endl;
}
void WSDL_singlethread_bench()
{
WSDL<int> w(1, ::maxElements);
int i, id, e;
bool nextRet;
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
for (i = 0; i < ::count; i++)
w.insert(i);
w.endOfInput();
id = w.getIterator();
for (i = 0; i < ::count; i++)
nextRet = w.next(id, &e);
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "WSDL_singlethread_bench: producer & consumer passed " << ::count << " elements in "
<< diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
}
void SWSDL_bench()
{
/*
SWSDL<ElementType> sw(1, ::maxElements);
int i;
pthread_t consumer;
pthread_t producer;
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
pthread_create(&producer, NULL, SWSDL_producer, &sw);
pthread_create(&consumer, NULL, SWSDL_bench_helper, &sw);
pthread_join(producer, NULL);
pthread_join(consumer, NULL);
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "SWSDL_bench: producer & consumer passed " << ::count << " elements in " <<
diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
*/
SWSDL<ElementType> sw(1, ::maxElements);
int i;
pthread_t consumer;
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
pthread_create(&consumer, NULL, SWSDL_bench_helper, &sw);
for (i = 0; i < ::count; i++)
sw.insert(ElementType(i, i));
sw.endOfInput();
cout << "end of input size is " << sw.totalSize() << endl;
pthread_join(consumer, NULL);
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "SWSDL_bench: producer & consumer passed " << ::count << " elements in " << diff.tv_sec << "s "
<< diff.tv_nsec << "ns" << endl;
}
void SWSDL_multiproducer_bench()
{
SWSDL<ElementType> sw(1, ::maxElements);
sw.setMultipleProducers(true);
int i, id;
ElementType e;
bool nextRet;
uint32_t numOfThreads = 8;
pthread_t producer[numOfThreads];
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
for (i = 0; i < numOfThreads; i++)
pthread_create(&producer[i], NULL, SWSDL_producer, &sw);
for (i = 0; i < numOfThreads; i++)
pthread_join(producer[i], NULL);
sw.endOfInput();
cout << "end of input size is " << sw.totalSize() << endl;
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "# of Producer: " << numOfThreads << endl;
cout << "SWSDL_producer_phase_bench: producer & consumer passed " << ::count * numOfThreads
<< " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
clock_gettime(CLOCK_REALTIME, &ts1);
id = sw.getIterator();
for (i = 0; i < ::count; i++)
nextRet = sw.next(id, &e);
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "SWSDL_consumer_phase_bench: producer & consumer passed " << ::count * numOfThreads
<< " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
}
void SWSDL_consumer_bench()
{
SWSDL<ElementType> sw(4);
sw.setMultipleProducers(true);
ElementType dle;
uint32_t i, id;
bool nextRet;
uint32_t numOfProducers = 8;
uint32_t numOfConsumers = 4;
pthread_t producer[numOfProducers];
pthread_t consumer[numOfConsumers];
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
for (i = 0; i < numOfProducers; i++)
pthread_create(&producer[i], NULL, SWSDL_producer, &sw);
for (i = 0; i < numOfConsumers; i++)
pthread_create(&consumer[i], NULL, SWSDL_consumer, &sw);
for (i = 0; i < numOfThreads; i++)
pthread_join(producer[i], NULL);
sw.endOfInput();
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "# of Producer: " << numOfThreads << endl;
cout << "SWSDL_consumer_bench: producer & consumer passed " << ::count * numOfThreads << " elements in "
<< diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
}
void BDL_multiproducer_bench()
{
BandedDL<ElementType> w(2);
w.setMultipleProducers(true);
ElementType dle;
uint32_t i, id;
bool nextRet;
uint32_t numOfThreads = 8;
pthread_t producer[numOfThreads];
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
for (i = 0; i < numOfThreads; i++)
pthread_create(&producer[i], NULL, BDL_producer, &w);
for (i = 0; i < numOfThreads; i++)
pthread_join(producer[i], NULL);
w.endOfInput();
cout << "end of input size is " << w.totalSize() << endl;
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "# of Producer: " << numOfThreads << endl;
cout << "BDL_producer_phase_bench: producer & consumer passed " << ::count * numOfThreads
<< " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
clock_gettime(CLOCK_REALTIME, &ts1);
id = w.getIterator();
for (i = 0; i < ::count; i++)
{
nextRet = w.next(id, &dle);
}
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "BDL_consumer_phase_bench: producer & consumer passed " << ::count * numOfThreads
<< " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
/*
SWSDL<ElementType> sw(1, ::maxElements);
int i;
pthread_t consumer;
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
pthread_create(&consumer, NULL, SWSDL_bench_helper, &sw);
for (i = 0; i < ::count; i++)
sw.insert(ElementType(i, i));
sw.endOfInput();
cout << "end of input size is " << sw.totalSize() << endl;
pthread_join(consumer, NULL);
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "SWSDL_bench: producer & consumer passed " << ::count << " elements in " <<
diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
*/
}
void BDL_consumer_bench()
{
BandedDL<ElementType> w(2);
w.setMultipleProducers(true);
ElementType dle;
uint32_t i, id;
bool nextRet;
uint32_t numOfThreads = 8;
pthread_t producer[numOfThreads];
pthread_t consumer;
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
for (i = 0; i < numOfThreads; i++)
pthread_create(&producer[i], NULL, BDL_producer, &w);
for (i = 0; i < numOfThreads; i++)
pthread_join(producer[i], NULL);
w.endOfInput();
// pthread_create(&consumer, NULL, BDL_bench_helper, &w);
// cout << "end of input size is " << w.totalSize() << endl;
// clock_gettime(CLOCK_REALTIME, &ts2);
// timespec_sub(ts1, ts2, diff);
// cout << "# of Producer: " << numOfThreads << endl;
// cout << "BDL_producer_phase_bench: producer & consumer passed " <<
// ::count * numOfThreads << " elements in " << diff.tv_sec << "s " << diff.tv_nsec <<
// "ns" << endl;
// clock_gettime(CLOCK_REALTIME, &ts1);
id = w.getIterator();
for (i = 0; i < ::count * 8; i++)
{
nextRet = w.next(id, &dle);
}
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "end of input size is " << w.totalSize() << endl;
cout << "BDL_consumer_phase_bench: producer & consumer passed " << ::count * numOfThreads
<< " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
/*
SWSDL<ElementType> sw(1, ::maxElements);
int i;
pthread_t consumer;
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
pthread_create(&consumer, NULL, SWSDL_bench_helper, &sw);
for (i = 0; i < ::count; i++)
sw.insert(ElementType(i, i));
sw.endOfInput();
cout << "end of input size is " << sw.totalSize() << endl;
pthread_join(consumer, NULL);
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "SWSDL_bench: producer & consumer passed " << ::count << " elements in " <<
diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
*/
}
void BDL_singlethread_bench()
{
/*SWSDL<ElementType> sw(1, ::maxElements);
int i;
pthread_t consumer;
pthread_t producer;
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
pthread_create(&producer, NULL, SWSDL_producer, &sw);
pthread_join(producer, NULL);
pthread_create(&consumer, NULL, SWSDL_bench_helper, &sw);
pthread_join(consumer, NULL);
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);*/
SWSDL<ElementType> sw(1, ::maxElements);
int i, id;
ElementType e;
bool nextRet;
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
cout << "start of input size is " << ::count << endl;
for (i = 0; i < ::count; i++)
sw.insert(ElementType(i, i));
// cout << "end of input size is " << sw.totalSize() << endl;
// sw.endOfInput();
// id = sw.getIterator();
for (i = 0; i < ::count; i++)
nextRet = sw.next(id, &e);
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
// cout << "SWSDL_consumer_phase_bench: producer & consumer passed " <<
// ::count << " elements in " << diff.tv_sec << "s " << diff.tv_nsec <<
// "ns" << endl;
}
void CON_DL_1()
{
DLElement d, d2;
int i = 0;
bool nextRet;
d.i = 1;
ConstantDataList<DLElement> c(d);
nextRet = c.next(i, &d2);
CPPUNIT_ASSERT(nextRet == true);
CPPUNIT_ASSERT(d.i == 1 && d2.i == 1);
d.i = 2;
c.insert(d);
nextRet = c.next(i, &d2);
CPPUNIT_ASSERT(nextRet == true);
CPPUNIT_ASSERT(d.i == 2 && d2.i == 2);
}
void BucketDL_1()
{
BucketDL<ElementType> bdl(16, 1, ::count / 100);
ElementType dle;
uint32_t i, it, eCount = 0, buckets;
bool nextRet;
for (i = 0; i < ::count; i++)
{
// if (i % 1000000 == 0) cout << i << endl;
dle.first = dle.second = i;
bdl.insert(dle);
}
bdl.endOfInput();
// cout << "inserted " << ::count << " elements" << endl;
buckets = bdl.bucketCount();
for (i = 0; i < buckets; i++)
{
it = bdl.getIterator(i);
do
{
nextRet = bdl.next(i, it, &dle);
if (nextRet)
{
CPPUNIT_ASSERT(dle.first >= 0 && dle.first <= (uint64_t)::count);
CPPUNIT_ASSERT(++eCount <= ::count);
}
} while (nextRet == true);
}
CPPUNIT_ASSERT(eCount == ::count);
}
void BandedDL_1()
{
BandedDL<ElementType> w(2);
ElementType dle;
uint32_t i, id, size;
bool nextRet;
pthread_t consumer1;
pthread_create(&consumer1, NULL, BandedDL_1_helper, &w);
// thread should wait here waiting on endOfInput()
for (i = 0; i < ::count; i++)
{
dle.first = dle.second = i;
w.insert(dle);
if (((i + 1) % (::count / 10)) == 0)
{
cout << "inserted " << i + 1 << "/" << ::count << endl;
w.saveBand();
}
}
sleep(1);
w.endOfInput();
/* let the thread consume the loaded set; make sure that the load
doesn't happen until the main thread finishes with the loaded set */
sleep(5);
size = w.totalSize();
id = w.getIterator();
for (i = 0; i < ::count; i++)
{
nextRet = w.next(id, &dle);
// cout << "got " << dle.i << endl;
CPPUNIT_ASSERT(nextRet == true);
CPPUNIT_ASSERT(dle.first == i);
}
pthread_join(consumer1, NULL);
}
void BandedDL_2()
{
BandedDL<StringElementType> w(2);
StringElementType dle;
uint32_t i, id, size;
bool nextRet;
pthread_t consumer1;
pthread_create(&consumer1, NULL, BandedDL_2_helper, &w);
// thread should wait here waiting on endOfInput()
for (i = 0; i < ::count; i++)
{
ostringstream os;
os << "blah blah" << i;
dle.first = i;
dle.second = os.str();
w.insert(dle);
if (((i + 1) % (::count / 10)) == 0)
{
cout << "inserted " << i + 1 << "/" << ::count << endl;
w.saveBand();
}
}
sleep(1);
w.endOfInput();
/* let the thread consume the loaded set; make sure that the load
doesn't happen until the main thread finishes with the loaded set */
cout << "endofInput finished\n";
sleep(5);
size = w.totalSize();
id = w.getIterator();
for (i = 0; i < ::count; i++)
{
ostringstream os;
os << "blah blah" << i;
nextRet = w.next(id, &dle);
// cout << "got " << dle.i << endl;
CPPUNIT_ASSERT(nextRet == true);
CPPUNIT_ASSERT(dle.first == i && dle.second == os.str());
}
pthread_join(consumer1, NULL);
}
void BandedDL_as_WSDL_1()
{
BandedDL<ElementType> bdl(1);
ElementType e;
uint32_t i, it;
bool more;
for (i = 1; i <= ::count; i++)
{
e.first = e.second = i;
bdl.insert(e);
if (i % (::count / 10) == 0)
bdl.saveBand();
}
bdl.endOfInput();
it = bdl.getIterator();
for (i = 1; i <= ::count; i++)
{
more = bdl.next(it, &e);
CPPUNIT_ASSERT(more == true);
CPPUNIT_ASSERT(e.first == i && e.second == i);
}
more = bdl.next(it, &e);
CPPUNIT_ASSERT(more == false);
bdl.restart();
/* make sure it can be read again */
for (i = 1; i <= ::count; i++)
{
more = bdl.next(it, &e);
if (!more)
cerr << i << endl;
CPPUNIT_ASSERT(more == true);
CPPUNIT_ASSERT(e.first == i && e.second == i);
}
more = bdl.next(it, &e);
CPPUNIT_ASSERT(more == false);
}
void BandedDL_as_WSDL_2()
{
BucketDL<ElementType> bucketDL(::buckets, 1, 1000, BandGrouper());
uint32_t last, i, it;
ElementType dle;
bool more;
srand(time(NULL));
for (i = 0; i < ::count; i++)
{
dle.first = dle.second = rand() % ::count; // artificial max
bucketDL.insert(dle);
}
bucketDL.endOfInput();
// cout << "made bucketdl" << endl;
BandedDL<ElementType> bdl(bucketDL, 1);
// cout << "made bandeddl" << endl;
it = bdl.getIterator();
last = 0;
// duplicates were removed, there won't be ::count elements in bdl
more = bdl.next(it, &dle);
while (more)
{
CPPUNIT_ASSERT(dle.first < (uint64_t)::count);
// cout << dle.first << " >= " << last << endl;
CPPUNIT_ASSERT(dle.first >= last);
last = dle.first;
more = bdl.next(it, &dle);
}
// cerr << "read through bandeddl" << endl;
more = bdl.next(it, &dle);
CPPUNIT_ASSERT(more == false);
}
void ZDL_bench()
{
ZDL<ElementType> zdl(4);
zdl.setMultipleProducers(true);
ElementType dle;
uint32_t i, id;
bool nextRet;
uint32_t numOfProducers = 8;
uint32_t numOfConsumers = 4;
pthread_t producer[numOfProducers];
pthread_t consumer[numOfConsumers];
struct timespec ts1, ts2, diff;
clock_gettime(CLOCK_REALTIME, &ts1);
for (i = 0; i < numOfProducers; i++)
pthread_create(&producer[i], NULL, ZDL_producer, &zdl);
for (i = 0; i < numOfConsumers; i++)
pthread_create(&consumer[i], NULL, ZDL_consumer, &zdl);
for (i = 0; i < numOfThreads; i++)
pthread_join(producer[i], NULL);
zdl.endOfInput();
// clock_gettime(CLOCK_REALTIME, &ts1);
id = w.getIterator();
for (i = 0; i < ::count * 8; i++)
{
nextRet = w.next(id, &dle);
}
clock_gettime(CLOCK_REALTIME, &ts2);
timespec_sub(ts1, ts2, diff);
cout << "end of input size is " << w.totalSize() << endl;
cout << "ZDL_consumer_producer_bench: producer & consumer passed " << ::count * numOfProducers
<< " elements in " << diff.tv_sec << "s " << diff.tv_nsec << "ns" << endl;
}
/* Sanity test */
void polymorphism_1()
{
DataList<DLElement>* dl;
FIFO<DLElement>* f;
DLElement dle;
int it, i;
bool more;
f = new FIFO<DLElement>(1, 20);
dl = (DataList<DLElement>*)f;
it = dl->getIterator();
for (i = 0; i < 10; i++)
{
dle.i = i;
dl->insert(dle);
}
dl->endOfInput();
for (i = 0; i < 10; i++)
{
more = dl->next(it, &dle);
// cout << dle.i << endl;
CPPUNIT_ASSERT(more == true);
CPPUNIT_ASSERT(dle.i == i);
}
delete f;
}
/* Sanity test 2. Can we stuff basic datatypes into DataLists? Apparently not. WHY?*/
void polymorphism_2()
{
DataList<int>* dl;
FIFO<int>* f = new FIFO<int>(1, 20);
int it, i, dle;
bool more;
dl = (DataList<int>*)f;
it = dl->getIterator();
for (i = 0; i < 10; i++)
dl->insert(i);
dl->endOfInput();
for (i = 0; i < 10; i++)
{
more = dl->next(it, &dle);
CPPUNIT_ASSERT(more == true);
CPPUNIT_ASSERT(dle == i);
// cout << "dle[" << i << "]: " << dle << endl;
}
delete f;
}
void polymorphism_3()
{
DataList<DLElement>* dl;
FIFO<DLElement>* f;
int it, i;
DLElement dle;
bool more;
f = new FIFO<DLElement>(1, 20);
dl = (DataList<DLElement>*)f;
it = dl->getIterator();
for (i = 0; i < 10; i++)
{
dle.i = i;
f->insert(dle);
}
f->endOfInput();
for (i = 0; i < 10; i++)
{
// cout << "next" << endl;
more = dl->next(it, &dle);
CPPUNIT_ASSERT(more == true);
CPPUNIT_ASSERT(dle.i == i);
// cout << "dle[" << i << "]: " << dle.i << endl;
}
delete f;
}
};
CPPUNIT_TEST_SUITE_REGISTRATION(DataListDriver);
int main(int argc, char** argv)
{
CppUnit::TextUi::TestRunner runner;
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
runner.addTest(registry.makeTest());
bool wasSuccessful = runner.run("", false);
return (wasSuccessful ? 0 : 1);
}