mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
605 lines
21 KiB
C++
605 lines
21 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/******************************************************************************
|
|
* $Id$
|
|
*
|
|
*****************************************************************************/
|
|
|
|
/** @file umsocketselector.cpp
|
|
* Used to iterate through available socket/port connections for a given UM,
|
|
* when sending response messages back to the UM.
|
|
*/
|
|
|
|
#include "umsocketselector.h"
|
|
#include <arpa/inet.h>
|
|
#include <sys/socket.h>
|
|
#include <iostream>
|
|
#include <climits>
|
|
#include <sstream>
|
|
#include <cstring>
|
|
#include <limits>
|
|
|
|
#include "liboamcpp.h"
|
|
using namespace oam;
|
|
|
|
//#define NDEBUG
|
|
#include <cassert>
|
|
|
|
//#define LOAD_MODULE_DEBUG 1
|
|
//#define MOD_CONN_DEBUG 1
|
|
//#define SEL_CONN_DEBUG 1
|
|
|
|
namespace primitiveprocessor
|
|
{
|
|
/*static*/ const int32_t UmIPSocketConns::NEXT_IOSOCKET_UNASSIGNED = -1;
|
|
/*static*/ const int32_t UmModuleIPs::NEXT_IP_SOCKET_UNASSIGNED = -1;
|
|
/*static*/ UmSocketSelector* UmSocketSelector::fpUmSocketSelector = 0;
|
|
|
|
//------------------------------------------------------------------------------
|
|
// UmSocketSelector methods
|
|
//------------------------------------------------------------------------------
|
|
// UmSocketSelector Singleton accessor
|
|
//------------------------------------------------------------------------------
|
|
/* static */ UmSocketSelector* UmSocketSelector::instance()
|
|
{
|
|
if (fpUmSocketSelector == 0)
|
|
{
|
|
fpUmSocketSelector = new UmSocketSelector();
|
|
}
|
|
|
|
return fpUmSocketSelector;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// UmSocketSelector constructor
|
|
//------------------------------------------------------------------------------
|
|
UmSocketSelector::UmSocketSelector()
|
|
{
|
|
loadUMModuleInfo();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Returns the number of IP addresses defined in the Columnstore.xml file
|
|
// return - uint32_t; the total number of IP addresses in the Columnstore.xml file
|
|
//------------------------------------------------------------------------------
|
|
uint32_t UmSocketSelector::ipAddressCount() const
|
|
{
|
|
uint32_t ipCount = 0;
|
|
|
|
for (unsigned int i = 0; i < fUmModuleIPs.size(); ++i)
|
|
{
|
|
ipCount += fUmModuleIPs[i]->ipAddressCount();
|
|
}
|
|
|
|
return ipCount;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Loads the UM module information from the Columnstore.xml file, so that we have
|
|
// the list of IP addresses that are valid for each UM. Note that this method
|
|
// does not insure thread safeness, but that's okay because it is assumed that
|
|
// it will only be called once from the first call to instance().
|
|
//------------------------------------------------------------------------------
|
|
void UmSocketSelector::loadUMModuleInfo()
|
|
{
|
|
Oam oam;
|
|
ModuleTypeConfig moduleTypeConfig;
|
|
const std::string UM_MODTYPE("um");
|
|
|
|
oam.getSystemConfig(UM_MODTYPE, moduleTypeConfig);
|
|
|
|
int moduleCount = moduleTypeConfig.ModuleCount;
|
|
std::string moduleType = moduleTypeConfig.ModuleType;
|
|
|
|
#ifdef LOAD_MODULE_DEBUG
|
|
std::cout << "ModuleConfig for type: " << UM_MODTYPE << std::endl;
|
|
std::cout << "ModuleDesc = " << moduleTypeConfig.ModuleDesc << std::endl;
|
|
std::cout << "ModuleCount = " << moduleCount << std::endl;
|
|
#endif
|
|
|
|
if (moduleCount > 0)
|
|
{
|
|
//..Loop through the list of UM modules
|
|
for (DeviceNetworkList::iterator iter1 = moduleTypeConfig.ModuleNetworkList.begin();
|
|
(iter1 != moduleTypeConfig.ModuleNetworkList.end()); ++iter1)
|
|
{
|
|
std::string moduleName = iter1->DeviceName;
|
|
|
|
#ifdef LOAD_MODULE_DEBUG
|
|
std::cout << "ModuleName-" << moduleName << std::endl;
|
|
#endif
|
|
|
|
//..Assign the UM index based on whether it is a new UM or one
|
|
// we have seen before
|
|
unsigned int umIdx = findOrAddUm(moduleName);
|
|
|
|
//..Get the list of IP addresses (NIC's) for this UM module
|
|
for (HostConfigList::iterator iter2 = iter1->hostConfigList.begin();
|
|
(iter2 != iter1->hostConfigList.end()); ++iter2)
|
|
{
|
|
std::string ipAddr = iter2->IPAddr;
|
|
|
|
#ifdef LOAD_MODULE_DEBUG
|
|
std::cout << " NIC-" << iter2->NicID << "; host-" << iter2->HostName << "; IP-" << ipAddr
|
|
<< std::endl;
|
|
#endif
|
|
|
|
struct in_addr ip;
|
|
|
|
if (inet_aton(ipAddr.c_str(), &ip))
|
|
{
|
|
fIpAddressUmMap[ip.s_addr] = umIdx;
|
|
fUmModuleIPs[umIdx]->addIP(ip.s_addr);
|
|
}
|
|
else
|
|
{
|
|
std::cerr << "Invalid IP address in SystemModuleConfig "
|
|
"section: "
|
|
<< ipAddr << std::endl;
|
|
}
|
|
} // loop through the IP addresses for a UM module
|
|
} // loop through the list of UM modules
|
|
} // moduleCount > 0
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Search for the specified moduleName, and return the applicable index from
|
|
// fUmModuleIPs, if it is found. Else add the new moduleName.
|
|
// return - unsigned int for the index into fUmModuleIPs for moduleName.
|
|
//------------------------------------------------------------------------------
|
|
unsigned int UmSocketSelector::findOrAddUm(const std::string& moduleName)
|
|
{
|
|
unsigned int umIdx = std::numeric_limits<unsigned int>::max();
|
|
|
|
for (unsigned int i = 0; i < fUmModuleIPs.size(); ++i)
|
|
{
|
|
if (fUmModuleIPs[i]->moduleName() == moduleName)
|
|
{
|
|
umIdx = i;
|
|
return umIdx;
|
|
}
|
|
}
|
|
|
|
//..We have encountered a new UM module we should add to the list
|
|
fUmModuleIPs.push_back(SP_UM_MODIPS(new UmModuleIPs(moduleName)));
|
|
umIdx = fUmModuleIPs.size() - 1;
|
|
|
|
return umIdx;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Add a new socket/port connection. It will be grouped with other
|
|
// socket/port connections belonging to the same UM.
|
|
// ios (in) - socket/port connection to be added
|
|
// writeLock (in) - mutex to use when writing to ios.
|
|
// return - boolean indicating whether socket/port connection was added.
|
|
//------------------------------------------------------------------------------
|
|
bool UmSocketSelector::addConnection(const SP_UM_IOSOCK& ios, const SP_UM_MUTEX& writeLock)
|
|
{
|
|
bool bConnAdded = false;
|
|
|
|
sockaddr sa = ios->sa();
|
|
const sockaddr_in* sinp = reinterpret_cast<const sockaddr_in*>(&sa);
|
|
IpAddressUmMap_t::iterator mapIter = fIpAddressUmMap.find(sinp->sin_addr.s_addr);
|
|
|
|
// Add this socket/port connection to the UM connection list it belongs to.
|
|
if (mapIter != fIpAddressUmMap.end())
|
|
{
|
|
unsigned int umIdx = mapIter->second;
|
|
bConnAdded = fUmModuleIPs[umIdx]->addSocketConn(ios, writeLock);
|
|
}
|
|
|
|
if (!bConnAdded)
|
|
{
|
|
#ifdef SEL_CONN_DEBUG
|
|
std::ostringstream oss;
|
|
oss << "No UM/IP match found to add connection " << ios->toString() << std::endl;
|
|
std::cout << oss.str();
|
|
#endif
|
|
}
|
|
|
|
return bConnAdded;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Delete a socket/port connection from the UM for which it belongs.
|
|
// ioSock (in) - socket/port connection to be deleted
|
|
//------------------------------------------------------------------------------
|
|
void UmSocketSelector::delConnection(const messageqcpp::IOSocket& ios)
|
|
{
|
|
sockaddr sa = ios.sa();
|
|
const sockaddr_in* sinp = reinterpret_cast<const sockaddr_in*>(&sa);
|
|
IpAddressUmMap_t::iterator mapIter = fIpAddressUmMap.find(sinp->sin_addr.s_addr);
|
|
|
|
if (mapIter != fIpAddressUmMap.end())
|
|
{
|
|
unsigned int umIdx = mapIter->second;
|
|
fUmModuleIPs[umIdx]->delSocketConn(ios);
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Get the next socket/port connection belonging to the same UM as ios.
|
|
// The selected socket/port connection will be returned in outIos. It can
|
|
// then be used for sending the next response message back to the applicable
|
|
// UM module.
|
|
// ios (in) - socket/port connection where a UM request originated from
|
|
// outIos (out) - socket/port connection to use in sending the
|
|
// corresponding response
|
|
// writelock (out) - mutex lock to be used when writing to outIos
|
|
// return - bool indicating if socket/port connection was assigned to
|
|
// outIos
|
|
//------------------------------------------------------------------------------
|
|
bool UmSocketSelector::nextIOSocket(const messageqcpp::IOSocket& ios, SP_UM_IOSOCK& outIos,
|
|
SP_UM_MUTEX& writeLock)
|
|
{
|
|
sockaddr sa = ios.sa();
|
|
const sockaddr_in* sinp = reinterpret_cast<const sockaddr_in*>(&sa);
|
|
IpAddressUmMap_t::iterator mapIter = fIpAddressUmMap.find(sinp->sin_addr.s_addr);
|
|
|
|
if (mapIter != fIpAddressUmMap.end())
|
|
{
|
|
unsigned int umIdx = mapIter->second;
|
|
|
|
if (fUmModuleIPs[umIdx]->nextIOSocket(outIos, writeLock))
|
|
{
|
|
#ifdef SEL_CONN_DEBUG
|
|
std::ostringstream oss;
|
|
oss << "UM " << fUmModuleIPs[umIdx]->moduleName() << "; in: " << ios.toString()
|
|
<< "; selected out: " << outIos->toString() << std::endl;
|
|
std::cout << oss.str();
|
|
#endif
|
|
|
|
return true;
|
|
}
|
|
}
|
|
|
|
//..This should not happen. Application is asking for next socket/port for
|
|
// a connection not in our UM module list.
|
|
return false;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Convert contents to string for logging, debugging, etc.
|
|
//------------------------------------------------------------------------------
|
|
const std::string UmSocketSelector::toString() const
|
|
{
|
|
std::ostringstream oss;
|
|
|
|
oss << "IP Address to UM index map:" << std::endl;
|
|
|
|
for (IpAddressUmMap_t::const_iterator mapIter = fIpAddressUmMap.begin(); (mapIter != fIpAddressUmMap.end());
|
|
++mapIter)
|
|
{
|
|
char ipString[INET_ADDRSTRLEN];
|
|
oss << " IPAddress: " << UmIPSocketConns::nwToString(mapIter->first, ipString)
|
|
<< " maps to UM: " << mapIter->second << std::endl;
|
|
}
|
|
|
|
for (unsigned int i = 0; i < fUmModuleIPs.size(); ++i)
|
|
{
|
|
oss << std::endl << fUmModuleIPs[i]->toString();
|
|
}
|
|
|
|
return oss.str();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// UmModuleIPs methods
|
|
//------------------------------------------------------------------------------
|
|
// Add an IP address to be associated with this UM module.
|
|
// ip (in) - IP address to associate with this UM module (in network byte order)
|
|
//------------------------------------------------------------------------------
|
|
void UmModuleIPs::addIP(in_addr_t ip)
|
|
{
|
|
boost::mutex::scoped_lock lock(fUmModuleMutex);
|
|
|
|
#ifdef MOD_CONN_DEBUG
|
|
std::ostringstream oss;
|
|
char ipString[INET_ADDRSTRLEN];
|
|
oss << " UM " << fUmModuleName << "; adding IP: " << UmIPSocketConns::nwToString(ip, ipString)
|
|
<< std::endl;
|
|
std::cout << oss.str();
|
|
#endif
|
|
|
|
fUmIPSocketConns.push_back(SP_UM_IPCONNS(new UmIPSocketConns(ip)));
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Add a new socket/port connection to this UM. It will be grouped with other
|
|
// socket/port connections having the same IP address for this UM.
|
|
// ioSock (in) - socket/port connection to be added
|
|
// writeLock (in) - mutex to use when writing to ioSock.
|
|
// return - boolean indicating whether socket/port connection was added.
|
|
//------------------------------------------------------------------------------
|
|
bool UmModuleIPs::addSocketConn(const SP_UM_IOSOCK& ioSock, const SP_UM_MUTEX& writeLock)
|
|
{
|
|
bool bConnAdded = false;
|
|
|
|
boost::mutex::scoped_lock lock(fUmModuleMutex);
|
|
|
|
for (unsigned int i = 0; i < fUmIPSocketConns.size(); ++i)
|
|
{
|
|
sockaddr sa = ioSock->sa();
|
|
const sockaddr_in* sinp = reinterpret_cast<const sockaddr_in*>(&sa);
|
|
|
|
if (fUmIPSocketConns[i]->ipAddress() == sinp->sin_addr.s_addr)
|
|
{
|
|
#ifdef MOD_CONN_DEBUG
|
|
std::ostringstream oss;
|
|
oss << "UM " << fUmModuleName << "; adding connection " << ioSock->toString() << std::endl;
|
|
std::cout << oss.str();
|
|
#endif
|
|
|
|
fUmIPSocketConns[i]->addSocketConn(ioSock, writeLock);
|
|
bConnAdded = true;
|
|
|
|
//..Initialize fNextUmIPSocketIdx if this is the first socket/port
|
|
// connection for this UM.
|
|
if (fNextUmIPSocketIdx == NEXT_IP_SOCKET_UNASSIGNED)
|
|
fNextUmIPSocketIdx = i;
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
return bConnAdded;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Delete a socket/port connection from this UM.
|
|
// ioSock (in) - socket/port connection to be deleted
|
|
//------------------------------------------------------------------------------
|
|
void UmModuleIPs::delSocketConn(const messageqcpp::IOSocket& ioSock)
|
|
{
|
|
boost::mutex::scoped_lock lock(fUmModuleMutex);
|
|
|
|
for (unsigned int i = 0; i < fUmIPSocketConns.size(); ++i)
|
|
{
|
|
sockaddr sa = ioSock.sa();
|
|
const sockaddr_in* sinp = reinterpret_cast<const sockaddr_in*>(&sa);
|
|
|
|
if (fUmIPSocketConns[i]->ipAddress() == sinp->sin_addr.s_addr)
|
|
{
|
|
#ifdef MOD_CONN_DEBUG
|
|
std::ostringstream oss;
|
|
oss << "UM " << fUmModuleName << "; deleting connection " << ioSock.toString() << std::endl;
|
|
std::cout << oss.str();
|
|
#endif
|
|
|
|
fUmIPSocketConns[i]->delSocketConn(ioSock);
|
|
|
|
//..If we just deleted the last connection for this IP, then ad-
|
|
// vance fNextUmIPSocketIdx to an IP address having connections.
|
|
if (fUmIPSocketConns[i]->count() == 0)
|
|
{
|
|
advanceToNextIP();
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Get the next socket/port connection for this UM to use in sending a message
|
|
// to the applicable UM module.
|
|
// outIos (out) - socket/port connection to use in sending next msg
|
|
// writelock (out) - mutex lock to be used when writing to outIos
|
|
// return - bool indicating if socket/port connection was assigned to
|
|
// outIos
|
|
//------------------------------------------------------------------------------
|
|
bool UmModuleIPs::nextIOSocket(SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock)
|
|
{
|
|
bool found = false;
|
|
|
|
boost::mutex::scoped_lock lock(fUmModuleMutex);
|
|
|
|
if ((fUmIPSocketConns.size() > 0) && (fNextUmIPSocketIdx != NEXT_IP_SOCKET_UNASSIGNED))
|
|
{
|
|
assert(fNextUmIPSocketIdx < static_cast<int>(fUmIPSocketConns.size()));
|
|
fUmIPSocketConns[fNextUmIPSocketIdx]->nextIOSocket(outIos, writeLock);
|
|
advanceToNextIP();
|
|
found = true;
|
|
}
|
|
|
|
return found;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Advance to the "next" available IP (for this UM), skipping over
|
|
// any IP's that have 0 socket/port connections. No mutex locking done, as
|
|
// the calling methods will have already locked a mutex.
|
|
//------------------------------------------------------------------------------
|
|
void UmModuleIPs::advanceToNextIP()
|
|
{
|
|
if (fUmIPSocketConns.size() > 1)
|
|
{
|
|
//..Search to end of IP list for an IP having 1 or more connections
|
|
for (int i = fNextUmIPSocketIdx + 1; i < static_cast<int>(fUmIPSocketConns.size()); ++i)
|
|
{
|
|
if (fUmIPSocketConns[i]->count() > 0)
|
|
{
|
|
fNextUmIPSocketIdx = i;
|
|
return;
|
|
}
|
|
}
|
|
|
|
//..Wrap back around to the start of the list, to continue the search
|
|
for (int i = 0; i < fNextUmIPSocketIdx; ++i)
|
|
{
|
|
if (fUmIPSocketConns[i]->count() > 0)
|
|
{
|
|
fNextUmIPSocketIdx = i;
|
|
return;
|
|
}
|
|
}
|
|
|
|
fNextUmIPSocketIdx = NEXT_IP_SOCKET_UNASSIGNED;
|
|
}
|
|
else // special logic to handle 0 fUmIPSocketConns, or a single empty one
|
|
{
|
|
if ((fUmIPSocketConns.size() == 0) || (fUmIPSocketConns[0]->count() == 0))
|
|
{
|
|
fNextUmIPSocketIdx = NEXT_IP_SOCKET_UNASSIGNED;
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Convert contents to string for logging, debugging, etc.
|
|
//------------------------------------------------------------------------------
|
|
const std::string UmModuleIPs::toString()
|
|
{
|
|
std::ostringstream oss;
|
|
|
|
boost::mutex::scoped_lock lock(fUmModuleMutex);
|
|
oss << "UM module name: " << fUmModuleName << "; nextUmIPIdx: " << fNextUmIPSocketIdx << std::endl;
|
|
|
|
for (unsigned int i = 0; i < fUmIPSocketConns.size(); ++i)
|
|
{
|
|
oss << fUmIPSocketConns[i]->toString();
|
|
}
|
|
|
|
return oss.str();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// UmIPSocketConns methods
|
|
//------------------------------------------------------------------------------
|
|
// Add the specified socket/port to the connection list for the IP address
|
|
// represented by this UmIPSocketConns object.
|
|
// ioSock (in) - socket/port connection to be added
|
|
// writeLock (in) - mutex to use when writing to ioSock
|
|
//------------------------------------------------------------------------------
|
|
void UmIPSocketConns::addSocketConn(const SP_UM_IOSOCK& ioSock, const SP_UM_MUTEX& writeLock)
|
|
{
|
|
UmIOSocketData sockData = {SP_UM_IOSOCK(ioSock), SP_UM_MUTEX(writeLock)};
|
|
fIOSockets.push_back(sockData);
|
|
|
|
//..Initialize fNextIOSocketIdx when we add first connection
|
|
if (fIOSockets.size() == 1)
|
|
fNextIOSocketIdx = 0;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Delete the specified socket/port from the connection list for the IP address
|
|
// referenced by this UmIPSocketConns object.
|
|
// ioSock (in) - socket/port connection to be deleted
|
|
//
|
|
// Not normally a good thing to be using a std::vector if we are going to be
|
|
// deleting elements in the middle of the collection. Very inefficient.
|
|
// But this method won't be called often, and we are only dealing with a small
|
|
// collection. Plus we want to use a vector over a list, so that nextIOSocket()
|
|
// can benefit from quick random access.
|
|
//------------------------------------------------------------------------------
|
|
void UmIPSocketConns::delSocketConn(const messageqcpp::IOSocket& ioSock)
|
|
{
|
|
for (unsigned int i = 0; i < fIOSockets.size(); ++i)
|
|
{
|
|
sockaddr sa1 = fIOSockets[i].fSock->sa();
|
|
const sockaddr_in* sinp1 = reinterpret_cast<const sockaddr_in*>(&sa1);
|
|
sockaddr sa2 = ioSock.sa();
|
|
const sockaddr_in* sinp2 = reinterpret_cast<const sockaddr_in*>(&sa2);
|
|
|
|
if (sinp1->sin_port == sinp2->sin_port)
|
|
{
|
|
fIOSockets.erase(fIOSockets.begin() + i);
|
|
|
|
//..Adjust fNextIOSocketIdx
|
|
// 1a. decrement if fNextIOSocketIdx is after deleted connection
|
|
// 1b. reset to start of vector if we deleted the last connection
|
|
// 2. reset fNextIOSocketIdx to -1 if we have no more connections
|
|
if (fIOSockets.size() > 0)
|
|
{
|
|
if (fNextIOSocketIdx > static_cast<int>(i))
|
|
fNextIOSocketIdx--;
|
|
|
|
if (fNextIOSocketIdx >= static_cast<int>(fIOSockets.size()))
|
|
fNextIOSocketIdx = 0;
|
|
}
|
|
else
|
|
{
|
|
fNextIOSocketIdx = NEXT_IOSOCKET_UNASSIGNED;
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Get the next socket/port connection for this IP to use in sending a message
|
|
// to the applicable UM module.
|
|
// outIos (out) - socket/port connection to use in sending next msg
|
|
// writelock (out) - mutex lock to be used when writing to outIos
|
|
//------------------------------------------------------------------------------
|
|
void UmIPSocketConns::nextIOSocket(SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock)
|
|
{
|
|
assert(fIOSockets.size() > 0);
|
|
assert(fNextIOSocketIdx != NEXT_IOSOCKET_UNASSIGNED);
|
|
assert(fNextIOSocketIdx < static_cast<int>(fIOSockets.size()));
|
|
|
|
outIos = fIOSockets[fNextIOSocketIdx].fSock;
|
|
writeLock = fIOSockets[fNextIOSocketIdx].fMutex;
|
|
|
|
//..Update "next" index, being sure to wrap around to the start
|
|
// whenever we reach the end of the vector.
|
|
fNextIOSocketIdx++;
|
|
|
|
if (fNextIOSocketIdx >= static_cast<int>(fIOSockets.size()))
|
|
fNextIOSocketIdx = 0;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Convert network byte ordered IP address to a string.
|
|
// return - char* is returned with the IP address string.
|
|
//------------------------------------------------------------------------------
|
|
/* static */ char* UmIPSocketConns::nwToString(in_addr_t addr, char* ipString)
|
|
{
|
|
in_addr addrStruct = {addr};
|
|
|
|
if (!inet_ntop(AF_INET, &addrStruct, ipString, INET_ADDRSTRLEN))
|
|
strcpy(ipString, "unknown");
|
|
|
|
return ipString;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Convert contents to string for logging, debugging, etc.
|
|
//------------------------------------------------------------------------------
|
|
const std::string UmIPSocketConns::toString() const
|
|
{
|
|
std::ostringstream oss;
|
|
|
|
char ipString[INET_ADDRSTRLEN];
|
|
oss << " IPAddress: " << UmIPSocketConns::nwToString(fIpAddress, ipString)
|
|
<< "; nextIOSocketIdx: " << fNextIOSocketIdx << std::endl;
|
|
|
|
for (unsigned int i = 0; i < fIOSockets.size(); ++i)
|
|
{
|
|
sockaddr sa = fIOSockets[i].fSock->sa();
|
|
const sockaddr_in* sinp = reinterpret_cast<const sockaddr_in*>(&sa);
|
|
oss << " port: " << ntohs(sinp->sin_port) << std::endl;
|
|
}
|
|
|
|
return oss.str();
|
|
}
|
|
|
|
} // namespace primitiveprocessor
|