/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: weclients.h 525 2010-01-19 23:18:05Z xlou $ // /** @file */ #pragma once #include #include #include #include #include #include #include #include #include "bytestream.h" //#include "we_message.h" #include "threadsafequeue.h" #include "rwlock_local.h" #include "resourcemanager.h" #define EXPORT namespace WriteEngine { class WEClients { public: /** * Constructors */ EXPORT WEClients(int PrgmID); EXPORT ~WEClients(); // static boost::mutex map_mutex; EXPORT void addQueue(uint32_t key); EXPORT void removeQueue(uint32_t key); EXPORT void shutdownQueue(uint32_t key); /** @brief read a Write Engine Server response * * Returns the next message in the inbound queue for unique ids. * @param bs A pointer to the ByteStream to fill in. * @note: saves a copy vs read(uint32_t, uint32_t). */ EXPORT void read(uint32_t key, messageqcpp::SBS&); /** @brief write function to write to specified PM */ EXPORT void write(const messageqcpp::ByteStream& msg, uint32_t connection); /** @brief write function to write to all PMs */ EXPORT void write_to_all(const messageqcpp::ByteStream& msg); /** @brief Shutdown this object * * Closes all the connections created during Setup() and cleans up other stuff. */ EXPORT int Close(); /** @brief Start listening for Write Engine Server responses * * Starts the current thread listening on the client socket for Write Engine Server response messages. Will * not return until busy() returns false or a zero-length response is received. */ EXPORT void Listen(boost::shared_ptr client, uint32_t connIndex); /** @brief set/unset busy flag * * Set or unset the busy flag so Listen() can return. */ EXPORT void makeBusy(bool b) { fBusy = b; } /** @brief fBusy accessor * */ EXPORT bool Busy() const { return fBusy; } EXPORT void Setup(); uint64_t connectedWEServers() const { return fPmConnections.size(); } /** @brief accessor */ uint32_t getPmCount() { return pmCount; } private: WEClients(const WEClients& weClient); WEClients& operator=(const WEClients& weClient); typedef std::vector ReaderList; typedef std::map > ClientList; // A queue of ByteStreams coming in from Write Engine Server typedef joblist::ThreadSafeQueue WESMsgQueue; /* To keep some state associated with the connection */ struct MQE { MQE(uint32_t pCount) : ackSocketIndex(0), pmCount(pCount) { unackedWork.reset(new volatile uint32_t[pmCount]); memset((void*)unackedWork.get(), 0, pmCount * sizeof(uint32_t)); } WESMsgQueue queue; uint32_t ackSocketIndex; boost::scoped_array unackedWork; uint32_t pmCount; }; // The mapping of session ids to StepMsgQueueLists typedef std::map > MessageQueueMap; void StartClientListener(boost::shared_ptr cl, uint32_t connIndex); /** @brief Add a message to the queue * */ void addDataToOutput(messageqcpp::SBS, uint32_t connIndex); int fPrgmID; ClientList fPmConnections; // all the Write Engine servers ReaderList fWESReader; // all the reader threads for the pm servers MessageQueueMap fSessionMessages; // place to put messages from the pm server to be returned by the Read method boost::mutex fMlock; // sessionMessages mutex std::vector > fWlock; // WES socket write mutexes bool fBusy; volatile uint32_t closingConnection; uint32_t pmCount; boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition boost::mutex ackLock; public: enum { DDLPROC = 0, SPLITTER, DMLPROC, BATCHINSERTPROC }; }; } // namespace WriteEngine #undef EXPORT