You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			850 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			850 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /*****************************************************************************
 | |
|  * $Id: tdriver-deliver.cpp 9210 2013-01-21 14:10:42Z rdempsey $
 | |
|  *
 | |
|  ****************************************************************************/
 | |
| 
 | |
| #define BENCHMARK
 | |
| 
 | |
| #include "joblist.h"
 | |
| #include "jobstep.h"
 | |
| #include "distributedenginecomm.h"
 | |
| #include "calpontsystemcatalog.h"
 | |
| #include "tableband.h"
 | |
| #include <sys/timeb.h>
 | |
| #include <iostream>
 | |
| #include <pthread.h>
 | |
| #include <boost/scoped_ptr.hpp>
 | |
| 
 | |
| #include "stopwatch.cpp"
 | |
| 
 | |
| #include "bytestream.h"
 | |
| using namespace messageqcpp;
 | |
| 
 | |
| #include "brm.h"
 | |
| using namespace BRM;
 | |
| 
 | |
| using namespace std;
 | |
| using namespace joblist;
 | |
| using namespace execplan;
 | |
| 
 | |
| string schema("tpch100");      // dmc
 | |
| string table("lineitem");      // create table d_step(c1 number);
 | |
| string strtable("d_strstep");  // create table d_strstep(c1 char(10));
 | |
| string colstable("lineitem");  // create table d_colsstep(c1 number, c2 number, c3number, c4 number, c5
 | |
|                                // number, c6 number, c7 number, c8 number, c9 number,);
 | |
| 
 | |
| int dataSize = 100 * 1000 * 1000;  // 8 MB
 | |
| int bandSize = 8192 * 8;           // 8 K
 | |
| int fifoSize = 128;
 | |
| int startingOid = 3416;  // Oid for lineitem.l_orderkey on your database.
 | |
| // uint32_t flushInterval = 16384; // interval used in flushing table bands
 | |
| uint32_t flushInterval = 16384;  // interval used in flushing table bands
 | |
| uint32_t columns;
 | |
| 
 | |
| Stopwatch timer;
 | |
| 
 | |
| //******************************************************************************
 | |
| //******************************************************************************
 | |
| // Start of Class to manage table band projection in a separate thread
 | |
| // (modeled after a similar class in the exemgr directory)
 | |
| 
 | |
| #include <queue>
 | |
| 
 | |
| namespace
 | |
