You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			930 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			930 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /***********************************************************************
 | |
|  *   $Id: tdriver-bru.cpp 9210 2013-01-21 14:10:42Z rdempsey $
 | |
|  *
 | |
|  *
 | |
|  ***********************************************************************/
 | |
| #include <iostream>
 | |
| #include <sstream>
 | |
| #include <fstream>
 | |
| #include <iomanip>
 | |
| #include <vector>
 | |
| #include <set>
 | |
| 
 | |
| #include <pthread.h>
 | |
| #include <time.h>
 | |
| #include <sys/time.h>
 | |
| 
 | |
| #include <boost/filesystem/path.hpp>
 | |
| #include <boost/filesystem/operations.hpp>
 | |
| #include <boost/thread/mutex.hpp>
 | |
| 
 | |
| #include <cppunit/extensions/HelperMacros.h>
 | |
| #include <cppunit/extensions/TestFactoryRegistry.h>
 | |
| #include <cppunit/ui/text/TestRunner.h>
 | |
| 
 | |
| #include "bucketdl.h"
 | |
| #include "elementtype.h"
 | |
| #include "stopwatch.cpp"
 | |
| 
 | |
| #include "bucketreuse.h"
 | |
| 
 | |
| // #undef CPPUNIT_ASSERT
 | |
| // #define CPPUNIT_ASSERT(x)
 | |
| 
 | |
| using namespace std;
 | |
| using namespace boost;
 | |
| using namespace joblist;
 | |
| 
 | |
| // Stopwatch timer;
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // TestDriver class derived from CppUnit
 | |
| //------------------------------------------------------------------------------
 | |
| 
 | |
| class BucketReUseDriver : public CppUnit::TestFixture
 | |
| {
 | |
|   CPPUNIT_TEST_SUITE(BucketReUseDriver);
 | |
|   CPPUNIT_TEST(parseConfig);
 | |
|   CPPUNIT_TEST(createFiles);
 | |
|   CPPUNIT_TEST(reuseFiles);
 | |
|   CPPUNIT_TEST(newversion);
 | |
|   CPPUNIT_TEST(concurrent);
 | |
|   CPPUNIT_TEST(concurrent_newversion);
 | |
|   CPPUNIT_TEST(concurrent_race);
 | |
|   CPPUNIT_TEST_SUITE_END();
 | |
| 
 | |
|  private:
 | |
|  public:
 | |
|   //--------------------------------------------------------------------------
 | |
|   // setup method run prior to each unit test, inherited from base
 | |
|   //--------------------------------------------------------------------------
 | |
|   void setUp()
 | |
|   {
 | |
|     clock_gettime(CLOCK_REALTIME, &ts);
 | |
|   }
 | |
| 
 | |
|   //--------------------------------------------------------------------------
 | |
|   // validate results from a unit test, inherited from base
 | |
|   //--------------------------------------------------------------------------
 | |
|   void validateResults()
 | |
|   {
 | |
|   }
 | |
| 
 | |
|   //--------------------------------------------------------------------------
 | |
|   // test functions
 | |
|   //--------------------------------------------------------------------------
 | |
|   void parseConfig();
 | |
|   void createFiles();
 | |
|   void reuseFiles();
 | |
|   void newversion();
 | |
|   void concurrent();
 | |
|   void concurrent_newversion();
 | |
|   void concurrent_race();
 | |
| 
 | |
|  private:
 | |
|   //--------------------------------------------------------------------------
 | |
|   // initialize method
 | |
|   // oid : reference to the column OID used for test
 | |
|   //--------------------------------------------------------------------------
 | |
|   void initControl(execplan::CalpontSystemCatalog::OID& oid);
 | |
| 
 | |
|   //--------------------------------------------------------------------------
 | |
|   // validate results
 | |
|   // files: the filenames to be verified if exist
 | |
|   // exist: expect if the files are created or deleted
 | |
|   //--------------------------------------------------------------------------
 | |
|   void validateFileExist(set<string>& files, bool exist);
 | |
| 
 | |
|   static void* insertThread(void*);
 | |
|   static void* readThread(void*);
 | |
| 
 | |
|   static void* scanThread(void*);
 | |
|   static void* reuseThread(void*);
 | |
|   static void* raceThread(void*);
 | |
| 
 | |
|   struct ThreadArg
 | |
|   {
 | |
|     uint64_t id;                              // thread id
 | |
|     uint64_t version;                         // db version
 | |
|     uint64_t buckets;                         // max bucket numbers
 | |
|     uint64_t elements;                        // max number of elem per bucket
 | |
|     uint64_t total;                           // total number of elements
 | |
|     execplan::CalpontSystemCatalog::OID oid;  // column OID
 | |
|     BucketDataList* dl;                       // datalist
 | |
| 
 | |
|     // for sync threads
 | |
|     bool* flag;
 | |
|     pthread_mutex_t* mutex;
 | |
|     pthread_cond_t* cond;
 | |
| 
 | |
|     // for file status check
 | |
|     set<string>* files;
 | |
| 
 | |
|     ThreadArg()
 | |
|      : id(0)
 | |
|      , version(0)
 | |
|      , buckets(0)
 | |
|      , elements(0)
 | |
|      , total(0)
 | |
|      , dl(NULL)
 | |
|      , flag(NULL)
 | |
|      , mutex(NULL)
 | |
|      , cond(NULL)
 | |
|      , files(NULL)
 | |
|     {
 | |
|     }
 | |
|   };
 | |
| 
 | |
|   static const string column;
 | |
| 
 | |
|  public:
 | |
|   static struct timespec ts;
 | |
| };
 | |
