/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016-22 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // // $Id: bppseeder.cpp 2035 2013-01-21 14:12:19Z rdempsey $ // C++ Implementation: bppseeder // // Description: // // // Author: Patrick , (C) 2008 // // Copyright: See COPYING file that comes with this distribution // // #include #include #include #include #include #include "bppseeder.h" #include "primitiveserver.h" #include "pp_logger.h" #include "errorcodes.h" #include "calpontsystemcatalog.h" #include "blockcacheclient.h" #include "mcsconfig.h" using namespace messageqcpp; using namespace std; namespace primitiveprocessor { struct PTLogs { PTLogs(){}; PTLogs(const int t, const char* fname) : thdId(t) { logFD.open(fname, ios_base::app | ios_base::ate); } ~PTLogs() { logFD.close(); } int thdId; ofstream logFD; }; typedef PTLogs PTLogs_t; typedef boost::shared_ptr SPPTLogs_t; typedef std::tr1::unordered_map PTLogsMap_t; PTLogsMap_t gFDList; SPPTLogs_t gLogFD; boost::mutex gFDMutex; // pthread_mutex_t gFDMutex=PTHREAD_MUTEX_INITIALIZER; int gThdCnt = 0; extern dbbc::BlockRequestProcessor** BRPp; extern int fCacheCount; void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, double& tm) { tm = (double)(tv2.tv_sec - tv1.tv_sec) + 1.e-9 * (tv2.tv_nsec - tv1.tv_nsec); } BPPSeeder::BPPSeeder(const SBS& b, const SP_UM_MUTEX& w, const SP_UM_IOSOCK& s, const int pmThreads, const bool trace) : bs(b), writelock(w), sock(s), fPMThreads(pmThreads), fTrace(trace), failCount(0), firstRun(true) { uint8_t* buf = b->buf(); uint32_t pos = sizeof(ISMPacketHeader); sessionID = *((uint32_t*)&buf[pos]); pos += 4; stepID = *((uint32_t*)&buf[pos]); pos += 4; uniqueID = *((uint32_t*)&buf[pos]); pos += 4; _priority = *((uint32_t*)&buf[pos]); dieTime = boost::posix_time::second_clock::universal_time() + boost::posix_time::seconds(100); } BPPSeeder::BPPSeeder(const BPPSeeder& b) : bs(b.bs) , writelock(b.writelock) , sock(b.sock) , fPMThreads(b.fPMThreads) , fTrace(b.fTrace) , uniqueID(b.uniqueID) , sessionID(b.sessionID) , stepID(b.stepID) , failCount(b.failCount) , bpp(b.bpp) , firstRun(b.firstRun) , _priority(b._priority) { } BPPSeeder::~BPPSeeder() { } int BPPSeeder::operator()() { uint32_t pos; const uint8_t* buf = bs->buf(); BPPMap::iterator it; ostringstream logData; struct timespec tm; struct timespec tm2; double tm3 = 0; bool ptLock = false; bool gotBPP = false; PTLogs_t* logFD = NULL; int ret = 0; pthread_t tid = 0; boost::mutex::scoped_lock scoped(bppLock, boost::defer_lock_t()); try { if (firstRun) { pos = sizeof(ISMPacketHeader) - 2; uint16_t status = *((uint16_t*)&buf[pos]); pos += 2; sessionID = *((uint32_t*)&buf[pos]); pos += 4; stepID = *((uint32_t*)&buf[pos]); pos += 4; uniqueID = *((uint32_t*)&buf[pos]); pos += 4; _priority = *((uint32_t*)&buf[pos]); if (0 < status) { error_handling::sendErrorMsg(status, uniqueID, stepID, sock); return ret; } scoped.lock(); if (!bppv) { it = bppMap.find(uniqueID); if (it == bppMap.end()) { /* mitigate a small race between creation and use */ scoped.unlock(); if (boost::posix_time::second_clock::universal_time() > dieTime) { cout << "BPPSeeder::operator(): job for id " << uniqueID << "and session " << sessionID << " has been killed." << endl; return 0; } return -1; } bppv = it->second; } if (bppv->aborted()) return 0; bpp = bppv->next(); scoped.unlock(); if (!bpp) { return -1; // all BPP instances are busy, make threadpool reschedule } gotBPP = true; bpp->resetBPP(*bs, writelock, sock); firstRun = false; } // firstRun if (fTrace) { PTLogsMap_t::iterator it; tid = pthread_self(); // only lock map while inserted objects // once there is an object for each thread // there is not need to lock if (gFDList.size() < (uint32_t)fPMThreads) { gFDMutex.lock(); ptLock = true; } it = gFDList.find(tid); if (it == gFDList.end()) { ostringstream LogFileName; SPPTLogs_t spof; LogFileName << MCSLOGDIR << "/trace/pt." << tid; spof.reset(new PTLogs_t(gThdCnt, LogFileName.str().c_str())); gThdCnt++; // TODO: add error checking if (spof->logFD.is_open()) { gFDList[tid] = spof; logFD = spof.get(); } } else logFD = (*it).second.get(); if (ptLock) { gFDMutex.unlock(); ptLock = false; } clock_gettime(CLOCK_MONOTONIC, &tm); } // if (fTrace) uint32_t retries = 0; restart: try { ret = (*bpp)(); } catch (NeedToRestartJob& e) { ostringstream os; // experimentally the race can exist longer than 10s. "No way" should // it take 10 minutes. If it does, the user will have to resubmit their // query. // 9/27/12 - changed the timeout to 2 mins b/c people report the system // is hung if it does nothing for 10 mins. 2 mins should still be more // than enough if (++retries == 120) { os << e.what() << ": Restarted a syscat job " << retries << " times, bailing\n"; throw NeedToRestartJob(os.str()); } flushSyscatOIDs(); bs->rewind(); bpp->resetBPP(*bs, writelock, sock); sleep(1); goto restart; } if (ret) return ret; if (fTrace) if (logFD && logFD->logFD.is_open()) { clock_gettime(CLOCK_MONOTONIC, &tm2); timespec_sub(tm, tm2, tm3); logFD->logFD << left << setw(3) << logFD->thdId << right << fixed << ((double)(tm.tv_sec + (1.e-9 * tm.tv_nsec))) << " " << right << fixed << tm3 << " " << right << setw(6) << bpp->getSessionID() << " " << right << setw(4) << bpp->getStepID() << " " << right << setw(2) << bpp->FilterCount() << " " << right << setw(2) << bpp->ProjectCount() << " " << right << setw(9) << bpp->PhysIOCount() << " " << right << setw(9) << bpp->CachedIOCount() << " " << right << setw(9) << bpp->BlocksTouchedCount() << endl; } // if (logFD... } catch (scalar_exception& se) { if (gotBPP) bpp->busy(false); if (ptLock) { gFDMutex.unlock(); ptLock = false; } } catch (exception& ex) { if (gotBPP) bpp->busy(false); if (ptLock) { gFDMutex.unlock(); ptLock = false; } catchHandler(ex.what(), uniqueID, stepID); cout << "BPPSeeder step " << stepID << " caught an exception: " << ex.what() << endl; } catch (...) { if (gotBPP) bpp->busy(false); if (ptLock) { gFDMutex.unlock(); ptLock = false; } string msg("BPPSeeder caught an unknown exception"); catchHandler(msg, uniqueID, stepID); cout << msg << endl; } return ret; } void BPPSeeder::catchHandler(const string& ex, uint32_t id, uint32_t step) { Logger log; log.logMessage(ex); error_handling::sendErrorMsg(logging::bppSeederErr, id, step, sock); } bool BPPSeeder::isSysCat() { const uint8_t* buf; uint32_t sessionIDOffset = sizeof(ISMPacketHeader); uint32_t sessionID; buf = bs->buf(); sessionID = *((uint32_t*)&buf[sessionIDOffset]); return (sessionID & 0x80000000); } uint32_t BPPSeeder::getID() { return uniqueID; } /* This is part of the syscat-retry hack. We should get rid of it once we * track down the source of the problem. */ void BPPSeeder::flushSyscatOIDs() { vector syscatOIDs; syscatOIDs = execplan::getAllSysCatOIDs(); for (int i = 0; i < fCacheCount; i++) { dbbc::blockCacheClient bc(*BRPp[i]); bc.flushOIDs((const uint32_t*)&syscatOIDs[0], syscatOIDs.size()); } } }; // namespace primitiveprocessor