1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

WIP with map clean-up

This commit is contained in:
Roman Nozdrin
2022-06-27 19:05:46 +00:00
committed by Roman Nozdrin
parent 0e8014db02
commit a3c582d9fe
3 changed files with 17 additions and 2 deletions

View File

@ -1998,6 +1998,8 @@ struct ReadThread
idbassert(bs->length() >= sizeof(ISMPacketHeader)); idbassert(bs->length() >= sizeof(ISMPacketHeader));
const ISMPacketHeader* ismHdr = reinterpret_cast<const ISMPacketHeader*>(bs->buf()); const ISMPacketHeader* ismHdr = reinterpret_cast<const ISMPacketHeader*>(bs->buf());
// uint64_t someVal = ismHdr->Command;
// std::cout << " PP read thread Command " << someVal << std::endl;
/* This switch is for the OOB commands */ /* This switch is for the OOB commands */
switch (ismHdr->Command) switch (ismHdr->Command)

View File

@ -36,7 +36,6 @@
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include "threadpool.h" #include "threadpool.h"
#include "../../utils/threadpool/fair_threadpool.h"
#include "fair_threadpool.h" #include "fair_threadpool.h"
#include "messagequeue.h" #include "messagequeue.h"
#include "blockrequestprocessor.h" #include "blockrequestprocessor.h"

View File

@ -112,14 +112,23 @@ void FairThreadPool::addJob_(const Job& job, bool useLock)
void FairThreadPool::removeJobs(uint32_t id) void FairThreadPool::removeJobs(uint32_t id)
{ {
// std::cout << "FairThreadPool::removeJobs id " << id << std::endl;
std::unique_lock<std::mutex> lk(mutex); std::unique_lock<std::mutex> lk(mutex);
for (auto& txnJobsMapPair : txn2JobsListMap_) for (auto& txnJobsMapPair : txn2JobsListMap_)
{ {
ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second; ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second;
if (txnJobsList->empty())
{
txn2JobsListMap_.erase(txnJobsMapPair.first);
delete txnJobsList;
continue;
// There is no clean-up for PQ. It will happen later in threadFcn
}
auto job = txnJobsList->begin(); auto job = txnJobsList->begin();
while (job != txnJobsList->end()) while (job != txnJobsList->end())
{ {
// std::cout << "removeJobs() job->id_ " << job->id_ << std::endl;
if (job->id_ == id) if (job->id_ == id)
{ {
job = txnJobsList->erase(job); // update the job iter job = txnJobsList->erase(job); // update the job iter
@ -231,6 +240,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {
// std::cout << "FairThreadPool::threadFcn(): std::exception - no reschedule but send an error" << std::endl;
if (running) if (running)
{ {
jobsRunning_.fetch_sub(1, std::memory_order_relaxed); jobsRunning_.fetch_sub(1, std::memory_order_relaxed);
@ -254,14 +264,18 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
#endif #endif
if (running) if (running)
{
sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_);
}
} }
catch (...) catch (...)
{ {
std::cout << "FairThreadPool::threadFcn(): std::exception - double exception: failed to send an error" << std::endl;
} }
} }
catch (...) catch (...)
{ {
// std::cout << "FairThreadPool::threadFcn(): ... exception - no reschedule but send an error" << std::endl;
// Log the exception and exit this thread // Log the exception and exit this thread
try try
{ {
@ -270,7 +284,6 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
jobsRunning_.fetch_sub(1, std::memory_order_relaxed); jobsRunning_.fetch_sub(1, std::memory_order_relaxed);
} }
threadCounts_.fetch_sub(1, std::memory_order_relaxed); threadCounts_.fetch_sub(1, std::memory_order_relaxed);
;
#ifndef NOLOGGING #ifndef NOLOGGING
logging::Message::Args args; logging::Message::Args args;
logging::Message message(6); logging::Message message(6);
@ -289,6 +302,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
} }
catch (...) catch (...)
{ {
std::cout << "FairThreadPool::threadFcn(): ... exception - double exception: failed to send an error" << std::endl;
} }
} }
} }