mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
323 lines
11 KiB
C++
323 lines
11 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/******************************************************************************
|
|
* $Id$
|
|
*
|
|
*****************************************************************************/
|
|
|
|
/** @file umsocketselector.h
|
|
* Used in selecting the "next" socket/port when sending a response message
|
|
* to a UM module. UmSocketSelector is the public API class. UmModuleIPs
|
|
* is a supporting class of UmSocketSelector, and UmIPSocketConns is in turn
|
|
* a supporting class of UmModuleIPs.
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <netinet/in.h>
|
|
#include <list>
|
|
#include <map>
|
|
#include <sstream>
|
|
#include <string>
|
|
#include <utility>
|
|
#include <vector>
|
|
#include <boost/shared_ptr.hpp>
|
|
#include <boost/thread.hpp>
|
|
|
|
#include "iosocket.h"
|
|
|
|
namespace primitiveprocessor
|
|
{
|
|
class UmModuleIPs;
|
|
class UmIPSocketConns;
|
|
|
|
typedef boost::shared_ptr<UmModuleIPs> SP_UM_MODIPS;
|
|
typedef boost::shared_ptr<UmIPSocketConns> SP_UM_IPCONNS;
|
|
typedef boost::shared_ptr<messageqcpp::IOSocket> SP_UM_IOSOCK;
|
|
typedef boost::shared_ptr<boost::mutex> SP_UM_MUTEX;
|
|
|
|
//------------------------------------------------------------------------------
|
|
/** @brief Public API class used to track and select socket for outgoing msgs.
|
|
*
|
|
* This class maintains a list of UM's and the corresponding IP addresses for
|
|
* each UM. In addition, a list of socket/port connections are maintained for
|
|
* each IP addresss. nextIOSocket() can be used to iterate through all the IP
|
|
* addresses and socket/port connections for a given UM, as response messages
|
|
* are sent to the UM. This action attempts to see that the output network
|
|
* traffic is evenly distributed among all the NIC's for a UM.
|
|
*
|
|
* This class is not entirely thread-safe, as the first call to instance() must
|
|
* be made from a single threaded envirionment as part of initialization. After
|
|
* that, the class is thread-safe through the use of a mutex in UmModuleIPs.
|
|
*/
|
|
//------------------------------------------------------------------------------
|
|
class UmSocketSelector
|
|
{
|
|
public:
|
|
typedef std::map<in_addr_t, unsigned int> IpAddressUmMap_t;
|
|
|
|
/** @brief Singleton accessor to UmSocketSelector instance.
|
|
*
|
|
* This method should be called once from the main thread to perform
|
|
* initialization from a single threaded environment.
|
|
*/
|
|
static UmSocketSelector* instance();
|
|
|
|
/** @brief UmSocketSelector destructor
|
|
*
|
|
*/
|
|
~UmSocketSelector() = default;
|
|
|
|
/** @brief Accessor to total number of UM IP's in Columnstore.xml.
|
|
*
|
|
* @return Number of UM IP addresses read from Columnstore.xml.
|
|
*/
|
|
uint32_t ipAddressCount() const;
|
|
|
|
/** @brief Add a socket/port connection to the connection list.
|
|
*
|
|
* @param ios (in) socket/port connection to be added.
|
|
* @param writeLock (in) mutex to use when writing to ios.
|
|
* @return boolean indicating if socket/port connection was added.
|
|
*/
|
|
bool addConnection(const SP_UM_IOSOCK& ios, const SP_UM_MUTEX& writeLock);
|
|
|
|
/** @brief Delete a socket/port connection from the connection list.
|
|
*
|
|
* @param ios (in) socket/port connection to be removed.
|
|
*/
|
|
void delConnection(const messageqcpp::IOSocket& ios);
|
|
|
|
/** @brief Get the next output IOSocket to use for the specified ios.
|
|
*
|
|
* @param ios (in) socket/port connection for the incoming message.
|
|
* @param outIos (out) socket/port connection to use for the response.
|
|
* @param writeLock (out) mutex to use when writing to outIos.
|
|
* @return boolean indicating if operation was successful.
|
|
*/
|
|
bool nextIOSocket(const messageqcpp::IOSocket& ios, SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock);
|
|
|
|
/** @brief toString method used in logging, debugging, etc.
|
|
*
|
|
*/
|
|
const std::string toString() const;
|
|
|
|
private:
|
|
//...Disable default copy constructor and assignment operator by
|
|
// declaring but not defining
|
|
UmSocketSelector(const UmSocketSelector& rhs);
|
|
UmSocketSelector& operator=(const UmSocketSelector& rhs);
|
|
|
|
UmSocketSelector();
|
|
void loadUMModuleInfo();
|
|
unsigned int findOrAddUm(const std::string& moduleName);
|
|
|
|
static UmSocketSelector* fpUmSocketSelector;
|
|
std::vector<SP_UM_MODIPS> fUmModuleIPs; // UM's and their sockets
|
|
|
|
// std::map that maps an IP address to an index into fUmModuleIPs
|
|
IpAddressUmMap_t fIpAddressUmMap;
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
/** @brief Tracks and selects "next" socket/port for a UM module.
|
|
*
|
|
* This is a supporting class to UmSocketSelector.
|
|
* This class maintains a list of IP addresses (and their corrresponding
|
|
* socket/port connections) for a specific UM module. UmModuleIPs can
|
|
* be used to iterate through the available IP addresses (and socket/port
|
|
* connections) for a UM module, as response messages are sent out by PrimProc.
|
|
*
|
|
* This class is thread-safe.
|
|
*/
|
|
//------------------------------------------------------------------------------
|
|
class UmModuleIPs
|
|
{
|
|
public:
|
|
/** @brief UmModuleIPs constructor.
|
|
*
|
|
* @param moduleName (in) UM module name for this UmModuleIPs object.
|
|
*/
|
|
explicit UmModuleIPs(std::string moduleName)
|
|
: fUmModuleName(std::move(moduleName)), fNextUmIPSocketIdx(NEXT_IP_SOCKET_UNASSIGNED)
|
|
{
|
|
}
|
|
|
|
/** @brief UmModuleIPs destructor.
|
|
*
|
|
*/
|
|
~UmModuleIPs() = default;
|
|
|
|
/** @brief Accessor to number of IP's from Columnstore.xml for this UM.
|
|
*
|
|
* @return Number of IP addresses read from Columnstore.xml for this UM.
|
|
*/
|
|
uint32_t ipAddressCount() const
|
|
{
|
|
return fUmIPSocketConns.size();
|
|
}
|
|
|
|
/** @brief Accessor to the module name for this UmModuleIPs object.
|
|
*
|
|
* @return UM module name.
|
|
*/
|
|
const std::string& moduleName() const
|
|
{
|
|
return fUmModuleName;
|
|
}
|
|
|
|
/** @brief Add an IP address to this UM module.
|
|
*
|
|
* @param ip (in) IP address to be added (in network byte order)
|
|
*/
|
|
void addIP(in_addr_t ip);
|
|
|
|
/** @brief Add specified socket/port to the connection list for this UM.
|
|
*
|
|
* @param ioSock (in) socket/port to add to the connection list.
|
|
* @param writeLock (in) mutex to use when writing to ioSock.
|
|
* @return boolean indicating if socket/port connection was added.
|
|
*/
|
|
bool addSocketConn(const SP_UM_IOSOCK& ioSock, const SP_UM_MUTEX& writeLock);
|
|
|
|
/** @brief Delete specified socket/port from the connection list.
|
|
*
|
|
* @param ioSock (in) socket/port to delete from the connection list.
|
|
*/
|
|
void delSocketConn(const messageqcpp::IOSocket& ioSock);
|
|
|
|
/** @brief Get the "next" available socket/port for this UM module.
|
|
*
|
|
* @param outIos (out) socket/port connection to use for the response.
|
|
* @param writeLock (out) mutex to use when writing to outIos.
|
|
* @return bool flag indicating whether a socket/port was available.
|
|
*/
|
|
bool nextIOSocket(SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock);
|
|
|
|
/** @brief toString method used in logging, debugging, etc.
|
|
*
|
|
*/
|
|
const std::string toString();
|
|
|
|
private:
|
|
void advanceToNextIP();
|
|
|
|
static const int32_t NEXT_IP_SOCKET_UNASSIGNED;
|
|
std::string fUmModuleName; // UM module name
|
|
int32_t fNextUmIPSocketIdx; // index to "next" IP address
|
|
boost::mutex fUmModuleMutex;
|
|
|
|
// collection of IP addresses and their corresponding socket/port conns
|
|
std::vector<SP_UM_IPCONNS> fUmIPSocketConns;
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
/** @brief Tracks and selects "next" socket/port for a UM module IP address.
|
|
*
|
|
* This is a supporting class to UmModuleIPs.
|
|
* This class maintains a list of socket/port connections for a specific UM
|
|
* module IP address. UmIPSocketConns can be used to iterate through the
|
|
* available connections for an IP address, as response messages are sent out
|
|
* by PrimProc.
|
|
*
|
|
* This class by itself is not thread-safe. However, UmModulesIPs, which
|
|
* uses UmIPSocketConns, insures thread-safeness.
|
|
*/
|
|
//------------------------------------------------------------------------------
|
|
class UmIPSocketConns
|
|
{
|
|
public:
|
|
struct UmIOSocketData
|
|
{
|
|
SP_UM_IOSOCK fSock; // an output IOSocket
|
|
SP_UM_MUTEX fMutex; // mutex to be use when writing to fSock
|
|
};
|
|
|
|
/** @brief UmIPSocketConns constructor.
|
|
*
|
|
* @param ip (in) IP address for this UmIPSocketConns object.
|
|
*/
|
|
explicit UmIPSocketConns(in_addr_t ip) : fIpAddress(ip), fNextIOSocketIdx(NEXT_IOSOCKET_UNASSIGNED)
|
|
{
|
|
}
|
|
|
|
/** @brief UmIPSocketConns destructor.
|
|
*
|
|
*/
|
|
~UmIPSocketConns() = default;
|
|
|
|
/** @brief Accessor to the IP address for this UmIPSocketConns object.
|
|
*
|
|
* @return IP address (in network byte order).
|
|
*/
|
|
in_addr_t ipAddress() const
|
|
{
|
|
return fIpAddress;
|
|
}
|
|
|
|
/** @brief Accessor to socket/port connection count for this IP address.
|
|
*
|
|
* @return socket/port connection count.
|
|
*/
|
|
uint32_t count()
|
|
{
|
|
return fIOSockets.size();
|
|
}
|
|
|
|
/** @brief Add specified socket/port to the connection list.
|
|
*
|
|
* @param ioSock (in) socket/port to add to the connection list.
|
|
* @param writeLock (in) mutex to use when writing to ioSock.
|
|
*/
|
|
void addSocketConn(const SP_UM_IOSOCK& ioSock, const SP_UM_MUTEX& writeLock);
|
|
|
|
/** @brief Delete specified socket/port from the connection list.
|
|
*
|
|
* @param ioSock (in) socket/port to delete from the connection list.
|
|
*/
|
|
void delSocketConn(const messageqcpp::IOSocket& ioSock);
|
|
|
|
/** @brief Get the "next" available socket/port for this IP address.
|
|
*
|
|
* @param outIos (out) socket/port connection to use for the response.
|
|
* @param writeLock (out) mutex to use when writing to outIos.
|
|
*/
|
|
void nextIOSocket(SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock);
|
|
|
|
/** @brief Convert network byte ordered IP address to string
|
|
*
|
|
* @param ipString (out) IP address string;
|
|
* ipString must be an array of size INET_ADDRSTRLEN.
|
|
* @return IP address string (same as ipString)
|
|
*/
|
|
static char* nwToString(in_addr_t addr, char* ipString);
|
|
|
|
/** @brief toString method used in logging, debugging, etc.
|
|
*
|
|
*/
|
|
const std::string toString() const;
|
|
|
|
private:
|
|
static const int32_t NEXT_IOSOCKET_UNASSIGNED;
|
|
in_addr_t fIpAddress; // IP address in network byte order
|
|
int32_t fNextIOSocketIdx; // index to "next" socket/port
|
|
std::vector<UmIOSocketData> fIOSockets; // socket/port list for fIpAddress
|
|
};
|
|
|
|
} // namespace primitiveprocessor
|