1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL4841 dev port run large join without OOM

This commit is contained in:
David Hall
2022-02-09 17:33:55 -06:00
parent d30e140dc3
commit 27dea733c5
34 changed files with 821 additions and 518 deletions

View File

@ -104,6 +104,7 @@ using namespace compress;
using namespace idbdatafile;
#include "mcsconfig.h"
#include "threadnaming.h"
typedef tr1::unordered_set<BRM::OID_t> USOID;
@ -384,6 +385,7 @@ static int updateptrs(char* ptr, FdCacheType_t::iterator fdit)
void* thr_popper(ioManager* arg)
{
utils::setThreadName("thr_popper");
ioManager* iom = arg;
FileBufferMgr* fbm;
int totalRqst = 0;

View File

@ -1148,17 +1148,24 @@ void BatchPrimitiveProcessor::initProcessor()
}
/* This version does a join on projected rows */
void BatchPrimitiveProcessor::executeTupleJoin()
// In order to prevent super size result sets in the case of near cartesian joins on three or more joins,
// the startRid start at 0) is used to begin the rid loop and if we cut off processing early because of
// the size of the result set, we return the next rid to start with. If we finish ridCount rids, return 0-
uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid)
{
uint32_t newRowCount = 0, i, j;
vector<uint32_t> matches;
uint64_t largeKey;
uint64_t resultCount = 0;
uint32_t newStartRid = startRid;
outputRG.getRow(0, &oldRow);
outputRG.getRow(0, &newRow);
// cout << "before join, RG has " << outputRG.getRowCount() << " BPP ridcount= " << ridCount << endl;
for (i = 0; i < ridCount && !sendThread->aborted(); i++, oldRow.nextRow())
// ridCount gets modified based on the number of Rids actually processed during this call.
// origRidCount is the number of rids for this thread after filter, which are the total
// number of rids to be processed from all calls to this function during this thread.
for (i = startRid; i < origRidCount && !sendThread->aborted(); i++, oldRow.nextRow())
{
/* Decide whether this large-side row belongs in the output. The breaks
* in the loop mean that it doesn't.
@ -1265,10 +1272,9 @@ void BatchPrimitiveProcessor::executeTupleJoin()
if (j == joinerCount)
{
uint32_t matchCount;
for (j = 0; j < joinerCount; j++)
{
uint32_t matchCount;
/* The result is already known if...
* -- anti-join with no fcnexp
* -- semi-join with no fcnexp and not scalar
@ -1356,6 +1362,8 @@ void BatchPrimitiveProcessor::executeTupleJoin()
tSmallSideMatches[j][newRowCount].push_back(-1);
matchCount = 1;
}
resultCount += matchCount;
}
/* Finally, copy the row into the output */
@ -1379,8 +1387,18 @@ void BatchPrimitiveProcessor::executeTupleJoin()
// else
// cout << "j != joinerCount\n";
}
// If we've accumulated more than maxResultCount -- 1048576 (2^20)_ of resultCounts, cut off processing.
// The caller will restart to continue where we left off.
if (resultCount >= maxResultCount)
{
newStartRid += newRowCount;
break;
}
}
if (resultCount < maxResultCount)
newStartRid = 0;
ridCount = newRowCount;
outputRG.setRowCount(ridCount);
@ -1397,6 +1415,7 @@ void BatchPrimitiveProcessor::executeTupleJoin()
}
}
*/
return newStartRid;
}
#ifdef PRIMPROC_STOPWATCH
@ -1405,6 +1424,9 @@ void BatchPrimitiveProcessor::execute(StopWatch* stopwatch)
void BatchPrimitiveProcessor::execute()
#endif
{
uint8_t sendCount = 0;
// bool smoreRGs = false;
// uint32_t sStartRid = 0;
uint32_t i, j;
try
@ -1443,6 +1465,7 @@ void BatchPrimitiveProcessor::execute()
// filters use relrids and values for intermediate results.
if (bop == BOP_AND)
{
for (j = 0; j < filterCount; ++j)
{
#ifdef PRIMPROC_STOPWATCH
@ -1453,6 +1476,7 @@ void BatchPrimitiveProcessor::execute()
filterSteps[j]->execute();
#endif
}
}
else // BOP_OR
{
/* XXXPAT: This is a hacky impl of OR logic. Each filter is configured to
@ -1542,10 +1566,12 @@ void BatchPrimitiveProcessor::execute()
// projection commands read relrids and write output directly to a rowgroup
// or the serialized bytestream
if (ot != ROW_GROUP)
{
for (j = 0; j < projectCount; ++j)
{
projectSteps[j]->project();
}
}
else
{
/* Function & Expression group 1 processing
@ -1621,15 +1647,93 @@ void BatchPrimitiveProcessor::execute()
// cout << " no target found for OID " << projectSteps[j]->getOID() <<
//endl;
}
if (fe2)
{
/* functionize this -> processFE2() */
fe2Output.resetRowGroup(baseRid);
fe2Output.getRow(0, &fe2Out);
fe2Input->getRow(0, &fe2In);
// cerr << "input row: " << fe2In.toString() << endl;
for (j = 0; j < outputRG.getRowCount(); j++, fe2In.nextRow())
{
if (fe2->evaluate(&fe2In))
{
applyMapping(fe2Mapping, fe2In, &fe2Out);
// cerr << " passed. output row: " << fe2Out.toString() << endl;
fe2Out.setRid(fe2In.getRelRid());
fe2Output.incRowCount();
fe2Out.nextRow();
}
}
if (!fAggregator)
{
*serialized << (uint8_t)1; // the "count this msg" var
fe2Output.setDBRoot(dbRoot);
fe2Output.serializeRGData(*serialized);
//*serialized << fe2Output.getDataSize();
// serialized->append(fe2Output.getData(), fe2Output.getDataSize());
}
}
if (fAggregator)
{
*serialized << (uint8_t)1; // the "count this msg" var
RowGroup& toAggregate = (fe2 ? fe2Output : outputRG);
// toAggregate.convertToInlineDataInPlace();
if (fe2)
fe2Output.setDBRoot(dbRoot);
else
outputRG.setDBRoot(dbRoot);
fAggregator->addRowGroup(&toAggregate);
if ((currentBlockOffset + 1) == count) // @bug4507, 8k
{
fAggregator->loadResult(*serialized); // @bug4507, 8k
} // @bug4507, 8k
else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k
{
fAggregator->loadEmptySet(*serialized); // @bug4507, 8k
} // @bug4507, 8k
else // @bug4507, 8k
{
fAggregator->loadResult(*serialized); // @bug4507, 8k
fAggregator->aggReset(); // @bug4507, 8k
} // @bug4507, 8k
}
if (!fAggregator && !fe2)
{
*serialized << (uint8_t)1; // the "count this msg" var
outputRG.setDBRoot(dbRoot);
// cerr << "serializing " << outputRG.toString() << endl;
outputRG.serializeRGData(*serialized);
//*serialized << outputRG.getDataSize();
// serialized->append(outputRG.getData(), outputRG.getDataSize());
}
#ifdef PRIMPROC_STOPWATCH
stopwatch->stop("- if(ot != ROW_GROUP) else");
#endif
}
else
else // Is doJoin
{
uint32_t startRid = 0;
ByteStream preamble = *serialized;
origRidCount = ridCount; // ridCount can get modified by executeTupleJoin(). We need to keep track of
// the original val.
/* project the key columns. If there's the filter IN the join, project everything.
Also need to project 'long' strings b/c executeTupleJoin may copy entire rows
using copyRow(), which will try to interpret the uninit'd string ptr.
Valgrind will legitimately complain about copying uninit'd values for the
other types but that is technically safe. */
for (j = 0; j < projectCount; j++)
{
if (keyColumnProj[j] ||
(projectionMap[j] != -1 && (hasJoinFEFilters || oldRow.isLongString(projectionMap[j]))))
{
@ -1639,215 +1743,176 @@ void BatchPrimitiveProcessor::execute()
stopwatch->stop("-- projectIntoRowGroup");
#else
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
#endif
}
#ifdef PRIMPROC_STOPWATCH
stopwatch->start("-- executeTupleJoin()");
executeTupleJoin();
stopwatch->stop("-- executeTupleJoin()");
#else
executeTupleJoin();
#endif
/* project the non-key columns */
for (j = 0; j < projectCount; ++j)
{
if (projectionMap[j] != -1 && !keyColumnProj[j] && !hasJoinFEFilters &&
!oldRow.isLongString(projectionMap[j]))
{
#ifdef PRIMPROC_STOPWATCH
stopwatch->start("-- projectIntoRowGroup");
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
stopwatch->stop("-- projectIntoRowGroup");
#else
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
#endif
}
}
}
/* The RowGroup is fully joined at this point.
Add additional RowGroup processing here.
TODO: Try to clean up all of the switching */
if (doJoin && (fe2 || fAggregator))
{
bool moreRGs = true;
ByteStream preamble = *serialized;
initGJRG();
while (moreRGs && !sendThread->aborted())
do // while (startRid > 0)
{
/*
generate 1 rowgroup (8192 rows max) of joined rows
if there's an FE2, run it
-pack results into a new rowgroup
-if there are < 8192 rows in the new RG, continue
if there's an agg, run it
send the result
*/
resetGJRG();
moreRGs = generateJoinedRowGroup(baseJRow);
*serialized << (uint8_t)!moreRGs;
if (fe2)
#ifdef PRIMPROC_STOPWATCH
stopwatch->start("-- executeTupleJoin()");
startRid = executeTupleJoin(startRid);
stopwatch->stop("-- executeTupleJoin()");
#else
startRid = executeTupleJoin(startRid);
// sStartRid = startRid;
#endif
/* project the non-key columns */
for (j = 0; j < projectCount; ++j)
{
/* functionize this -> processFE2()*/
fe2Output.resetRowGroup(baseRid);
fe2Output.setDBRoot(dbRoot);
fe2Output.getRow(0, &fe2Out);
fe2Input->getRow(0, &fe2In);
for (j = 0; j < joinedRG.getRowCount(); j++, fe2In.nextRow())
if (fe2->evaluate(&fe2In))
{
applyMapping(fe2Mapping, fe2In, &fe2Out);
fe2Out.setRid(fe2In.getRelRid());
fe2Output.incRowCount();
fe2Out.nextRow();
}
if (projectionMap[j] != -1 && !keyColumnProj[j] && !hasJoinFEFilters &&
!oldRow.isLongString(projectionMap[j]))
{
#ifdef PRIMPROC_STOPWATCH
stopwatch->start("-- projectIntoRowGroup");
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
stopwatch->stop("-- projectIntoRowGroup");
#else
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
#endif
}
}
/* The RowGroup is fully joined at this point.
* Add additional RowGroup processing here.
* TODO: Try to clean up all of the switching */
RowGroup& nextRG = (fe2 ? fe2Output : joinedRG);
nextRG.setDBRoot(dbRoot);
if (fAggregator)
if (fe2 || fAggregator)
{
fAggregator->addRowGroup(&nextRG);
bool moreRGs = true;
initGJRG();
if ((currentBlockOffset + 1) == count && moreRGs == false) // @bug4507, 8k
while (moreRGs && !sendThread->aborted())
{
fAggregator->loadResult(*serialized); // @bug4507, 8k
} // @bug4507, 8k
else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k
/*
* generate 1 rowgroup (8192 rows max) of joined rows
* if there's an FE2, run it
* -pack results into a new rowgroup
* -if there are < 8192 rows in the new RG, continue
* if there's an agg, run it
* send the result
*/
resetGJRG();
moreRGs = generateJoinedRowGroup(baseJRow);
// smoreRGs = moreRGs;
sendCount = (uint8_t)(!moreRGs && !startRid);
// *serialized << (uint8_t)(!moreRGs && !startRid); // the "count
// this msg" var
*serialized << sendCount;
if (fe2)
{
/* functionize this -> processFE2()*/
fe2Output.resetRowGroup(baseRid);
fe2Output.setDBRoot(dbRoot);
fe2Output.getRow(0, &fe2Out);
fe2Input->getRow(0, &fe2In);
for (j = 0; j < joinedRG.getRowCount(); j++, fe2In.nextRow())
{
if (fe2->evaluate(&fe2In))
{
applyMapping(fe2Mapping, fe2In, &fe2Out);
fe2Out.setRid(fe2In.getRelRid());
fe2Output.incRowCount();
fe2Out.nextRow();
}
}
}
RowGroup& nextRG = (fe2 ? fe2Output : joinedRG);
nextRG.setDBRoot(dbRoot);
if (fAggregator)
{
fAggregator->addRowGroup(&nextRG);
if ((currentBlockOffset + 1) == count && moreRGs == false && startRid == 0) // @bug4507, 8k
{
fAggregator->loadResult(*serialized); // @bug4507, 8k
} // @bug4507, 8k
else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k
{
fAggregator->loadEmptySet(*serialized); // @bug4507, 8k
} // @bug4507, 8k
else // @bug4507, 8k
{
fAggregator->loadResult(*serialized); // @bug4507, 8k
fAggregator->aggReset(); // @bug4507, 8k
} // @bug4507, 8k
}
else
{
// cerr <<" * serialzing " << nextRG.toString() << endl;
nextRG.serializeRGData(*serialized);
}
/* send the msg & reinit the BS */
if (moreRGs)
{
sendResponse();
serialized.reset(new ByteStream());
*serialized = preamble;
}
}
if (hasSmallOuterJoin)
{
fAggregator->loadEmptySet(*serialized); // @bug4507, 8k
} // @bug4507, 8k
else // @bug4507, 8k
// Should we happen to finish sending data rows right on the boundary of when moreRGs flips off,
// then we need to start a new buffer. I.e., it needs the count this message byte pushed.
if (serialized->length() == preamble.length())
*serialized << (uint8_t)(startRid > 0 ? 0 : 1); // the "count this msg" var
*serialized << ridCount;
for (i = 0; i < joinerCount; i++)
{
for (j = 0; j < ridCount; ++j)
{
serializeInlineVector<uint32_t>(*serialized, tSmallSideMatches[i][j]);
tSmallSideMatches[i][j].clear();
}
}
}
else
{
fAggregator->loadResult(*serialized); // @bug4507, 8k
fAggregator->aggReset(); // @bug4507, 8k
} // @bug4507, 8k
// We hae no more use for this allocation
for (i = 0; i < joinerCount; i++)
for (j = 0; j < ridCount; ++j)
tSmallSideMatches[i][j].clear();
}
}
else
{
// cerr <<" * serialzing " << nextRG.toString() << endl;
nextRG.serializeRGData(*serialized);
}
*serialized << (uint8_t)(startRid > 0 ? 0 : 1); // the "count this msg" var
outputRG.setDBRoot(dbRoot);
// cerr << "serializing " << outputRG.toString() << endl;
outputRG.serializeRGData(*serialized);
/* send the msg & reinit the BS */
if (moreRGs)
//*serialized << outputRG.getDataSize();
// serialized->append(outputRG.getData(), outputRG.getDataSize());
for (i = 0; i < joinerCount; i++)
{
for (j = 0; j < ridCount; ++j)
{
serializeInlineVector<uint32_t>(*serialized, tSmallSideMatches[i][j]);
tSmallSideMatches[i][j].clear();
}
}
}
if (startRid > 0)
{
sendResponse();
serialized.reset(new ByteStream());
*serialized = preamble;
}
}
if (hasSmallOuterJoin)
{
*serialized << ridCount;
for (i = 0; i < joinerCount; i++)
for (j = 0; j < ridCount; ++j)
serializeInlineVector<uint32_t>(*serialized, tSmallSideMatches[i][j]);
}
} while (startRid > 0);
}
if (!doJoin && fe2)
{
/* functionize this -> processFE2() */
fe2Output.resetRowGroup(baseRid);
fe2Output.getRow(0, &fe2Out);
fe2Input->getRow(0, &fe2In);
// cerr << "input row: " << fe2In.toString() << endl;
for (j = 0; j < outputRG.getRowCount(); j++, fe2In.nextRow())
{
if (fe2->evaluate(&fe2In))
{
applyMapping(fe2Mapping, fe2In, &fe2Out);
// cerr << " passed. output row: " << fe2Out.toString() << endl;
fe2Out.setRid(fe2In.getRelRid());
fe2Output.incRowCount();
fe2Out.nextRow();
}
}
if (!fAggregator)
{
*serialized << (uint8_t)1; // the "count this msg" var
fe2Output.setDBRoot(dbRoot);
fe2Output.serializeRGData(*serialized);
//*serialized << fe2Output.getDataSize();
// serialized->append(fe2Output.getData(), fe2Output.getDataSize());
}
}
if (!doJoin && fAggregator)
{
*serialized << (uint8_t)1; // the "count this msg" var
RowGroup& toAggregate = (fe2 ? fe2Output : outputRG);
// toAggregate.convertToInlineDataInPlace();
if (fe2)
fe2Output.setDBRoot(dbRoot);
else
outputRG.setDBRoot(dbRoot);
fAggregator->addRowGroup(&toAggregate);
if ((currentBlockOffset + 1) == count) // @bug4507, 8k
{
fAggregator->loadResult(*serialized); // @bug4507, 8k
} // @bug4507, 8k
else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k
{
fAggregator->loadEmptySet(*serialized); // @bug4507, 8k
} // @bug4507, 8k
else // @bug4507, 8k
{
fAggregator->loadResult(*serialized); // @bug4507, 8k
fAggregator->aggReset(); // @bug4507, 8k
} // @bug4507, 8k
}
if (!fAggregator && !fe2)
{
*serialized << (uint8_t)1; // the "count this msg" var
outputRG.setDBRoot(dbRoot);
// cerr << "serializing " << outputRG.toString() << endl;
outputRG.serializeRGData(*serialized);
//*serialized << outputRG.getDataSize();
// serialized->append(outputRG.getData(), outputRG.getDataSize());
if (doJoin)
{
for (i = 0; i < joinerCount; i++)
{
for (j = 0; j < ridCount; ++j)
{
serializeInlineVector<uint32_t>(*serialized, tSmallSideMatches[i][j]);
}
}
}
}
// clear small side match vector
if (doJoin)
{
for (i = 0; i < joinerCount; i++)
for (j = 0; j < ridCount; ++j)
tSmallSideMatches[i][j].clear();
}
#ifdef PRIMPROC_STOPWATCH
stopwatch->stop("- if(ot != ROW_GROUP) else");
#endif
}
ridCount = origRidCount; // May not be needed, but just to be safe.
// std::cout << "end of send. startRid=" << sStartRid << " moreRG=" << smoreRGs << " sendCount=" <<
// sendCount << std::endl;
if (projectCount > 0 || ot == ROW_GROUP)
{
*serialized << cachedIO;
@ -2187,8 +2252,9 @@ int BatchPrimitiveProcessor::operator()()
if (sendThread->aborted())
break;
if (!sendThread->okToProceed())
if (sendThread->sizeTooBig())
{
// The send buffer is full of messages yet to be sent, so this thread would block anyway.
freeLargeBuffers();
return -1; // the reschedule error code
}

View File

@ -224,6 +224,7 @@ class BatchPrimitiveProcessor
int128_t wide128Values[LOGICAL_BLOCK_RIDS];
boost::scoped_array<uint64_t> absRids;
boost::scoped_array<std::string> strValues;
uint16_t origRidCount;
uint16_t ridCount;
bool needStrValues;
uint16_t wideColumnsWidths;
@ -333,7 +334,7 @@ class BatchPrimitiveProcessor
boost::shared_array<boost::shared_array<boost::shared_ptr<TJoiner>>> tJoiners;
typedef std::vector<uint32_t> MatchedData[LOGICAL_BLOCK_RIDS];
boost::shared_array<MatchedData> tSmallSideMatches;
void executeTupleJoin();
uint32_t executeTupleJoin(uint32_t startRid);
bool getTupleJoinRowGroupData;
std::vector<rowgroup::RowGroup> smallSideRGs;
rowgroup::RowGroup largeSideRG;
@ -432,6 +433,8 @@ class BatchPrimitiveProcessor
uint ptMask;
bool firstInstance;
static const uint64_t maxResultCount = 1048576; // 2^20
friend class Command;
friend class ColumnCommand;
friend class DictStep;

View File

@ -23,16 +23,14 @@
#include <unistd.h>
#include <stdexcept>
#include <mutex>
#include "bppsendthread.h"
using namespace std;
using namespace boost;
#include "atomicops.h"
#include "resourcemanager.h"
namespace primitiveprocessor
{
extern uint32_t connectionsPerUM;
extern uint32_t BPPCount;
BPPSendThread::BPPSendThread()
: die(false)
@ -44,8 +42,8 @@ BPPSendThread::BPPSendThread()
, sawAllConnections(false)
, fcEnabled(false)
, currentByteSize(0)
, maxByteSize(25000000)
{
maxByteSize = joblist::ResourceManager::instance()->getMaxBPPSendQueue();
runner = boost::thread(Runner_t(this));
}
@ -59,36 +57,36 @@ BPPSendThread::BPPSendThread(uint32_t initMsgsLeft)
, sawAllConnections(false)
, fcEnabled(false)
, currentByteSize(0)
, maxByteSize(25000000)
{
maxByteSize = joblist::ResourceManager::instance()->getMaxBPPSendQueue();
runner = boost::thread(Runner_t(this));
}
BPPSendThread::~BPPSendThread()
{
boost::mutex::scoped_lock sl(msgQueueLock);
boost::mutex::scoped_lock sl2(ackLock);
die = true;
queueNotEmpty.notify_one();
okToSend.notify_one();
sl.unlock();
sl2.unlock();
abort();
runner.join();
}
bool BPPSendThread::okToProceed()
{
// keep the queue size below the 100 msg threshold & below the 25MB mark,
// but at least 2 msgs so there is always 1 ready to be sent.
return ((msgQueue.size() < sizeThreshold && currentByteSize < maxByteSize) || msgQueue.size() < 3) && !die;
}
void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection)
{
// Wait for the queue to empty out a bit if it's stuffed full
if (sizeTooBig())
{
std::unique_lock<std::mutex> sl1(respondLock);
while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die)
{
respondWait = true;
fProcessorPool->incBlockedThreads();
okToRespond.wait(sl1);
fProcessorPool->decBlockedThreads();
respondWait = false;
}
}
if (die)
return;
boost::mutex::scoped_lock sl(msgQueueLock);
std::unique_lock<std::mutex> sl(msgQueueLock);
if (gotException)
throw runtime_error(exceptionString);
@ -119,10 +117,23 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection)
void BPPSendThread::sendResults(const vector<Msg_t>& msgs, bool newConnection)
{
// Wait for the queue to empty out a bit if it's stuffed full
if (sizeTooBig())
{
std::unique_lock<std::mutex> sl1(respondLock);
while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die)
{
respondWait = true;
fProcessorPool->incBlockedThreads();
okToRespond.wait(sl1);
fProcessorPool->decBlockedThreads();
respondWait = false;
}
}
if (die)
return;
boost::mutex::scoped_lock sl(msgQueueLock);
std::unique_lock<std::mutex> sl(msgQueueLock);
if (gotException)
throw runtime_error(exceptionString);
@ -157,7 +168,7 @@ void BPPSendThread::sendResults(const vector<Msg_t>& msgs, bool newConnection)
void BPPSendThread::sendMore(int num)
{
boost::mutex::scoped_lock sl(ackLock);
std::unique_lock<std::mutex> sl(ackLock);
// cout << "got an ACK for " << num << " msgsLeft=" << msgsLeft << endl;
if (num == -1)
@ -170,6 +181,7 @@ void BPPSendThread::sendMore(int num)
else
(void)atomicops::atomicAdd(&msgsLeft, num);
sl.unlock();
if (waiting)
okToSend.notify_one();
}
@ -192,7 +204,7 @@ void BPPSendThread::mainLoop()
while (!die)
{
boost::mutex::scoped_lock sl(msgQueueLock);
std::unique_lock<std::mutex> sl(msgQueueLock);
if (msgQueue.empty() && !die)
{
@ -223,8 +235,7 @@ void BPPSendThread::mainLoop()
if (msgsLeft <= 0 && fcEnabled && !die)
{
boost::mutex::scoped_lock sl2(ackLock);
std::unique_lock<std::mutex> sl2(ackLock);
while (msgsLeft <= 0 && fcEnabled && !die)
{
waiting = true;
@ -267,19 +278,26 @@ void BPPSendThread::mainLoop()
(void)atomicops::atomicSub(&currentByteSize, bsSize);
msg[msgsSent].msg.reset();
}
if (respondWait && currentByteSize < maxByteSize)
{
okToRespond.notify_one();
}
}
}
}
void BPPSendThread::abort()
{
boost::mutex::scoped_lock sl(msgQueueLock);
boost::mutex::scoped_lock sl2(ackLock);
std::lock_guard<std::mutex> sl(msgQueueLock);
std::lock_guard<std::mutex> sl2(ackLock);
std::lock_guard<std::mutex> sl3(respondLock);
die = true;
queueNotEmpty.notify_one();
okToSend.notify_one();
sl.unlock();
sl2.unlock();
queueNotEmpty.notify_all();
okToSend.notify_all();
okToRespond.notify_all();
}
} // namespace primitiveprocessor

