/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: tdriver-bru.cpp 9210 2013-01-21 14:10:42Z rdempsey $ * * ***********************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "bucketdl.h" #include "elementtype.h" #include "stopwatch.cpp" #include "bucketreuse.h" // #undef CPPUNIT_ASSERT // #define CPPUNIT_ASSERT(x) using namespace std; using namespace boost; using namespace joblist; // Stopwatch timer; //------------------------------------------------------------------------------ // TestDriver class derived from CppUnit //------------------------------------------------------------------------------ class BucketReUseDriver : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(BucketReUseDriver); CPPUNIT_TEST(parseConfig); CPPUNIT_TEST(createFiles); CPPUNIT_TEST(reuseFiles); CPPUNIT_TEST(newversion); CPPUNIT_TEST(concurrent); CPPUNIT_TEST(concurrent_newversion); CPPUNIT_TEST(concurrent_race); CPPUNIT_TEST_SUITE_END(); private: public: //-------------------------------------------------------------------------- // setup method run prior to each unit test, inherited from base //-------------------------------------------------------------------------- void setUp() { clock_gettime(CLOCK_REALTIME, &ts); } //-------------------------------------------------------------------------- // validate results from a unit test, inherited from base //-------------------------------------------------------------------------- void validateResults() { } //-------------------------------------------------------------------------- // test functions //-------------------------------------------------------------------------- void parseConfig(); void createFiles(); void reuseFiles(); void newversion(); void concurrent(); void concurrent_newversion(); void concurrent_race(); private: //-------------------------------------------------------------------------- // initialize method // oid : reference to the column OID used for test //-------------------------------------------------------------------------- void initControl(execplan::CalpontSystemCatalog::OID& oid); //-------------------------------------------------------------------------- // validate results // files: the filenames to be verified if exist // exist: expect if the files are created or deleted //-------------------------------------------------------------------------- void validateFileExist(set& files, bool exist); static void* insertThread(void*); static void* readThread(void*); static void* scanThread(void*); static void* reuseThread(void*); static void* raceThread(void*); struct ThreadArg { uint64_t id; // thread id uint64_t version; // db version uint64_t buckets; // max bucket numbers uint64_t elements; // max number of elem per bucket uint64_t total; // total number of elements execplan::CalpontSystemCatalog::OID oid; // column OID BucketDataList* dl; // datalist // for sync threads bool* flag; pthread_mutex_t* mutex; pthread_cond_t* cond; // for file status check set* files; ThreadArg() : id(0) , version(0) , buckets(0) , elements(0) , total(0) , dl(NULL) , flag(NULL) , mutex(NULL) , cond(NULL) , files(NULL) { } }; static const string column; public: static struct timespec ts; }; CPPUNIT_TEST_SUITE_REGISTRATION(BucketReUseDriver); const string BucketReUseDriver::column = "tpch.lineitem.l_orderkey"; struct timespec BucketReUseDriver::ts; //------------------------------------------------------------------------------ // main entry point //------------------------------------------------------------------------------ int main(int argc, char** argv) { CppUnit::TextUi::TestRunner runner; CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry(); runner.addTest(registry.makeTest()); bool wasSuccessful = runner.run("", false); return (wasSuccessful ? 0 : 1); } struct timespec atTime(); ostream& operator<<(ostream& os, const struct timespec& t); void BucketReUseDriver::parseConfig() { cout << "ut: parseConfig start...\n" << endl; // before run this test, make sure "tpch.lineitem.l_orderkey" is in the Columnstore.xml BucketReuseManager* control = BucketReuseManager::instance(); ResourceManager rm; BucketReuseManager::instance()->startup(rm); // list all the predicates if any listed in the Columnstore.xml file for (BucketReuseMap::iterator it = control->fControlMap.begin(); it != control->fControlMap.end(); it++) cout << *(it->second); cout << endl; size_t schemap = column.find_first_of("."); size_t columnp = column.find_last_of("."); CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos); execplan::CalpontSystemCatalog::TableColName columnName; columnName.schema = column.substr(0, schemap); columnName.table = column.substr(schemap + 1, columnp - schemap - 1); columnName.column = column.substr(columnp + 1); string filter = "allrows"; CPPUNIT_ASSERT(control->fControlMap.find(BucketFileKey(columnName, filter)) != control->fControlMap.end()); cout << "ut: parseConfig done!\n" << endl; } void BucketReUseDriver::createFiles() { cout << "ut: createFiles start...\n" << endl; ThreadArg arg; arg.id = 1; arg.version = 1; arg.buckets = 2; arg.elements = 2; arg.total = 1000000; execplan::CalpontSystemCatalog::OID oid; initControl(oid); arg.oid = oid; set files; arg.files = &files; pthread_t t; pthread_create(&t, NULL, scanThread, &arg); pthread_join(t, NULL); validateFileExist(files, true); cout << "ut: createFiles done!\n" << endl; } void BucketReUseDriver::reuseFiles() { cout << "ut: reuseFiles start...\n" << endl; ThreadArg arg1; arg1.id = 1; arg1.version = 1; arg1.buckets = 2; arg1.elements = 2; arg1.total = 1000000; execplan::CalpontSystemCatalog::OID oid; initControl(oid); arg1.oid = oid; set files1; arg1.files = &files1; // create the files pthread_t t1; pthread_create(&t1, NULL, scanThread, &arg1); pthread_join(t1, NULL); validateFileExist(files1, true); // use new datalist, reuse case ThreadArg arg2 = arg1; arg2.id = 2; set files2; arg2.files = &files2; pthread_t t2; pthread_create(&t2, NULL, reuseThread, &arg2); pthread_join(t2, NULL); validateFileExist(files2, true); cout << "ut: reuseFiles done!\n" << endl; } void BucketReUseDriver::newversion() { cout << "ut: newversion start...\n" << endl; ThreadArg arg1; arg1.id = 1; arg1.version = 1; arg1.buckets = 2; arg1.elements = 2; arg1.total = 1000000; execplan::CalpontSystemCatalog::OID oid; initControl(oid); arg1.oid = oid; set files1; arg1.files = &files1; // create the files pthread_t t1; pthread_create(&t1, NULL, scanThread, &arg1); pthread_join(t1, NULL); validateFileExist(files1, true); // new version ThreadArg arg2 = arg1; arg2.id = 2; arg2.version = 2; set files2; arg2.files = &files2; pthread_t t2; pthread_create(&t2, NULL, scanThread, &arg2); pthread_join(t2, NULL); pthread_yield(); validateFileExist(files1, false); validateFileExist(files2, true); // read from the new files with new datalist ThreadArg arg3 = arg2; arg3.id = 3; arg3.dl = NULL; set files3; arg3.files = &files3; pthread_t t3; pthread_create(&t3, NULL, reuseThread, &arg3); pthread_join(t3, NULL); validateFileExist(files3, true); cout << "ut: newversion done!\n" << endl; } void BucketReUseDriver::concurrent() { cout << "ut: concurrent start...\n" << endl; ThreadArg arg1; arg1.id = 1; arg1.version = 1; arg1.buckets = 2; arg1.elements = 2; arg1.total = 1000000; execplan::CalpontSystemCatalog::OID oid; initControl(oid); arg1.oid = oid; set files1; arg1.files = &files1; // create the files pthread_t t1; pthread_create(&t1, NULL, scanThread, &arg1); sleep(1); // reuse case, current thread ThreadArg arg2 = arg1; arg2.id = 2; set files2; arg2.files = &files2; pthread_t t2; pthread_create(&t2, NULL, reuseThread, &arg2); pthread_join(t1, NULL); validateFileExist(files1, true); pthread_join(t2, NULL); validateFileExist(files2, true); cout << "ut: concurrent done!\n" << endl; } void BucketReUseDriver::concurrent_newversion() { cout << "ut: concurrent_newversion start...\n" << endl; bool flag = false; pthread_mutex_t mutex; pthread_mutex_init(&mutex, 0); pthread_cond_t cond; pthread_cond_init(&cond, 0); ThreadArg arg1; arg1.id = 1; arg1.version = 1; arg1.buckets = 2; arg1.elements = 2; arg1.total = 1000000; execplan::CalpontSystemCatalog::OID oid; initControl(oid); arg1.oid = oid; set files1; arg1.files = &files1; // create the files pthread_t t1; pthread_create(&t1, NULL, scanThread, &arg1); // reuse case, current thread ThreadArg arg2 = arg1; arg2.id = 2; set files2; arg2.files = &files2; sleep(1); pthread_t t2; pthread_create(&t2, NULL, reuseThread, &arg2); // new version ThreadArg arg3 = arg1; arg3.id = 3; arg3.version = 3; arg3.flag = &flag; arg3.mutex = &mutex; arg3.cond = &cond; set files3; arg3.files = &files3; pthread_t t3; pthread_create(&t3, NULL, scanThread, &arg3); sleep(3); pthread_mutex_lock(&mutex); flag = true; pthread_cond_broadcast(&cond); pthread_mutex_unlock(&mutex); pthread_join(t1, NULL); pthread_join(t2, NULL); pthread_join(t3, NULL); // let cleanup thread do its job pthread_yield(); validateFileExist(files1, false); validateFileExist(files2, false); validateFileExist(files3, true); pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cond); cout << "ut: concurrent_newversion done!\n" << endl; } void BucketReUseDriver::concurrent_race() { cout << "ut: concurrent_race start...\n" << endl; ThreadArg arg1; arg1.id = 1; arg1.version = 1; arg1.buckets = 2; arg1.elements = 2; arg1.total = 2000000; execplan::CalpontSystemCatalog::OID oid; initControl(oid); arg1.oid = oid; set files1; arg1.files = &files1; ThreadArg arg2 = arg1; arg2.id = 2; set files2; arg2.files = &files2; // start the version 1 threads pthread_t t1; pthread_t t2; pthread_create(&t1, NULL, raceThread, &arg1); pthread_create(&t2, NULL, raceThread, &arg2); // let the version 1 threads register sleep(1); ThreadArg arg3 = arg1; arg3.id = 3; arg3.version = 4; arg3.total = 1000000; set files3; arg3.files = &files3; ThreadArg arg4 = arg3; arg4.id = 4; set files4; arg4.files = &files4; // start the version 4 threads pthread_t t3; pthread_t t4; pthread_create(&t3, NULL, raceThread, &arg3); pthread_create(&t4, NULL, raceThread, &arg4); pthread_join(t3, NULL); pthread_join(t4, NULL); validateFileExist(files1, true); validateFileExist(files4, true); pthread_join(t1, NULL); pthread_join(t2, NULL); validateFileExist(files1, false); validateFileExist(files4, true); cout << "ut: concurrent_race done!\n" << endl; } void BucketReUseDriver::initControl(execplan::CalpontSystemCatalog::OID& oid) { // make sure the column for testing is in the map, "column" is a class variable // to test with another column, only one place to change // -- to validate the config parsing, run parseConfig -- config::Config* cf = config::Config::makeConfig(); vector columns; cf->getConfig("HashBucketReuse", "Predicate", columns); bool found = true; for (vector::iterator it = columns.begin(); it != columns.end(); ++it) { if (it->compare(0, column.size(), column) == 0) { found = true; break; } } string filter = "allrows"; BucketReuseManager* control = BucketReuseManager::instance(); if (found == false) { cout << "tpch.lineitem.l_orderkey is not in the Columnstore.xml!)" << endl; cout << "insert it to countinue unit test" << endl; size_t schemap = column.find_first_of("."); size_t columnp = column.find_last_of("."); CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos); execplan::CalpontSystemCatalog::TableColName tcn; tcn.schema = column.substr(0, schemap); tcn.table = column.substr(schemap + 1, columnp - schemap - 1); tcn.column = column.substr(columnp + 1); control->fConfigMap.insert(pair(column, BucketFileKey(tcn, filter))); } ResourceManager rm; // now start the BucketReuseManager BucketReuseManager::instance()->startup(rm); // get the oid for registration size_t schemap = column.find_first_of("."); size_t columnp = column.find_last_of("."); CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos); execplan::CalpontSystemCatalog::TableColName columnName; columnName.schema = column.substr(0, schemap); columnName.table = column.substr(schemap + 1, columnp - schemap - 1); columnName.column = column.substr(columnp + 1); CPPUNIT_ASSERT(control->fControlMap.find(BucketFileKey(columnName, filter)) != control->fControlMap.end()); } void BucketReUseDriver::validateFileExist(set& files, bool exist) { cout << "\ncheck if files exist or not:" << endl; for (set::iterator i = files.begin(); i != files.end(); i++) { filesystem::path p(i->c_str()); cout << (*i) << "-- "; if (exist) { CPPUNIT_ASSERT(filesystem::exists(p)); cout << "OK" << endl; } else { CPPUNIT_ASSERT(!filesystem::exists(p)); cout << "GONE" << endl; } } cout << endl; } void* BucketReUseDriver::insertThread(void* arg) { ThreadArg* a = reinterpret_cast(arg); BucketDataList* dl = a->dl; CPPUNIT_ASSERT(dl != NULL); cout << "thread " << a->id << " start at " << atTime() << endl; BucketReuseControlEntry* entry = dl->reuseControl(); for (uint64_t i = 0; i < a->buckets; i++) { stringstream ss; ss << entry->baseName() << "." << i; filesystem::path p(ss.str().c_str()); a->files->insert(ss.str()); } ElementType e; for (uint64_t i = 0; i < a->total; i++) { e.first = i; e.second = i * 10 + a->version; // include the version in values dl->insert(e); } cout << "thread[" << a->id << "] last element inserted at " << atTime() << endl; dl->endOfInput(); cout << "thread " << a->id << " finished at " << atTime() << endl; return NULL; } void* BucketReUseDriver::readThread(void* arg) { ThreadArg* a = reinterpret_cast(arg); BucketDataList* dl = a->dl; CPPUNIT_ASSERT(dl != NULL); cout << "thread " << a->id << " start at " << atTime() << endl; BucketReuseControlEntry* entry = dl->reuseControl(); for (uint64_t i = 0; i < a->buckets; i++) { stringstream ss; ss << entry->baseName() << "." << i; filesystem::path p(ss.str().c_str()); a->files->insert(ss.str()); } ElementType e; uint64_t min = 0xffffffff, max = 0, count = 0; uint64_t k[a->buckets]; // count of each bucket bool firstRead = true; for (uint64_t i = 0; i < a->buckets; i++) { k[i] = 0; uint64_t it = dl->getIterator(i); while (dl->next(i, it, &e)) { if (firstRead) { cout << "thread[" << a->id << "] first read at " << atTime() << endl; firstRead = false; } if (e.second < min) min = e.second; if (e.second > max) max = e.second; // output the first 10 of each bucket or last 10 of the datalist // if (count < 10 || (a->total - count) < 10 || k[i] < 10) if (count < 2 || (a->total - count) < 2 || k[i] < 2) cout << "thread[" << a->id << "] bucket:" << i << " e(" << e.first << ", " << e.second << ")" << endl; count++; k[i]++; } } cout << "\nthread[" << a->id << "] element: count = " << count << ", min/max = " << min << "/" << max << ", elements in each bucket: "; for (uint64_t i = 0; i < a->buckets; i++) cout << k[i] << " "; cout << endl; cout << "thread " << a->id << " finished at " << atTime() << endl; return NULL; } void* BucketReUseDriver::scanThread(void* arg) { ThreadArg* a = reinterpret_cast(arg); BucketDataList* dl = a->dl; CPPUNIT_ASSERT(dl == NULL); if (a->cond != NULL) { pthread_mutex_lock(a->mutex); while (*(a->flag) != true) pthread_cond_wait(a->cond, a->mutex); pthread_mutex_unlock(a->mutex); } cout << "thread " << a->id << " start at " << atTime() << endl; string dummy; bool scan = false; boost::shared_ptr c = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000); execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid); BucketReuseControlEntry* entry = BucketReuseManager::instance()->userRegister(tcn, dummy, a->version, a->buckets, scan); CPPUNIT_ASSERT(scan == true); ResourceManager rm; dl = new BucketDataList(a->buckets, 1, a->elements, rm); dl->setElementMode(1); dl->reuseControl(entry, !scan); ThreadArg arg1 = *a; arg1.id *= 10; arg1.dl = dl; pthread_t t1; pthread_create(&t1, NULL, insertThread, &arg1); ThreadArg arg2 = arg1; arg2.id += 1; pthread_t t2; pthread_create(&t2, NULL, readThread, &arg2); pthread_join(t1, NULL); pthread_join(t2, NULL); delete dl; cout << "thread " << a->id << " finished at " << atTime() << endl; return NULL; } void* BucketReUseDriver::reuseThread(void* arg) { ThreadArg* a = reinterpret_cast(arg); BucketDataList* dl = a->dl; CPPUNIT_ASSERT(dl == NULL); if (a->cond != NULL) { pthread_mutex_lock(a->mutex); while (*(a->flag) != true) pthread_cond_wait(a->cond, a->mutex); pthread_mutex_unlock(a->mutex); } cout << "thread " << a->id << " start at " << atTime() << endl; string dummy; bool scan = true; boost::shared_ptr c = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000); execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid); BucketReuseControlEntry* entry = BucketReuseManager::instance()->userRegister(tcn, dummy, a->version, a->buckets, scan); CPPUNIT_ASSERT(scan == false); ResourceManager rm; dl = new BucketDataList(a->buckets, 1, a->elements, rm); dl->setElementMode(1); dl->reuseControl(entry, !scan); ThreadArg arg1 = *a; arg1.id *= 10; arg1.dl = dl; pthread_t t1; pthread_create(&t1, NULL, readThread, &arg1); if (entry->fileStatus() == BucketReuseControlEntry::progress_c) { boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex()); dl->reuseControl()->stateChange().wait(lock); } else { CPPUNIT_ASSERT((entry->fileStatus() == BucketReuseControlEntry::using_c) || (entry->fileStatus() == BucketReuseControlEntry::ready_c)); } // the bucket files are ready dl->restoreBucketInformation(); dl->endOfInput(); pthread_join(t1, NULL); delete dl; cout << "thread " << a->id << " finished at " << atTime() << endl; return NULL; } void* BucketReUseDriver::raceThread(void* arg) { ThreadArg* a = reinterpret_cast(arg); BucketDataList* dl = a->dl; CPPUNIT_ASSERT(dl == NULL); if (a->cond != NULL) { pthread_mutex_lock(a->mutex); while (*(a->flag) != true) pthread_cond_wait(a->cond, a->mutex); pthread_mutex_unlock(a->mutex); } cout << "thread " << a->id << " start at " << atTime() << endl; string dummy; bool scan = true; ResourceManager rm; BucketReuseControlEntry* entry = NULL; { boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex()); boost::shared_ptr c = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000); execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid); entry = BucketReuseManager::instance()->userRegister(tcn, dummy, a->version, a->buckets, scan); dl = new BucketDataList(a->buckets, 1, a->elements, rm); dl->setElementMode(1); dl->reuseControl(entry, !scan); } if (scan == true) { CPPUNIT_ASSERT(entry->fileStatus() == BucketReuseControlEntry::progress_c); ThreadArg arg1 = *a; arg1.id *= 10; arg1.dl = dl; pthread_t t1; pthread_create(&t1, NULL, insertThread, &arg1); ThreadArg arg2 = arg1; arg2.id += 1; pthread_t t2; pthread_create(&t2, NULL, readThread, &arg2); pthread_join(t1, NULL); pthread_join(t2, NULL); } else { ThreadArg arg1 = *a; arg1.id *= 10; arg1.dl = dl; pthread_t t1; pthread_create(&t1, NULL, readThread, &arg1); if (entry->fileStatus() == BucketReuseControlEntry::progress_c) { boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex()); dl->reuseControl()->stateChange().wait(lock); } else { CPPUNIT_ASSERT((entry->fileStatus() == BucketReuseControlEntry::using_c) || (entry->fileStatus() == BucketReuseControlEntry::ready_c)); } // the bucket files are ready dl->restoreBucketInformation(); dl->endOfInput(); pthread_join(t1, NULL); } delete dl; cout << "thread " << a->id << " finished at " << atTime() << endl; return NULL; } timespec atTime() { timespec ts, ts1, ts2; ts1 = BucketReUseDriver::ts; clock_gettime(CLOCK_REALTIME, &ts2); if (ts2.tv_nsec < ts1.tv_nsec) { ts.tv_sec = ts2.tv_sec - ts1.tv_sec - 1; ts.tv_nsec = ts2.tv_nsec + 1000000000 - ts1.tv_nsec; } else { ts.tv_sec = ts2.tv_sec - ts1.tv_sec; ts.tv_nsec = ts2.tv_nsec - ts1.tv_nsec; } return ts; } ostream& operator<<(ostream& os, const struct timespec& t) { os << t.tv_sec << "." << setw(9) << setfill('0') << t.tv_nsec << "s"; return os; }