mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
* fix(threadpool): MCOL-5565 queries stuck in FairThreadScheduler. (#3100) Meta Primitive Jobs, .e.g ADD_JOINER, LAST_JOINER stuck in Fair scheduler without out-of-band scheduler. Add OOB scheduler back to remedy the issue. * fix(messageqcpp): MCOL-5636 same node communication crashes transmiting PP errors to EM b/c error messaging leveraged socket that was a nullptr. (#3106) * fix(threadpool): MCOL-5645 errenous threadpool Job ctor implictly sets socket shared_ptr to nullptr causing sigabrt when threadpool returns an error (#3125) --------- Co-authored-by: drrtuy <roman.nozdrin@mariadb.com>
375 lines
8.9 KiB
C++
375 lines
8.9 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2016-22 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
//
|
|
// $Id: bppseeder.cpp 2035 2013-01-21 14:12:19Z rdempsey $
|
|
// C++ Implementation: bppseeder
|
|
//
|
|
// Description:
|
|
//
|
|
//
|
|
// Author: Patrick <pleblanc@localhost.localdomain>, (C) 2008
|
|
//
|
|
// Copyright: See COPYING file that comes with this distribution
|
|
//
|
|
//
|
|
|
|
#include <unistd.h>
|
|
#include <sstream>
|
|
#include <pthread.h>
|
|
#include <boost/date_time/posix_time/posix_time.hpp>
|
|
#include <boost/thread.hpp>
|
|
|
|
#include "bppseeder.h"
|
|
#include "primitiveserver.h"
|
|
#include "pp_logger.h"
|
|
#include "errorcodes.h"
|
|
#include "calpontsystemcatalog.h"
|
|
#include "blockcacheclient.h"
|
|
#include "mcsconfig.h"
|
|
|
|
using namespace messageqcpp;
|
|
using namespace std;
|
|
|
|
namespace primitiveprocessor
|
|
{
|
|
struct PTLogs
|
|
{
|
|
PTLogs(){};
|
|
PTLogs(const int t, const char* fname) : thdId(t)
|
|
{
|
|
logFD.open(fname, ios_base::app | ios_base::ate);
|
|
}
|
|
~PTLogs()
|
|
{
|
|
logFD.close();
|
|
}
|
|
|
|
int thdId;
|
|
ofstream logFD;
|
|
};
|
|
|
|
typedef PTLogs PTLogs_t;
|
|
typedef boost::shared_ptr<PTLogs_t> SPPTLogs_t;
|
|
typedef std::tr1::unordered_map<pthread_t, SPPTLogs_t> PTLogsMap_t;
|
|
|
|
PTLogsMap_t gFDList;
|
|
SPPTLogs_t gLogFD;
|
|
boost::mutex gFDMutex; // pthread_mutex_t gFDMutex=PTHREAD_MUTEX_INITIALIZER;
|
|
int gThdCnt = 0;
|
|
|
|
extern dbbc::BlockRequestProcessor** BRPp;
|
|
extern int fCacheCount;
|
|
|
|
void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, double& tm)
|
|
{
|
|
tm = (double)(tv2.tv_sec - tv1.tv_sec) + 1.e-9 * (tv2.tv_nsec - tv1.tv_nsec);
|
|
}
|
|
|
|
BPPSeeder::BPPSeeder(const SBS& b, const SP_UM_MUTEX& w, const SP_UM_IOSOCK& s, const int pmThreads,
|
|
const bool trace)
|
|
: bs(b), writelock(w), sock(s), fPMThreads(pmThreads), fTrace(trace), failCount(0), firstRun(true)
|
|
{
|
|
uint8_t* buf = b->buf();
|
|
uint32_t pos = sizeof(ISMPacketHeader);
|
|
|
|
sessionID = *((uint32_t*)&buf[pos]);
|
|
pos += 4;
|
|
stepID = *((uint32_t*)&buf[pos]);
|
|
pos += 4;
|
|
uniqueID = *((uint32_t*)&buf[pos]);
|
|
pos += 4;
|
|
_priority = *((uint32_t*)&buf[pos]);
|
|
|
|
dieTime = boost::posix_time::second_clock::universal_time() + boost::posix_time::seconds(100);
|
|
}
|
|
|
|
BPPSeeder::BPPSeeder(const BPPSeeder& b)
|
|
: bs(b.bs)
|
|
, writelock(b.writelock)
|
|
, sock(b.sock)
|
|
, fPMThreads(b.fPMThreads)
|
|
, fTrace(b.fTrace)
|
|
, uniqueID(b.uniqueID)
|
|
, sessionID(b.sessionID)
|
|
, stepID(b.stepID)
|
|
, failCount(b.failCount)
|
|
, bpp(b.bpp)
|
|
, firstRun(b.firstRun)
|
|
, _priority(b._priority)
|
|
{
|
|
}
|
|
|
|
BPPSeeder::~BPPSeeder()
|
|
{
|
|
}
|
|
|
|
int BPPSeeder::operator()()
|
|
{
|
|
uint32_t pos;
|
|
const uint8_t* buf = bs->buf();
|
|
BPPMap::iterator it;
|
|
ostringstream logData;
|
|
struct timespec tm;
|
|
struct timespec tm2;
|
|
double tm3 = 0;
|
|
bool ptLock = false;
|
|
bool gotBPP = false;
|
|
PTLogs_t* logFD = NULL;
|
|
int ret = 0;
|
|
pthread_t tid = 0;
|
|
boost::mutex::scoped_lock scoped(bppLock, boost::defer_lock_t());
|
|
|
|
try
|
|
{
|
|
if (firstRun)
|
|
{
|
|
pos = sizeof(ISMPacketHeader) - 2;
|
|
uint16_t status = *((uint16_t*)&buf[pos]);
|
|
pos += 2;
|
|
|
|
sessionID = *((uint32_t*)&buf[pos]);
|
|
pos += 4;
|
|
stepID = *((uint32_t*)&buf[pos]);
|
|
pos += 4;
|
|
uniqueID = *((uint32_t*)&buf[pos]);
|
|
pos += 4;
|
|
_priority = *((uint32_t*)&buf[pos]);
|
|
|
|
if (0 < status)
|
|
{
|
|
error_handling::sendErrorMsg(status, uniqueID, stepID, sock);
|
|
return ret;
|
|
}
|
|
|
|
scoped.lock();
|
|
|
|
if (!bppv)
|
|
{
|
|
it = bppMap.find(uniqueID);
|
|
|
|
if (it == bppMap.end())
|
|
{
|
|
/* mitigate a small race between creation and use */
|
|
scoped.unlock();
|
|
|
|
if (boost::posix_time::second_clock::universal_time() > dieTime)
|
|
{
|
|
cout << "BPPSeeder::operator(): job for id " << uniqueID << "and session " << sessionID
|
|
<< " has been killed." << endl;
|
|
return 0;
|
|
}
|
|
|
|
return -1;
|
|
}
|
|
|
|
bppv = it->second;
|
|
}
|
|
|
|
if (bppv->aborted())
|
|
return 0;
|
|
|
|
bpp = bppv->next();
|
|
scoped.unlock();
|
|
|
|
if (!bpp)
|
|
{
|
|
return -1; // all BPP instances are busy, make threadpool reschedule
|
|
}
|
|
|
|
gotBPP = true;
|
|
bpp->resetBPP(*bs, writelock, sock);
|
|
firstRun = false;
|
|
} // firstRun
|
|
|
|
if (fTrace)
|
|
{
|
|
PTLogsMap_t::iterator it;
|
|
tid = pthread_self();
|
|
|
|
// only lock map while inserted objects
|
|
// once there is an object for each thread
|
|
// there is not need to lock
|
|
if (gFDList.size() < (uint32_t)fPMThreads)
|
|
{
|
|
gFDMutex.lock();
|
|
ptLock = true;
|
|
}
|
|
|
|
it = gFDList.find(tid);
|
|
|
|
if (it == gFDList.end())
|
|
{
|
|
ostringstream LogFileName;
|
|
SPPTLogs_t spof;
|
|
LogFileName << MCSLOGDIR << "/trace/pt." << tid;
|
|
spof.reset(new PTLogs_t(gThdCnt, LogFileName.str().c_str()));
|
|
gThdCnt++;
|
|
|
|
// TODO: add error checking
|
|
if (spof->logFD.is_open())
|
|
{
|
|
gFDList[tid] = spof;
|
|
logFD = spof.get();
|
|
}
|
|
}
|
|
else
|
|
logFD = (*it).second.get();
|
|
|
|
if (ptLock)
|
|
{
|
|
gFDMutex.unlock();
|
|
ptLock = false;
|
|
}
|
|
|
|
clock_gettime(CLOCK_MONOTONIC, &tm);
|
|
} // if (fTrace)
|
|
|
|
uint32_t retries = 0;
|
|
restart:
|
|
|
|
try
|
|
{
|
|
ret = (*bpp)();
|
|
}
|
|
catch (NeedToRestartJob& e)
|
|
{
|
|
ostringstream os;
|
|
// experimentally the race can exist longer than 10s. "No way" should
|
|
// it take 10 minutes. If it does, the user will have to resubmit their
|
|
// query.
|
|
|
|
// 9/27/12 - changed the timeout to 2 mins b/c people report the system
|
|
// is hung if it does nothing for 10 mins. 2 mins should still be more
|
|
// than enough
|
|
if (++retries == 120)
|
|
{
|
|
os << e.what() << ": Restarted a syscat job " << retries << " times, bailing\n";
|
|
throw NeedToRestartJob(os.str());
|
|
}
|
|
|
|
flushSyscatOIDs();
|
|
bs->rewind();
|
|
bpp->resetBPP(*bs, writelock, sock);
|
|
sleep(1);
|
|
goto restart;
|
|
}
|
|
|
|
if (ret)
|
|
return ret;
|
|
|
|
if (fTrace)
|
|
if (logFD && logFD->logFD.is_open())
|
|
{
|
|
clock_gettime(CLOCK_MONOTONIC, &tm2);
|
|
timespec_sub(tm, tm2, tm3);
|
|
logFD->logFD << left << setw(3) << logFD->thdId << right << fixed
|
|
<< ((double)(tm.tv_sec + (1.e-9 * tm.tv_nsec))) << " " << right << fixed << tm3 << " "
|
|
<< right << setw(6) << bpp->getSessionID() << " " << right << setw(4) << bpp->getStepID()
|
|
<< " " << right << setw(2) << bpp->FilterCount() << " " << right << setw(2)
|
|
<< bpp->ProjectCount() << " " << right << setw(9) << bpp->PhysIOCount() << " " << right
|
|
<< setw(9) << bpp->CachedIOCount() << " " << right << setw(9)
|
|
<< bpp->BlocksTouchedCount() << endl;
|
|
} // if (logFD...
|
|
}
|
|
catch (scalar_exception& se)
|
|
{
|
|
if (gotBPP)
|
|
bpp->busy(false);
|
|
|
|
if (ptLock)
|
|
{
|
|
gFDMutex.unlock();
|
|
ptLock = false;
|
|
}
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
if (gotBPP)
|
|
bpp->busy(false);
|
|
|
|
if (ptLock)
|
|
{
|
|
gFDMutex.unlock();
|
|
ptLock = false;
|
|
}
|
|
|
|
catchHandler(ex.what(), uniqueID, stepID);
|
|
cout << "BPPSeeder step " << stepID << " caught an exception: " << ex.what() << endl;
|
|
}
|
|
catch (...)
|
|
{
|
|
if (gotBPP)
|
|
bpp->busy(false);
|
|
|
|
if (ptLock)
|
|
{
|
|
gFDMutex.unlock();
|
|
ptLock = false;
|
|
}
|
|
|
|
string msg("BPPSeeder caught an unknown exception");
|
|
catchHandler(msg, uniqueID, stepID);
|
|
cout << msg << endl;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
void BPPSeeder::catchHandler(const string& ex, uint32_t id, uint32_t step)
|
|
{
|
|
Logger log;
|
|
log.logMessage(ex);
|
|
|
|
error_handling::sendErrorMsg(logging::bppSeederErr, id, step, sock);
|
|
}
|
|
|
|
bool BPPSeeder::isSysCat()
|
|
{
|
|
const uint8_t* buf;
|
|
uint32_t sessionIDOffset = sizeof(ISMPacketHeader);
|
|
uint32_t sessionID;
|
|
|
|
buf = bs->buf();
|
|
sessionID = *((uint32_t*)&buf[sessionIDOffset]);
|
|
return (sessionID & 0x80000000);
|
|
}
|
|
|
|
uint32_t BPPSeeder::getID()
|
|
{
|
|
return uniqueID;
|
|
}
|
|
|
|
/* This is part of the syscat-retry hack. We should get rid of it once we
|
|
* track down the source of the problem.
|
|
*/
|
|
void BPPSeeder::flushSyscatOIDs()
|
|
{
|
|
vector<BRM::OID_t> syscatOIDs;
|
|
|
|
syscatOIDs = execplan::getAllSysCatOIDs();
|
|
|
|
for (int i = 0; i < fCacheCount; i++)
|
|
{
|
|
dbbc::blockCacheClient bc(*BRPp[i]);
|
|
bc.flushOIDs((const uint32_t*)&syscatOIDs[0], syscatOIDs.size());
|
|
}
|
|
}
|
|
|
|
}; // namespace primitiveprocessor
|