View File

@ -27,8 +27,9 @@
#include "umsocketselector.h"
#include <queue>
#include <set>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition.hpp>
#include <condition_variable>
#include "threadnaming.h"
#include "prioritythreadpool.h"
namespace primitiveprocessor
{
@ -65,7 +66,14 @@ class BPPSendThread
}
};
bool okToProceed();
bool sizeTooBig()
{
// keep the queue size below the 100 msg threshold & below the 250MB mark,
// but at least 3 msgs so there is always 1 ready to be sent.
return ((msgQueue.size() > sizeThreshold) || (currentByteSize >= maxByteSize && msgQueue.size() > 3)) &&
!die;
}
void sendMore(int num);
void sendResults(const std::vector<Msg_t>& msgs, bool newConnection);
void sendResult(const Msg_t& msg, bool newConnection);
@ -76,6 +84,10 @@ class BPPSendThread
{
return die;
}
void setProcessorPool(threadpool::PriorityThreadPool* processorPool)
{
fProcessorPool = processorPool;
}
private:
BPPSendThread(const BPPSendThread&);
@ -89,21 +101,26 @@ class BPPSendThread
}
void operator()()
{
utils::setThreadName("BPPSendThread");
bppst->mainLoop();
}
};
boost::thread runner;
std::queue<Msg_t> msgQueue;
boost::mutex msgQueueLock;
boost::condition queueNotEmpty;
std::mutex msgQueueLock;
std::condition_variable queueNotEmpty;
volatile bool die, gotException, mainThreadWaiting;
std::string exceptionString;
uint32_t sizeThreshold;
volatile int32_t msgsLeft;
bool waiting;
boost::mutex ackLock;
boost::condition okToSend;
std::mutex ackLock;
std::condition_variable okToSend;
// Condition to prevent run away queue
bool respondWait;
std::mutex respondLock;
std::condition_variable okToRespond;
/* Load balancing structures */
struct Connection_t
@ -130,6 +147,9 @@ class BPPSendThread
/* secondary queue size restriction based on byte size */
volatile uint64_t currentByteSize;
uint64_t maxByteSize;
// Used to tell the PriorityThreadPool It should consider additional threads because a
// queue full event has happened and a thread has been blocked.
threadpool::PriorityThreadPool* fProcessorPool;
};
} // namespace primitiveprocessor