| {
 | |
| class BSQueueMgr
 | |
| {
 | |
|  public:
 | |
|   /** @brief BSQueueMgr constructor.
 | |
|    *
 | |
|    * @param joblist      (in) JobList where table bands reside
 | |
|    * @param tableOID     (in) OID of the table to be projected
 | |
|    * @param maxQueueSize (in) Max # of table bands to be queued up
 | |
|    */
 | |
|   explicit BSQueueMgr(DeliveryStep* d, int maxQueueSize = 1);
 | |
| 
 | |
|   /** @brief TableBandQueueMgr destructor.
 | |
|    *
 | |
|    */
 | |
|   ~BSQueueMgr();
 | |
| 
 | |
|   /** @brief Main processing loop that projects the table bands.
 | |
|    *
 | |
|    */
 | |
|   void project();
 | |
| 
 | |
|   /** @brief Called by second thread to acquire projected table band
 | |
|    *
 | |
|    * Acquires the next table band that has been projected, so that
 | |
|    * the calling thread can serialize and forward the table band to
 | |
|    * the next step.
 | |
|    *
 | |
|    * @return Next projected table band. Row count of 0 marks end of data
 | |
|    */
 | |
|   messageqcpp::ByteStream* getNextByteStream(uint32_t& rowCount);
 | |
| 
 | |
|  private:
 | |
|   // Disable these by declaring but not defining
 | |
|   BSQueueMgr(const BSQueueMgr& rhs);
 | |
|   BSQueueMgr& operator=(const BSQueueMgr& rhs);
 | |
| 
 | |
|   struct QueueElement
 | |
|   {
 | |
|     messageqcpp::ByteStream* bs;
 | |
|     uint32_t rowCount;
 | |
|   };
 | |
| 
 | |
|   DeliveryStep* ds;
 | |
|   unsigned int fMaxQueueSize;
 | |
| 
 | |
|   //...Internal queue used to save table bands as they are projected.
 | |
|   std::queue<QueueElement> fBSQueue;
 | |
| 
 | |
|   //...Mutex to protect our internal table band queue, and the
 | |
|   //...condition variables used to signal when a table band has
 | |
|   //...been added to, or removed from the queue.
 | |
|   pthread_mutex_t fMutex;
 | |
|   pthread_cond_t fBSAdded;
 | |
|   pthread_cond_t fBSRemoved;
 | |
| };
 | |
| 
 | |
| BSQueueMgr::BSQueueMgr(DeliveryStep* d, int maxQueueSize) : ds(d), fMaxQueueSize(maxQueueSize)
 | |
| {
 | |
|   pthread_mutex_init(&fMutex, 0);
 | |
|   pthread_cond_init(&fBSAdded, 0);
 | |
|   pthread_cond_init(&fBSRemoved, 0);
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // BSQueueMgr destructor
 | |
| //------------------------------------------------------------------------------
 | |
| BSQueueMgr::~BSQueueMgr()
 | |
| {
 | |
|   pthread_mutex_destroy(&fMutex);
 | |
|   pthread_cond_destroy(&fBSAdded);
 | |
|   pthread_cond_destroy(&fBSRemoved);
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Contains loop to project the table bands.  They are stored into our internal
 | |
| // queue, until we reach the queue size limit, in which case, we wait for the
 | |
| // consumer to work off some of the table bands.  This will in turn free us up
 | |
| // to continue adding projected table bands, until we reach a table band with
 | |
| // a row count of 0, which denotes the end of the table.
 | |
| //------------------------------------------------------------------------------
 | |
| void BSQueueMgr::project()
 | |
| {
 | |
|   bool moreData = true;
 | |
| 
 | |
|   //...Stay in loop to project table bands, until we reach a table band
 | |
|   //...having a row count of 0.
 | |
| 
 | |
|   while (moreData)
 | |
|   {
 | |
|     uint32_t rowCount;
 | |
|     QueueElement qe;
 | |
| 
 | |
|     qe.bs = new ByteStream;
 | |
| 
 | |
|     rowCount = ds->nextBand(*(qe.bs));
 | |
|     qe.rowCount = rowCount;
 | |
| 
 | |
|     pthread_mutex_lock(&fMutex);
 | |
| 
 | |
|     //...Wait for room in our queue before adding this table band
 | |
|     while (fBSQueue.size() >= fMaxQueueSize)
 | |
|     {
 | |
|       pthread_cond_wait(&fBSRemoved, &fMutex);
 | |
|     }
 | |
| 
 | |
|     fBSQueue.push(qe);
 | |
| 
 | |
|     if (rowCount == 0)
 | |
|       moreData = false;
 | |
| 
 | |
|     pthread_cond_broadcast(&fBSAdded);
 | |
| 
 | |
|     pthread_mutex_unlock(&fMutex);
 | |
|   }
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Extract a projected table band from our internal queue of table bands.
 | |
| // Returns next projected table band.  A row count of 0, marks the table
 | |
| // band as being the last.
 | |
| //------------------------------------------------------------------------------
 | |
| ByteStream* BSQueueMgr::getNextByteStream(uint32_t& rowCount)
 | |
| {
 | |
|   QueueElement qe;
 | |
| 
 | |
|   pthread_mutex_lock(&fMutex);
 | |
| 
 | |
|   //...Wait for a table band to be added to our queue if the queue is empty
 | |
|   while (fBSQueue.size() < 1)
 | |
|   {
 | |
|     pthread_cond_wait(&fBSAdded, &fMutex);
 | |
|   }
 | |
| 
 | |
|   qe = fBSQueue.front();
 | |
|   fBSQueue.pop();
 | |
|   pthread_cond_broadcast(&fBSRemoved);
 | |
| 
 | |
|   pthread_mutex_unlock(&fMutex);
 | |
| 
 | |
|   rowCount = qe.rowCount;
 | |
|   return qe.bs;
 | |
| }
 | |
| 
 | |
| struct BSProjectThread
 | |
| {
 | |
|   BSProjectThread(BSQueueMgr* pMgr) : fBSQueueMgr(pMgr)
 | |
|   {
 | |
|   }
 | |
| 
 | |
|   BSQueueMgr* fBSQueueMgr;
 | |
| 
 | |
|   void operator()()
 | |
|   {
 | |
|     fBSQueueMgr->project();
 | |
|   }
 | |
| };
 | |
| 
 | |
| //
 | |
| // A class that manages projection of table bands for a specific table
 | |
| //
 | |
| // This class manages table band projections for a specified table OID.
 | |
| // The projections can be performed in a separate thread, allowing the
 | |
| // serialization of the resulting table band(s) to occur concurrently.
 | |
| // The projected table band(s) are saved into an internal queue, with the
 | |
| // size of the queue being controlled through a constructor argument.
 | |
| //
 | |
| class TableBandQueueMgr
 | |
| {
 | |
|  public:
 | |
|   //
 | |
|   // TableBandQueueMgr constructor.
 | |
|   //
 | |
|   // pDStep       (in) Delivery step where table bands reside
 | |
|   // maxQueueSize (in) Max # of table bands to be queued up
 | |
|   //
 | |
|   explicit TableBandQueueMgr(DeliveryStep* pDStep, int maxQueueSize = 1)
 | |
|    : fDStep(pDStep), fMaxQueueSize(maxQueueSize)
 | |
|   {
 | |
|     pthread_mutex_init(&fMutex, 0);
 | |
|     pthread_cond_init(&fTableBandAdded, 0);
 | |
|     pthread_cond_init(&fTableBandRemoved, 0);
 | |
|   }
 | |
| 
 | |
|   // TableBandQueueMgr destructor.
 | |
|   ~TableBandQueueMgr()
 | |
|   {
 | |
|     pthread_mutex_destroy(&fMutex);
 | |
|     pthread_cond_destroy(&fTableBandAdded);
 | |
|     pthread_cond_destroy(&fTableBandRemoved);
 | |
|   }
 | |
| 
 | |
|   // Main processing loop that projects the table bands.
 | |
|   void project();
 | |
| 
 | |
|   // Called by second thread to acquire projected table band
 | |
|   //
 | |
|   // Acquires the next table band that has been projected, so that
 | |
|   // the calling thread can serialize and forward the table band to
 | |
|   // the next step.
 | |
|   //
 | |
|   // Returns next projected table band. Row count of 0 marks end of data
 | |
|   joblist::TableBand* getNextTableBand();
 | |
| 
 | |
|  private:
 | |
|   // Disable these by declaring but not defining
 | |
|   TableBandQueueMgr(const TableBandQueueMgr& rhs);
 | |
|   TableBandQueueMgr& operator=(const TableBandQueueMgr& rhs);
 | |
| 
 | |
|   DeliveryStep* fDStep;
 | |
|   unsigned int fMaxQueueSize;
 | |
| 
 | |
|   // Internal queue used to save table bands as they are projected.
 | |
|   std::queue<joblist::TableBand*> fTblBandQueue;
 | |
| 
 | |
|   // Mutex to protect our internal table band queue, and the
 | |
|   // condition variables used to signal when a table band has
 | |
|   // been added to, or removed from the queue.
 | |
|   pthread_mutex_t fMutex;
 | |
|   pthread_cond_t fTableBandAdded;
 | |
|   pthread_cond_t fTableBandRemoved;
 | |
| };
 | |
| 
 | |
| //
 | |
| // Contains loop to project the table bands.  They are stored into our internal
 | |
| // queue, until we reach the queue size limit, in which case, we wait for the
 | |
| // consumer to work off some of the table bands.  This will in turn free us up
 | |
| // to continue adding projected table bands, until we reach a table band with
 | |
| // a row count of 0, which denotes the end of the table.
 | |
| //
 | |
| void TableBandQueueMgr::project()
 | |
| {
 | |
|   bool moreData = true;
 | |
| 
 | |
|   //...Stay in loop to project table bands, until we reach a table band
 | |
|   //...having a row count of 0.
 | |
| 
 | |
|   while (moreData)
 | |
|   {
 | |
|     joblist::TableBand* pTblBand = new joblist::TableBand;
 | |
|     *pTblBand = fDStep->nextBand();
 | |
| 
 | |
|     pthread_mutex_lock(&fMutex);
 | |
| 
 | |
|     //...Wait for room in our queue before adding this table band
 | |
|     while (fTblBandQueue.size() >= fMaxQueueSize)
 | |
|     {
 | |
|       pthread_cond_wait(&fTableBandRemoved, &fMutex);
 | |
|     }
 | |
| 
 | |
|     fTblBandQueue.push(pTblBand);
 | |
| 
 | |
|     if (pTblBand->getRowCount() == 0)
 | |
|       moreData = false;
 | |
| 
 | |
|     pthread_cond_broadcast(&fTableBandAdded);
 | |
| 
 | |
|     pthread_mutex_unlock(&fMutex);
 | |
|   }
 | |
| }
 | |
| 
 | |
| //
 | |
| // Extract a projected table band from our internal queue of table bands.
 | |
| // Returns next projected table band.  A row count of 0, marks the table
 | |
| // band as being the last.
 | |
| //
 | |
| joblist::TableBand* TableBandQueueMgr::getNextTableBand()
 | |
| {
 | |
|   joblist::TableBand* pTblBand = 0;
 | |
| 
 | |
|   pthread_mutex_lock(&fMutex);
 | |
| 
 | |
|   //...Wait for a table band to be added to our queue if the queue is empty
 | |
|   while (fTblBandQueue.size() < 1)
 | |
|   {
 | |
|     pthread_cond_wait(&fTableBandAdded, &fMutex);
 | |
|   }
 | |
| 
 | |
|   pTblBand = fTblBandQueue.front();
 | |
|   fTblBandQueue.pop();
 | |
| 
 | |
|   //...If the row count is 0, there is no need to notify our producing
 | |
|   //...thread, since that means we have reached the end of data.
 | |
|   if (pTblBand->getRowCount() != 0)
 | |
|     pthread_cond_broadcast(&fTableBandRemoved);
 | |
| 
 | |
|   pthread_mutex_unlock(&fMutex);
 | |
| 
 | |
|   return pTblBand;
 | |
| }
 | |
| 
 | |
| }  // end of namespace
 | |
| 
 | |
| // End of Class to manage table band projection in a separate thread
 | |
| //******************************************************************************
 | |
| //******************************************************************************
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Drives thread to project table bands for delivery.
 | |
| //------------------------------------------------------------------------------
 | |
| void* projectThreadWrapper(void* pThreadData)
 | |
| {
 | |
|   TableBandQueueMgr* tableBandMgr = (TableBandQueueMgr*)pThreadData;
 | |
| 
 | |
|   // cout << "Starting thread to project columns..." << endl;
 | |
|   tableBandMgr->project();
 | |
|   // cout << "Finished thread to project columns..." << endl;
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Perform column projection and serialization for the given delivery step
 | |
| //------------------------------------------------------------------------------
 | |
| void runStep(DeliveryStep& dstep, const string& message)
 | |
| {
 | |
|   string nextBandMsg(message);
 | |
|   nextBandMsg += " - nextBand()";
 | |
|   string serializeMsg(message);
 | |
|   serializeMsg += " - serialize()";
 | |
|   int nextBandCount = 0;
 | |
| 
 | |
|   ByteStream bs;
 | |
|   TableBand tb;
 | |
| 
 | |
| //...Perform table band projection and serialization in succession
 | |
| #if 0
 | |
| 
 | |
|     while (1)
 | |
|     {
 | |
|         // timer.start(nextBandMsg);
 | |
|         tb = dstep.nextBand();
 | |
|         nextBandCount++;
 | |
|         // timer.stop (nextBandMsg);
 | |
| 
 | |
|         // timer.start(serializeMsg);
 | |
|         bs.reset();
 | |
|         tb.serialize(bs);
 | |
|         // timer.stop( serializeMsg);
 | |
| 
 | |
|         if (tb.getRowCount() == 0)
 | |
|             break;
 | |
|     }
 | |
| 
 | |
| //...Perform table band projection and serialization in parallel
 | |
| #else
 | |
|   string thrCreateMsg(message);
 | |
|   thrCreateMsg += " - serialize-thrCreate";
 | |
|   string thrJoinMsg(message);
 | |
|   thrJoinMsg += " - serialize-thrJoin";
 | |
|   string serializeWaitMsg(message);
 | |
|   serializeWaitMsg += " - serialize-Wait";
 | |
| 
 | |
|   //...Would prefer to record this label in projectThreadWrapper, but
 | |
|   //...Stopwatch is not threadsafe, so safer to put here in main thread,
 | |
|   //...where the other Stopwatch times are recorded.  Note that this
 | |
|   //...time will overlap the other timestamps we are recording.
 | |
|   // timer.start(nextBandMsg);
 | |
| 
 | |
|   //...Start a second thread that will allow us to perform
 | |
|   //...table projections in parallel with band serialization
 | |
|   // timer.start(thrCreateMsg);
 | |
|   TableBandQueueMgr tableBandMgr(&dstep, 1);
 | |
|   pthread_t projectionThread;
 | |
|   pthread_create(&projectionThread, 0, projectThreadWrapper, &tableBandMgr);
 | |
|   // timer.stop (thrCreateMsg);
 | |
| 
 | |
|   while (1)
 | |
|   {
 | |
|     //...The amount of time we spend waiting will help tell us how
 | |
|     //...much extra time is being spent constructing the table bands
 | |
|     // timer.start(serializeWaitMsg);
 | |
|     boost::scoped_ptr<TableBand> band(tableBandMgr.getNextTableBand());
 | |
|     nextBandCount++;
 | |
|     // timer.stop (serializeWaitMsg);
 | |
| 
 | |
|     // timer.start(serializeMsg);
 | |
|     bs.reset();
 | |
|     band->serialize(bs);
 | |
|     // timer.stop( serializeMsg);
 | |
| 
 | |
|     if (band->getRowCount() == 0)
 | |
|       break;
 | |
|   }
 | |
| 
 | |
|   // timer.stop(nextBandMsg);
 | |
| 
 | |
|   // timer.start(thrJoinMsg);
 | |
|   pthread_join(projectionThread, 0);
 | |
|   // timer.stop (thrJoinMsg);
 | |
| #endif
 | |
|   cout << nextBandCount << " table bands delivered" << endl;
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Add elements to a BandedDL (outdated version, replaced by FIFO version)
 | |
| //------------------------------------------------------------------------------
 | |
| void addElements(BandedDL<ElementType>* dl1)
 | |
| {
 | |
|   ElementType e;
 | |
| 
 | |
|   for (int i = 1; i <= dataSize; i++)
 | |
|   {
 | |
|     e.first = i;
 | |
|     e.second = i;
 | |
|     dl1->insert(e);
 | |
|   }
 | |
| 
 | |
|   dl1->endOfInput();
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Add string elements to a FifoDL
 | |
| //------------------------------------------------------------------------------
 | |
| void addElements(FIFO<StringElementType>* dl1)
 | |
| {
 | |
|   StringElementType e;
 | |
| 
 | |
|   for (int i = 1; i <= dataSize; i++)
 | |
|   {
 | |
|     e.first = i;
 | |
|     e.second = strtable;
 | |
|     dl1->insert(e);
 | |
|   }
 | |
| 
 | |
|   dl1->endOfInput();
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Add numeric elements to a FifoDL
 | |
| //------------------------------------------------------------------------------
 | |
| void addElements(FIFO<UintRowGroup>* dl1)
 | |
| {
 | |
|   ElementType e;
 | |
|   int wrapCount = 0;
 | |
|   UintRowGroup rg;
 | |
| 
 | |
|   for (int i = 1; i <= dataSize; i++)
 | |
|   {
 | |
|     e.first = i;
 | |
|     e.second = i;
 | |
| 
 | |
|     rg.et[wrapCount] = e;
 | |
|     wrapCount++;
 | |
| 
 | |
|     if (wrapCount == 8192 || i == dataSize)
 | |
|     {
 | |
|       rg.count = wrapCount;
 | |
|       dl1->insert(rg);
 | |
|       wrapCount = 0;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   dl1->endOfInput();
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Test delivery of a single numeric column
 | |
| //------------------------------------------------------------------------------
 | |
| void deliveryStep_1()  // number column
 | |
| {
 | |
|   DistributedEngineComm* dec;
 | |
|   boost::shared_ptr<CalpontSystemCatalog> cat;
 | |
| 
 | |
|   ResourceManager rm;
 | |
|   dec = DistributedEngineComm::instance(rm);
 | |
|   cat = CalpontSystemCatalog::makeCalpontSystemCatalog(1);
 | |
| 
 | |
|   DBRM dbrm;
 | |
|   uint32_t uniqueID = dbrm.getUnique32();
 | |
|   dec->addQueue(uniqueID);
 | |
| 
 | |
|   JobStepAssociation inJs;
 | |
| 
 | |
|   AnyDataListSPtr spdl1(new AnyDataList());
 | |
|   FIFO<UintRowGroup>* dl1 = new FIFO<UintRowGroup>(1, 100);
 | |
|   addElements(dl1);
 | |
| 
 | |
|   spdl1->fifoDL(dl1);
 | |
| 
 | |
|   inJs.outAdd(spdl1);
 | |
| 
 | |
|   DeliveryStep dstep(inJs, JobStepAssociation(), make_table(schema, table), cat, 1, 1, 1, flushInterval);
 | |
| 
 | |
|   runStep(dstep, string("deliveryStep_1"));
 | |
| 
 | |
|   dec->removeQueue(uniqueID);
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Test delivery of a single string column
 | |
| //------------------------------------------------------------------------------
 | |
| /*
 | |
| void deliveryStep_2()   //string column
 | |
| {
 | |
|                 DistributedEngineComm* dec;
 | |
|                 boost::shared_ptr<CalpontSystemCatalog> cat;
 | |
| 
 | |
|                 dec = DistributedEngineComm::instance();
 | |
|                 cat = CalpontSystemCatalog::makeCalpontSystemCatalog(1);
 | |
| 
 | |
|                 dec->addSession(12345);
 | |
|                 dec->addStep(12345, 0);
 | |
| 
 | |
|                 JobStepAssociation inJs;
 | |
| 
 | |
|                 AnyDataListSPtr spdl1(new AnyDataList());
 | |
|                 FIFO<StringElementType>* dl1 = new FIFO<StringElementType>(1,100);
 | |
| 
 | |
|                 addElements(dl1);
 | |
|                 StringElementType e;
 | |
| 
 | |
|                 spdl1->stringDL(dl1);
 | |
| 
 | |
|                 inJs.outAdd(spdl1);
 | |
| 
 | |
|                 DeliveryStep dstep(inJs, JobStepAssociation(),
 | |
|                         make_table(schema, strtable),
 | |
|                         cat, bandSize, 1, 1, 1, flushInterval);
 | |
| 
 | |
|                 runStep(dstep, string("deliveryStep_2"));
 | |
| 
 | |
|                 dec->removeSession(12345);
 | |
| }
 | |
| */
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Drives thread to add elements to the specified FIFO.
 | |
| //------------------------------------------------------------------------------
 | |
| void* addElementsThreadWrapper(void* pThreadData)
 | |
| {
 | |
|   FIFO<UintRowGroup>* dl1 = (FIFO<UintRowGroup>*)pThreadData;
 | |
| 
 | |
|   // cout << "Starting thread to add elements for column " <<
 | |
|   //	dl1->OID() << endl;
 | |
|   addElements(dl1);
 | |
|   // cout << "Finished thread to add elements for column " <<
 | |
|   //	dl1->OID() << endl;
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Test delivery of multiple (numCols) numeric columns
 | |
| //------------------------------------------------------------------------------
 | |
| void deliverTest(int numCols)
 | |
| {
 | |
|   DistributedEngineComm* dec;
 | |
|   boost::shared_ptr<CalpontSystemCatalog> cat;
 | |
|   ResourceManager rm;
 | |
|   dec = DistributedEngineComm::instance(rm);
 | |
|   cat = CalpontSystemCatalog::makeCalpontSystemCatalog(1);
 | |
| 
 | |
|   // Get the oid for the first column in the table - usually lineitem.l_orderkey.
 | |
|   CalpontSystemCatalog::TableName tableName = make_table(schema, colstable);
 | |
|   CalpontSystemCatalog::ROPair p = cat->tableRID(tableName);
 | |
|   startingOid = p.objnum + 1;
 | |
| 
 | |
|   int startingOid = 3416;  // Oid for lineitem.l_orderkey on your database.
 | |
| 
 | |
|   DBRM dbrm;
 | |
|   uint32_t uniqueID = dbrm.getUnique32();
 | |
|   dec->addQueue(uniqueID);
 | |
| 
 | |
|   JobStepAssociation inJs;
 | |
| 
 | |
|   stringstream ss;
 | |
|   pthread_t threads[numCols];
 | |
| 
 | |
|   for (int i = 0; i < numCols; ++i)
 | |
|   {
 | |
|     AnyDataListSPtr spdl1(new AnyDataList());
 | |
| 
 | |
|     // Make FIFO big enough to contain the elements for a flush interval
 | |
|     FIFO<UintRowGroup>* dl1 = new FIFO<UintRowGroup>(1, fifoSize);
 | |
|     //			BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1);
 | |
| 
 | |
|     dl1->OID(i + startingOid);  // lineitem first col object id
 | |
|     spdl1->fifoDL(dl1);
 | |
|     //			spdl1->bandedDL(dl1);
 | |
|     inJs.outAdd(spdl1);
 | |
| 
 | |
|     pthread_create(&threads[i], 0, addElementsThreadWrapper, dl1);
 | |
|   }
 | |
| 
 | |
|   DeliveryStep dstep(inJs, JobStepAssociation(), tableName, cat, 12345, 1, 1, flushInterval);
 | |
| 
 | |
|   ss << "DeliverStep test for " << numCols;
 | |
| 
 | |
|   string message = ss.str();
 | |
| 
 | |
|   runStep(dstep, ss.str());
 | |
| 
 | |
|   for (int i = 0; i < numCols; ++i)
 | |
|   {
 | |
|     pthread_join(threads[i], 0);
 | |
|   }
 | |
| 
 | |
|   dec->removeQueue(uniqueID);
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Perform testcases for a 1 column table, a 2 column table, etc, up to
 | |
| // a table having "maxCols" columns.
 | |
| //------------------------------------------------------------------------------
 | |
| void testSizes()
 | |
| {
 | |
|   // Prompt for schema.
 | |
|   cout << "Enter Schema or Enter for " << schema << ":  ";
 | |
|   string tmpSchema;
 | |
|   getline(cin, tmpSchema);
 | |
| 
 | |
|   if (tmpSchema.length() > 0)
 | |
|   {
 | |
|     schema = tmpSchema;
 | |
|   }
 | |
| 
 | |
|   // Prompt for table.
 | |
|   cout << "Enter Table or Enter for " << colstable << ":  ";
 | |
|   string tmpTable;
 | |
|   getline(cin, tmpTable);
 | |
| 
 | |
|   if (tmpTable.length() > 0)
 | |
|   {
 | |
|     colstable = tmpTable;
 | |
|   }
 | |
| 
 | |
|   timer.start("Total");
 | |
|   int maxCols = 9;
 | |
|   cout << endl;
 | |
| 
 | |
|   for (int i = 7; i <= maxCols; i++)
 | |
|   {
 | |
|     cout << endl << "Running test " << i << " of " << maxCols << endl;
 | |
|     stringstream ss;
 | |
|     ss << "Delivery test for " << dataSize << " rows and " << i << " columns";
 | |
|     timer.start(ss.str());
 | |
|     deliverTest(i);
 | |
|     timer.stop(ss.str());
 | |
|   }
 | |
| 
 | |
|   timer.stop("Total");
 | |
|   timer.finish();
 | |
| }
 | |
| 
 | |
| void* nextBandBenchProducer(void* arg)
 | |
| {
 | |
|   FIFO<UintRowGroup>* dl1 = (FIFO<UintRowGroup>*)arg;
 | |
|   UintRowGroup rg;
 | |
|   uint64_t* arr;
 | |
|   uint32_t i;
 | |
| 
 | |
|   arr = (uint64_t*)rg.et;
 | |
| 
 | |
|   for (i = 0; i < 8192; ++i)
 | |
|     arr[i] = i;
 | |
| 
 | |
|   rg.count = 8192;
 | |
| 
 | |
|   for (i = 1; i <= dataSize / 8192; i++)
 | |
|   {
 | |
|     //		cout << "inserting set " << i << endl;
 | |
|     dl1->insert(rg);
 | |
|   }
 | |
| 
 | |
|   dl1->endOfInput();
 | |
|   return NULL;
 | |
| }
 | |
| 
 | |
| void nextBandBenchmark()
 | |
| {
 | |
|   ByteStream bs;
 | |
|   pthread_t threads[columns];
 | |
|   uint32_t i, rowCount = 1;
 | |
|   JobStepAssociation inJs;
 | |
| 
 | |
|   for (i = 0; i < columns; ++i)
 | |
|   {
 | |
|     AnyDataListSPtr spdl1(new AnyDataList());
 | |
| 
 | |
|     FIFO<UintRowGroup>* dl1 = new FIFO<UintRowGroup>(1, fifoSize);
 | |
| 
 | |
|     dl1->OID(i);  // lineitem first col object id
 | |
|     spdl1->fifoDL(dl1);
 | |
|     inJs.outAdd(spdl1);
 | |
| 
 | |
|     pthread_create(&threads[i], 0, nextBandBenchProducer, dl1);
 | |
|     cout << "started thread " << i << endl;
 | |
|   }
 | |
| 
 | |
|   DeliveryStep ds(inJs, JobStepAssociation(), 8);
 | |
|   stringstream ss;
 | |
| 
 | |
|   ss << "nextBandBenchmark with " << columns << " columns\n";
 | |
| 
 | |
|   timer.start(ss.str());
 | |
| 
 | |
|   while (rowCount != 0)
 | |
|   {
 | |
|     //		cout << "getting a BS\n";
 | |
|     rowCount = ds.nextBand(bs);
 | |
|     bs.restart();
 | |
|     //		cout << "got a BS\n";
 | |
|   }
 | |
| 
 | |
|   timer.stop(ss.str());
 | |
| 
 | |
|   for (i = 0; i < columns; ++i)
 | |
|     pthread_join(threads[i], NULL);
 | |
| }
 | |
| 
 | |
| void queuedBSBenchmark(int queueLength)
 | |
| {
 | |
|   ByteStream* bs;
 | |
|   pthread_t threads[columns];
 | |
|   uint32_t i, rowCount = 1;
 | |
|   JobStepAssociation inJs;
 | |
| 
 | |
|   for (i = 0; i < columns; ++i)
 | |
|   {
 | |
|     AnyDataListSPtr spdl1(new AnyDataList());
 | |
| 
 | |
|     FIFO<UintRowGroup>* dl1 = new FIFO<UintRowGroup>(1, fifoSize);
 | |
| 
 | |
|     dl1->OID(i);  // lineitem first col object id
 | |
|     spdl1->fifoDL(dl1);
 | |
|     inJs.outAdd(spdl1);
 | |
| 
 | |
|     pthread_create(&threads[i], 0, nextBandBenchProducer, dl1);
 | |
|     cout << "started thread " << i << endl;
 | |
|   }
 | |
| 
 | |
|   DeliveryStep ds(inJs, JobStepAssociation(), 8);
 | |
|   BSQueueMgr bsq(&ds, queueLength);
 | |
|   stringstream ss;
 | |
|   ss << "queuedBSBenchmark with " << columns << " columns and " << queueLength << " queue length\n";
 | |
| 
 | |
|   timer.start(ss.str());
 | |
|   boost::thread(BSProjectThread(&bsq));
 | |
| 
 | |
|   while (rowCount != 0)
 | |
|   {
 | |
|     bs = bsq.getNextByteStream(rowCount);
 | |
|     delete bs;
 | |
|   }
 | |
| 
 | |
|   timer.stop(ss.str());
 | |
| 
 | |
|   for (i = 0; i < columns; ++i)
 | |
|     pthread_join(threads[i], NULL);
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Main entry point
 | |
| //------------------------------------------------------------------------------
 | |
| int main(int argc, char** argv)
 | |
| {
 | |
|   if (argc > 1)
 | |
|     columns = atoi(argv[1]);  // override default number of rows
 | |
|   else
 | |
|     columns = 10;
 | |
| 
 | |
|   while (columns > 0)
 | |
|   {
 | |
|     //   testSizes();
 | |
|     nextBandBenchmark();
 | |
| 
 | |
|     queuedBSBenchmark(10);
 | |
|     queuedBSBenchmark(9);
 | |
|     queuedBSBenchmark(8);
 | |
|     queuedBSBenchmark(5);
 | |
|     queuedBSBenchmark(4);
 | |
|     queuedBSBenchmark(1);
 | |
|     timer.finish();
 | |
|     columns--;
 | |
|   }
 | |
| 
 | |
|   return 0;
 | |
| }
 |