/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /***************************************************************************** * $Id: tdriver-deliver.cpp 9210 2013-01-21 14:10:42Z rdempsey $ * ****************************************************************************/ #define BENCHMARK #include "joblist.h" #include "jobstep.h" #include "distributedenginecomm.h" #include "calpontsystemcatalog.h" #include "tableband.h" #include #include #include #include #include "stopwatch.cpp" #include "bytestream.h" using namespace messageqcpp; #include "brm.h" using namespace BRM; using namespace std; using namespace joblist; using namespace execplan; string schema("tpch100"); // dmc string table("lineitem"); // create table d_step(c1 number); string strtable("d_strstep"); // create table d_strstep(c1 char(10)); string colstable("lineitem"); // create table d_colsstep(c1 number, c2 number, c3number, c4 number, c5 // number, c6 number, c7 number, c8 number, c9 number,); int dataSize = 100 * 1000 * 1000; // 8 MB int bandSize = 8192 * 8; // 8 K int fifoSize = 128; int startingOid = 3416; // Oid for lineitem.l_orderkey on your database. // uint32_t flushInterval = 16384; // interval used in flushing table bands uint32_t flushInterval = 16384; // interval used in flushing table bands uint32_t columns; Stopwatch timer; //****************************************************************************** //****************************************************************************** // Start of Class to manage table band projection in a separate thread // (modeled after a similar class in the exemgr directory) #include namespace { class BSQueueMgr { public: /** @brief BSQueueMgr constructor. * * @param joblist (in) JobList where table bands reside * @param tableOID (in) OID of the table to be projected * @param maxQueueSize (in) Max # of table bands to be queued up */ explicit BSQueueMgr(DeliveryStep* d, int maxQueueSize = 1); /** @brief TableBandQueueMgr destructor. * */ ~BSQueueMgr(); /** @brief Main processing loop that projects the table bands. * */ void project(); /** @brief Called by second thread to acquire projected table band * * Acquires the next table band that has been projected, so that * the calling thread can serialize and forward the table band to * the next step. * * @return Next projected table band. Row count of 0 marks end of data */ messageqcpp::ByteStream* getNextByteStream(uint32_t& rowCount); private: // Disable these by declaring but not defining BSQueueMgr(const BSQueueMgr& rhs); BSQueueMgr& operator=(const BSQueueMgr& rhs); struct QueueElement { messageqcpp::ByteStream* bs; uint32_t rowCount; }; DeliveryStep* ds; unsigned int fMaxQueueSize; //...Internal queue used to save table bands as they are projected. std::queue fBSQueue; //...Mutex to protect our internal table band queue, and the //...condition variables used to signal when a table band has //...been added to, or removed from the queue. pthread_mutex_t fMutex; pthread_cond_t fBSAdded; pthread_cond_t fBSRemoved; }; BSQueueMgr::BSQueueMgr(DeliveryStep* d, int maxQueueSize) : ds(d), fMaxQueueSize(maxQueueSize) { pthread_mutex_init(&fMutex, 0); pthread_cond_init(&fBSAdded, 0); pthread_cond_init(&fBSRemoved, 0); } //------------------------------------------------------------------------------ // BSQueueMgr destructor //------------------------------------------------------------------------------ BSQueueMgr::~BSQueueMgr() { pthread_mutex_destroy(&fMutex); pthread_cond_destroy(&fBSAdded); pthread_cond_destroy(&fBSRemoved); } //------------------------------------------------------------------------------ // Contains loop to project the table bands. They are stored into our internal // queue, until we reach the queue size limit, in which case, we wait for the // consumer to work off some of the table bands. This will in turn free us up // to continue adding projected table bands, until we reach a table band with // a row count of 0, which denotes the end of the table. //------------------------------------------------------------------------------ void BSQueueMgr::project() { bool moreData = true; //...Stay in loop to project table bands, until we reach a table band //...having a row count of 0. while (moreData) { uint32_t rowCount; QueueElement qe; qe.bs = new ByteStream; rowCount = ds->nextBand(*(qe.bs)); qe.rowCount = rowCount; pthread_mutex_lock(&fMutex); //...Wait for room in our queue before adding this table band while (fBSQueue.size() >= fMaxQueueSize) { pthread_cond_wait(&fBSRemoved, &fMutex); } fBSQueue.push(qe); if (rowCount == 0) moreData = false; pthread_cond_broadcast(&fBSAdded); pthread_mutex_unlock(&fMutex); } } //------------------------------------------------------------------------------ // Extract a projected table band from our internal queue of table bands. // Returns next projected table band. A row count of 0, marks the table // band as being the last. //------------------------------------------------------------------------------ ByteStream* BSQueueMgr::getNextByteStream(uint32_t& rowCount) { QueueElement qe; pthread_mutex_lock(&fMutex); //...Wait for a table band to be added to our queue if the queue is empty while (fBSQueue.size() < 1) { pthread_cond_wait(&fBSAdded, &fMutex); } qe = fBSQueue.front(); fBSQueue.pop(); pthread_cond_broadcast(&fBSRemoved); pthread_mutex_unlock(&fMutex); rowCount = qe.rowCount; return qe.bs; } struct BSProjectThread { BSProjectThread(BSQueueMgr* pMgr) : fBSQueueMgr(pMgr) { } BSQueueMgr* fBSQueueMgr; void operator()() { fBSQueueMgr->project(); } }; // // A class that manages projection of table bands for a specific table // // This class manages table band projections for a specified table OID. // The projections can be performed in a separate thread, allowing the // serialization of the resulting table band(s) to occur concurrently. // The projected table band(s) are saved into an internal queue, with the // size of the queue being controlled through a constructor argument. // class TableBandQueueMgr { public: // // TableBandQueueMgr constructor. // // pDStep (in) Delivery step where table bands reside // maxQueueSize (in) Max # of table bands to be queued up // explicit TableBandQueueMgr(DeliveryStep* pDStep, int maxQueueSize = 1) : fDStep(pDStep), fMaxQueueSize(maxQueueSize) { pthread_mutex_init(&fMutex, 0); pthread_cond_init(&fTableBandAdded, 0); pthread_cond_init(&fTableBandRemoved, 0); } // TableBandQueueMgr destructor. ~TableBandQueueMgr() { pthread_mutex_destroy(&fMutex); pthread_cond_destroy(&fTableBandAdded); pthread_cond_destroy(&fTableBandRemoved); } // Main processing loop that projects the table bands. void project(); // Called by second thread to acquire projected table band // // Acquires the next table band that has been projected, so that // the calling thread can serialize and forward the table band to // the next step. // // Returns next projected table band. Row count of 0 marks end of data joblist::TableBand* getNextTableBand(); private: // Disable these by declaring but not defining TableBandQueueMgr(const TableBandQueueMgr& rhs); TableBandQueueMgr& operator=(const TableBandQueueMgr& rhs); DeliveryStep* fDStep; unsigned int fMaxQueueSize; // Internal queue used to save table bands as they are projected. std::queue fTblBandQueue; // Mutex to protect our internal table band queue, and the // condition variables used to signal when a table band has // been added to, or removed from the queue. pthread_mutex_t fMutex; pthread_cond_t fTableBandAdded; pthread_cond_t fTableBandRemoved; }; // // Contains loop to project the table bands. They are stored into our internal // queue, until we reach the queue size limit, in which case, we wait for the // consumer to work off some of the table bands. This will in turn free us up // to continue adding projected table bands, until we reach a table band with // a row count of 0, which denotes the end of the table. // void TableBandQueueMgr::project() { bool moreData = true; //...Stay in loop to project table bands, until we reach a table band //...having a row count of 0. while (moreData) { joblist::TableBand* pTblBand = new joblist::TableBand; *pTblBand = fDStep->nextBand(); pthread_mutex_lock(&fMutex); //...Wait for room in our queue before adding this table band while (fTblBandQueue.size() >= fMaxQueueSize) { pthread_cond_wait(&fTableBandRemoved, &fMutex); } fTblBandQueue.push(pTblBand); if (pTblBand->getRowCount() == 0) moreData = false; pthread_cond_broadcast(&fTableBandAdded); pthread_mutex_unlock(&fMutex); } } // // Extract a projected table band from our internal queue of table bands. // Returns next projected table band. A row count of 0, marks the table // band as being the last. // joblist::TableBand* TableBandQueueMgr::getNextTableBand() { joblist::TableBand* pTblBand = 0; pthread_mutex_lock(&fMutex); //...Wait for a table band to be added to our queue if the queue is empty while (fTblBandQueue.size() < 1) { pthread_cond_wait(&fTableBandAdded, &fMutex); } pTblBand = fTblBandQueue.front(); fTblBandQueue.pop(); //...If the row count is 0, there is no need to notify our producing //...thread, since that means we have reached the end of data. if (pTblBand->getRowCount() != 0) pthread_cond_broadcast(&fTableBandRemoved); pthread_mutex_unlock(&fMutex); return pTblBand; } } // end of namespace // End of Class to manage table band projection in a separate thread //****************************************************************************** //****************************************************************************** //------------------------------------------------------------------------------ // Drives thread to project table bands for delivery. //------------------------------------------------------------------------------ void* projectThreadWrapper(void* pThreadData) { TableBandQueueMgr* tableBandMgr = (TableBandQueueMgr*)pThreadData; // cout << "Starting thread to project columns..." << endl; tableBandMgr->project(); // cout << "Finished thread to project columns..." << endl; return 0; } //------------------------------------------------------------------------------ // Perform column projection and serialization for the given delivery step //------------------------------------------------------------------------------ void runStep(DeliveryStep& dstep, const string& message) { string nextBandMsg(message); nextBandMsg += " - nextBand()"; string serializeMsg(message); serializeMsg += " - serialize()"; int nextBandCount = 0; ByteStream bs; TableBand tb; //...Perform table band projection and serialization in succession #if 0 while (1) { // timer.start(nextBandMsg); tb = dstep.nextBand(); nextBandCount++; // timer.stop (nextBandMsg); // timer.start(serializeMsg); bs.reset(); tb.serialize(bs); // timer.stop( serializeMsg); if (tb.getRowCount() == 0) break; } //...Perform table band projection and serialization in parallel #else string thrCreateMsg(message); thrCreateMsg += " - serialize-thrCreate"; string thrJoinMsg(message); thrJoinMsg += " - serialize-thrJoin"; string serializeWaitMsg(message); serializeWaitMsg += " - serialize-Wait"; //...Would prefer to record this label in projectThreadWrapper, but //...Stopwatch is not threadsafe, so safer to put here in main thread, //...where the other Stopwatch times are recorded. Note that this //...time will overlap the other timestamps we are recording. // timer.start(nextBandMsg); //...Start a second thread that will allow us to perform //...table projections in parallel with band serialization // timer.start(thrCreateMsg); TableBandQueueMgr tableBandMgr(&dstep, 1); pthread_t projectionThread; pthread_create(&projectionThread, 0, projectThreadWrapper, &tableBandMgr); // timer.stop (thrCreateMsg); while (1) { //...The amount of time we spend waiting will help tell us how //...much extra time is being spent constructing the table bands // timer.start(serializeWaitMsg); boost::scoped_ptr band(tableBandMgr.getNextTableBand()); nextBandCount++; // timer.stop (serializeWaitMsg); // timer.start(serializeMsg); bs.reset(); band->serialize(bs); // timer.stop( serializeMsg); if (band->getRowCount() == 0) break; } // timer.stop(nextBandMsg); // timer.start(thrJoinMsg); pthread_join(projectionThread, 0); // timer.stop (thrJoinMsg); #endif cout << nextBandCount << " table bands delivered" << endl; } //------------------------------------------------------------------------------ // Add elements to a BandedDL (outdated version, replaced by FIFO version) //------------------------------------------------------------------------------ void addElements(BandedDL* dl1) { ElementType e; for (int i = 1; i <= dataSize; i++) { e.first = i; e.second = i; dl1->insert(e); } dl1->endOfInput(); } //------------------------------------------------------------------------------ // Add string elements to a FifoDL //------------------------------------------------------------------------------ void addElements(FIFO* dl1) { StringElementType e; for (int i = 1; i <= dataSize; i++) { e.first = i; e.second = strtable; dl1->insert(e); } dl1->endOfInput(); } //------------------------------------------------------------------------------ // Add numeric elements to a FifoDL //------------------------------------------------------------------------------ void addElements(FIFO* dl1) { ElementType e; int wrapCount = 0; UintRowGroup rg; for (int i = 1; i <= dataSize; i++) { e.first = i; e.second = i; rg.et[wrapCount] = e; wrapCount++; if (wrapCount == 8192 || i == dataSize) { rg.count = wrapCount; dl1->insert(rg); wrapCount = 0; } } dl1->endOfInput(); } //------------------------------------------------------------------------------ // Test delivery of a single numeric column //------------------------------------------------------------------------------ void deliveryStep_1() // number column { DistributedEngineComm* dec; boost::shared_ptr cat; ResourceManager rm; dec = DistributedEngineComm::instance(rm); cat = CalpontSystemCatalog::makeCalpontSystemCatalog(1); DBRM dbrm; uint32_t uniqueID = dbrm.getUnique32(); dec->addQueue(uniqueID); JobStepAssociation inJs; AnyDataListSPtr spdl1(new AnyDataList()); FIFO* dl1 = new FIFO(1, 100); addElements(dl1); spdl1->fifoDL(dl1); inJs.outAdd(spdl1); DeliveryStep dstep(inJs, JobStepAssociation(), make_table(schema, table), cat, 1, 1, 1, flushInterval); runStep(dstep, string("deliveryStep_1")); dec->removeQueue(uniqueID); } //------------------------------------------------------------------------------ // Test delivery of a single string column //------------------------------------------------------------------------------ /* void deliveryStep_2() //string column { DistributedEngineComm* dec; boost::shared_ptr cat; dec = DistributedEngineComm::instance(); cat = CalpontSystemCatalog::makeCalpontSystemCatalog(1); dec->addSession(12345); dec->addStep(12345, 0); JobStepAssociation inJs; AnyDataListSPtr spdl1(new AnyDataList()); FIFO* dl1 = new FIFO(1,100); addElements(dl1); StringElementType e; spdl1->stringDL(dl1); inJs.outAdd(spdl1); DeliveryStep dstep(inJs, JobStepAssociation(), make_table(schema, strtable), cat, bandSize, 1, 1, 1, flushInterval); runStep(dstep, string("deliveryStep_2")); dec->removeSession(12345); } */ //------------------------------------------------------------------------------ // Drives thread to add elements to the specified FIFO. //------------------------------------------------------------------------------ void* addElementsThreadWrapper(void* pThreadData) { FIFO* dl1 = (FIFO*)pThreadData; // cout << "Starting thread to add elements for column " << // dl1->OID() << endl; addElements(dl1); // cout << "Finished thread to add elements for column " << // dl1->OID() << endl; return 0; } //------------------------------------------------------------------------------ // Test delivery of multiple (numCols) numeric columns //------------------------------------------------------------------------------ void deliverTest(int numCols) { DistributedEngineComm* dec; boost::shared_ptr cat; ResourceManager rm; dec = DistributedEngineComm::instance(rm); cat = CalpontSystemCatalog::makeCalpontSystemCatalog(1); // Get the oid for the first column in the table - usually lineitem.l_orderkey. CalpontSystemCatalog::TableName tableName = make_table(schema, colstable); CalpontSystemCatalog::ROPair p = cat->tableRID(tableName); startingOid = p.objnum + 1; int startingOid = 3416; // Oid for lineitem.l_orderkey on your database. DBRM dbrm; uint32_t uniqueID = dbrm.getUnique32(); dec->addQueue(uniqueID); JobStepAssociation inJs; stringstream ss; pthread_t threads[numCols]; for (int i = 0; i < numCols; ++i) { AnyDataListSPtr spdl1(new AnyDataList()); // Make FIFO big enough to contain the elements for a flush interval FIFO* dl1 = new FIFO(1, fifoSize); // BandedDL* dl1 = new BandedDL(1); dl1->OID(i + startingOid); // lineitem first col object id spdl1->fifoDL(dl1); // spdl1->bandedDL(dl1); inJs.outAdd(spdl1); pthread_create(&threads[i], 0, addElementsThreadWrapper, dl1); } DeliveryStep dstep(inJs, JobStepAssociation(), tableName, cat, 12345, 1, 1, flushInterval); ss << "DeliverStep test for " << numCols; string message = ss.str(); runStep(dstep, ss.str()); for (int i = 0; i < numCols; ++i) { pthread_join(threads[i], 0); } dec->removeQueue(uniqueID); } //------------------------------------------------------------------------------ // Perform testcases for a 1 column table, a 2 column table, etc, up to // a table having "maxCols" columns. //------------------------------------------------------------------------------ void testSizes() { // Prompt for schema. cout << "Enter Schema or Enter for " << schema << ": "; string tmpSchema; getline(cin, tmpSchema); if (tmpSchema.length() > 0) { schema = tmpSchema; } // Prompt for table. cout << "Enter Table or Enter for " << colstable << ": "; string tmpTable; getline(cin, tmpTable); if (tmpTable.length() > 0) { colstable = tmpTable; } timer.start("Total"); int maxCols = 9; cout << endl; for (int i = 7; i <= maxCols; i++) { cout << endl << "Running test " << i << " of " << maxCols << endl; stringstream ss; ss << "Delivery test for " << dataSize << " rows and " << i << " columns"; timer.start(ss.str()); deliverTest(i); timer.stop(ss.str()); } timer.stop("Total"); timer.finish(); } void* nextBandBenchProducer(void* arg) { FIFO* dl1 = (FIFO*)arg; UintRowGroup rg; uint64_t* arr; uint32_t i; arr = (uint64_t*)rg.et; for (i = 0; i < 8192; ++i) arr[i] = i; rg.count = 8192; for (i = 1; i <= dataSize / 8192; i++) { // cout << "inserting set " << i << endl; dl1->insert(rg); } dl1->endOfInput(); return NULL; } void nextBandBenchmark() { ByteStream bs; pthread_t threads[columns]; uint32_t i, rowCount = 1; JobStepAssociation inJs; for (i = 0; i < columns; ++i) { AnyDataListSPtr spdl1(new AnyDataList()); FIFO* dl1 = new FIFO(1, fifoSize); dl1->OID(i); // lineitem first col object id spdl1->fifoDL(dl1); inJs.outAdd(spdl1); pthread_create(&threads[i], 0, nextBandBenchProducer, dl1); cout << "started thread " << i << endl; } DeliveryStep ds(inJs, JobStepAssociation(), 8); stringstream ss; ss << "nextBandBenchmark with " << columns << " columns\n"; timer.start(ss.str()); while (rowCount != 0) { // cout << "getting a BS\n"; rowCount = ds.nextBand(bs); bs.restart(); // cout << "got a BS\n"; } timer.stop(ss.str()); for (i = 0; i < columns; ++i) pthread_join(threads[i], NULL); } void queuedBSBenchmark(int queueLength) { ByteStream* bs; pthread_t threads[columns]; uint32_t i, rowCount = 1; JobStepAssociation inJs; for (i = 0; i < columns; ++i) { AnyDataListSPtr spdl1(new AnyDataList()); FIFO* dl1 = new FIFO(1, fifoSize); dl1->OID(i); // lineitem first col object id spdl1->fifoDL(dl1); inJs.outAdd(spdl1); pthread_create(&threads[i], 0, nextBandBenchProducer, dl1); cout << "started thread " << i << endl; } DeliveryStep ds(inJs, JobStepAssociation(), 8); BSQueueMgr bsq(&ds, queueLength); stringstream ss; ss << "queuedBSBenchmark with " << columns << " columns and " << queueLength << " queue length\n"; timer.start(ss.str()); boost::thread(BSProjectThread(&bsq)); while (rowCount != 0) { bs = bsq.getNextByteStream(rowCount); delete bs; } timer.stop(ss.str()); for (i = 0; i < columns; ++i) pthread_join(threads[i], NULL); } //------------------------------------------------------------------------------ // Main entry point //------------------------------------------------------------------------------ int main(int argc, char** argv) { if (argc > 1) columns = atoi(argv[1]); // override default number of rows else columns = 10; while (columns > 0) { // testSizes(); nextBandBenchmark(); queuedBSBenchmark(10); queuedBSBenchmark(9); queuedBSBenchmark(8); queuedBSBenchmark(5); queuedBSBenchmark(4); queuedBSBenchmark(1); timer.finish(); columns--; } return 0; }