/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /****************************************************************************** * $Id$ * *****************************************************************************/ /** @file umsocketselector.cpp * Used to iterate through available socket/port connections for a given UM, * when sending response messages back to the UM. */ #include "umsocketselector.h" #include #include #include #include #include #include #include #include "liboamcpp.h" using namespace oam; //#define NDEBUG #include //#define LOAD_MODULE_DEBUG 1 //#define MOD_CONN_DEBUG 1 //#define SEL_CONN_DEBUG 1 namespace primitiveprocessor { /*static*/ const int32_t UmIPSocketConns::NEXT_IOSOCKET_UNASSIGNED = -1; /*static*/ const int32_t UmModuleIPs::NEXT_IP_SOCKET_UNASSIGNED = -1; /*static*/ UmSocketSelector* UmSocketSelector::fpUmSocketSelector = 0; //------------------------------------------------------------------------------ // UmSocketSelector methods //------------------------------------------------------------------------------ // UmSocketSelector Singleton accessor //------------------------------------------------------------------------------ /* static */ UmSocketSelector* UmSocketSelector::instance() { if (fpUmSocketSelector == 0) { fpUmSocketSelector = new UmSocketSelector(); } return fpUmSocketSelector; } //------------------------------------------------------------------------------ // UmSocketSelector constructor //------------------------------------------------------------------------------ UmSocketSelector::UmSocketSelector() { loadUMModuleInfo(); } //------------------------------------------------------------------------------ // Returns the number of IP addresses defined in the Columnstore.xml file // return - uint32_t; the total number of IP addresses in the Columnstore.xml file //------------------------------------------------------------------------------ uint32_t UmSocketSelector::ipAddressCount() const { uint32_t ipCount = 0; for (unsigned int i = 0; i < fUmModuleIPs.size(); ++i) { ipCount += fUmModuleIPs[i]->ipAddressCount(); } return ipCount; } //------------------------------------------------------------------------------ // Loads the UM module information from the Columnstore.xml file, so that we have // the list of IP addresses that are valid for each UM. Note that this method // does not insure thread safeness, but that's okay because it is assumed that // it will only be called once from the first call to instance(). //------------------------------------------------------------------------------ void UmSocketSelector::loadUMModuleInfo() { Oam oam; ModuleTypeConfig moduleTypeConfig; const std::string UM_MODTYPE("um"); oam.getSystemConfig(UM_MODTYPE, moduleTypeConfig); int moduleCount = moduleTypeConfig.ModuleCount; std::string moduleType = moduleTypeConfig.ModuleType; #ifdef LOAD_MODULE_DEBUG std::cout << "ModuleConfig for type: " << UM_MODTYPE << std::endl; std::cout << "ModuleDesc = " << moduleTypeConfig.ModuleDesc << std::endl; std::cout << "ModuleCount = " << moduleCount << std::endl; #endif if (moduleCount > 0) { //..Loop through the list of UM modules for (DeviceNetworkList::iterator iter1 = moduleTypeConfig.ModuleNetworkList.begin(); (iter1 != moduleTypeConfig.ModuleNetworkList.end()); ++iter1) { std::string moduleName = iter1->DeviceName; #ifdef LOAD_MODULE_DEBUG std::cout << "ModuleName-" << moduleName << std::endl; #endif //..Assign the UM index based on whether it is a new UM or one // we have seen before unsigned int umIdx = findOrAddUm(moduleName); //..Get the list of IP addresses (NIC's) for this UM module for (HostConfigList::iterator iter2 = iter1->hostConfigList.begin(); (iter2 != iter1->hostConfigList.end()); ++iter2) { std::string ipAddr = iter2->IPAddr; #ifdef LOAD_MODULE_DEBUG std::cout << " NIC-" << iter2->NicID << "; host-" << iter2->HostName << "; IP-" << ipAddr << std::endl; #endif struct in_addr ip; if (inet_aton(ipAddr.c_str(), &ip)) { fIpAddressUmMap[ip.s_addr] = umIdx; fUmModuleIPs[umIdx]->addIP(ip.s_addr); } else { std::cerr << "Invalid IP address in SystemModuleConfig " "section: " << ipAddr << std::endl; } } // loop through the IP addresses for a UM module } // loop through the list of UM modules } // moduleCount > 0 } //------------------------------------------------------------------------------ // Search for the specified moduleName, and return the applicable index from // fUmModuleIPs, if it is found. Else add the new moduleName. // return - unsigned int for the index into fUmModuleIPs for moduleName. //------------------------------------------------------------------------------ unsigned int UmSocketSelector::findOrAddUm(const std::string& moduleName) { unsigned int umIdx = std::numeric_limits::max(); for (unsigned int i = 0; i < fUmModuleIPs.size(); ++i) { if (fUmModuleIPs[i]->moduleName() == moduleName) { umIdx = i; return umIdx; } } //..We have encountered a new UM module we should add to the list fUmModuleIPs.push_back(SP_UM_MODIPS(new UmModuleIPs(moduleName))); umIdx = fUmModuleIPs.size() - 1; return umIdx; } //------------------------------------------------------------------------------ // Add a new socket/port connection. It will be grouped with other // socket/port connections belonging to the same UM. // ios (in) - socket/port connection to be added // writeLock (in) - mutex to use when writing to ios. // return - boolean indicating whether socket/port connection was added. //------------------------------------------------------------------------------ bool UmSocketSelector::addConnection(const SP_UM_IOSOCK& ios, const SP_UM_MUTEX& writeLock) { bool bConnAdded = false; sockaddr sa = ios->sa(); const sockaddr_in* sinp = reinterpret_cast(&sa); IpAddressUmMap_t::iterator mapIter = fIpAddressUmMap.find(sinp->sin_addr.s_addr); // Add this socket/port connection to the UM connection list it belongs to. if (mapIter != fIpAddressUmMap.end()) { unsigned int umIdx = mapIter->second; bConnAdded = fUmModuleIPs[umIdx]->addSocketConn(ios, writeLock); } if (!bConnAdded) { #ifdef SEL_CONN_DEBUG std::ostringstream oss; oss << "No UM/IP match found to add connection " << ios->toString() << std::endl; std::cout << oss.str(); #endif } return bConnAdded; } //------------------------------------------------------------------------------ // Delete a socket/port connection from the UM for which it belongs. // ioSock (in) - socket/port connection to be deleted //------------------------------------------------------------------------------ void UmSocketSelector::delConnection(const messageqcpp::IOSocket& ios) { sockaddr sa = ios.sa(); const sockaddr_in* sinp = reinterpret_cast(&sa); IpAddressUmMap_t::iterator mapIter = fIpAddressUmMap.find(sinp->sin_addr.s_addr); if (mapIter != fIpAddressUmMap.end()) { unsigned int umIdx = mapIter->second; fUmModuleIPs[umIdx]->delSocketConn(ios); } } //------------------------------------------------------------------------------ // Get the next socket/port connection belonging to the same UM as ios. // The selected socket/port connection will be returned in outIos. It can // then be used for sending the next response message back to the applicable // UM module. // ios (in) - socket/port connection where a UM request originated from // outIos (out) - socket/port connection to use in sending the // corresponding response // writelock (out) - mutex lock to be used when writing to outIos // return - bool indicating if socket/port connection was assigned to // outIos //------------------------------------------------------------------------------ bool UmSocketSelector::nextIOSocket(const messageqcpp::IOSocket& ios, SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock) { sockaddr sa = ios.sa(); const sockaddr_in* sinp = reinterpret_cast(&sa); IpAddressUmMap_t::iterator mapIter = fIpAddressUmMap.find(sinp->sin_addr.s_addr); if (mapIter != fIpAddressUmMap.end()) { unsigned int umIdx = mapIter->second; if (fUmModuleIPs[umIdx]->nextIOSocket(outIos, writeLock)) { #ifdef SEL_CONN_DEBUG std::ostringstream oss; oss << "UM " << fUmModuleIPs[umIdx]->moduleName() << "; in: " << ios.toString() << "; selected out: " << outIos->toString() << std::endl; std::cout << oss.str(); #endif return true; } } //..This should not happen. Application is asking for next socket/port for // a connection not in our UM module list. return false; } //------------------------------------------------------------------------------ // Convert contents to string for logging, debugging, etc. //------------------------------------------------------------------------------ const std::string UmSocketSelector::toString() const { std::ostringstream oss; oss << "IP Address to UM index map:" << std::endl; for (IpAddressUmMap_t::const_iterator mapIter = fIpAddressUmMap.begin(); (mapIter != fIpAddressUmMap.end()); ++mapIter) { char ipString[INET_ADDRSTRLEN]; oss << " IPAddress: " << UmIPSocketConns::nwToString(mapIter->first, ipString) << " maps to UM: " << mapIter->second << std::endl; } for (unsigned int i = 0; i < fUmModuleIPs.size(); ++i) { oss << std::endl << fUmModuleIPs[i]->toString(); } return oss.str(); } //------------------------------------------------------------------------------ // UmModuleIPs methods //------------------------------------------------------------------------------ // Add an IP address to be associated with this UM module. // ip (in) - IP address to associate with this UM module (in network byte order) //------------------------------------------------------------------------------ void UmModuleIPs::addIP(in_addr_t ip) { boost::mutex::scoped_lock lock(fUmModuleMutex); #ifdef MOD_CONN_DEBUG std::ostringstream oss; char ipString[INET_ADDRSTRLEN]; oss << " UM " << fUmModuleName << "; adding IP: " << UmIPSocketConns::nwToString(ip, ipString) << std::endl; std::cout << oss.str(); #endif fUmIPSocketConns.push_back(SP_UM_IPCONNS(new UmIPSocketConns(ip))); } //------------------------------------------------------------------------------ // Add a new socket/port connection to this UM. It will be grouped with other // socket/port connections having the same IP address for this UM. // ioSock (in) - socket/port connection to be added // writeLock (in) - mutex to use when writing to ioSock. // return - boolean indicating whether socket/port connection was added. //------------------------------------------------------------------------------ bool UmModuleIPs::addSocketConn(const SP_UM_IOSOCK& ioSock, const SP_UM_MUTEX& writeLock) { bool bConnAdded = false; boost::mutex::scoped_lock lock(fUmModuleMutex); for (unsigned int i = 0; i < fUmIPSocketConns.size(); ++i) { sockaddr sa = ioSock->sa(); const sockaddr_in* sinp = reinterpret_cast(&sa); if (fUmIPSocketConns[i]->ipAddress() == sinp->sin_addr.s_addr) { #ifdef MOD_CONN_DEBUG std::ostringstream oss; oss << "UM " << fUmModuleName << "; adding connection " << ioSock->toString() << std::endl; std::cout << oss.str(); #endif fUmIPSocketConns[i]->addSocketConn(ioSock, writeLock); bConnAdded = true; //..Initialize fNextUmIPSocketIdx if this is the first socket/port // connection for this UM. if (fNextUmIPSocketIdx == NEXT_IP_SOCKET_UNASSIGNED) fNextUmIPSocketIdx = i; break; } } return bConnAdded; } //------------------------------------------------------------------------------ // Delete a socket/port connection from this UM. // ioSock (in) - socket/port connection to be deleted //------------------------------------------------------------------------------ void UmModuleIPs::delSocketConn(const messageqcpp::IOSocket& ioSock) { boost::mutex::scoped_lock lock(fUmModuleMutex); for (unsigned int i = 0; i < fUmIPSocketConns.size(); ++i) { sockaddr sa = ioSock.sa(); const sockaddr_in* sinp = reinterpret_cast(&sa); if (fUmIPSocketConns[i]->ipAddress() == sinp->sin_addr.s_addr) { #ifdef MOD_CONN_DEBUG std::ostringstream oss; oss << "UM " << fUmModuleName << "; deleting connection " << ioSock.toString() << std::endl; std::cout << oss.str(); #endif fUmIPSocketConns[i]->delSocketConn(ioSock); //..If we just deleted the last connection for this IP, then ad- // vance fNextUmIPSocketIdx to an IP address having connections. if (fUmIPSocketConns[i]->count() == 0) { advanceToNextIP(); } break; } } } //------------------------------------------------------------------------------ // Get the next socket/port connection for this UM to use in sending a message // to the applicable UM module. // outIos (out) - socket/port connection to use in sending next msg // writelock (out) - mutex lock to be used when writing to outIos // return - bool indicating if socket/port connection was assigned to // outIos //------------------------------------------------------------------------------ bool UmModuleIPs::nextIOSocket(SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock) { bool found = false; boost::mutex::scoped_lock lock(fUmModuleMutex); if ((fUmIPSocketConns.size() > 0) && (fNextUmIPSocketIdx != NEXT_IP_SOCKET_UNASSIGNED)) { assert(fNextUmIPSocketIdx < static_cast(fUmIPSocketConns.size())); fUmIPSocketConns[fNextUmIPSocketIdx]->nextIOSocket(outIos, writeLock); advanceToNextIP(); found = true; } return found; } //------------------------------------------------------------------------------ // Advance to the "next" available IP (for this UM), skipping over // any IP's that have 0 socket/port connections. No mutex locking done, as // the calling methods will have already locked a mutex. //------------------------------------------------------------------------------ void UmModuleIPs::advanceToNextIP() { if (fUmIPSocketConns.size() > 1) { //..Search to end of IP list for an IP having 1 or more connections for (int i = fNextUmIPSocketIdx + 1; i < static_cast(fUmIPSocketConns.size()); ++i) { if (fUmIPSocketConns[i]->count() > 0) { fNextUmIPSocketIdx = i; return; } } //..Wrap back around to the start of the list, to continue the search for (int i = 0; i < fNextUmIPSocketIdx; ++i) { if (fUmIPSocketConns[i]->count() > 0) { fNextUmIPSocketIdx = i; return; } } fNextUmIPSocketIdx = NEXT_IP_SOCKET_UNASSIGNED; } else // special logic to handle 0 fUmIPSocketConns, or a single empty one { if ((fUmIPSocketConns.size() == 0) || (fUmIPSocketConns[0]->count() == 0)) { fNextUmIPSocketIdx = NEXT_IP_SOCKET_UNASSIGNED; } } } //------------------------------------------------------------------------------ // Convert contents to string for logging, debugging, etc. //------------------------------------------------------------------------------ const std::string UmModuleIPs::toString() { std::ostringstream oss; boost::mutex::scoped_lock lock(fUmModuleMutex); oss << "UM module name: " << fUmModuleName << "; nextUmIPIdx: " << fNextUmIPSocketIdx << std::endl; for (unsigned int i = 0; i < fUmIPSocketConns.size(); ++i) { oss << fUmIPSocketConns[i]->toString(); } return oss.str(); } //------------------------------------------------------------------------------ // UmIPSocketConns methods //------------------------------------------------------------------------------ // Add the specified socket/port to the connection list for the IP address // represented by this UmIPSocketConns object. // ioSock (in) - socket/port connection to be added // writeLock (in) - mutex to use when writing to ioSock //------------------------------------------------------------------------------ void UmIPSocketConns::addSocketConn(const SP_UM_IOSOCK& ioSock, const SP_UM_MUTEX& writeLock) { UmIOSocketData sockData = {SP_UM_IOSOCK(ioSock), SP_UM_MUTEX(writeLock)}; fIOSockets.push_back(sockData); //..Initialize fNextIOSocketIdx when we add first connection if (fIOSockets.size() == 1) fNextIOSocketIdx = 0; } //------------------------------------------------------------------------------ // Delete the specified socket/port from the connection list for the IP address // referenced by this UmIPSocketConns object. // ioSock (in) - socket/port connection to be deleted // // Not normally a good thing to be using a std::vector if we are going to be // deleting elements in the middle of the collection. Very inefficient. // But this method won't be called often, and we are only dealing with a small // collection. Plus we want to use a vector over a list, so that nextIOSocket() // can benefit from quick random access. //------------------------------------------------------------------------------ void UmIPSocketConns::delSocketConn(const messageqcpp::IOSocket& ioSock) { for (unsigned int i = 0; i < fIOSockets.size(); ++i) { sockaddr sa1 = fIOSockets[i].fSock->sa(); const sockaddr_in* sinp1 = reinterpret_cast(&sa1); sockaddr sa2 = ioSock.sa(); const sockaddr_in* sinp2 = reinterpret_cast(&sa2); if (sinp1->sin_port == sinp2->sin_port) { fIOSockets.erase(fIOSockets.begin() + i); //..Adjust fNextIOSocketIdx // 1a. decrement if fNextIOSocketIdx is after deleted connection // 1b. reset to start of vector if we deleted the last connection // 2. reset fNextIOSocketIdx to -1 if we have no more connections if (fIOSockets.size() > 0) { if (fNextIOSocketIdx > static_cast(i)) fNextIOSocketIdx--; if (fNextIOSocketIdx >= static_cast(fIOSockets.size())) fNextIOSocketIdx = 0; } else { fNextIOSocketIdx = NEXT_IOSOCKET_UNASSIGNED; } break; } } } //------------------------------------------------------------------------------ // Get the next socket/port connection for this IP to use in sending a message // to the applicable UM module. // outIos (out) - socket/port connection to use in sending next msg // writelock (out) - mutex lock to be used when writing to outIos //------------------------------------------------------------------------------ void UmIPSocketConns::nextIOSocket(SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock) { assert(fIOSockets.size() > 0); assert(fNextIOSocketIdx != NEXT_IOSOCKET_UNASSIGNED); assert(fNextIOSocketIdx < static_cast(fIOSockets.size())); outIos = fIOSockets[fNextIOSocketIdx].fSock; writeLock = fIOSockets[fNextIOSocketIdx].fMutex; //..Update "next" index, being sure to wrap around to the start // whenever we reach the end of the vector. fNextIOSocketIdx++; if (fNextIOSocketIdx >= static_cast(fIOSockets.size())) fNextIOSocketIdx = 0; } //------------------------------------------------------------------------------ // Convert network byte ordered IP address to a string. // return - char* is returned with the IP address string. //------------------------------------------------------------------------------ /* static */ char* UmIPSocketConns::nwToString(in_addr_t addr, char* ipString) { in_addr addrStruct = {addr}; if (!inet_ntop(AF_INET, &addrStruct, ipString, INET_ADDRSTRLEN)) strcpy(ipString, "unknown"); return ipString; } //------------------------------------------------------------------------------ // Convert contents to string for logging, debugging, etc. //------------------------------------------------------------------------------ const std::string UmIPSocketConns::toString() const { std::ostringstream oss; char ipString[INET_ADDRSTRLEN]; oss << " IPAddress: " << UmIPSocketConns::nwToString(fIpAddress, ipString) << "; nextIOSocketIdx: " << fNextIOSocketIdx << std::endl; for (unsigned int i = 0; i < fIOSockets.size(); ++i) { sockaddr sa = fIOSockets[i].fSock->sa(); const sockaddr_in* sinp = reinterpret_cast(&sa); oss << " port: " << ntohs(sinp->sin_port) << std::endl; } return oss.str(); } } // namespace primitiveprocessor