1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Leonid Fedorov 83c2408f8d
fix(join, threadpool): MCOL-5565: MCOL-5636: MCOL-5645: port from develop-23.02 to [develop] (#3128)
* fix(threadpool): MCOL-5565 queries stuck in FairThreadScheduler. (#3100)

Meta Primitive Jobs, .e.g ADD_JOINER, LAST_JOINER stuck
	in Fair scheduler without out-of-band scheduler. Add OOB
	scheduler back to remedy the issue.

* fix(messageqcpp): MCOL-5636 same node communication crashes transmiting PP errors to EM b/c error messaging leveraged socket that was a nullptr. (#3106)

* fix(threadpool): MCOL-5645 errenous threadpool Job ctor implictly sets socket shared_ptr to nullptr causing sigabrt when threadpool returns an error (#3125)

---------

Co-authored-by: drrtuy <roman.nozdrin@mariadb.com>
2024-02-13 19:01:16 +03:00

375 lines
8.9 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016-22 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
//
// $Id: bppseeder.cpp 2035 2013-01-21 14:12:19Z rdempsey $
// C++ Implementation: bppseeder
//
// Description:
//
//
// Author: Patrick <pleblanc@localhost.localdomain>, (C) 2008
//
// Copyright: See COPYING file that comes with this distribution
//
//
#include <unistd.h>
#include <sstream>
#include <pthread.h>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread.hpp>
#include "bppseeder.h"
#include "primitiveserver.h"
#include "pp_logger.h"
#include "errorcodes.h"
#include "calpontsystemcatalog.h"
#include "blockcacheclient.h"
#include "mcsconfig.h"
using namespace messageqcpp;
using namespace std;
namespace primitiveprocessor
{
struct PTLogs
{
PTLogs(){};
PTLogs(const int t, const char* fname) : thdId(t)
{
logFD.open(fname, ios_base::app | ios_base::ate);
}
~PTLogs()
{
logFD.close();
}
int thdId;
ofstream logFD;
};
typedef PTLogs PTLogs_t;
typedef boost::shared_ptr<PTLogs_t> SPPTLogs_t;
typedef std::tr1::unordered_map<pthread_t, SPPTLogs_t> PTLogsMap_t;
PTLogsMap_t gFDList;
SPPTLogs_t gLogFD;
boost::mutex gFDMutex; // pthread_mutex_t gFDMutex=PTHREAD_MUTEX_INITIALIZER;
int gThdCnt = 0;
extern dbbc::BlockRequestProcessor** BRPp;
extern int fCacheCount;
void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, double& tm)
{
tm = (double)(tv2.tv_sec - tv1.tv_sec) + 1.e-9 * (tv2.tv_nsec - tv1.tv_nsec);
}
BPPSeeder::BPPSeeder(const SBS& b, const SP_UM_MUTEX& w, const SP_UM_IOSOCK& s, const int pmThreads,
const bool trace)
: bs(b), writelock(w), sock(s), fPMThreads(pmThreads), fTrace(trace), failCount(0), firstRun(true)
{
uint8_t* buf = b->buf();
uint32_t pos = sizeof(ISMPacketHeader);
sessionID = *((uint32_t*)&buf[pos]);
pos += 4;
stepID = *((uint32_t*)&buf[pos]);
pos += 4;
uniqueID = *((uint32_t*)&buf[pos]);
pos += 4;
_priority = *((uint32_t*)&buf[pos]);
dieTime = boost::posix_time::second_clock::universal_time() + boost::posix_time::seconds(100);
}
BPPSeeder::BPPSeeder(const BPPSeeder& b)
: bs(b.bs)
, writelock(b.writelock)
, sock(b.sock)
, fPMThreads(b.fPMThreads)
, fTrace(b.fTrace)
, uniqueID(b.uniqueID)
, sessionID(b.sessionID)
, stepID(b.stepID)
, failCount(b.failCount)
, bpp(b.bpp)
, firstRun(b.firstRun)
, _priority(b._priority)
{
}
BPPSeeder::~BPPSeeder()
{
}
int BPPSeeder::operator()()
{
uint32_t pos;
const uint8_t* buf = bs->buf();
BPPMap::iterator it;
ostringstream logData;
struct timespec tm;
struct timespec tm2;
double tm3 = 0;
bool ptLock = false;
bool gotBPP = false;
PTLogs_t* logFD = NULL;
int ret = 0;
pthread_t tid = 0;
boost::mutex::scoped_lock scoped(bppLock, boost::defer_lock_t());
try
{
if (firstRun)
{
pos = sizeof(ISMPacketHeader) - 2;
uint16_t status = *((uint16_t*)&buf[pos]);
pos += 2;
sessionID = *((uint32_t*)&buf[pos]);
pos += 4;
stepID = *((uint32_t*)&buf[pos]);
pos += 4;
uniqueID = *((uint32_t*)&buf[pos]);
pos += 4;
_priority = *((uint32_t*)&buf[pos]);
if (0 < status)
{
error_handling::sendErrorMsg(status, uniqueID, stepID, sock);
return ret;
}
scoped.lock();
if (!bppv)
{
it = bppMap.find(uniqueID);
if (it == bppMap.end())
{
/* mitigate a small race between creation and use */
scoped.unlock();
if (boost::posix_time::second_clock::universal_time() > dieTime)
{
cout << "BPPSeeder::operator(): job for id " << uniqueID << "and session " << sessionID
<< " has been killed." << endl;
return 0;
}
return -1;
}
bppv = it->second;
}
if (bppv->aborted())
return 0;
bpp = bppv->next();
scoped.unlock();
if (!bpp)
{
return -1; // all BPP instances are busy, make threadpool reschedule
}
gotBPP = true;
bpp->resetBPP(*bs, writelock, sock);
firstRun = false;
} // firstRun
if (fTrace)
{
PTLogsMap_t::iterator it;
tid = pthread_self();
// only lock map while inserted objects
// once there is an object for each thread
// there is not need to lock
if (gFDList.size() < (uint32_t)fPMThreads)
{
gFDMutex.lock();
ptLock = true;
}
it = gFDList.find(tid);
if (it == gFDList.end())
{
ostringstream LogFileName;
SPPTLogs_t spof;
LogFileName << MCSLOGDIR << "/trace/pt." << tid;
spof.reset(new PTLogs_t(gThdCnt, LogFileName.str().c_str()));
gThdCnt++;
// TODO: add error checking
if (spof->logFD.is_open())
{
gFDList[tid] = spof;
logFD = spof.get();
}
}
else
logFD = (*it).second.get();
if (ptLock)
{
gFDMutex.unlock();
ptLock = false;
}
clock_gettime(CLOCK_MONOTONIC, &tm);
} // if (fTrace)
uint32_t retries = 0;
restart:
try
{
ret = (*bpp)();
}
catch (NeedToRestartJob& e)
{
ostringstream os;
// experimentally the race can exist longer than 10s. "No way" should
// it take 10 minutes. If it does, the user will have to resubmit their
// query.
// 9/27/12 - changed the timeout to 2 mins b/c people report the system
// is hung if it does nothing for 10 mins. 2 mins should still be more
// than enough
if (++retries == 120)
{
os << e.what() << ": Restarted a syscat job " << retries << " times, bailing\n";
throw NeedToRestartJob(os.str());
}
flushSyscatOIDs();
bs->rewind();
bpp->resetBPP(*bs, writelock, sock);
sleep(1);
goto restart;
}
if (ret)
return ret;
if (fTrace)
if (logFD && logFD->logFD.is_open())
{
clock_gettime(CLOCK_MONOTONIC, &tm2);
timespec_sub(tm, tm2, tm3);
logFD->logFD << left << setw(3) << logFD->thdId << right << fixed
<< ((double)(tm.tv_sec + (1.e-9 * tm.tv_nsec))) << " " << right << fixed << tm3 << " "
<< right << setw(6) << bpp->getSessionID() << " " << right << setw(4) << bpp->getStepID()
<< " " << right << setw(2) << bpp->FilterCount() << " " << right << setw(2)
<< bpp->ProjectCount() << " " << right << setw(9) << bpp->PhysIOCount() << " " << right
<< setw(9) << bpp->CachedIOCount() << " " << right << setw(9)
<< bpp->BlocksTouchedCount() << endl;
} // if (logFD...
}
catch (scalar_exception& se)
{
if (gotBPP)
bpp->busy(false);
if (ptLock)
{
gFDMutex.unlock();
ptLock = false;
}
}
catch (exception& ex)
{
if (gotBPP)
bpp->busy(false);
if (ptLock)
{
gFDMutex.unlock();
ptLock = false;
}
catchHandler(ex.what(), uniqueID, stepID);
cout << "BPPSeeder step " << stepID << " caught an exception: " << ex.what() << endl;
}
catch (...)
{
if (gotBPP)
bpp->busy(false);
if (ptLock)
{
gFDMutex.unlock();
ptLock = false;
}
string msg("BPPSeeder caught an unknown exception");
catchHandler(msg, uniqueID, stepID);
cout << msg << endl;
}
return ret;
}
void BPPSeeder::catchHandler(const string& ex, uint32_t id, uint32_t step)
{
Logger log;
log.logMessage(ex);
error_handling::sendErrorMsg(logging::bppSeederErr, id, step, sock);
}
bool BPPSeeder::isSysCat()
{
const uint8_t* buf;
uint32_t sessionIDOffset = sizeof(ISMPacketHeader);
uint32_t sessionID;
buf = bs->buf();
sessionID = *((uint32_t*)&buf[sessionIDOffset]);
return (sessionID & 0x80000000);
}
uint32_t BPPSeeder::getID()
{
return uniqueID;
}
/* This is part of the syscat-retry hack. We should get rid of it once we
* track down the source of the problem.
*/
void BPPSeeder::flushSyscatOIDs()
{
vector<BRM::OID_t> syscatOIDs;
syscatOIDs = execplan::getAllSysCatOIDs();
for (int i = 0; i < fCacheCount; i++)
{
dbbc::blockCacheClient bc(*BRPp[i]);
bc.flushOIDs((const uint32_t*)&syscatOIDs[0], syscatOIDs.size());
}
}
}; // namespace primitiveprocessor