| 
 | |
| CPPUNIT_TEST_SUITE_REGISTRATION(BucketReUseDriver);
 | |
| 
 | |
| const string BucketReUseDriver::column = "tpch.lineitem.l_orderkey";
 | |
| struct timespec BucketReUseDriver::ts;
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // main entry point
 | |
| //------------------------------------------------------------------------------
 | |
| int main(int argc, char** argv)
 | |
| {
 | |
|   CppUnit::TextUi::TestRunner runner;
 | |
|   CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
 | |
|   runner.addTest(registry.makeTest());
 | |
|   bool wasSuccessful = runner.run("", false);
 | |
|   return (wasSuccessful ? 0 : 1);
 | |
| }
 | |
| 
 | |
| struct timespec atTime();
 | |
| ostream& operator<<(ostream& os, const struct timespec& t);
 | |
| 
 | |
| void BucketReUseDriver::parseConfig()
 | |
| {
 | |
|   cout << "ut: parseConfig start...\n" << endl;
 | |
| 
 | |
|   // before run this test, make sure "tpch.lineitem.l_orderkey" is in the Columnstore.xml
 | |
|   BucketReuseManager* control = BucketReuseManager::instance();
 | |
|   ResourceManager rm;
 | |
|   BucketReuseManager::instance()->startup(rm);
 | |
| 
 | |
|   // list all the predicates if any listed in the Columnstore.xml file
 | |
|   for (BucketReuseMap::iterator it = control->fControlMap.begin(); it != control->fControlMap.end(); it++)
 | |
|     cout << *(it->second);
 | |
| 
 | |
|   cout << endl;
 | |
| 
 | |
|   size_t schemap = column.find_first_of(".");
 | |
|   size_t columnp = column.find_last_of(".");
 | |
|   CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);
 | |
| 
 | |
|   execplan::CalpontSystemCatalog::TableColName columnName;
 | |
|   columnName.schema = column.substr(0, schemap);
 | |
|   columnName.table = column.substr(schemap + 1, columnp - schemap - 1);
 | |
|   columnName.column = column.substr(columnp + 1);
 | |
| 
 | |
|   string filter = "allrows";
 | |
| 
 | |
|   CPPUNIT_ASSERT(control->fControlMap.find(BucketFileKey(columnName, filter)) != control->fControlMap.end());
 | |
| 
 | |
|   cout << "ut: parseConfig done!\n" << endl;
 | |
| }
 | |
| 
 | |
| void BucketReUseDriver::createFiles()
 | |
| {
 | |
|   cout << "ut: createFiles start...\n" << endl;
 | |
| 
 | |
|   ThreadArg arg;
 | |
|   arg.id = 1;
 | |
|   arg.version = 1;
 | |
|   arg.buckets = 2;
 | |
|   arg.elements = 2;
 | |
|   arg.total = 1000000;
 | |
| 
 | |
|   execplan::CalpontSystemCatalog::OID oid;
 | |
|   initControl(oid);
 | |
|   arg.oid = oid;
 | |
| 
 | |
|   set<string> files;
 | |
|   arg.files = &files;
 | |
| 
 | |
|   pthread_t t;
 | |
|   pthread_create(&t, NULL, scanThread, &arg);
 | |
|   pthread_join(t, NULL);
 | |
| 
 | |
|   validateFileExist(files, true);
 | |
| 
 | |
|   cout << "ut: createFiles done!\n" << endl;
 | |
| }
 | |
| 
 | |
| void BucketReUseDriver::reuseFiles()
 | |
