1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-1474 Add error handling to PTP

PriorityThreadPool didn't have very good error handling. If something
failed it would just ignore whatever was being processed. This could
lead to a query continuing without retreiving all of the required data.

This patch adds error handling, sending a message back to the client
and a log message. It also destroys and recreates the pool thread.
This commit is contained in:
Andrew Hutchings
2018-06-14 16:28:06 +01:00
parent 250d90a9bc
commit 40405c792a
3 changed files with 98 additions and 8 deletions

View File

@ -1818,12 +1818,22 @@ struct ReadThread
switch(ismHdr->Command) { switch(ismHdr->Command) {
case DICT_CREATE_EQUALITY_FILTER: { case DICT_CREATE_EQUALITY_FILTER: {
PriorityThreadPool::Job job; PriorityThreadPool::Job job;
const uint8_t *buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t *) &buf[pos+6]);
job.uniqueID = *((uint32_t *) &buf[pos+10]);
job.sock = outIos;
job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new CreateEqualityFilter(bs)); job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new CreateEqualityFilter(bs));
OOBPool->addJob(job); OOBPool->addJob(job);
break; break;
} }
case DICT_DESTROY_EQUALITY_FILTER: { case DICT_DESTROY_EQUALITY_FILTER: {
PriorityThreadPool::Job job; PriorityThreadPool::Job job;
const uint8_t *buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t *) &buf[pos+6]);
job.uniqueID = *((uint32_t *) &buf[pos+10]);
job.sock = outIos;
job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new DestroyEqualityFilter(bs)); job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new DestroyEqualityFilter(bs));
OOBPool->addJob(job); OOBPool->addJob(job);
break; break;
@ -1851,6 +1861,11 @@ struct ReadThread
job.id = hdr->Hdr.UniqueID; job.id = hdr->Hdr.UniqueID;
job.weight = LOGICAL_BLOCK_RIDS; job.weight = LOGICAL_BLOCK_RIDS;
job.priority = hdr->Hdr.Priority; job.priority = hdr->Hdr.Priority;
const uint8_t *buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t *) &buf[pos+6]);
job.uniqueID = *((uint32_t *) &buf[pos+10]);
job.sock = outIos;
if (hdr->flags & IS_SYSCAT) { if (hdr->flags & IS_SYSCAT) {
//boost::thread t(DictScanJob(outIos, bs, writeLock)); //boost::thread t(DictScanJob(outIos, bs, writeLock));
// using already-existing threads may cut latency // using already-existing threads may cut latency
@ -1889,6 +1904,12 @@ struct ReadThread
job.id = bpps->getID(); job.id = bpps->getID();
job.weight = ismHdr->Size; job.weight = ismHdr->Size;
job.priority = bpps->priority(); job.priority = bpps->priority();
const uint8_t *buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t *) &buf[pos+6]);
job.uniqueID = *((uint32_t *) &buf[pos+10]);
job.sock = outIos;
if (bpps->isSysCat()) { if (bpps->isSysCat()) {
//boost::thread t(*bpps); //boost::thread t(*bpps);
// using already-existing threads may cut latency // using already-existing threads may cut latency
@ -1904,6 +1925,11 @@ struct ReadThread
case BATCH_PRIMITIVE_CREATE: { case BATCH_PRIMITIVE_CREATE: {
PriorityThreadPool::Job job; PriorityThreadPool::Job job;
job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Create(fBPPHandler, bs)); job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Create(fBPPHandler, bs));
const uint8_t *buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t *) &buf[pos+6]);
job.uniqueID = *((uint32_t *) &buf[pos+10]);
job.sock = outIos;
OOBPool->addJob(job); OOBPool->addJob(job);
//fBPPHandler->createBPP(*bs); //fBPPHandler->createBPP(*bs);
break; break;
@ -1912,6 +1938,11 @@ struct ReadThread
PriorityThreadPool::Job job; PriorityThreadPool::Job job;
job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::AddJoiner(fBPPHandler, bs)); job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::AddJoiner(fBPPHandler, bs));
job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
const uint8_t *buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t *) &buf[pos+6]);
job.uniqueID = *((uint32_t *) &buf[pos+10]);
job.sock = outIos;
OOBPool->addJob(job); OOBPool->addJob(job);
//fBPPHandler->addJoinerToBPP(*bs); //fBPPHandler->addJoinerToBPP(*bs);
break; break;
@ -1923,6 +1954,11 @@ struct ReadThread
PriorityThreadPool::Job job; PriorityThreadPool::Job job;
job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::LastJoiner(fBPPHandler, bs)); job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::LastJoiner(fBPPHandler, bs));
job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
const uint8_t *buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t *) &buf[pos+6]);
job.uniqueID = *((uint32_t *) &buf[pos+10]);
job.sock = outIos;
OOBPool->addJob(job); OOBPool->addJob(job);
break; break;
} }
@ -1932,6 +1968,11 @@ struct ReadThread
PriorityThreadPool::Job job; PriorityThreadPool::Job job;
job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Destroy(fBPPHandler, bs)); job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Destroy(fBPPHandler, bs));
job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
const uint8_t *buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t *) &buf[pos+6]);
job.uniqueID = *((uint32_t *) &buf[pos+10]);
job.sock = outIos;
OOBPool->addJob(job); OOBPool->addJob(job);
//fBPPHandler->destroyBPP(*bs); //fBPPHandler->destroyBPP(*bs);
break; break;
@ -1946,6 +1987,11 @@ struct ReadThread
PriorityThreadPool::Job job; PriorityThreadPool::Job job;
job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Abort(fBPPHandler, bs)); job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Abort(fBPPHandler, bs));
job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
const uint8_t *buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t *) &buf[pos+6]);
job.uniqueID = *((uint32_t *) &buf[pos+10]);
job.sock = outIos;
OOBPool->addJob(job); OOBPool->addJob(job);
break; break;
} }

