1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-06-12 05:01:56 +03:00

fix(join,threadpool): MCOL-5565: MCOL-5636: MCOL-5645: port from develop-23.02 to [stable-23.10] (#3127)

* fix(threadpool): MCOL-5565 queries stuck in FairThreadScheduler. (#3100)

Meta Primitive Jobs, .e.g ADD_JOINER, LAST_JOINER stuck
	in Fair scheduler without out-of-band scheduler. Add OOB
	scheduler back to remedy the issue.

* fix(messageqcpp): MCOL-5636 same node communication crashes transmiting PP errors to EM b/c error messaging leveraged socket that was a nullptr. (#3106)

* fix(threadpool): MCOL-5645 errenous threadpool Job ctor implictly sets socket shared_ptr to nullptr causing sigabrt when threadpool returns an error (#3125)

---------

Co-authored-by: drrtuy <roman.nozdrin@mariadb.com>
This commit is contained in:
Leonid Fedorov
2024-02-14 14:56:07 +03:00
committed by GitHub
parent 2f29e22ba0
commit 9a9b5f8036
15 changed files with 410 additions and 317 deletions

View File

@ -441,9 +441,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
}
}
#ifdef __FreeBSD__
pthread_mutex_unlock(&objLock);
#endif
}
bs >> filterCount;
@ -593,9 +591,7 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con
memset(asyncLoaded.get(), 0, sizeof(bool) * (projectCount + 2));
buildVSSCache(count);
#ifdef __FreeBSD__
pthread_mutex_unlock(&objLock);
#endif
}
// This version of addToJoiner() is multithreaded. Values are first
@ -834,28 +830,11 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
idbassert(bs.length() == 0);
}
void BatchPrimitiveProcessor::doneSendingJoinerData()
{
/* to get wall-time of hash table construction
if (!firstCallTime.is_not_a_date_time() && !(sessionID & 0x80000000))
{
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
Logger logger;
ostringstream os;
os << "id " << uniqueID << ": joiner construction time = " << now-firstCallTime;
logger.logMessage(os.str());
cout << os.str() << endl;
}
*/
}
int BatchPrimitiveProcessor::endOfJoiner()
{
/* Wait for all joiner elements to be added */
uint32_t i;
size_t currentSize;
// it should be safe to run this without grabbing this lock
// boost::mutex::scoped_lock scoped(addToJoinerLock);
if (endOfJoinerRan)
return 0;
@ -876,34 +855,38 @@ int BatchPrimitiveProcessor::endOfJoiner()
currentSize = 0;
for (uint j = 0; j < processorThreads; ++j)
if (!tJoiners[i] || !tJoiners[i][j])
{
return -1;
}
else
currentSize += tJoiners[i][j]->size();
if (currentSize != tJoinerSizes[i])
{
return -1;
// if ((!tJoiners[i] || tJoiners[i]->size() != tJoinerSizes[i]))
// return -1;
}
}
else
{
currentSize = 0;
for (uint j = 0; j < processorThreads; ++j)
{
if (!tlJoiners[i] || !tlJoiners[i][j])
{
return -1;
}
else
currentSize += tlJoiners[i][j]->size();
}
if (currentSize != tJoinerSizes[i])
{
return -1;
// if ((!tJoiners[i] || tlJoiners[i]->size() != tJoinerSizes[i]))
// return -1;
}
}
}
endOfJoinerRan = true;
#ifndef __FreeBSD__
pthread_mutex_unlock(&objLock);
#endif
return 0;
}
@ -1076,7 +1059,6 @@ void BatchPrimitiveProcessor::initProcessor()
{
for (i = 0; i < (uint32_t)filterCount - 1; ++i)
{
// cout << "prepping filter " << i << endl;
filterSteps[i]->setBatchPrimitiveProcessor(this);
if (filterSteps[i + 1]->getCommandType() == Command::DICT_STEP)
@ -1087,14 +1069,12 @@ void BatchPrimitiveProcessor::initProcessor()
filterSteps[i]->prep(OT_RID, false);
}
// cout << "prepping filter " << i << endl;
filterSteps[i]->setBatchPrimitiveProcessor(this);
filterSteps[i]->prep(OT_BOTH, false);
}
for (i = 0; i < projectCount; ++i)
{
// cout << "prepping projection " << i << endl;
projectSteps[i]->setBatchPrimitiveProcessor(this);
if (noVB)
@ -1120,7 +1100,6 @@ void BatchPrimitiveProcessor::initProcessor()
if (fAggregator.get() != NULL)
{
// fAggRowGroupData.reset(new uint8_t[fAggregateRG.getMaxDataSize()]);
fAggRowGroupData.reinit(fAggregateRG);
fAggregateRG.setData(&fAggRowGroupData);
@ -1944,64 +1923,6 @@ void BatchPrimitiveProcessor::execute()
}
catch (NeedToRestartJob& n)
{
#if 0
/* This block of code will flush the problematic OIDs from the
* cache. It seems to have no effect on the problem, so it's commented
* for now.
*
* This is currently thrown only on syscat queries. If we find the problem
* in user tables also, we should avoid dropping entire OIDs if possible.
*
* In local testing there was no need for flushing, because DDL flushes
* the syscat constantly. However, it can take a long time (>10 s) before
* that happens. Doing it locally should make it much more likely only
* one restart is necessary.
*/
try
{
vector<uint32_t> oids;
uint32_t oid;
for (uint32_t i = 0; i < filterCount; i++)
{
oid = filterSteps[i]->getOID();
if (oid > 0)
oids.push_back(oid);
}
for (uint32_t i = 0; i < projectCount; i++)
{
oid = projectSteps[i]->getOID();
if (oid > 0)
oids.push_back(oid);
}
#if 0
Logger logger;
ostringstream os;
os << "dropping OIDs: ";
for (int i = 0; i < oids.size(); i++)
os << oids[i] << " ";
logger.logMessage(os.str());
#endif
for (int i = 0; i < fCacheCount; i++)
{
dbbc::blockCacheClient bc(*BRPp[i]);
// bc.flushCache();
bc.flushOIDs(&oids[0], oids.size());
}
}
catch (...) { } // doesn't matter if this fails, just avoid crashing
#endif
#ifndef __FreeBSD__
pthread_mutex_unlock(&objLock);
#endif
@ -2132,21 +2053,20 @@ void BatchPrimitiveProcessor::serializeStrings()
void BatchPrimitiveProcessor::sendResponse()
{
auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
// Here is the fast path for local EM to PM interaction. PM puts into the
// input EM DEC queue directly.
// !sock has a 'same host connection' semantics here.
if (initiatedByEM_ && (!sock || exeMgrDecPtr->clientAtTheSameHost(sock)))
// !writelock has a 'same host connection' semantics here.
if (initiatedByEM_ && !writelock)
{
// Flow Control now handles same node connections so the recieving DEC queue
// is limited.
if (sendThread->flowControlEnabled())
{
sendThread->sendResult({serialized, nullptr, nullptr, 0}, false);
sendThread->sendResult({serialized, sock, writelock, 0}, false);
}
else
{
exeMgrDecPtr->addDataToOutput(serialized);
sock->write(serialized);
serialized.reset();
}