| {
 | |
|   cout << "ut: reuseFiles start...\n" << endl;
 | |
| 
 | |
|   ThreadArg arg1;
 | |
|   arg1.id = 1;
 | |
|   arg1.version = 1;
 | |
|   arg1.buckets = 2;
 | |
|   arg1.elements = 2;
 | |
|   arg1.total = 1000000;
 | |
| 
 | |
|   execplan::CalpontSystemCatalog::OID oid;
 | |
|   initControl(oid);
 | |
|   arg1.oid = oid;
 | |
| 
 | |
|   set<string> files1;
 | |
|   arg1.files = &files1;
 | |
| 
 | |
|   // create the files
 | |
|   pthread_t t1;
 | |
|   pthread_create(&t1, NULL, scanThread, &arg1);
 | |
|   pthread_join(t1, NULL);
 | |
| 
 | |
|   validateFileExist(files1, true);
 | |
| 
 | |
|   // use new datalist, reuse case
 | |
|   ThreadArg arg2 = arg1;
 | |
|   arg2.id = 2;
 | |
| 
 | |
|   set<string> files2;
 | |
|   arg2.files = &files2;
 | |
| 
 | |
|   pthread_t t2;
 | |
|   pthread_create(&t2, NULL, reuseThread, &arg2);
 | |
|   pthread_join(t2, NULL);
 | |
| 
 | |
|   validateFileExist(files2, true);
 | |
| 
 | |
|   cout << "ut: reuseFiles done!\n" << endl;
 | |
| }
 | |
| 
 | |
| void BucketReUseDriver::newversion()
 | |
| {
 | |
|   cout << "ut: newversion start...\n" << endl;
 | |
| 
 | |
|   ThreadArg arg1;
 | |
|   arg1.id = 1;
 | |
|   arg1.version = 1;
 | |
|   arg1.buckets = 2;
 | |
|   arg1.elements = 2;
 | |
|   arg1.total = 1000000;
 | |
| 
 | |
|   execplan::CalpontSystemCatalog::OID oid;
 | |
|   initControl(oid);
 | |
|   arg1.oid = oid;
 | |
| 
 | |
|   set<string> files1;
 | |
|   arg1.files = &files1;
 | |
| 
 | |
|   // create the files
 | |
|   pthread_t t1;
 | |
|   pthread_create(&t1, NULL, scanThread, &arg1);
 | |
|   pthread_join(t1, NULL);
 | |
| 
 | |
|   validateFileExist(files1, true);
 | |
| 
 | |
|   // new version
 | |
|   ThreadArg arg2 = arg1;
 | |
|   arg2.id = 2;
 | |
|   arg2.version = 2;
 | |
| 
 | |
|   set<string> files2;
 | |
|   arg2.files = &files2;
 | |
| 
 | |
|   pthread_t t2;
 | |
|   pthread_create(&t2, NULL, scanThread, &arg2);
 | |
|   pthread_join(t2, NULL);
 | |
| 
 | |
|   pthread_yield();
 | |
| 
 | |
|   validateFileExist(files1, false);
 | |
|   validateFileExist(files2, true);
 | |
| 
 | |
|   // read from the new files with new datalist
 | |
|   ThreadArg arg3 = arg2;
 | |
|   arg3.id = 3;
 | |
|   arg3.dl = NULL;
 | |
| 
 | |
|   set<string> files3;
 | |
|   arg3.files = &files3;
 | |
| 
 | |
|   pthread_t t3;
 | |
|   pthread_create(&t3, NULL, reuseThread, &arg3);
 | |
|   pthread_join(t3, NULL);
 | |
| 
 | |
|   validateFileExist(files3, true);
 | |
| 
 | |
|   cout << "ut: newversion done!\n" << endl;
 | |
| }
 | |
| 
 | |
| void BucketReUseDriver::concurrent()
 | |
| {
 | |
|   cout << "ut: concurrent start...\n" << endl;
 | |
| 
 | |
|   ThreadArg arg1;
 | |
|   arg1.id = 1;
 | |
|   arg1.version = 1;
 | |
|   arg1.buckets = 2;
 | |
|   arg1.elements = 2;
 | |
|   arg1.total = 1000000;
 | |
| 
 | |
|   execplan::CalpontSystemCatalog::OID oid;
 | |
|   initControl(oid);
 | |
|   arg1.oid = oid;
 | |
| 
 | |
|   set<string> files1;
 | |
|   arg1.files = &files1;
 | |
| 
 | |
|   // create the files
 | |
|   pthread_t t1;
 | |
|   pthread_create(&t1, NULL, scanThread, &arg1);
 | |
| 
 | |
|   sleep(1);
 | |
| 
 | |
|   // reuse case, current thread
 | |
|   ThreadArg arg2 = arg1;
 | |
|   arg2.id = 2;
 | |
|   set<string> files2;
 | |
|   arg2.files = &files2;
 | |
| 
 | |
|   pthread_t t2;
 | |
|   pthread_create(&t2, NULL, reuseThread, &arg2);
 | |
| 
 | |
|   pthread_join(t1, NULL);
 | |
|   validateFileExist(files1, true);
 | |
| 
 | |
|   pthread_join(t2, NULL);
 | |
|   validateFileExist(files2, true);
 | |
| 
 | |
|   cout << "ut: concurrent done!\n" << endl;
 | |
| }
 | |
