1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/tdriver-deliver.cpp
2022-01-21 16:43:49 +00:00

850 lines
23 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*****************************************************************************
* $Id: tdriver-deliver.cpp 9210 2013-01-21 14:10:42Z rdempsey $
*
****************************************************************************/
#define BENCHMARK
#include "joblist.h"
#include "jobstep.h"
#include "distributedenginecomm.h"
#include "calpontsystemcatalog.h"
#include "tableband.h"
#include <sys/timeb.h>
#include <iostream>
#include <pthread.h>
#include <boost/scoped_ptr.hpp>
#include "stopwatch.cpp"
#include "bytestream.h"
using namespace messageqcpp;
#include "brm.h"
using namespace BRM;
using namespace std;
using namespace joblist;
using namespace execplan;
string schema("tpch100"); // dmc
string table("lineitem"); // create table d_step(c1 number);
string strtable("d_strstep"); // create table d_strstep(c1 char(10));
string colstable("lineitem"); // create table d_colsstep(c1 number, c2 number, c3number, c4 number, c5
// number, c6 number, c7 number, c8 number, c9 number,);
int dataSize = 100 * 1000 * 1000; // 8 MB
int bandSize = 8192 * 8; // 8 K
int fifoSize = 128;
int startingOid = 3416; // Oid for lineitem.l_orderkey on your database.
// uint32_t flushInterval = 16384; // interval used in flushing table bands
uint32_t flushInterval = 16384; // interval used in flushing table bands
uint32_t columns;
Stopwatch timer;
//******************************************************************************
//******************************************************************************
// Start of Class to manage table band projection in a separate thread
// (modeled after a similar class in the exemgr directory)
#include <queue>
namespace
{
class BSQueueMgr
{
public:
/** @brief BSQueueMgr constructor.
*
* @param joblist (in) JobList where table bands reside
* @param tableOID (in) OID of the table to be projected
* @param maxQueueSize (in) Max # of table bands to be queued up
*/
explicit BSQueueMgr(DeliveryStep* d, int maxQueueSize = 1);
/** @brief TableBandQueueMgr destructor.
*
*/
~BSQueueMgr();
/** @brief Main processing loop that projects the table bands.
*
*/
void project();
/** @brief Called by second thread to acquire projected table band
*
* Acquires the next table band that has been projected, so that
* the calling thread can serialize and forward the table band to
* the next step.
*
* @return Next projected table band. Row count of 0 marks end of data
*/
messageqcpp::ByteStream* getNextByteStream(uint32_t& rowCount);
private:
// Disable these by declaring but not defining
BSQueueMgr(const BSQueueMgr& rhs);
BSQueueMgr& operator=(const BSQueueMgr& rhs);
struct QueueElement
{
messageqcpp::ByteStream* bs;
uint32_t rowCount;
};
DeliveryStep* ds;
unsigned int fMaxQueueSize;
//...Internal queue used to save table bands as they are projected.
std::queue<QueueElement> fBSQueue;
//...Mutex to protect our internal table band queue, and the
//...condition variables used to signal when a table band has
//...been added to, or removed from the queue.
pthread_mutex_t fMutex;
pthread_cond_t fBSAdded;
pthread_cond_t fBSRemoved;
};
BSQueueMgr::BSQueueMgr(DeliveryStep* d, int maxQueueSize) : ds(d), fMaxQueueSize(maxQueueSize)
{
pthread_mutex_init(&fMutex, 0);
pthread_cond_init(&fBSAdded, 0);
pthread_cond_init(&fBSRemoved, 0);
}
//------------------------------------------------------------------------------
// BSQueueMgr destructor
//------------------------------------------------------------------------------
BSQueueMgr::~BSQueueMgr()
{
pthread_mutex_destroy(&fMutex);
pthread_cond_destroy(&fBSAdded);
pthread_cond_destroy(&fBSRemoved);
}
//------------------------------------------------------------------------------
// Contains loop to project the table bands. They are stored into our internal
// queue, until we reach the queue size limit, in which case, we wait for the
// consumer to work off some of the table bands. This will in turn free us up
// to continue adding projected table bands, until we reach a table band with
// a row count of 0, which denotes the end of the table.
//------------------------------------------------------------------------------
void BSQueueMgr::project()
{
bool moreData = true;
//...Stay in loop to project table bands, until we reach a table band
//...having a row count of 0.
while (moreData)
{
uint32_t rowCount;
QueueElement qe;
qe.bs = new ByteStream;
rowCount = ds->nextBand(*(qe.bs));
qe.rowCount = rowCount;
pthread_mutex_lock(&fMutex);
//...Wait for room in our queue before adding this table band
while (fBSQueue.size() >= fMaxQueueSize)
{
pthread_cond_wait(&fBSRemoved, &fMutex);
}
fBSQueue.push(qe);
if (rowCount == 0)
moreData = false;
pthread_cond_broadcast(&fBSAdded);
pthread_mutex_unlock(&fMutex);
}
}
//------------------------------------------------------------------------------
// Extract a projected table band from our internal queue of table bands.
// Returns next projected table band. A row count of 0, marks the table
// band as being the last.
//------------------------------------------------------------------------------
ByteStream* BSQueueMgr::getNextByteStream(uint32_t& rowCount)
{
QueueElement qe;
pthread_mutex_lock(&fMutex);
//...Wait for a table band to be added to our queue if the queue is empty
while (fBSQueue.size() < 1)
{
pthread_cond_wait(&fBSAdded, &fMutex);
}
qe = fBSQueue.front();
fBSQueue.pop();
pthread_cond_broadcast(&fBSRemoved);
pthread_mutex_unlock(&fMutex);
rowCount = qe.rowCount;
return qe.bs;
}
struct BSProjectThread
{
BSProjectThread(BSQueueMgr* pMgr) : fBSQueueMgr(pMgr)
{
}
BSQueueMgr* fBSQueueMgr;
void operator()()
{
fBSQueueMgr->project();
}
};
//
// A class that manages projection of table bands for a specific table
//
// This class manages table band projections for a specified table OID.
// The projections can be performed in a separate thread, allowing the
// serialization of the resulting table band(s) to occur concurrently.
// The projected table band(s) are saved into an internal queue, with the
// size of the queue being controlled through a constructor argument.
//
class TableBandQueueMgr
{
public:
//
// TableBandQueueMgr constructor.
//
// pDStep (in) Delivery step where table bands reside
// maxQueueSize (in) Max # of table bands to be queued up
//
explicit TableBandQueueMgr(DeliveryStep* pDStep, int maxQueueSize = 1)
: fDStep(pDStep), fMaxQueueSize(maxQueueSize)
{
pthread_mutex_init(&fMutex, 0);
pthread_cond_init(&fTableBandAdded, 0);
pthread_cond_init(&fTableBandRemoved, 0);
}
// TableBandQueueMgr destructor.
~TableBandQueueMgr()
{
pthread_mutex_destroy(&fMutex);
pthread_cond_destroy(&fTableBandAdded);
pthread_cond_destroy(&fTableBandRemoved);
}
// Main processing loop that projects the table bands.
void project();
// Called by second thread to acquire projected table band
//
// Acquires the next table band that has been projected, so that
// the calling thread can serialize and forward the table band to
// the next step.
//
// Returns next projected table band. Row count of 0 marks end of data
joblist::TableBand* getNextTableBand();
private:
// Disable these by declaring but not defining
TableBandQueueMgr(const TableBandQueueMgr& rhs);
TableBandQueueMgr& operator=(const TableBandQueueMgr& rhs);
DeliveryStep* fDStep;
unsigned int fMaxQueueSize;
// Internal queue used to save table bands as they are projected.
std::queue<joblist::TableBand*> fTblBandQueue;
// Mutex to protect our internal table band queue, and the
// condition variables used to signal when a table band has
// been added to, or removed from the queue.
pthread_mutex_t fMutex;
pthread_cond_t fTableBandAdded;
pthread_cond_t fTableBandRemoved;
};
//
// Contains loop to project the table bands. They are stored into our internal
// queue, until we reach the queue size limit, in which case, we wait for the
// consumer to work off some of the table bands. This will in turn free us up
// to continue adding projected table bands, until we reach a table band with
// a row count of 0, which denotes the end of the table.
//
void TableBandQueueMgr::project()
{
bool moreData = true;
//...Stay in loop to project table bands, until we reach a table band
//...having a row count of 0.
while (moreData)
{
joblist::TableBand* pTblBand = new joblist::TableBand;
*pTblBand = fDStep->nextBand();
pthread_mutex_lock(&fMutex);
//...Wait for room in our queue before adding this table band
while (fTblBandQueue.size() >= fMaxQueueSize)
{
pthread_cond_wait(&fTableBandRemoved, &fMutex);
}
fTblBandQueue.push(pTblBand);
if (pTblBand->getRowCount() == 0)
moreData = false;
pthread_cond_broadcast(&fTableBandAdded);
pthread_mutex_unlock(&fMutex);
}
}
//
// Extract a projected table band from our internal queue of table bands.
// Returns next projected table band. A row count of 0, marks the table
// band as being the last.
//
joblist::TableBand* TableBandQueueMgr::getNextTableBand()
{
joblist::TableBand* pTblBand = 0;
pthread_mutex_lock(&fMutex);
//...Wait for a table band to be added to our queue if the queue is empty
while (fTblBandQueue.size() < 1)
{
pthread_cond_wait(&fTableBandAdded, &fMutex);
}
pTblBand = fTblBandQueue.front();
fTblBandQueue.pop();
//...If the row count is 0, there is no need to notify our producing
//...thread, since that means we have reached the end of data.
if (pTblBand->getRowCount() != 0)
pthread_cond_broadcast(&fTableBandRemoved);
pthread_mutex_unlock(&fMutex);
return pTblBand;
}
} // end of namespace
// End of Class to manage table band projection in a separate thread
//******************************************************************************
//******************************************************************************
//------------------------------------------------------------------------------
// Drives thread to project table bands for delivery.
//------------------------------------------------------------------------------
void* projectThreadWrapper(void* pThreadData)
{
TableBandQueueMgr* tableBandMgr = (TableBandQueueMgr*)pThreadData;
// cout << "Starting thread to project columns..." << endl;
tableBandMgr->project();
// cout << "Finished thread to project columns..." << endl;
return 0;
}
//------------------------------------------------------------------------------
// Perform column projection and serialization for the given delivery step
//------------------------------------------------------------------------------
void runStep(DeliveryStep& dstep, const string& message)
{
string nextBandMsg(message);
nextBandMsg += " - nextBand()";
string serializeMsg(message);
serializeMsg += " - serialize()";
int nextBandCount = 0;
ByteStream bs;
TableBand tb;
//...Perform table band projection and serialization in succession
#if 0
while (1)
{
// timer.start(nextBandMsg);
tb = dstep.nextBand();
nextBandCount++;
// timer.stop (nextBandMsg);
// timer.start(serializeMsg);
bs.reset();
tb.serialize(bs);
// timer.stop( serializeMsg);
if (tb.getRowCount() == 0)
break;
}
//...Perform table band projection and serialization in parallel
#else
string thrCreateMsg(message);
thrCreateMsg += " - serialize-thrCreate";
string thrJoinMsg(message);
thrJoinMsg += " - serialize-thrJoin";
string serializeWaitMsg(message);
serializeWaitMsg += " - serialize-Wait";
//...Would prefer to record this label in projectThreadWrapper, but
//...Stopwatch is not threadsafe, so safer to put here in main thread,
//...where the other Stopwatch times are recorded. Note that this
//...time will overlap the other timestamps we are recording.
// timer.start(nextBandMsg);
//...Start a second thread that will allow us to perform
//...table projections in parallel with band serialization
// timer.start(thrCreateMsg);
TableBandQueueMgr tableBandMgr(&dstep, 1);
pthread_t projectionThread;
pthread_create(&projectionThread, 0, projectThreadWrapper, &tableBandMgr);
// timer.stop (thrCreateMsg);
while (1)
{
//...The amount of time we spend waiting will help tell us how
//...much extra time is being spent constructing the table bands
// timer.start(serializeWaitMsg);
boost::scoped_ptr<TableBand> band(tableBandMgr.getNextTableBand());
nextBandCount++;
// timer.stop (serializeWaitMsg);
// timer.start(serializeMsg);
bs.reset();
band->serialize(bs);
// timer.stop( serializeMsg);
if (band->getRowCount() == 0)
break;
}
// timer.stop(nextBandMsg);
// timer.start(thrJoinMsg);
pthread_join(projectionThread, 0);
// timer.stop (thrJoinMsg);
#endif
cout << nextBandCount << " table bands delivered" << endl;
}
//------------------------------------------------------------------------------
// Add elements to a BandedDL (outdated version, replaced by FIFO version)
//------------------------------------------------------------------------------
void addElements(BandedDL<ElementType>* dl1)
{
ElementType e;
for (int i = 1; i <= dataSize; i++)
{
e.first = i;
e.second = i;
dl1->insert(e);
}
dl1->endOfInput();
}
//------------------------------------------------------------------------------
// Add string elements to a FifoDL
//------------------------------------------------------------------------------
void addElements(FIFO<StringElementType>* dl1)
{
StringElementType e;
for (int i = 1; i <= dataSize; i++)
{
e.first = i;
e.second = strtable;
dl1->insert(e);
}
dl1->endOfInput();
}
//------------------------------------------------------------------------------
// Add numeric elements to a FifoDL
//------------------------------------------------------------------------------
void addElements(FIFO<UintRowGroup>* dl1)
{
ElementType e;
int wrapCount = 0;
UintRowGroup rg;
for (int i = 1; i <= dataSize; i++)
{
e.first = i;
e.second = i;
rg.et[wrapCount] = e;
wrapCount++;
if (wrapCount == 8192 || i == dataSize)
{
rg.count = wrapCount;
dl1->insert(rg);
wrapCount = 0;
}
}
dl1->endOfInput();
}
//------------------------------------------------------------------------------
// Test delivery of a single numeric column
//------------------------------------------------------------------------------
void deliveryStep_1() // number column
{
DistributedEngineComm* dec;
boost::shared_ptr<CalpontSystemCatalog> cat;
ResourceManager rm;
dec = DistributedEngineComm::instance(rm);
cat = CalpontSystemCatalog::makeCalpontSystemCatalog(1);
DBRM dbrm;
uint32_t uniqueID = dbrm.getUnique32();
dec->addQueue(uniqueID);
JobStepAssociation inJs;
AnyDataListSPtr spdl1(new AnyDataList());
FIFO<UintRowGroup>* dl1 = new FIFO<UintRowGroup>(1, 100);
addElements(dl1);
spdl1->fifoDL(dl1);
inJs.outAdd(spdl1);
DeliveryStep dstep(inJs, JobStepAssociation(), make_table(schema, table), cat, 1, 1, 1, flushInterval);
runStep(dstep, string("deliveryStep_1"));
dec->removeQueue(uniqueID);
}
//------------------------------------------------------------------------------
// Test delivery of a single string column
//------------------------------------------------------------------------------
/*
void deliveryStep_2() //string column
{
DistributedEngineComm* dec;
boost::shared_ptr<CalpontSystemCatalog> cat;
dec = DistributedEngineComm::instance();
cat = CalpontSystemCatalog::makeCalpontSystemCatalog(1);
dec->addSession(12345);
dec->addStep(12345, 0);
JobStepAssociation inJs;
AnyDataListSPtr spdl1(new AnyDataList());
FIFO<StringElementType>* dl1 = new FIFO<StringElementType>(1,100);
addElements(dl1);
StringElementType e;
spdl1->stringDL(dl1);
inJs.outAdd(spdl1);
DeliveryStep dstep(inJs, JobStepAssociation(),
make_table(schema, strtable),
cat, bandSize, 1, 1, 1, flushInterval);
runStep(dstep, string("deliveryStep_2"));
dec->removeSession(12345);
}
*/
//------------------------------------------------------------------------------
// Drives thread to add elements to the specified FIFO.
//------------------------------------------------------------------------------
void* addElementsThreadWrapper(void* pThreadData)
{
FIFO<UintRowGroup>* dl1 = (FIFO<UintRowGroup>*)pThreadData;
// cout << "Starting thread to add elements for column " <<
// dl1->OID() << endl;
addElements(dl1);
// cout << "Finished thread to add elements for column " <<
// dl1->OID() << endl;
return 0;
}
//------------------------------------------------------------------------------
// Test delivery of multiple (numCols) numeric columns
//------------------------------------------------------------------------------
void deliverTest(int numCols)
{
DistributedEngineComm* dec;
boost::shared_ptr<CalpontSystemCatalog> cat;
ResourceManager rm;
dec = DistributedEngineComm::instance(rm);
cat = CalpontSystemCatalog::makeCalpontSystemCatalog(1);
// Get the oid for the first column in the table - usually lineitem.l_orderkey.
CalpontSystemCatalog::TableName tableName = make_table(schema, colstable);
CalpontSystemCatalog::ROPair p = cat->tableRID(tableName);
startingOid = p.objnum + 1;
int startingOid = 3416; // Oid for lineitem.l_orderkey on your database.
DBRM dbrm;
uint32_t uniqueID = dbrm.getUnique32();
dec->addQueue(uniqueID);
JobStepAssociation inJs;
stringstream ss;
pthread_t threads[numCols];
for (int i = 0; i < numCols; ++i)
{
AnyDataListSPtr spdl1(new AnyDataList());
// Make FIFO big enough to contain the elements for a flush interval
FIFO<UintRowGroup>* dl1 = new FIFO<UintRowGroup>(1, fifoSize);
// BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1);
dl1->OID(i + startingOid); // lineitem first col object id
spdl1->fifoDL(dl1);
// spdl1->bandedDL(dl1);
inJs.outAdd(spdl1);
pthread_create(&threads[i], 0, addElementsThreadWrapper, dl1);
}
DeliveryStep dstep(inJs, JobStepAssociation(), tableName, cat, 12345, 1, 1, flushInterval);
ss << "DeliverStep test for " << numCols;
string message = ss.str();
runStep(dstep, ss.str());
for (int i = 0; i < numCols; ++i)
{
pthread_join(threads[i], 0);
}
dec->removeQueue(uniqueID);
}
//------------------------------------------------------------------------------
// Perform testcases for a 1 column table, a 2 column table, etc, up to
// a table having "maxCols" columns.
//------------------------------------------------------------------------------
void testSizes()
{
// Prompt for schema.
cout << "Enter Schema or Enter for " << schema << ": ";
string tmpSchema;
getline(cin, tmpSchema);
if (tmpSchema.length() > 0)
{
schema = tmpSchema;
}
// Prompt for table.
cout << "Enter Table or Enter for " << colstable << ": ";
string tmpTable;
getline(cin, tmpTable);
if (tmpTable.length() > 0)
{
colstable = tmpTable;
}
timer.start("Total");
int maxCols = 9;
cout << endl;
for (int i = 7; i <= maxCols; i++)
{
cout << endl << "Running test " << i << " of " << maxCols << endl;
stringstream ss;
ss << "Delivery test for " << dataSize << " rows and " << i << " columns";
timer.start(ss.str());
deliverTest(i);
timer.stop(ss.str());
}
timer.stop("Total");
timer.finish();
}
void* nextBandBenchProducer(void* arg)
{
FIFO<UintRowGroup>* dl1 = (FIFO<UintRowGroup>*)arg;
UintRowGroup rg;
uint64_t* arr;
uint32_t i;
arr = (uint64_t*)rg.et;
for (i = 0; i < 8192; ++i)
arr[i] = i;
rg.count = 8192;
for (i = 1; i <= dataSize / 8192; i++)
{
// cout << "inserting set " << i << endl;
dl1->insert(rg);
}
dl1->endOfInput();
return NULL;
}
void nextBandBenchmark()
{
ByteStream bs;
pthread_t threads[columns];
uint32_t i, rowCount = 1;
JobStepAssociation inJs;
for (i = 0; i < columns; ++i)
{
AnyDataListSPtr spdl1(new AnyDataList());
FIFO<UintRowGroup>* dl1 = new FIFO<UintRowGroup>(1, fifoSize);
dl1->OID(i); // lineitem first col object id
spdl1->fifoDL(dl1);
inJs.outAdd(spdl1);
pthread_create(&threads[i], 0, nextBandBenchProducer, dl1);
cout << "started thread " << i << endl;
}
DeliveryStep ds(inJs, JobStepAssociation(), 8);
stringstream ss;
ss << "nextBandBenchmark with " << columns << " columns\n";
timer.start(ss.str());
while (rowCount != 0)
{
// cout << "getting a BS\n";
rowCount = ds.nextBand(bs);
bs.restart();
// cout << "got a BS\n";
}
timer.stop(ss.str());
for (i = 0; i < columns; ++i)
pthread_join(threads[i], NULL);
}
void queuedBSBenchmark(int queueLength)
{
ByteStream* bs;
pthread_t threads[columns];
uint32_t i, rowCount = 1;
JobStepAssociation inJs;
for (i = 0; i < columns; ++i)
{
AnyDataListSPtr spdl1(new AnyDataList());
FIFO<UintRowGroup>* dl1 = new FIFO<UintRowGroup>(1, fifoSize);
dl1->OID(i); // lineitem first col object id
spdl1->fifoDL(dl1);
inJs.outAdd(spdl1);
pthread_create(&threads[i], 0, nextBandBenchProducer, dl1);
cout << "started thread " << i << endl;
}
DeliveryStep ds(inJs, JobStepAssociation(), 8);
BSQueueMgr bsq(&ds, queueLength);
stringstream ss;
ss << "queuedBSBenchmark with " << columns << " columns and " << queueLength << " queue length\n";
timer.start(ss.str());
boost::thread(BSProjectThread(&bsq));
while (rowCount != 0)
{
bs = bsq.getNextByteStream(rowCount);
delete bs;
}
timer.stop(ss.str());
for (i = 0; i < columns; ++i)
pthread_join(threads[i], NULL);
}
//------------------------------------------------------------------------------
// Main entry point
//------------------------------------------------------------------------------
int main(int argc, char** argv)
{
if (argc > 1)
columns = atoi(argv[1]); // override default number of rows
else
columns = 10;
while (columns > 0)
{
// testSizes();
nextBandBenchmark();
queuedBSBenchmark(10);
queuedBSBenchmark(9);
queuedBSBenchmark(8);
queuedBSBenchmark(5);
queuedBSBenchmark(4);
queuedBSBenchmark(1);
timer.finish();
columns--;
}
return 0;
}