View File

@ -1395,7 +1395,7 @@ struct BPPHandler
SBPPV bppv;
// make the new BPP object
bppv.reset(new BPPV());
bppv.reset(new BPPV(fPrimitiveServerPtr));
bpp.reset(new BatchPrimitiveProcessor(bs, fPrimitiveServerPtr->prefetchThreshold(), bppv->getSendThread(),
fPrimitiveServerPtr->ProcessorThreads()));
@ -1857,7 +1857,7 @@ struct ReadThread
/* Message format:
* ISMPacketHeader
* Partition count - 32 bits
* Partition set - sizeof(LogicalPartition) * count
* Partition set - sizeof(LogicalPartition) boost::shared_ptr* count
* OID count - 32 bits
* OID array - 32 bits * count
*/
@ -1948,8 +1948,7 @@ struct ReadThread
void operator()()
{
utils::setThreadName("PPReadThread");
boost::shared_ptr<threadpool::PriorityThreadPool> procPoolPtr =
fPrimitiveServerPtr->getProcessorThreadPool();
threadpool::PriorityThreadPool* procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool();
SBS bs;
UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
@ -2407,8 +2406,8 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro
fServerpool.setQueueSize(fServerQueueSize);
fServerpool.setName("PrimitiveServer");
fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads,
medPriorityThreads, lowPriorityThreads, 0));
fProcessorPool = new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads,
medPriorityThreads, lowPriorityThreads, 0);
// We're not using either the priority or the job-clustering features, just need a threadpool
// that can reschedule jobs, and an unlimited non-blocking queue
@ -2460,9 +2459,10 @@ void PrimitiveServer::start(Service* service)
cerr << "PrimitiveServer::start() exiting!" << endl;
}
BPPV::BPPV()
BPPV::BPPV(PrimitiveServer* ps)
{
sendThread.reset(new BPPSendThread());
sendThread->setProcessorPool(ps->getProcessorThreadPool());
v.reserve(BPPCount);
pos = 0;
joinDataReceived = false;
@ -2503,7 +2503,7 @@ const vector<boost::shared_ptr<BatchPrimitiveProcessor> >& BPPV::get()
boost::shared_ptr<BatchPrimitiveProcessor> BPPV::next()
{
uint32_t size = v.size();
uint32_t i;
uint32_t i = 0;
#if 0

View File

@ -56,10 +56,12 @@ extern uint32_t highPriorityThreads, medPriorityThreads, lowPriorityThreads;
class BPPSendThread;
class PrimitiveServer;
class BPPV
{
public:
BPPV();
BPPV(PrimitiveServer* ps);
~BPPV();
boost::shared_ptr<BatchPrimitiveProcessor> next();
void add(boost::shared_ptr<BatchPrimitiveProcessor> a);
@ -128,7 +130,7 @@ class PrimitiveServer
/** @brief get a pointer the shared processor thread pool
*/
inline boost::shared_ptr<threadpool::PriorityThreadPool> getProcessorThreadPool() const
inline threadpool::PriorityThreadPool* getProcessorThreadPool() const
{
return fProcessorPool;
}
@ -165,7 +167,7 @@ class PrimitiveServer
/** @brief the thread pool used to process
* primitive commands
*/
boost::shared_ptr<threadpool::PriorityThreadPool> fProcessorPool;
threadpool::PriorityThreadPool* fProcessorPool;
int fServerThreads;
int fServerQueueSize;

View File

@ -75,6 +75,7 @@ using namespace idbdatafile;
#include "mariadb_my_sys.h"
#include "service.h"
#include "threadnaming.h"
class Opt
{
@ -246,6 +247,7 @@ class QszMonThd
void operator()()
{
utils::setThreadName("QszMonThd");
for (;;)
{
uint32_t qd = fPsp->getProcessorThreadPool()->getWaiting();
@ -287,6 +289,7 @@ class QszMonThd
#ifdef DUMP_CACHE_CONTENTS
void* waitForSIGUSR1(void* p)
{
utils::setThreadName("waitForSIGUSR1");
#if defined(__LP64__) || defined(_MSC_VER)
ptrdiff_t tmp = reinterpret_cast<ptrdiff_t>(p);
int cacheCount = static_cast<int>(tmp);