/* Copyright (C) 2014 InfiniDB, Inc. * Copyright (C) 2016 MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: weclients.h 525 2010-01-19 23:18:05Z xlou $ // /** @file */ #include #include #include #include #include #include #include #if __FreeBSD__ #include #endif using namespace std; #include using namespace boost; #include "messagequeue.h" #include "bytestream.h" using namespace messageqcpp; #include "configcpp.h" using namespace config; #include "errorids.h" #include "exceptclasses.h" #include "messagelog.h" #include "messageobj.h" #include "loggingid.h" using namespace logging; #include "liboamcpp.h" using namespace oam; #include "we_clients.h" #include "we_messages.h" using namespace WriteEngine; #include "atomicops.h" namespace { void writeToLog(const char* file, int line, const string& msg, LOG_TYPE logto = LOG_TYPE_INFO) { LoggingID lid(05); MessageLog ml(lid); Message::Args args; Message m(0); args.add(file); args.add("@"); args.add(line); args.add(msg); m.format(args); switch (logto) { case LOG_TYPE_DEBUG: ml.logDebugMessage(m); break; case LOG_TYPE_INFO: ml.logInfoMessage(m); break; case LOG_TYPE_WARNING: ml.logWarningMessage(m); break; case LOG_TYPE_ERROR: ml.logErrorMessage(m); break; case LOG_TYPE_CRITICAL: ml.logCriticalMessage(m); break; } } string getModuleNameByIPAddr(oam::ModuleTypeConfig moduletypeconfig, string ipAddress) { string modulename = "pm1"; DeviceNetworkList::iterator pt = moduletypeconfig.ModuleNetworkList.begin(); for (; pt != moduletypeconfig.ModuleNetworkList.end(); pt++) { modulename = (*pt).DeviceName; HostConfigList::iterator pt1 = (*pt).hostConfigList.begin(); for (; pt1 != (*pt).hostConfigList.end(); pt1++) { if (ipAddress == (*pt1).IPAddr) return modulename; } } return modulename; } struct WEClientRunner { WEClientRunner(WriteEngine::WEClients* jl, boost::shared_ptr cl, uint32_t connectionIndex) : jbl(jl), client(cl), connIndex(connectionIndex) { } WriteEngine::WEClients* jbl; boost::shared_ptr client; uint32_t connIndex; void operator()() { // cout << "Listening on client at 0x" << hex << (ptrdiff_t)client << dec << endl; try { jbl->Listen(client, connIndex); // cout << "Listening connIndex " << connIndex << endl; } catch (std::exception& ex) { string what(ex.what()); cerr << "exception caught in WEClient: " << what << endl; if (what.find("St9bad_alloc") != string::npos) { writeToLog(__FILE__, __LINE__, what, LOG_TYPE_CRITICAL); // abort(); } else writeToLog(__FILE__, __LINE__, what); } catch (...) { string msg("exception caught in WEClientRunner."); writeToLog(__FILE__, __LINE__, msg); cerr << msg << endl; } } }; template struct QueueShutdown { void operator()(T& x) { x.shutdown(); } }; } // namespace namespace WriteEngine { WEClients::WEClients(int PrgmID) : fPrgmID(PrgmID), pmCount(0) { closingConnection = 0; Setup(); } WEClients::~WEClients() { Close(); } void WEClients::Setup() { makeBusy(true); joblist::ResourceManager* rm = joblist::ResourceManager::instance(); oam::Oam oam; string ipAddress; ModuleTypeConfig moduletypeconfig; try { oam.getSystemConfig("pm", moduletypeconfig); } catch (...) { writeToLog(__FILE__, __LINE__, "oam.getSystemConfig error, unknown exception", LOG_TYPE_ERROR); throw runtime_error("Setup failed"); } uint32_t pmCountConfig = moduletypeconfig.ModuleCount; pmCount = 0; int moduleID = 1; char buff[32]; ByteStream bs, bsRead; if (fPrgmID == DDLPROC) { bs << (ByteStream::byte)WE_SVR_DDL_KEEPALIVE; bs << (ByteStream::octbyte)moduleID; } else if (fPrgmID == DMLPROC) { bs << (ByteStream::byte)WE_SVR_DML_KEEPALIVE; bs << (ByteStream::octbyte)moduleID; } else if (fPrgmID == SPLITTER) { bs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE; } else if (fPrgmID == BATCHINSERTPROC) { bs << (ByteStream::byte)WE_SVR_BATCH_KEEPALIVE; bs << (ByteStream::octbyte)moduleID; } for (unsigned i = 0; i < pmCountConfig; i++) { // Find the module id moduleID = atoi((moduletypeconfig.ModuleNetworkList[i]) .DeviceName.substr(MAX_MODULE_TYPE_SIZE, MAX_MODULE_ID_SIZE) .c_str()); // cout << "setting connection to moduleid " << moduleID << endl; snprintf(buff, sizeof(buff), "pm%u_WriteEngineServer", moduleID); string fServer(buff); boost::shared_ptr cl(new MessageQueueClient(fServer, rm->getConfig())); boost::shared_ptr nl(new boost::mutex()); // Bug 5224. Take out the retrys. If connection fails, we assume the server is down. try { if (cl->connect()) { try { cl->write(bs); } catch (std::exception& ex1) { ostringstream oss; oss << "Write to WES during connect failed due to " << ex1.what(); throw runtime_error(oss.str()); } try { bsRead = cl->read(); if (bsRead.length() == 0) throw runtime_error("Got byte 0 during reading "); } catch (std::exception& ex2) { ostringstream oss; oss << "Read from WES during connect failed due to " << ex2.what() << " and this = " << this; throw runtime_error(oss.str()); } fPmConnections[moduleID] = cl; // cout << "connection is open. this = " << this << endl; // cout << "set up connection to mudule " << moduleID << endl; // assign the module name // ipAddress = sin_addr2String(cl->serv_addr().sin_addr); ipAddress = cl->addr2String(); cl->moduleName(getModuleNameByIPAddr(moduletypeconfig, ipAddress)); StartClientListener(cl, i); pmCount++; // ostringstream oss; // oss << "WECLIENT: connected to " << fServer + " and this = " << this << " and pmcount is now " << // pmCount; writeToLog(__FILE__, __LINE__, oss.str() , LOG_TYPE_DEBUG); } else { throw runtime_error("Connection refused"); } } catch (std::exception& ex) { writeToLog(__FILE__, __LINE__, "Could not connect to " + fServer + ": " + ex.what(), LOG_TYPE_ERROR); cerr << "Could not connect to " << fServer << ": " << ex.what() << endl; } catch (...) { writeToLog(__FILE__, __LINE__, "Could not connect to " + fServer, LOG_TYPE_ERROR); } } } int WEClients::Close() { makeBusy(false); closingConnection = 1; ByteStream bs; bs << (ByteStream::byte)WE_SVR_CLOSE_CONNECTION; write_to_all(bs); // cout << "connection is closed. this = " << this << " and closingConnection = " << closingConnection << // endl; for (uint32_t i = 0; i < fWESReader.size(); i++) { fWESReader[i]->join(); } fWESReader.clear(); fPmConnections.clear(); pmCount = 0; // ostringstream oss; // oss << "WECLIENT: closed connection to wes and this = " << this << " and pmcount is now " << pmCount; // writeToLog(__FILE__, __LINE__, oss.str() , LOG_TYPE_DEBUG); return 0; } void WEClients::Listen(boost::shared_ptr client, uint32_t connIndex) { SBS sbs; try { while (Busy()) { // TODO: This call blocks so setting Busy() in another thread doesn't work here... sbs = client->read(); if (sbs->length() != 0) { // cout << "adding data to connIndex " << endl; addDataToOutput(sbs, connIndex); } else // got zero bytes on read, nothing more will come { if (closingConnection > 0) { return; } cerr << "WEC got 0 byte message for object " << this << endl; goto Error; } } return; } catch (std::exception& e) { cerr << "WEC Caught EXCEPTION: " << e.what() << endl; goto Error; } catch (...) { cerr << "WEC Caught UNKNOWN EXCEPT" << endl; goto Error; } Error: // error condition! push 0 length bs to messagequeuemap and // eventually let jobstep error out. boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok; sbs.reset(new ByteStream(0U)); for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) { map_tok->second->queue.clear(); (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); map_tok->second->queue.push(sbs); } lk.unlock(); // reset the pmconnection map { boost::mutex::scoped_lock onErrLock(fOnErrMutex); string moduleName = client->moduleName(); ClientList::iterator itor = fPmConnections.begin(); while (itor != fPmConnections.end()) { if (moduleName == (itor->second)->moduleName()) { (fPmConnections[itor->first]).reset(); pmCount--; ostringstream oss; // oss << "WECLIENT: connection to is reset and this = " << this << " and pmcount is decremented."; // writeToLog(__FILE__, __LINE__, oss.str() , LOG_TYPE_DEBUG); } itor++; } } return; } void WEClients::addQueue(uint32_t key) { bool b; boost::mutex* lock = new boost::mutex(); condition* cond = new condition(); boost::shared_ptr mqe(new MQE(pmCount)); mqe->queue = WESMsgQueue(lock, cond); boost::mutex::scoped_lock lk(fMlock); b = fSessionMessages.insert(pair >(key, mqe)).second; if (!b) { ostringstream os; os << "WEClient: attempt to add a queue with a duplicate ID " << key << endl; throw runtime_error(os.str()); } } void WEClients::removeQueue(uint32_t key) { boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) return; map_tok->second->queue.shutdown(); map_tok->second->queue.clear(); fSessionMessages.erase(map_tok); } void WEClients::shutdownQueue(uint32_t key) { boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) return; map_tok->second->queue.shutdown(); map_tok->second->queue.clear(); } void WEClients::read(uint32_t key, SBS& bs) { boost::shared_ptr mqe; // Find the StepMsgQueueList for this session boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(key); if (map_tok == fSessionMessages.end()) { ostringstream os; // cout << " reading for key " << key << " not found" << endl; os << "WEClient: attempt to read(bs) from a nonexistent queue\n"; throw runtime_error(os.str()); } mqe = map_tok->second; lk.unlock(); // this method can block: you can't hold any locks here... (void)mqe->queue.pop(&bs); if (!bs) bs.reset(new ByteStream()); } void WEClients::write(const messageqcpp::ByteStream& msg, uint32_t connection) { if (pmCount == 0) { ostringstream oss; oss << "WECLIENT: There is no connection to WES and this = " << this; writeToLog(__FILE__, __LINE__, oss.str(), LOG_TYPE_DEBUG); throw runtime_error("There is no WriteEngineServer to send message to."); } if (fPmConnections[connection] != 0) fPmConnections[connection]->write(msg); else { ostringstream os; os << "Lost connection to WriteEngineServer on pm" << connection; throw runtime_error(os.str()); } } void WEClients::write_to_all(const messageqcpp::ByteStream& msg) { if (pmCount == 0) { ostringstream oss; oss << "WECLIENT: There is no connection to WES and this = " << this; writeToLog(__FILE__, __LINE__, oss.str(), LOG_TYPE_DEBUG); throw runtime_error("There is no WriteEngineServer to send message to."); } ClientList::iterator itor = fPmConnections.begin(); while (itor != fPmConnections.end()) { if (itor->second != NULL) { itor->second->write(msg); } itor++; } } void WEClients::StartClientListener(boost::shared_ptr cl, uint32_t connIndex) { boost::thread* thrd = new boost::thread(WEClientRunner(this, cl, connIndex)); fWESReader.push_back(thrd); } void WEClients::addDataToOutput(SBS sbs, uint32_t connIndex) { // ISMPacketHeader *hdr = (ISMPacketHeader*)(sbs->buf()); // PrimitiveHeader *p = (PrimitiveHeader *)(hdr+1); // uint32_t uniqueId = p->UniqueID; uint64_t uniqueId = 0; *sbs >> uniqueId; boost::shared_ptr mqe; boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId); if (map_tok == fSessionMessages.end()) { return; } mqe = map_tok->second; lk.unlock(); if (pmCount > 0) { atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]); } (void)mqe->queue.push(sbs); } } // namespace WriteEngine