You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
930 lines
23 KiB
C++
930 lines
23 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: tdriver-bru.cpp 9210 2013-01-21 14:10:42Z rdempsey $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
#include <iostream>
|
|
#include <sstream>
|
|
#include <fstream>
|
|
#include <iomanip>
|
|
#include <vector>
|
|
#include <set>
|
|
|
|
#include <pthread.h>
|
|
#include <time.h>
|
|
#include <sys/time.h>
|
|
|
|
#include <boost/filesystem/path.hpp>
|
|
#include <boost/filesystem/operations.hpp>
|
|
#include <boost/thread/mutex.hpp>
|
|
|
|
#include <cppunit/extensions/HelperMacros.h>
|
|
#include <cppunit/extensions/TestFactoryRegistry.h>
|
|
#include <cppunit/ui/text/TestRunner.h>
|
|
|
|
#include "bucketdl.h"
|
|
#include "elementtype.h"
|
|
#include "stopwatch.cpp"
|
|
|
|
#include "bucketreuse.h"
|
|
|
|
// #undef CPPUNIT_ASSERT
|
|
// #define CPPUNIT_ASSERT(x)
|
|
|
|
using namespace std;
|
|
using namespace boost;
|
|
using namespace joblist;
|
|
|
|
// Stopwatch timer;
|
|
|
|
//------------------------------------------------------------------------------
|
|
// TestDriver class derived from CppUnit
|
|
//------------------------------------------------------------------------------
|
|
|
|
class BucketReUseDriver : public CppUnit::TestFixture
|
|
{
|
|
CPPUNIT_TEST_SUITE(BucketReUseDriver);
|
|
CPPUNIT_TEST(parseConfig);
|
|
CPPUNIT_TEST(createFiles);
|
|
CPPUNIT_TEST(reuseFiles);
|
|
CPPUNIT_TEST(newversion);
|
|
CPPUNIT_TEST(concurrent);
|
|
CPPUNIT_TEST(concurrent_newversion);
|
|
CPPUNIT_TEST(concurrent_race);
|
|
CPPUNIT_TEST_SUITE_END();
|
|
|
|
private:
|
|
public:
|
|
//--------------------------------------------------------------------------
|
|
// setup method run prior to each unit test, inherited from base
|
|
//--------------------------------------------------------------------------
|
|
void setUp()
|
|
{
|
|
clock_gettime(CLOCK_REALTIME, &ts);
|
|
}
|
|
|
|
//--------------------------------------------------------------------------
|
|
// validate results from a unit test, inherited from base
|
|
//--------------------------------------------------------------------------
|
|
void validateResults()
|
|
{
|
|
}
|
|
|
|
//--------------------------------------------------------------------------
|
|
// test functions
|
|
//--------------------------------------------------------------------------
|
|
void parseConfig();
|
|
void createFiles();
|
|
void reuseFiles();
|
|
void newversion();
|
|
void concurrent();
|
|
void concurrent_newversion();
|
|
void concurrent_race();
|
|
|
|
private:
|
|
//--------------------------------------------------------------------------
|
|
// initialize method
|
|
// oid : reference to the column OID used for test
|
|
//--------------------------------------------------------------------------
|
|
void initControl(execplan::CalpontSystemCatalog::OID& oid);
|
|
|
|
//--------------------------------------------------------------------------
|
|
// validate results
|
|
// files: the filenames to be verified if exist
|
|
// exist: expect if the files are created or deleted
|
|
//--------------------------------------------------------------------------
|
|
void validateFileExist(set<string>& files, bool exist);
|
|
|
|
static void* insertThread(void*);
|
|
static void* readThread(void*);
|
|
|
|
static void* scanThread(void*);
|
|
static void* reuseThread(void*);
|
|
static void* raceThread(void*);
|
|
|
|
struct ThreadArg
|
|
{
|
|
uint64_t id; // thread id
|
|
uint64_t version; // db version
|
|
uint64_t buckets; // max bucket numbers
|
|
uint64_t elements; // max number of elem per bucket
|
|
uint64_t total; // total number of elements
|
|
execplan::CalpontSystemCatalog::OID oid; // column OID
|
|
BucketDataList* dl; // datalist
|
|
|
|
// for sync threads
|
|
bool* flag;
|
|
pthread_mutex_t* mutex;
|
|
pthread_cond_t* cond;
|
|
|
|
// for file status check
|
|
set<string>* files;
|
|
|
|
ThreadArg()
|
|
: id(0)
|
|
, version(0)
|
|
, buckets(0)
|
|
, elements(0)
|
|
, total(0)
|
|
, dl(NULL)
|
|
, flag(NULL)
|
|
, mutex(NULL)
|
|
, cond(NULL)
|
|
, files(NULL)
|
|
{
|
|
}
|
|
};
|
|
|
|
static const string column;
|
|
|
|
public:
|
|
static struct timespec ts;
|
|
};
|
|
|
|
CPPUNIT_TEST_SUITE_REGISTRATION(BucketReUseDriver);
|
|
|
|
const string BucketReUseDriver::column = "tpch.lineitem.l_orderkey";
|
|
struct timespec BucketReUseDriver::ts;
|
|
|
|
//------------------------------------------------------------------------------
|
|
// main entry point
|
|
//------------------------------------------------------------------------------
|
|
int main(int argc, char** argv)
|
|
{
|
|
CppUnit::TextUi::TestRunner runner;
|
|
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
|
|
runner.addTest(registry.makeTest());
|
|
bool wasSuccessful = runner.run("", false);
|
|
return (wasSuccessful ? 0 : 1);
|
|
}
|
|
|
|
struct timespec atTime();
|
|
ostream& operator<<(ostream& os, const struct timespec& t);
|
|
|
|
void BucketReUseDriver::parseConfig()
|
|
{
|
|
cout << "ut: parseConfig start...\n" << endl;
|
|
|
|
// before run this test, make sure "tpch.lineitem.l_orderkey" is in the Columnstore.xml
|
|
BucketReuseManager* control = BucketReuseManager::instance();
|
|
ResourceManager rm;
|
|
BucketReuseManager::instance()->startup(rm);
|
|
|
|
// list all the predicates if any listed in the Columnstore.xml file
|
|
for (BucketReuseMap::iterator it = control->fControlMap.begin(); it != control->fControlMap.end(); it++)
|
|
cout << *(it->second);
|
|
|
|
cout << endl;
|
|
|
|
size_t schemap = column.find_first_of(".");
|
|
size_t columnp = column.find_last_of(".");
|
|
CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);
|
|
|
|
execplan::CalpontSystemCatalog::TableColName columnName;
|
|
columnName.schema = column.substr(0, schemap);
|
|
columnName.table = column.substr(schemap + 1, columnp - schemap - 1);
|
|
columnName.column = column.substr(columnp + 1);
|
|
|
|
string filter = "allrows";
|
|
|
|
CPPUNIT_ASSERT(control->fControlMap.find(BucketFileKey(columnName, filter)) != control->fControlMap.end());
|
|
|
|
cout << "ut: parseConfig done!\n" << endl;
|
|
}
|
|
|
|
void BucketReUseDriver::createFiles()
|
|
{
|
|
cout << "ut: createFiles start...\n" << endl;
|
|
|
|
ThreadArg arg;
|
|
arg.id = 1;
|
|
arg.version = 1;
|
|
arg.buckets = 2;
|
|
arg.elements = 2;
|
|
arg.total = 1000000;
|
|
|
|
execplan::CalpontSystemCatalog::OID oid;
|
|
initControl(oid);
|
|
arg.oid = oid;
|
|
|
|
set<string> files;
|
|
arg.files = &files;
|
|
|
|
pthread_t t;
|
|
pthread_create(&t, NULL, scanThread, &arg);
|
|
pthread_join(t, NULL);
|
|
|
|
validateFileExist(files, true);
|
|
|
|
cout << "ut: createFiles done!\n" << endl;
|
|
}
|
|
|
|
void BucketReUseDriver::reuseFiles()
|
|
{
|
|
cout << "ut: reuseFiles start...\n" << endl;
|
|
|
|
ThreadArg arg1;
|
|
arg1.id = 1;
|
|
arg1.version = 1;
|
|
arg1.buckets = 2;
|
|
arg1.elements = 2;
|
|
arg1.total = 1000000;
|
|
|
|
execplan::CalpontSystemCatalog::OID oid;
|
|
initControl(oid);
|
|
arg1.oid = oid;
|
|
|
|
set<string> files1;
|
|
arg1.files = &files1;
|
|
|
|
// create the files
|
|
pthread_t t1;
|
|
pthread_create(&t1, NULL, scanThread, &arg1);
|
|
pthread_join(t1, NULL);
|
|
|
|
validateFileExist(files1, true);
|
|
|
|
// use new datalist, reuse case
|
|
ThreadArg arg2 = arg1;
|
|
arg2.id = 2;
|
|
|
|
set<string> files2;
|
|
arg2.files = &files2;
|
|
|
|
pthread_t t2;
|
|
pthread_create(&t2, NULL, reuseThread, &arg2);
|
|
pthread_join(t2, NULL);
|
|
|
|
validateFileExist(files2, true);
|
|
|
|
cout << "ut: reuseFiles done!\n" << endl;
|
|
}
|
|
|
|
void BucketReUseDriver::newversion()
|
|
{
|
|
cout << "ut: newversion start...\n" << endl;
|
|
|
|
ThreadArg arg1;
|
|
arg1.id = 1;
|
|
arg1.version = 1;
|
|
arg1.buckets = 2;
|
|
arg1.elements = 2;
|
|
arg1.total = 1000000;
|
|
|
|
execplan::CalpontSystemCatalog::OID oid;
|
|
initControl(oid);
|
|
arg1.oid = oid;
|
|
|
|
set<string> files1;
|
|
arg1.files = &files1;
|
|
|
|
// create the files
|
|
pthread_t t1;
|
|
pthread_create(&t1, NULL, scanThread, &arg1);
|
|
pthread_join(t1, NULL);
|
|
|
|
validateFileExist(files1, true);
|
|
|
|
// new version
|
|
ThreadArg arg2 = arg1;
|
|
arg2.id = 2;
|
|
arg2.version = 2;
|
|
|
|
set<string> files2;
|
|
arg2.files = &files2;
|
|
|
|
pthread_t t2;
|
|
pthread_create(&t2, NULL, scanThread, &arg2);
|
|
pthread_join(t2, NULL);
|
|
|
|
pthread_yield();
|
|
|
|
validateFileExist(files1, false);
|
|
validateFileExist(files2, true);
|
|
|
|
// read from the new files with new datalist
|
|
ThreadArg arg3 = arg2;
|
|
arg3.id = 3;
|
|
arg3.dl = NULL;
|
|
|
|
set<string> files3;
|
|
arg3.files = &files3;
|
|
|
|
pthread_t t3;
|
|
pthread_create(&t3, NULL, reuseThread, &arg3);
|
|
pthread_join(t3, NULL);
|
|
|
|
validateFileExist(files3, true);
|
|
|
|
cout << "ut: newversion done!\n" << endl;
|
|
}
|
|
|
|
void BucketReUseDriver::concurrent()
|
|
{
|
|
cout << "ut: concurrent start...\n" << endl;
|
|
|
|
ThreadArg arg1;
|
|
arg1.id = 1;
|
|
arg1.version = 1;
|
|
arg1.buckets = 2;
|
|
arg1.elements = 2;
|
|
arg1.total = 1000000;
|
|
|
|
execplan::CalpontSystemCatalog::OID oid;
|
|
initControl(oid);
|
|
arg1.oid = oid;
|
|
|
|
set<string> files1;
|
|
arg1.files = &files1;
|
|
|
|
// create the files
|
|
pthread_t t1;
|
|
pthread_create(&t1, NULL, scanThread, &arg1);
|
|
|
|
sleep(1);
|
|
|
|
// reuse case, current thread
|
|
ThreadArg arg2 = arg1;
|
|
arg2.id = 2;
|
|
set<string> files2;
|
|
arg2.files = &files2;
|
|
|
|
pthread_t t2;
|
|
pthread_create(&t2, NULL, reuseThread, &arg2);
|
|
|
|
pthread_join(t1, NULL);
|
|
validateFileExist(files1, true);
|
|
|
|
pthread_join(t2, NULL);
|
|
validateFileExist(files2, true);
|
|
|
|
cout << "ut: concurrent done!\n" << endl;
|
|
}
|
|
|
|
void BucketReUseDriver::concurrent_newversion()
|
|
{
|
|
cout << "ut: concurrent_newversion start...\n" << endl;
|
|
|
|
bool flag = false;
|
|
pthread_mutex_t mutex;
|
|
pthread_mutex_init(&mutex, 0);
|
|
pthread_cond_t cond;
|
|
pthread_cond_init(&cond, 0);
|
|
|
|
ThreadArg arg1;
|
|
arg1.id = 1;
|
|
arg1.version = 1;
|
|
arg1.buckets = 2;
|
|
arg1.elements = 2;
|
|
arg1.total = 1000000;
|
|
|
|
execplan::CalpontSystemCatalog::OID oid;
|
|
initControl(oid);
|
|
arg1.oid = oid;
|
|
|
|
set<string> files1;
|
|
arg1.files = &files1;
|
|
|
|
// create the files
|
|
pthread_t t1;
|
|
pthread_create(&t1, NULL, scanThread, &arg1);
|
|
|
|
// reuse case, current thread
|
|
ThreadArg arg2 = arg1;
|
|
arg2.id = 2;
|
|
set<string> files2;
|
|
arg2.files = &files2;
|
|
|
|
sleep(1);
|
|
|
|
pthread_t t2;
|
|
pthread_create(&t2, NULL, reuseThread, &arg2);
|
|
|
|
// new version
|
|
ThreadArg arg3 = arg1;
|
|
arg3.id = 3;
|
|
arg3.version = 3;
|
|
arg3.flag = &flag;
|
|
arg3.mutex = &mutex;
|
|
arg3.cond = &cond;
|
|
set<string> files3;
|
|
arg3.files = &files3;
|
|
|
|
pthread_t t3;
|
|
pthread_create(&t3, NULL, scanThread, &arg3);
|
|
|
|
sleep(3);
|
|
|
|
pthread_mutex_lock(&mutex);
|
|
flag = true;
|
|
pthread_cond_broadcast(&cond);
|
|
pthread_mutex_unlock(&mutex);
|
|
|
|
pthread_join(t1, NULL);
|
|
pthread_join(t2, NULL);
|
|
pthread_join(t3, NULL);
|
|
|
|
// let cleanup thread do its job
|
|
pthread_yield();
|
|
|
|
validateFileExist(files1, false);
|
|
validateFileExist(files2, false);
|
|
validateFileExist(files3, true);
|
|
|
|
pthread_mutex_destroy(&mutex);
|
|
pthread_cond_destroy(&cond);
|
|
|
|
cout << "ut: concurrent_newversion done!\n" << endl;
|
|
}
|
|
|
|
void BucketReUseDriver::concurrent_race()
|
|
{
|
|
cout << "ut: concurrent_race start...\n" << endl;
|
|
|
|
ThreadArg arg1;
|
|
arg1.id = 1;
|
|
arg1.version = 1;
|
|
arg1.buckets = 2;
|
|
arg1.elements = 2;
|
|
arg1.total = 2000000;
|
|
|
|
execplan::CalpontSystemCatalog::OID oid;
|
|
initControl(oid);
|
|
arg1.oid = oid;
|
|
|
|
set<string> files1;
|
|
arg1.files = &files1;
|
|
|
|
ThreadArg arg2 = arg1;
|
|
arg2.id = 2;
|
|
set<string> files2;
|
|
arg2.files = &files2;
|
|
|
|
// start the version 1 threads
|
|
pthread_t t1;
|
|
pthread_t t2;
|
|
pthread_create(&t1, NULL, raceThread, &arg1);
|
|
pthread_create(&t2, NULL, raceThread, &arg2);
|
|
|
|
// let the version 1 threads register
|
|
sleep(1);
|
|
|
|
ThreadArg arg3 = arg1;
|
|
arg3.id = 3;
|
|
arg3.version = 4;
|
|
arg3.total = 1000000;
|
|
set<string> files3;
|
|
arg3.files = &files3;
|
|
|
|
ThreadArg arg4 = arg3;
|
|
arg4.id = 4;
|
|
set<string> files4;
|
|
arg4.files = &files4;
|
|
|
|
// start the version 4 threads
|
|
pthread_t t3;
|
|
pthread_t t4;
|
|
pthread_create(&t3, NULL, raceThread, &arg3);
|
|
pthread_create(&t4, NULL, raceThread, &arg4);
|
|
|
|
pthread_join(t3, NULL);
|
|
pthread_join(t4, NULL);
|
|
validateFileExist(files1, true);
|
|
validateFileExist(files4, true);
|
|
|
|
pthread_join(t1, NULL);
|
|
pthread_join(t2, NULL);
|
|
validateFileExist(files1, false);
|
|
validateFileExist(files4, true);
|
|
|
|
cout << "ut: concurrent_race done!\n" << endl;
|
|
}
|
|
|
|
void BucketReUseDriver::initControl(execplan::CalpontSystemCatalog::OID& oid)
|
|
{
|
|
// make sure the column for testing is in the map, "column" is a class variable
|
|
// to test with another column, only one place to change
|
|
// -- to validate the config parsing, run parseConfig --
|
|
config::Config* cf = config::Config::makeConfig();
|
|
vector<string> columns;
|
|
cf->getConfig("HashBucketReuse", "Predicate", columns);
|
|
|
|
bool found = true;
|
|
|
|
for (vector<string>::iterator it = columns.begin(); it != columns.end(); ++it)
|
|
{
|
|
if (it->compare(0, column.size(), column) == 0)
|
|
{
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
string filter = "allrows";
|
|
BucketReuseManager* control = BucketReuseManager::instance();
|
|
|
|
if (found == false)
|
|
{
|
|
cout << "tpch.lineitem.l_orderkey is not in the Columnstore.xml!)" << endl;
|
|
cout << "insert it to countinue unit test" << endl;
|
|
|
|
size_t schemap = column.find_first_of(".");
|
|
size_t columnp = column.find_last_of(".");
|
|
CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);
|
|
|
|
execplan::CalpontSystemCatalog::TableColName tcn;
|
|
tcn.schema = column.substr(0, schemap);
|
|
tcn.table = column.substr(schemap + 1, columnp - schemap - 1);
|
|
tcn.column = column.substr(columnp + 1);
|
|
|
|
control->fConfigMap.insert(pair<string, BucketFileKey>(column, BucketFileKey(tcn, filter)));
|
|
}
|
|
|
|
ResourceManager rm;
|
|
// now start the BucketReuseManager
|
|
BucketReuseManager::instance()->startup(rm);
|
|
|
|
// get the oid for registration
|
|
size_t schemap = column.find_first_of(".");
|
|
size_t columnp = column.find_last_of(".");
|
|
CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);
|
|
|
|
execplan::CalpontSystemCatalog::TableColName columnName;
|
|
columnName.schema = column.substr(0, schemap);
|
|
columnName.table = column.substr(schemap + 1, columnp - schemap - 1);
|
|
columnName.column = column.substr(columnp + 1);
|
|
|
|
CPPUNIT_ASSERT(control->fControlMap.find(BucketFileKey(columnName, filter)) != control->fControlMap.end());
|
|
}
|
|
|
|
void BucketReUseDriver::validateFileExist(set<string>& files, bool exist)
|
|
{
|
|
cout << "\ncheck if files exist or not:" << endl;
|
|
|
|
for (set<string>::iterator i = files.begin(); i != files.end(); i++)
|
|
{
|
|
filesystem::path p(i->c_str());
|
|
cout << (*i) << "-- ";
|
|
|
|
if (exist)
|
|
{
|
|
CPPUNIT_ASSERT(filesystem::exists(p));
|
|
cout << "OK" << endl;
|
|
}
|
|
else
|
|
{
|
|
CPPUNIT_ASSERT(!filesystem::exists(p));
|
|
cout << "GONE" << endl;
|
|
}
|
|
}
|
|
|
|
cout << endl;
|
|
}
|
|
|
|
void* BucketReUseDriver::insertThread(void* arg)
|
|
{
|
|
ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
|
|
BucketDataList* dl = a->dl;
|
|
CPPUNIT_ASSERT(dl != NULL);
|
|
|
|
cout << "thread " << a->id << " start at " << atTime() << endl;
|
|
|
|
BucketReuseControlEntry* entry = dl->reuseControl();
|
|
|
|
for (uint64_t i = 0; i < a->buckets; i++)
|
|
{
|
|
stringstream ss;
|
|
ss << entry->baseName() << "." << i;
|
|
filesystem::path p(ss.str().c_str());
|
|
a->files->insert(ss.str());
|
|
}
|
|
|
|
ElementType e;
|
|
|
|
for (uint64_t i = 0; i < a->total; i++)
|
|
{
|
|
e.first = i;
|
|
e.second = i * 10 + a->version; // include the version in values
|
|
dl->insert(e);
|
|
}
|
|
|
|
cout << "thread[" << a->id << "] last element inserted at " << atTime() << endl;
|
|
dl->endOfInput();
|
|
|
|
cout << "thread " << a->id << " finished at " << atTime() << endl;
|
|
|
|
return NULL;
|
|
}
|
|
|
|
void* BucketReUseDriver::readThread(void* arg)
|
|
{
|
|
ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
|
|
BucketDataList* dl = a->dl;
|
|
CPPUNIT_ASSERT(dl != NULL);
|
|
|
|
cout << "thread " << a->id << " start at " << atTime() << endl;
|
|
|
|
BucketReuseControlEntry* entry = dl->reuseControl();
|
|
|
|
for (uint64_t i = 0; i < a->buckets; i++)
|
|
{
|
|
stringstream ss;
|
|
ss << entry->baseName() << "." << i;
|
|
filesystem::path p(ss.str().c_str());
|
|
a->files->insert(ss.str());
|
|
}
|
|
|
|
ElementType e;
|
|
uint64_t min = 0xffffffff, max = 0, count = 0;
|
|
uint64_t k[a->buckets]; // count of each bucket
|
|
bool firstRead = true;
|
|
|
|
for (uint64_t i = 0; i < a->buckets; i++)
|
|
{
|
|
k[i] = 0;
|
|
uint64_t it = dl->getIterator(i);
|
|
|
|
while (dl->next(i, it, &e))
|
|
{
|
|
if (firstRead)
|
|
{
|
|
cout << "thread[" << a->id << "] first read at " << atTime() << endl;
|
|
firstRead = false;
|
|
}
|
|
|
|
if (e.second < min)
|
|
min = e.second;
|
|
|
|
if (e.second > max)
|
|
max = e.second;
|
|
|
|
// output the first 10 of each bucket or last 10 of the datalist
|
|
// if (count < 10 || (a->total - count) < 10 || k[i] < 10)
|
|
if (count < 2 || (a->total - count) < 2 || k[i] < 2)
|
|
cout << "thread[" << a->id << "] bucket:" << i << " e(" << e.first << ", " << e.second << ")" << endl;
|
|
|
|
count++;
|
|
k[i]++;
|
|
}
|
|
}
|
|
|
|
cout << "\nthread[" << a->id << "] element: count = " << count << ", min/max = " << min << "/" << max
|
|
<< ", elements in each bucket: ";
|
|
|
|
for (uint64_t i = 0; i < a->buckets; i++)
|
|
cout << k[i] << " ";
|
|
|
|
cout << endl;
|
|
|
|
cout << "thread " << a->id << " finished at " << atTime() << endl;
|
|
|
|
return NULL;
|
|
}
|
|
|
|
void* BucketReUseDriver::scanThread(void* arg)
|
|
{
|
|
ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
|
|
BucketDataList* dl = a->dl;
|
|
CPPUNIT_ASSERT(dl == NULL);
|
|
|
|
if (a->cond != NULL)
|
|
{
|
|
pthread_mutex_lock(a->mutex);
|
|
|
|
while (*(a->flag) != true)
|
|
pthread_cond_wait(a->cond, a->mutex);
|
|
|
|
pthread_mutex_unlock(a->mutex);
|
|
}
|
|
|
|
cout << "thread " << a->id << " start at " << atTime() << endl;
|
|
|
|
string dummy;
|
|
bool scan = false;
|
|
boost::shared_ptr<execplan::CalpontSystemCatalog> c =
|
|
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
|
|
execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
|
|
BucketReuseControlEntry* entry =
|
|
BucketReuseManager::instance()->userRegister(tcn, dummy, a->version, a->buckets, scan);
|
|
CPPUNIT_ASSERT(scan == true);
|
|
|
|
ResourceManager rm;
|
|
dl = new BucketDataList(a->buckets, 1, a->elements, rm);
|
|
dl->setElementMode(1);
|
|
dl->reuseControl(entry, !scan);
|
|
|
|
ThreadArg arg1 = *a;
|
|
arg1.id *= 10;
|
|
arg1.dl = dl;
|
|
|
|
pthread_t t1;
|
|
pthread_create(&t1, NULL, insertThread, &arg1);
|
|
|
|
ThreadArg arg2 = arg1;
|
|
arg2.id += 1;
|
|
pthread_t t2;
|
|
pthread_create(&t2, NULL, readThread, &arg2);
|
|
|
|
pthread_join(t1, NULL);
|
|
pthread_join(t2, NULL);
|
|
|
|
delete dl;
|
|
|
|
cout << "thread " << a->id << " finished at " << atTime() << endl;
|
|
|
|
return NULL;
|
|
}
|
|
|
|
void* BucketReUseDriver::reuseThread(void* arg)
|
|
{
|
|
ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
|
|
BucketDataList* dl = a->dl;
|
|
CPPUNIT_ASSERT(dl == NULL);
|
|
|
|
if (a->cond != NULL)
|
|
{
|
|
pthread_mutex_lock(a->mutex);
|
|
|
|
while (*(a->flag) != true)
|
|
pthread_cond_wait(a->cond, a->mutex);
|
|
|
|
pthread_mutex_unlock(a->mutex);
|
|
}
|
|
|
|
cout << "thread " << a->id << " start at " << atTime() << endl;
|
|
|
|
string dummy;
|
|
bool scan = true;
|
|
boost::shared_ptr<execplan::CalpontSystemCatalog> c =
|
|
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
|
|
execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
|
|
BucketReuseControlEntry* entry =
|
|
BucketReuseManager::instance()->userRegister(tcn, dummy, a->version, a->buckets, scan);
|
|
CPPUNIT_ASSERT(scan == false);
|
|
|
|
ResourceManager rm;
|
|
dl = new BucketDataList(a->buckets, 1, a->elements, rm);
|
|
dl->setElementMode(1);
|
|
dl->reuseControl(entry, !scan);
|
|
|
|
ThreadArg arg1 = *a;
|
|
arg1.id *= 10;
|
|
arg1.dl = dl;
|
|
|
|
pthread_t t1;
|
|
pthread_create(&t1, NULL, readThread, &arg1);
|
|
|
|
if (entry->fileStatus() == BucketReuseControlEntry::progress_c)
|
|
{
|
|
boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
|
|
dl->reuseControl()->stateChange().wait(lock);
|
|
}
|
|
else
|
|
{
|
|
CPPUNIT_ASSERT((entry->fileStatus() == BucketReuseControlEntry::using_c) ||
|
|
(entry->fileStatus() == BucketReuseControlEntry::ready_c));
|
|
}
|
|
|
|
// the bucket files are ready
|
|
dl->restoreBucketInformation();
|
|
dl->endOfInput();
|
|
|
|
pthread_join(t1, NULL);
|
|
|
|
delete dl;
|
|
|
|
cout << "thread " << a->id << " finished at " << atTime() << endl;
|
|
|
|
return NULL;
|
|
}
|
|
|
|
void* BucketReUseDriver::raceThread(void* arg)
|
|
{
|
|
ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
|
|
BucketDataList* dl = a->dl;
|
|
CPPUNIT_ASSERT(dl == NULL);
|
|
|
|
if (a->cond != NULL)
|
|
{
|
|
pthread_mutex_lock(a->mutex);
|
|
|
|
while (*(a->flag) != true)
|
|
pthread_cond_wait(a->cond, a->mutex);
|
|
|
|
pthread_mutex_unlock(a->mutex);
|
|
}
|
|
|
|
cout << "thread " << a->id << " start at " << atTime() << endl;
|
|
|
|
string dummy;
|
|
bool scan = true;
|
|
ResourceManager rm;
|
|
BucketReuseControlEntry* entry = NULL;
|
|
{
|
|
boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
|
|
boost::shared_ptr<execplan::CalpontSystemCatalog> c =
|
|
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
|
|
execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
|
|
entry = BucketReuseManager::instance()->userRegister(tcn, dummy, a->version, a->buckets, scan);
|
|
|
|
dl = new BucketDataList(a->buckets, 1, a->elements, rm);
|
|
dl->setElementMode(1);
|
|
dl->reuseControl(entry, !scan);
|
|
}
|
|
|
|
if (scan == true)
|
|
{
|
|
CPPUNIT_ASSERT(entry->fileStatus() == BucketReuseControlEntry::progress_c);
|
|
|
|
ThreadArg arg1 = *a;
|
|
arg1.id *= 10;
|
|
arg1.dl = dl;
|
|
|
|
pthread_t t1;
|
|
pthread_create(&t1, NULL, insertThread, &arg1);
|
|
|
|
ThreadArg arg2 = arg1;
|
|
arg2.id += 1;
|
|
pthread_t t2;
|
|
pthread_create(&t2, NULL, readThread, &arg2);
|
|
|
|
pthread_join(t1, NULL);
|
|
pthread_join(t2, NULL);
|
|
}
|
|
else
|
|
{
|
|
ThreadArg arg1 = *a;
|
|
arg1.id *= 10;
|
|
arg1.dl = dl;
|
|
|
|
pthread_t t1;
|
|
pthread_create(&t1, NULL, readThread, &arg1);
|
|
|
|
if (entry->fileStatus() == BucketReuseControlEntry::progress_c)
|
|
{
|
|
boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
|
|
dl->reuseControl()->stateChange().wait(lock);
|
|
}
|
|
else
|
|
{
|
|
CPPUNIT_ASSERT((entry->fileStatus() == BucketReuseControlEntry::using_c) ||
|
|
(entry->fileStatus() == BucketReuseControlEntry::ready_c));
|
|
}
|
|
|
|
// the bucket files are ready
|
|
dl->restoreBucketInformation();
|
|
dl->endOfInput();
|
|
|
|
pthread_join(t1, NULL);
|
|
}
|
|
|
|
delete dl;
|
|
|
|
cout << "thread " << a->id << " finished at " << atTime() << endl;
|
|
|
|
return NULL;
|
|
}
|
|
|
|
timespec atTime()
|
|
{
|
|
timespec ts, ts1, ts2;
|
|
ts1 = BucketReUseDriver::ts;
|
|
clock_gettime(CLOCK_REALTIME, &ts2);
|
|
|
|
if (ts2.tv_nsec < ts1.tv_nsec)
|
|
{
|
|
ts.tv_sec = ts2.tv_sec - ts1.tv_sec - 1;
|
|
ts.tv_nsec = ts2.tv_nsec + 1000000000 - ts1.tv_nsec;
|
|
}
|
|
else
|
|
{
|
|
ts.tv_sec = ts2.tv_sec - ts1.tv_sec;
|
|
ts.tv_nsec = ts2.tv_nsec - ts1.tv_nsec;
|
|
}
|
|
|
|
return ts;
|
|
}
|
|
|
|
ostream& operator<<(ostream& os, const struct timespec& t)
|
|
{
|
|
os << t.tv_sec << "." << setw(9) << setfill('0') << t.tv_nsec << "s";
|
|
return os;
|
|
}
|