| 
 | |
| void BucketReUseDriver::concurrent_newversion()
 | |
| {
 | |
|   cout << "ut: concurrent_newversion start...\n" << endl;
 | |
| 
 | |
|   bool flag = false;
 | |
|   pthread_mutex_t mutex;
 | |
|   pthread_mutex_init(&mutex, 0);
 | |
|   pthread_cond_t cond;
 | |
|   pthread_cond_init(&cond, 0);
 | |
| 
 | |
|   ThreadArg arg1;
 | |
|   arg1.id = 1;
 | |
|   arg1.version = 1;
 | |
|   arg1.buckets = 2;
 | |
|   arg1.elements = 2;
 | |
|   arg1.total = 1000000;
 | |
| 
 | |
|   execplan::CalpontSystemCatalog::OID oid;
 | |
|   initControl(oid);
 | |
|   arg1.oid = oid;
 | |
| 
 | |
|   set<string> files1;
 | |
|   arg1.files = &files1;
 | |
| 
 | |
|   // create the files
 | |
|   pthread_t t1;
 | |
|   pthread_create(&t1, NULL, scanThread, &arg1);
 | |
| 
 | |
|   // reuse case, current thread
 | |
|   ThreadArg arg2 = arg1;
 | |
|   arg2.id = 2;
 | |
|   set<string> files2;
 | |
|   arg2.files = &files2;
 | |
| 
 | |
|   sleep(1);
 | |
| 
 | |
|   pthread_t t2;
 | |
|   pthread_create(&t2, NULL, reuseThread, &arg2);
 | |
| 
 | |
|   // new version
 | |
|   ThreadArg arg3 = arg1;
 | |
|   arg3.id = 3;
 | |
|   arg3.version = 3;
 | |
|   arg3.flag = &flag;
 | |
|   arg3.mutex = &mutex;
 | |
|   arg3.cond = &cond;
 | |
|   set<string> files3;
 | |
|   arg3.files = &files3;
 | |
| 
 | |
|   pthread_t t3;
 | |
|   pthread_create(&t3, NULL, scanThread, &arg3);
 | |
| 
 | |
|   sleep(3);
 | |
| 
 | |
|   pthread_mutex_lock(&mutex);
 | |
|   flag = true;
 | |
|   pthread_cond_broadcast(&cond);
 | |
|   pthread_mutex_unlock(&mutex);
 | |
| 
 | |
|   pthread_join(t1, NULL);
 | |
|   pthread_join(t2, NULL);
 | |
|   pthread_join(t3, NULL);
 | |
| 
 | |
|   // let cleanup thread do its job
 | |
|   pthread_yield();
 | |
| 
 | |
|   validateFileExist(files1, false);
 | |
|   validateFileExist(files2, false);
 | |
|   validateFileExist(files3, true);
 | |
| 
 | |
|   pthread_mutex_destroy(&mutex);
 | |
|   pthread_cond_destroy(&cond);
 | |
| 
 | |
|   cout << "ut: concurrent_newversion done!\n" << endl;
 | |
| }
 | |
| 
 | |
| void BucketReUseDriver::concurrent_race()
 | |
