/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /****************************************************************************** * $Id$ * *****************************************************************************/ /** @file umsocketselector.h * Used in selecting the "next" socket/port when sending a response message * to a UM module. UmSocketSelector is the public API class. UmModuleIPs * is a supporting class of UmSocketSelector, and UmIPSocketConns is in turn * a supporting class of UmModuleIPs. */ #pragma once #include #include #include #include #include #include #include #include #include #include "iosocket.h" namespace primitiveprocessor { class UmModuleIPs; class UmIPSocketConns; typedef boost::shared_ptr SP_UM_MODIPS; typedef boost::shared_ptr SP_UM_IPCONNS; typedef boost::shared_ptr SP_UM_IOSOCK; typedef boost::shared_ptr SP_UM_MUTEX; //------------------------------------------------------------------------------ /** @brief Public API class used to track and select socket for outgoing msgs. * * This class maintains a list of UM's and the corresponding IP addresses for * each UM. In addition, a list of socket/port connections are maintained for * each IP addresss. nextIOSocket() can be used to iterate through all the IP * addresses and socket/port connections for a given UM, as response messages * are sent to the UM. This action attempts to see that the output network * traffic is evenly distributed among all the NIC's for a UM. * * This class is not entirely thread-safe, as the first call to instance() must * be made from a single threaded envirionment as part of initialization. After * that, the class is thread-safe through the use of a mutex in UmModuleIPs. */ //------------------------------------------------------------------------------ class UmSocketSelector { public: typedef std::map IpAddressUmMap_t; /** @brief Singleton accessor to UmSocketSelector instance. * * This method should be called once from the main thread to perform * initialization from a single threaded environment. */ static UmSocketSelector* instance(); /** @brief UmSocketSelector destructor * */ ~UmSocketSelector() = default; /** @brief Accessor to total number of UM IP's in Columnstore.xml. * * @return Number of UM IP addresses read from Columnstore.xml. */ uint32_t ipAddressCount() const; /** @brief Add a socket/port connection to the connection list. * * @param ios (in) socket/port connection to be added. * @param writeLock (in) mutex to use when writing to ios. * @return boolean indicating if socket/port connection was added. */ bool addConnection(const SP_UM_IOSOCK& ios, const SP_UM_MUTEX& writeLock); /** @brief Delete a socket/port connection from the connection list. * * @param ios (in) socket/port connection to be removed. */ void delConnection(const messageqcpp::IOSocket& ios); /** @brief Get the next output IOSocket to use for the specified ios. * * @param ios (in) socket/port connection for the incoming message. * @param outIos (out) socket/port connection to use for the response. * @param writeLock (out) mutex to use when writing to outIos. * @return boolean indicating if operation was successful. */ bool nextIOSocket(const messageqcpp::IOSocket& ios, SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock); /** @brief toString method used in logging, debugging, etc. * */ const std::string toString() const; private: //...Disable default copy constructor and assignment operator by // declaring but not defining UmSocketSelector(const UmSocketSelector& rhs); UmSocketSelector& operator=(const UmSocketSelector& rhs); UmSocketSelector(); void loadUMModuleInfo(); unsigned int findOrAddUm(const std::string& moduleName); static UmSocketSelector* fpUmSocketSelector; std::vector fUmModuleIPs; // UM's and their sockets // std::map that maps an IP address to an index into fUmModuleIPs IpAddressUmMap_t fIpAddressUmMap; }; //------------------------------------------------------------------------------ /** @brief Tracks and selects "next" socket/port for a UM module. * * This is a supporting class to UmSocketSelector. * This class maintains a list of IP addresses (and their corrresponding * socket/port connections) for a specific UM module. UmModuleIPs can * be used to iterate through the available IP addresses (and socket/port * connections) for a UM module, as response messages are sent out by PrimProc. * * This class is thread-safe. */ //------------------------------------------------------------------------------ class UmModuleIPs { public: /** @brief UmModuleIPs constructor. * * @param moduleName (in) UM module name for this UmModuleIPs object. */ explicit UmModuleIPs(std::string moduleName) : fUmModuleName(std::move(moduleName)), fNextUmIPSocketIdx(NEXT_IP_SOCKET_UNASSIGNED) { } /** @brief UmModuleIPs destructor. * */ ~UmModuleIPs() = default; /** @brief Accessor to number of IP's from Columnstore.xml for this UM. * * @return Number of IP addresses read from Columnstore.xml for this UM. */ uint32_t ipAddressCount() const { return fUmIPSocketConns.size(); } /** @brief Accessor to the module name for this UmModuleIPs object. * * @return UM module name. */ const std::string& moduleName() const { return fUmModuleName; } /** @brief Add an IP address to this UM module. * * @param ip (in) IP address to be added (in network byte order) */ void addIP(in_addr_t ip); /** @brief Add specified socket/port to the connection list for this UM. * * @param ioSock (in) socket/port to add to the connection list. * @param writeLock (in) mutex to use when writing to ioSock. * @return boolean indicating if socket/port connection was added. */ bool addSocketConn(const SP_UM_IOSOCK& ioSock, const SP_UM_MUTEX& writeLock); /** @brief Delete specified socket/port from the connection list. * * @param ioSock (in) socket/port to delete from the connection list. */ void delSocketConn(const messageqcpp::IOSocket& ioSock); /** @brief Get the "next" available socket/port for this UM module. * * @param outIos (out) socket/port connection to use for the response. * @param writeLock (out) mutex to use when writing to outIos. * @return bool flag indicating whether a socket/port was available. */ bool nextIOSocket(SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock); /** @brief toString method used in logging, debugging, etc. * */ const std::string toString(); private: void advanceToNextIP(); static const int32_t NEXT_IP_SOCKET_UNASSIGNED; std::string fUmModuleName; // UM module name int32_t fNextUmIPSocketIdx; // index to "next" IP address boost::mutex fUmModuleMutex; // collection of IP addresses and their corresponding socket/port conns std::vector fUmIPSocketConns; }; //------------------------------------------------------------------------------ /** @brief Tracks and selects "next" socket/port for a UM module IP address. * * This is a supporting class to UmModuleIPs. * This class maintains a list of socket/port connections for a specific UM * module IP address. UmIPSocketConns can be used to iterate through the * available connections for an IP address, as response messages are sent out * by PrimProc. * * This class by itself is not thread-safe. However, UmModulesIPs, which * uses UmIPSocketConns, insures thread-safeness. */ //------------------------------------------------------------------------------ class UmIPSocketConns { public: struct UmIOSocketData { SP_UM_IOSOCK fSock; // an output IOSocket SP_UM_MUTEX fMutex; // mutex to be use when writing to fSock }; /** @brief UmIPSocketConns constructor. * * @param ip (in) IP address for this UmIPSocketConns object. */ explicit UmIPSocketConns(in_addr_t ip) : fIpAddress(ip), fNextIOSocketIdx(NEXT_IOSOCKET_UNASSIGNED) { } /** @brief UmIPSocketConns destructor. * */ ~UmIPSocketConns() = default; /** @brief Accessor to the IP address for this UmIPSocketConns object. * * @return IP address (in network byte order). */ in_addr_t ipAddress() const { return fIpAddress; } /** @brief Accessor to socket/port connection count for this IP address. * * @return socket/port connection count. */ uint32_t count() { return fIOSockets.size(); } /** @brief Add specified socket/port to the connection list. * * @param ioSock (in) socket/port to add to the connection list. * @param writeLock (in) mutex to use when writing to ioSock. */ void addSocketConn(const SP_UM_IOSOCK& ioSock, const SP_UM_MUTEX& writeLock); /** @brief Delete specified socket/port from the connection list. * * @param ioSock (in) socket/port to delete from the connection list. */ void delSocketConn(const messageqcpp::IOSocket& ioSock); /** @brief Get the "next" available socket/port for this IP address. * * @param outIos (out) socket/port connection to use for the response. * @param writeLock (out) mutex to use when writing to outIos. */ void nextIOSocket(SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock); /** @brief Convert network byte ordered IP address to string * * @param ipString (out) IP address string; * ipString must be an array of size INET_ADDRSTRLEN. * @return IP address string (same as ipString) */ static char* nwToString(in_addr_t addr, char* ipString); /** @brief toString method used in logging, debugging, etc. * */ const std::string toString() const; private: static const int32_t NEXT_IOSOCKET_UNASSIGNED; in_addr_t fIpAddress; // IP address in network byte order int32_t fNextIOSocketIdx; // index to "next" socket/port std::vector fIOSockets; // socket/port list for fIpAddress }; } // namespace primitiveprocessor