1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

This patch removes Out-Of-Band pool from PP

This commit is contained in:
Roman Nozdrin
2022-07-01 13:40:53 +00:00
committed by Roman Nozdrin
parent 2c2e1f9924
commit 4d41a945db
5 changed files with 3 additions and 32 deletions

View File

@ -124,7 +124,6 @@ oam::OamCache* oamCache = oam::OamCache::makeOamCache();
// FIXME: there is an anon ns burried later in between 2 named namespaces... // FIXME: there is an anon ns burried later in between 2 named namespaces...
namespace primitiveprocessor namespace primitiveprocessor
{ {
boost::shared_ptr<threadpool::FairThreadPool> OOBPool;
BlockRequestProcessor** BRPp; BlockRequestProcessor** BRPp;
#ifndef _MSC_VER #ifndef _MSC_VER
@ -1236,7 +1235,6 @@ struct BPPHandler
} }
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key); fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
OOBPool->removeJobs(key);
} }
scoped.unlock(); scoped.unlock();
@ -1360,7 +1358,6 @@ struct BPPHandler
scoped.unlock(); scoped.unlock();
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key); fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
OOBPool->removeJobs(key);
return 0; return 0;
} }
@ -1660,7 +1657,6 @@ return 0;
" stepID "<< stepID << endl; " stepID "<< stepID << endl;
*/ */
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID); fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID);
OOBPool->removeJobs(uniqueID);
lk.unlock(); lk.unlock();
deleteDJLock(uniqueID); deleteDJLock(uniqueID);
return 0; return 0;
@ -2138,7 +2134,6 @@ struct ReadThread
txnId = *((uint32_t*)&buf[pos + 2]); txnId = *((uint32_t*)&buf[pos + 2]);
stepID = *((uint32_t*)&buf[pos + 6]); stepID = *((uint32_t*)&buf[pos + 6]);
uniqueID = *((uint32_t*)&buf[pos + 10]); uniqueID = *((uint32_t*)&buf[pos + 10]);
isSyscat = hdr->flags & IS_SYSCAT;
} }
else if (ismHdr->Command == BATCH_PRIMITIVE_RUN) else if (ismHdr->Command == BATCH_PRIMITIVE_RUN)
{ {
@ -2154,7 +2149,6 @@ struct ReadThread
stepID = *((uint32_t*)&buf[pos + 6]); stepID = *((uint32_t*)&buf[pos + 6]);
uniqueID = *((uint32_t*)&buf[pos + 10]); uniqueID = *((uint32_t*)&buf[pos + 10]);
weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]); weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]);
isSyscat = bpps->isSysCat();
} }
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
procPoolPtr->addJob(job); procPoolPtr->addJob(job);
@ -2319,12 +2313,6 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro
fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads,
medPriorityThreads, lowPriorityThreads, 0)); medPriorityThreads, lowPriorityThreads, 0));
// We're not using either the priority or the job-clustering features, just need a threadpool
// that can reschedule jobs, and an unlimited non-blocking queue
OOBPool.reset(new threadpool::FairThreadPool(1, 5, 0, 0, 1));
// Initialize a local pointer.
fOOBPool = OOBPool;
asyncCounter = 0; asyncCounter = 0;
brm = new DBRM(); brm = new DBRM();

View File

@ -48,7 +48,6 @@ extern oam::OamCache* oamCache;
namespace primitiveprocessor namespace primitiveprocessor
{ {
extern boost::shared_ptr<threadpool::FairThreadPool> OOBPool;
extern dbbc::BlockRequestProcessor** BRPp; extern dbbc::BlockRequestProcessor** BRPp;
extern BRM::DBRM* brm; extern BRM::DBRM* brm;
extern boost::mutex bppLock; extern boost::mutex bppLock;
@ -135,12 +134,6 @@ class PrimitiveServer
return fProcessorPool; return fProcessorPool;
} }
inline boost::shared_ptr<threadpool::FairThreadPool> getOOBThreadPool() const
{
return fOOBPool;
}
// int fCacheCount;
int ReadAheadBlocks() const int ReadAheadBlocks() const
{ {
return fReadAheadBlocks; return fReadAheadBlocks;
@ -173,7 +166,6 @@ class PrimitiveServer
* primitive commands * primitive commands
*/ */
boost::shared_ptr<threadpool::FairThreadPool> fProcessorPool; boost::shared_ptr<threadpool::FairThreadPool> fProcessorPool;
boost::shared_ptr<threadpool::FairThreadPool> fOOBPool;
int fServerThreads; int fServerThreads;
int fServerQueueSize; int fServerQueueSize;

View File

@ -24,9 +24,8 @@ class PrimitiveServerThreadPools
{ {
public: public:
PrimitiveServerThreadPools() = default; PrimitiveServerThreadPools() = default;
PrimitiveServerThreadPools(boost::shared_ptr<threadpool::FairThreadPool> primServerThreadPool, PrimitiveServerThreadPools(boost::shared_ptr<threadpool::FairThreadPool> primServerThreadPool)
boost::shared_ptr<threadpool::FairThreadPool> OOBThreadPool) : fPrimServerThreadPool(primServerThreadPool)
: fPrimServerThreadPool(primServerThreadPool), fOOBThreadPool(OOBThreadPool)
{ {
} }
@ -35,12 +34,6 @@ class PrimitiveServerThreadPools
return fPrimServerThreadPool; return fPrimServerThreadPool;
} }
boost::shared_ptr<threadpool::FairThreadPool> getOOBThreadPool()
{
return fOOBThreadPool;
}
private: private:
boost::shared_ptr<threadpool::FairThreadPool> fPrimServerThreadPool; boost::shared_ptr<threadpool::FairThreadPool> fPrimServerThreadPool;
boost::shared_ptr<threadpool::FairThreadPool> fOOBThreadPool;
}; };

View File

@ -725,7 +725,6 @@ int ServicePrimProc::Child()
#endif #endif
primServerThreadPool = server.getProcessorThreadPool(); primServerThreadPool = server.getProcessorThreadPool();
OOBThreadPool = server.getOOBThreadPool();
server.start(this, startupRaceLock); server.start(this, startupRaceLock);

View File

@ -520,8 +520,7 @@ namespace exemgr
statementsRunningCount->incr(stmtCounted); statementsRunningCount->incr(stmtCounted);
PrimitiveServerThreadPools primitiveServerThreadPools( PrimitiveServerThreadPools primitiveServerThreadPools(
ServicePrimProc::instance()->getPrimitiveServerThreadPool(), ServicePrimProc::instance()->getPrimitiveServerThreadPool());
ServicePrimProc::instance()->getOOBThreadPool());
if (tryTuples) if (tryTuples)
{ {