| {
 | |
|   cout << "ut: concurrent_race start...\n" << endl;
 | |
| 
 | |
|   ThreadArg arg1;
 | |
|   arg1.id = 1;
 | |
|   arg1.version = 1;
 | |
|   arg1.buckets = 2;
 | |
|   arg1.elements = 2;
 | |
|   arg1.total = 2000000;
 | |
| 
 | |
|   execplan::CalpontSystemCatalog::OID oid;
 | |
|   initControl(oid);
 | |
|   arg1.oid = oid;
 | |
| 
 | |
|   set<string> files1;
 | |
|   arg1.files = &files1;
 | |
| 
 | |
|   ThreadArg arg2 = arg1;
 | |
|   arg2.id = 2;
 | |
|   set<string> files2;
 | |
|   arg2.files = &files2;
 | |
| 
 | |
|   // start the version 1 threads
 | |
|   pthread_t t1;
 | |
|   pthread_t t2;
 | |
|   pthread_create(&t1, NULL, raceThread, &arg1);
 | |
|   pthread_create(&t2, NULL, raceThread, &arg2);
 | |
| 
 | |
|   // let the version 1 threads register
 | |
|   sleep(1);
 | |
| 
 | |
|   ThreadArg arg3 = arg1;
 | |
|   arg3.id = 3;
 | |
|   arg3.version = 4;
 | |
|   arg3.total = 1000000;
 | |
|   set<string> files3;
 | |
|   arg3.files = &files3;
 | |
| 
 | |
|   ThreadArg arg4 = arg3;
 | |
|   arg4.id = 4;
 | |
|   set<string> files4;
 | |
|   arg4.files = &files4;
 | |
| 
 | |
|   // start the version 4 threads
 | |
|   pthread_t t3;
 | |
|   pthread_t t4;
 | |
|   pthread_create(&t3, NULL, raceThread, &arg3);
 | |
|   pthread_create(&t4, NULL, raceThread, &arg4);
 | |
| 
 | |
|   pthread_join(t3, NULL);
 | |
|   pthread_join(t4, NULL);
 | |
|   validateFileExist(files1, true);
 | |
|   validateFileExist(files4, true);
 | |
| 
 | |
|   pthread_join(t1, NULL);
 | |
|   pthread_join(t2, NULL);
 | |
|   validateFileExist(files1, false);
 | |
|   validateFileExist(files4, true);
 | |
| 
 | |
|   cout << "ut: concurrent_race done!\n" << endl;
 | |
| }
 | |
| 
 | |
| void BucketReUseDriver::initControl(execplan::CalpontSystemCatalog::OID& oid)
 | |
| {
 | |
|   // make sure the column for testing is in the map, "column" is a class variable
 | |
|   // to test with another column, only one place to change
 | |
|   // -- to validate the config parsing, run parseConfig --
 | |
|   config::Config* cf = config::Config::makeConfig();
 | |
|   vector<string> columns;
 | |
|   cf->getConfig("HashBucketReuse", "Predicate", columns);
 | |
| 
 | |
|   bool found = true;
 | |
| 
 | |
|   for (vector<string>::iterator it = columns.begin(); it != columns.end(); ++it)
 | |
|   {
 | |
|     if (it->compare(0, column.size(), column) == 0)
 | |
|     {
 | |
|       found = true;
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   string filter = "allrows";
 | |
|   BucketReuseManager* control = BucketReuseManager::instance();
 | |
| 
 | |
|   if (found == false)
 | |
|   {
 | |
|     cout << "tpch.lineitem.l_orderkey is not in the Columnstore.xml!)" << endl;
 | |
|     cout << "insert it to countinue unit test" << endl;
 | |
| 
 | |
|     size_t schemap = column.find_first_of(".");
 | |
|     size_t columnp = column.find_last_of(".");
 | |
|     CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);
 | |
| 
 | |
|     execplan::CalpontSystemCatalog::TableColName tcn;
 | |
|     tcn.schema = column.substr(0, schemap);
 | |
|     tcn.table = column.substr(schemap + 1, columnp - schemap - 1);
 | |
|     tcn.column = column.substr(columnp + 1);
 | |
| 
 | |
|     control->fConfigMap.insert(pair<string, BucketFileKey>(column, BucketFileKey(tcn, filter)));
 | |
|   }
 | |
| 
 | |
|   ResourceManager rm;
 | |
|   // now  start the BucketReuseManager
 | |
|   BucketReuseManager::instance()->startup(rm);
 | |
| 
 | |
|   // get the oid for registration
 | |
|   size_t schemap = column.find_first_of(".");
 | |
|   size_t columnp = column.find_last_of(".");
 | |
|   CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);
 | |
| 
 | |
|   execplan::CalpontSystemCatalog::TableColName columnName;
 | |
|   columnName.schema = column.substr(0, schemap);
 | |
|   columnName.table = column.substr(schemap + 1, columnp - schemap - 1);
 | |
|   columnName.column = column.substr(columnp + 1);
 | |
| 
 | |
|   CPPUNIT_ASSERT(control->fControlMap.find(BucketFileKey(columnName, filter)) != control->fControlMap.end());
 | |
| }
 | |
| 
 | |
| void BucketReUseDriver::validateFileExist(set<string>& files, bool exist)
 | |