View File

@ -33,6 +33,8 @@ using namespace logging;
#include "prioritythreadpool.h" #include "prioritythreadpool.h"
using namespace boost; using namespace boost;
#include "dbcon/joblist/primitivemsg.h"
namespace threadpool namespace threadpool
{ {
@ -48,9 +50,9 @@ PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads
threads.create_thread(ThreadHelper(this, LOW)); threads.create_thread(ThreadHelper(this, LOW));
cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads
<< " low.\n"; << " low.\n";
threadCounts[HIGH] = highThreads; defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads;
threadCounts[MEDIUM] = midThreads; defaultThreadCounts[MEDIUM] = threadCounts[MEDIUM] = midThreads;
threadCounts[LOW] = lowThreads; defaultThreadCounts[LOW] = threadCounts[LOW] = lowThreads;
} }
PriorityThreadPool::~PriorityThreadPool() PriorityThreadPool::~PriorityThreadPool()
@ -65,6 +67,23 @@ void PriorityThreadPool::addJob(const Job &job, bool useLock)
if (useLock) if (useLock)
lk.lock(); lk.lock();
// Create any missing threads
if (defaultThreadCounts[HIGH] != threadCounts[HIGH])
{
threads.create_thread(ThreadHelper(this, HIGH));
threadCounts[HIGH]++;
}
if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM])
{
threads.create_thread(ThreadHelper(this, MEDIUM));
threadCounts[MEDIUM]++;
}
if (defaultThreadCounts[LOW] != threadCounts[LOW])
{
threads.create_thread(ThreadHelper(this, LOW));
threadCounts[LOW]++;
}
if (job.priority > 66) if (job.priority > 66)
jobQueues[HIGH].push_back(job); jobQueues[HIGH].push_back(job);
else if (job.priority > 33) else if (job.priority > 33)
@ -110,6 +129,7 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
vector<bool> reschedule; vector<bool> reschedule;
uint32_t rescheduleCount; uint32_t rescheduleCount;
uint32_t queueSize; uint32_t queueSize;
bool running = false;
try try
{ {
@ -143,15 +163,12 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
reschedule.resize(runList.size()); reschedule.resize(runList.size());
rescheduleCount = 0; rescheduleCount = 0;
for (i = 0; i < runList.size() && !_stop; i++) { for (i = 0; i < runList.size() && !_stop; i++) {
try {
reschedule[i] = false; reschedule[i] = false;
running = true;
reschedule[i] = (*(runList[i].functor))(); reschedule[i] = (*(runList[i].functor))();
running = false;
if (reschedule[i]) if (reschedule[i])
rescheduleCount++; rescheduleCount++;
}
catch (std::exception &e) {
cerr << e.what() << endl;
}
} }
// no real work was done, prevent intensive busy waiting // no real work was done, prevent intensive busy waiting
@ -177,6 +194,7 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
// Log the exception and exit this thread // Log the exception and exit this thread
try try
{ {
threadCounts[queue]--;
#ifndef NOLOGGING #ifndef NOLOGGING
logging::Message::Args args; logging::Message::Args args;
logging::Message message(5); logging::Message message(5);
@ -190,6 +208,8 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
ml.logErrorMessage( message ); ml.logErrorMessage( message );
#endif #endif
if (running)
sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock);
} }
catch (...) catch (...)
{ {
@ -201,6 +221,7 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
// Log the exception and exit this thread // Log the exception and exit this thread
try try
{ {
threadCounts[queue]--;
#ifndef NOLOGGING #ifndef NOLOGGING
logging::Message::Args args; logging::Message::Args args;
logging::Message message(6); logging::Message message(6);
@ -213,6 +234,8 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
ml.logErrorMessage( message ); ml.logErrorMessage( message );
#endif #endif
if (running)
sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock);
} }
catch (...) catch (...)
{ {
@ -220,6 +243,21 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
} }
} }
void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock)
{
ISMPacketHeader ism;
PrimitiveHeader ph = {0};
ism.Status = logging::primitiveServerErr;
ph.UniqueID = id;
ph.StepID = step;
ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
msg.append((uint8_t *) &ism, sizeof(ism));
msg.append((uint8_t *) &ph, sizeof(ph));
sock->write(msg);
}
void PriorityThreadPool::stop() void PriorityThreadPool::stop()
{ {
_stop = true; _stop = true;

View File

@ -36,6 +36,7 @@
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <boost/function.hpp> #include <boost/function.hpp>
#include "../winport/winport.h" #include "../winport/winport.h"
#include "primitives/primproc/umsocketselector.h"
namespace threadpool namespace threadpool
{ {
@ -60,6 +61,9 @@ public:
uint32_t weight; uint32_t weight;
uint32_t priority; uint32_t priority;
uint32_t id; uint32_t id;
uint32_t uniqueID;
uint32_t stepID;
primitiveprocessor::SP_UM_IOSOCK sock;
}; };
enum Priority { enum Priority {
@ -105,9 +109,11 @@ private:
Priority pickAQueue(Priority preference); Priority pickAQueue(Priority preference);
void threadFcn(const Priority preferredQueue) throw(); void threadFcn(const Priority preferredQueue) throw();
void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock);
std::list<Job> jobQueues[3]; // higher indexes = higher priority std::list<Job> jobQueues[3]; // higher indexes = higher priority
uint32_t threadCounts[3]; uint32_t threadCounts[3];
uint32_t defaultThreadCounts[3];
boost::mutex mutex; boost::mutex mutex;
boost::condition newJob; boost::condition newJob;
boost::thread_group threads; boost::thread_group threads;