| {
 | |
|   cout << "\ncheck if files exist or not:" << endl;
 | |
| 
 | |
|   for (set<string>::iterator i = files.begin(); i != files.end(); i++)
 | |
|   {
 | |
|     filesystem::path p(i->c_str());
 | |
|     cout << (*i) << "-- ";
 | |
| 
 | |
|     if (exist)
 | |
|     {
 | |
|       CPPUNIT_ASSERT(filesystem::exists(p));
 | |
|       cout << "OK" << endl;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       CPPUNIT_ASSERT(!filesystem::exists(p));
 | |
|       cout << "GONE" << endl;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   cout << endl;
 | |
| }
 | |
| 
 | |
| void* BucketReUseDriver::insertThread(void* arg)
 | |
| {
 | |
|   ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
 | |
|   BucketDataList* dl = a->dl;
 | |
|   CPPUNIT_ASSERT(dl != NULL);
 | |
| 
 | |
|   cout << "thread " << a->id << " start at " << atTime() << endl;
 | |
| 
 | |
|   BucketReuseControlEntry* entry = dl->reuseControl();
 | |
| 
 | |
|   for (uint64_t i = 0; i < a->buckets; i++)
 | |
|   {
 | |
|     stringstream ss;
 | |
|     ss << entry->baseName() << "." << i;
 | |
|     filesystem::path p(ss.str().c_str());
 | |
|     a->files->insert(ss.str());
 | |
|   }
 | |
| 
 | |
|   ElementType e;
 | |
| 
 | |
|   for (uint64_t i = 0; i < a->total; i++)
 | |
|   {
 | |
|     e.first = i;
 | |
|     e.second = i * 10 + a->version;  // include the version in values
 | |
|     dl->insert(e);
 | |
|   }
 | |
| 
 | |
|   cout << "thread[" << a->id << "] last element inserted at " << atTime() << endl;
 | |
|   dl->endOfInput();
 | |
| 
 | |
|   cout << "thread " << a->id << " finished at " << atTime() << endl;
 | |
| 
 | |
|   return NULL;
 | |
| }
 | |
| 
 | |
| void* BucketReUseDriver::readThread(void* arg)
 | |
| {
 | |
|   ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
 | |
|   BucketDataList* dl = a->dl;
 | |
|   CPPUNIT_ASSERT(dl != NULL);
 | |
| 
 | |
|   cout << "thread " << a->id << " start at " << atTime() << endl;
 | |
| 
 | |
|   BucketReuseControlEntry* entry = dl->reuseControl();
 | |
| 
 | |
|   for (uint64_t i = 0; i < a->buckets; i++)
 | |
|   {
 | |
|     stringstream ss;
 | |
|     ss << entry->baseName() << "." << i;
 | |
|     filesystem::path p(ss.str().c_str());
 | |
|     a->files->insert(ss.str());
 | |
|   }
 | |
| 
 | |
|   ElementType e;
 | |
|   uint64_t min = 0xffffffff, max = 0, count = 0;
 | |
|   uint64_t k[a->buckets];  // count of each bucket
 | |
|   bool firstRead = true;
 | |
| 
 | |
|   for (uint64_t i = 0; i < a->buckets; i++)
 | |
|   {
 | |
|     k[i] = 0;
 | |
|     uint64_t it = dl->getIterator(i);
 | |
| 
 | |
|     while (dl->next(i, it, &e))
 | |
|     {
 | |
|       if (firstRead)
 | |
|       {
 | |
|         cout << "thread[" << a->id << "] first read at " << atTime() << endl;
 | |
|         firstRead = false;
 | |
|       }
 | |
| 
 | |
|       if (e.second < min)
 | |
|         min = e.second;
 | |
| 
 | |
|       if (e.second > max)
 | |
|         max = e.second;
 | |
| 
 | |
|       // output the first 10 of each bucket or last 10 of the datalist
 | |
|       // if (count < 10 || (a->total - count) < 10 || k[i] < 10)
 | |
|       if (count < 2 || (a->total - count) < 2 || k[i] < 2)
 | |
|         cout << "thread[" << a->id << "] bucket:" << i << " e(" << e.first << ", " << e.second << ")" << endl;
 | |
| 
 | |
|       count++;
 | |
|       k[i]++;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   cout << "\nthread[" << a->id << "] element: count = " << count << ", min/max = " << min << "/" << max
 | |
|        << ", elements in each bucket: ";
 | |
| 
 | |
|   for (uint64_t i = 0; i < a->buckets; i++)
 | |
|     cout << k[i] << " ";
 | |
| 
 | |
|   cout << endl;
 | |
| 
 | |
|   cout << "thread " << a->id << " finished at " << atTime() << endl;
 | |
| 
 | |
|   return NULL;
 | |
| }
 | |
| 
 | |
| void* BucketReUseDriver::scanThread(void* arg)
 | |
| {
 | |
|   ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
 | |
|   BucketDataList* dl = a->dl;
 | |
|   CPPUNIT_ASSERT(dl == NULL);
 | |
| 
 | |
|   if (a->cond != NULL)
 | |
|   {
 | |
|     pthread_mutex_lock(a->mutex);
 | |
| 
 | |
|     while (*(a->flag) != true)
 | |
|       pthread_cond_wait(a->cond, a->mutex);
 | |
| 
 | |
|     pthread_mutex_unlock(a->mutex);
 | |
|   }
 | |
| 
 | |
|   cout << "thread " << a->id << " start at " << atTime() << endl;
 | |
| 
 | |
|   string dummy;
 | |
|   bool scan = false;
 | |
|   boost::shared_ptr<execplan::CalpontSystemCatalog> c =
 | |
|       execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
 | |
|   execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
 | |
|   BucketReuseControlEntry* entry =
 | |
|       BucketReuseManager::instance()->userRegister(tcn, dummy, a->version, a->buckets, scan);
 | |
|   CPPUNIT_ASSERT(scan == true);
 | |
| 
 | |
|   ResourceManager rm;
 | |
|   dl = new BucketDataList(a->buckets, 1, a->elements, rm);
 | |
|   dl->setElementMode(1);
 | |
|   dl->reuseControl(entry, !scan);
 | |
| 
 | |
|   ThreadArg arg1 = *a;
 | |
|   arg1.id *= 10;
 | |
|   arg1.dl = dl;
 | |
| 
 | |
|   pthread_t t1;
 | |
|   pthread_create(&t1, NULL, insertThread, &arg1);
 | |
| 
 | |
|   ThreadArg arg2 = arg1;
 | |
|   arg2.id += 1;
 | |
|   pthread_t t2;
 | |
|   pthread_create(&t2, NULL, readThread, &arg2);
 | |
| 
 | |
|   pthread_join(t1, NULL);
 | |
|   pthread_join(t2, NULL);
 | |
| 
 | |
|   delete dl;
 | |
| 
 | |
|   cout << "thread " << a->id << " finished at " << atTime() << endl;
 | |
| 
 | |
|   return NULL;
 | |
| }
 | |
| 
 | |
| void* BucketReUseDriver::reuseThread(void* arg)
 | |
| {
 | |
|   ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
 | |
|   BucketDataList* dl = a->dl;
 | |
|   CPPUNIT_ASSERT(dl == NULL);
 | |
| 
 | |
|   if (a->cond != NULL)
 | |
|   {
 | |
|     pthread_mutex_lock(a->mutex);
 | |
| 
 | |
|     while (*(a->flag) != true)
 | |
|       pthread_cond_wait(a->cond, a->mutex);
 | |
| 
 | |
|     pthread_mutex_unlock(a->mutex);
 | |
|   }
 | |
| 
 | |
|   cout << "thread " << a->id << " start at " << atTime() << endl;
 | |
| 
 | |
|   string dummy;
 | |
|   bool scan = true;
 | |
|   boost::shared_ptr<execplan::CalpontSystemCatalog> c =
 | |
|       execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
 | |
|   execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
 | |
|   BucketReuseControlEntry* entry =
 | |
|       BucketReuseManager::instance()->userRegister(tcn, dummy, a->version, a->buckets, scan);
 | |
|   CPPUNIT_ASSERT(scan == false);
 | |
| 
 | |
|   ResourceManager rm;
 | |
|   dl = new BucketDataList(a->buckets, 1, a->elements, rm);
 | |
|   dl->setElementMode(1);
 | |
|   dl->reuseControl(entry, !scan);
 | |
| 
 | |
|   ThreadArg arg1 = *a;
 | |
|   arg1.id *= 10;
 | |
|   arg1.dl = dl;
 | |
| 
 | |
|   pthread_t t1;
 | |
|   pthread_create(&t1, NULL, readThread, &arg1);
 | |
| 
 | |
|   if (entry->fileStatus() == BucketReuseControlEntry::progress_c)
 | |
|   {
 | |
|     boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
 | |
|     dl->reuseControl()->stateChange().wait(lock);
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     CPPUNIT_ASSERT((entry->fileStatus() == BucketReuseControlEntry::using_c) ||
 | |
|                    (entry->fileStatus() == BucketReuseControlEntry::ready_c));
 | |
|   }
 | |
| 
 | |
|   // the bucket files are ready
 | |
|   dl->restoreBucketInformation();
 | |
|   dl->endOfInput();
 | |
| 
 | |
|   pthread_join(t1, NULL);
 | |
| 
 | |
|   delete dl;
 | |
| 
 | |
|   cout << "thread " << a->id << " finished at " << atTime() << endl;
 | |
| 
 | |
|   return NULL;
 | |
| }
 | |
| 
 | |
| void* BucketReUseDriver::raceThread(void* arg)
 | |
| {
 | |
|   ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
 | |
|   BucketDataList* dl = a->dl;
 | |
|   CPPUNIT_ASSERT(dl == NULL);
 | |
| 
 | |
|   if (a->cond != NULL)
 | |
|   {
 | |
|     pthread_mutex_lock(a->mutex);
 | |
| 
 | |
|     while (*(a->flag) != true)
 | |
|       pthread_cond_wait(a->cond, a->mutex);
 | |
| 
 | |
|     pthread_mutex_unlock(a->mutex);
 | |
|   }
 | |
| 
 | |
|   cout << "thread " << a->id << " start at " << atTime() << endl;
 | |
| 
 | |
|   string dummy;
 | |
|   bool scan = true;
 | |
|   ResourceManager rm;
 | |
|   BucketReuseControlEntry* entry = NULL;
 | |
|   {
 | |
|     boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
 | |
|     boost::shared_ptr<execplan::CalpontSystemCatalog> c =
 | |
|         execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
 | |
|     execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
 | |
|     entry = BucketReuseManager::instance()->userRegister(tcn, dummy, a->version, a->buckets, scan);
 | |
| 
 | |
|     dl = new BucketDataList(a->buckets, 1, a->elements, rm);
 | |
|     dl->setElementMode(1);
 | |
|     dl->reuseControl(entry, !scan);
 | |
|   }
 | |
| 
 | |
|   if (scan == true)
 | |
|   {
 | |
|     CPPUNIT_ASSERT(entry->fileStatus() == BucketReuseControlEntry::progress_c);
 | |
| 
 | |
|     ThreadArg arg1 = *a;
 | |
|     arg1.id *= 10;
 | |
|     arg1.dl = dl;
 | |
| 
 | |
|     pthread_t t1;
 | |
|     pthread_create(&t1, NULL, insertThread, &arg1);
 | |
| 
 | |
|     ThreadArg arg2 = arg1;
 | |
|     arg2.id += 1;
 | |
|     pthread_t t2;
 | |
|     pthread_create(&t2, NULL, readThread, &arg2);
 | |
| 
 | |
|     pthread_join(t1, NULL);
 | |
|     pthread_join(t2, NULL);
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     ThreadArg arg1 = *a;
 | |
|     arg1.id *= 10;
 | |
|     arg1.dl = dl;
 | |
| 
 | |
|     pthread_t t1;
 | |
|     pthread_create(&t1, NULL, readThread, &arg1);
 | |
| 
 | |
|     if (entry->fileStatus() == BucketReuseControlEntry::progress_c)
 | |
|     {
 | |
|       boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
 | |
|       dl->reuseControl()->stateChange().wait(lock);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       CPPUNIT_ASSERT((entry->fileStatus() == BucketReuseControlEntry::using_c) ||
 | |
|                      (entry->fileStatus() == BucketReuseControlEntry::ready_c));
 | |
|     }
 | |
| 
 | |
|     // the bucket files are ready
 | |
|     dl->restoreBucketInformation();
 | |
|     dl->endOfInput();
 | |
| 
 | |
|     pthread_join(t1, NULL);
 | |
|   }
 | |
| 
 | |
|   delete dl;
 | |
| 
 | |
|   cout << "thread " << a->id << " finished at " << atTime() << endl;
 | |
| 
 | |
|   return NULL;
 | |
| }
 | |
| 
 | |
| timespec atTime()
 | |
| {
 | |
|   timespec ts, ts1, ts2;
 | |
|   ts1 = BucketReUseDriver::ts;
 | |
|   clock_gettime(CLOCK_REALTIME, &ts2);
 | |
| 
 | |
|   if (ts2.tv_nsec < ts1.tv_nsec)
 | |
|   {
 | |
|     ts.tv_sec = ts2.tv_sec - ts1.tv_sec - 1;
 | |
|     ts.tv_nsec = ts2.tv_nsec + 1000000000 - ts1.tv_nsec;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     ts.tv_sec = ts2.tv_sec - ts1.tv_sec;
 | |
|     ts.tv_nsec = ts2.tv_nsec - ts1.tv_nsec;
 | |
|   }
 | |
| 
 | |
|   return ts;
 | |
| }
 | |
| 
 | |
| ostream& operator<<(ostream& os, const struct timespec& t)
 | |
| {
 | |
|   os << t.tv_sec << "." << setw(9) << setfill('0') << t.tv_nsec << "s";
 | |
|   return os;
 | |
| }
 |