diff --git a/dbcon/mysql/is_columnstore_files.cpp b/dbcon/mysql/is_columnstore_files.cpp index 2ddb393fe..e9479e254 100644 --- a/dbcon/mysql/is_columnstore_files.cpp +++ b/dbcon/mysql/is_columnstore_files.cpp @@ -33,6 +33,7 @@ #include "bytestream.h" #include "liboamcpp.h" #include "messagequeue.h" +#include "messagequeuepool.h" #include "we_messages.h" // Required declaration as it isn't in a MairaDB include @@ -81,15 +82,6 @@ static bool get_file_sizes(messageqcpp::MessageQueueClient *msgQueueClient, cons } } -static void cleanup(std::map &clients) -{ - for(std::map::iterator itr = clients.begin(); itr != clients.end(); itr++) - { - delete itr->second; - } -} - - static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) { BRM::DBRM *emp = new BRM::DBRM(); @@ -105,7 +97,6 @@ static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) off_t fileSize = 0; off_t compressedFileSize = 0; we_config.initConfigCache(); - std::map clients; messageqcpp::MessageQueueClient *msgQueueClient; oam::Oam oam_instance; int pmId = 0; @@ -143,37 +134,15 @@ static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str()); fileSize = compressedFileSize = 0; snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName); - try - { - msgQueueClient = clients.at(iter->dbRoot); - } - catch (...) - { - msgQueueClient = NULL; - } - if (!msgQueueClient) - { - oam_instance.getDbrootPmConfig(iter->dbRoot, pmId); - std::ostringstream oss; - oss << "pm" << pmId << "_WriteEngineServer"; - try - { - msgQueueClient = new messageqcpp::MessageQueueClient(oss.str()); - } - catch (...) - { - delete msgQueueClient; - cleanup(clients); - delete emp; - return 1; - } - clients[iter->dbRoot] = msgQueueClient; - } - - + oam_instance.getDbrootPmConfig(iter->dbRoot, pmId); + std::ostringstream oss; + oss << "pm" << pmId << "_WriteEngineServer"; + std::string client = oss.str(); + msgQueueClient = messageqcpp::MessageQueueClientPool::getInstance(oss.str()); + if (!get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize)) { - cleanup(clients); + messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); delete emp; return 1; } @@ -201,11 +170,13 @@ static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) if (schema_table_store_record(thd, table)) { - cleanup(clients); + messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); delete emp; return 1; } iter++; + messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); + msgQueueClient = NULL; } } delete emp; diff --git a/utils/messageqcpp/CMakeLists.txt b/utils/messageqcpp/CMakeLists.txt index 25c3fa251..5fc8a39ab 100644 --- a/utils/messageqcpp/CMakeLists.txt +++ b/utils/messageqcpp/CMakeLists.txt @@ -5,6 +5,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ) set(messageqcpp_LIB_SRCS messagequeue.cpp + messagequeuepool.cpp bytestream.cpp socketparms.cpp inetstreamsocket.cpp diff --git a/utils/messageqcpp/inetstreamsocket.cpp b/utils/messageqcpp/inetstreamsocket.cpp index bfbb291b3..e03cfe109 100644 --- a/utils/messageqcpp/inetstreamsocket.cpp +++ b/utils/messageqcpp/inetstreamsocket.cpp @@ -70,6 +70,7 @@ either expressed or implied, of the FreeBSD Project. #include #include #include +#include #endif #include #include @@ -1070,5 +1071,44 @@ int InetStreamSocket::ping(const std::string& ipaddr, const struct timespec* tim return 0; } +bool InetStreamSocket::isConnected() const +{ + int error = 0; + socklen_t len = sizeof(error); + int retval = getsockopt(fSocketParms.sd(), SOL_SOCKET, SO_ERROR, &error, &len); + + if (error || retval) + return false; + + struct pollfd pfd[1]; + pfd[0].fd = fSocketParms.sd(); + pfd[0].events = POLLIN; + pfd[0].revents = 0; + + error = poll(pfd, 1, 0); + if ((error < 0) || (pfd[0].revents & (POLLHUP | POLLNVAL | POLLERR))) { + return false; + } + + return true; +} + +bool InetStreamSocket::hasData() const +{ + int count; + char buf[1]; + ssize_t retval; + ioctl(fSocketParms.sd(), FIONREAD, &count); + if (count) + return true; + + // EAGAIN | EWOULDBLOCK means the socket is clear. Anything else is data or error + retval = recv(fSocketParms.sd(), buf, 1, MSG_DONTWAIT); + if (retval & (EAGAIN | EWOULDBLOCK)) + return false; + + return true; +} + } //namespace messageqcpp diff --git a/utils/messageqcpp/inetstreamsocket.h b/utils/messageqcpp/inetstreamsocket.h index 1080bde46..4029a1fb1 100644 --- a/utils/messageqcpp/inetstreamsocket.h +++ b/utils/messageqcpp/inetstreamsocket.h @@ -196,6 +196,13 @@ public: */ EXPORT static int ping(const std::string& ipaddr, const struct timespec* timeout=0); + // Check if we are still connected + virtual bool isConnected() const; + + // Check if the socket still has data pending + + virtual bool hasData() const; + /* * allow test suite access to private data for OOB test */ diff --git a/utils/messageqcpp/iosocket.h b/utils/messageqcpp/iosocket.h index a2a8a85d8..0e937e404 100644 --- a/utils/messageqcpp/iosocket.h +++ b/utils/messageqcpp/iosocket.h @@ -174,7 +174,9 @@ public: */ virtual void connectionTimeout(const struct timespec* timeout) { fSocket->connectionTimeout(timeout); } - + inline virtual bool isConnected() const; + inline virtual bool hasData() const; + friend class ::MessageQTestSuite; protected: @@ -208,6 +210,8 @@ inline const SocketParms IOSocket::socketParms() const { idbassert(fSocket); ret inline void IOSocket::socketParms(const SocketParms& socketParms) { idbassert(fSocket); fSocket->socketParms(socketParms); } inline void IOSocket::setSocketImpl(Socket* socket) { delete fSocket; fSocket = socket; } inline const int IOSocket::getConnectionNum() const { return fSocket->getConnectionNum(); } +inline bool IOSocket::isConnected() const { return fSocket->isConnected(); } +inline bool IOSocket::hasData() const { return fSocket->hasData(); } /** * stream an IOSocket rep to any ostream diff --git a/utils/messageqcpp/messagequeue.h b/utils/messageqcpp/messagequeue.h index a52d69883..8de4df398 100644 --- a/utils/messageqcpp/messagequeue.h +++ b/utils/messageqcpp/messagequeue.h @@ -249,6 +249,9 @@ public: */ inline const bool isSameAddr(const MessageQueueClient& rhs) const; + bool isConnected() { return fClientSock.isConnected(); } + + bool hasData() { return fClientSock.hasData(); } /* * allow test suite access to private data for OOB test */ diff --git a/utils/messageqcpp/messagequeuepool.cpp b/utils/messageqcpp/messagequeuepool.cpp new file mode 100644 index 000000000..5b8c9862c --- /dev/null +++ b/utils/messageqcpp/messagequeuepool.cpp @@ -0,0 +1,205 @@ +/* Copyright (C) 2017 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include +#include "messagequeuepool.h" +#include "messagequeue.h" + +namespace messageqcpp { + +boost::mutex queueMutex; +// Make linker happy +std::multimap MessageQueueClientPool::clientMap; + +// 300 seconds idle until cleanup +#define MAX_IDLE_TIME 300 + +static uint64_t TimeSpecToSeconds(struct timespec* ts) +{ + return (uint64_t)ts->tv_sec + (uint64_t)ts->tv_nsec / 1000000000; +} + +MessageQueueClient *MessageQueueClientPool::getInstance(const std::string &ip, uint64_t port) +{ + boost::mutex::scoped_lock lock(queueMutex); + + std::ostringstream oss; + oss << ip << "_" << port; + std::string searchString = oss.str(); + + MessageQueueClient *returnClient = MessageQueueClientPool::findInPool(searchString); + + // We found one, return it + if (returnClient != NULL) + { + return returnClient; + } + + // We didn't find one, create new one + ClientObject *newClientObject = new ClientObject(); + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + uint64_t nowSeconds = TimeSpecToSeconds(&now); + + newClientObject->client = new MessageQueueClient(ip, port); + newClientObject->inUse = true; + newClientObject->lastUsed = nowSeconds; + clientMap.insert(std::pair(searchString, newClientObject)); + return newClientObject->client; +} + +MessageQueueClient *MessageQueueClientPool::getInstance(const std::string &module) +{ + boost::mutex::scoped_lock lock(queueMutex); + + MessageQueueClient *returnClient = MessageQueueClientPool::findInPool(module); + + // We found one, return it + if (returnClient != NULL) + { + return returnClient; + } + + // We didn't find one, create new one + ClientObject *newClientObject = new ClientObject(); + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + uint64_t nowSeconds = TimeSpecToSeconds(&now); + + newClientObject->client = new MessageQueueClient(module); + newClientObject->inUse = true; + newClientObject->lastUsed = nowSeconds; + clientMap.insert(std::pair(module, newClientObject)); + return newClientObject->client; +} + +MessageQueueClient *MessageQueueClientPool::findInPool(const std::string &search) +{ + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + uint64_t nowSeconds = TimeSpecToSeconds(&now); + MessageQueueClient *returnClient = NULL; + + std::multimap::iterator it=clientMap.begin(); + + // Scan pool + while (it!=clientMap.end()) + { + ClientObject *clientObject = it->second; + uint64_t elapsedTime = nowSeconds - clientObject->lastUsed; + + // If connection hasn't been used for MAX_IDLE_TIME we probably don't need it so drop it + // Don't drop in use connections that have been in use a long time + if ((elapsedTime >= MAX_IDLE_TIME) && (!clientObject->inUse)) + { + delete clientObject->client; + delete clientObject; + // Do this so we don't invalidate current interator + std::multimap::iterator toDelete = it; + it++; + clientMap.erase(toDelete); + continue; + } + + if (!clientObject->inUse) + { + MessageQueueClient *client = clientObject->client; + + // If the unused socket isn't connected or has data pending read, destroy it + if (!client->isConnected() || client->hasData()) + { + delete client; + delete clientObject; + // Do this so we don't invalidate current interator + std::multimap::iterator toDelete = it; + it++; + clientMap.erase(toDelete); + continue; + } + + } + + // If connection matches store it for later, but keep scanning the pool for more timeout prunes + if (it->first.compare(search) == 0) + { + if ((returnClient == NULL) && (!clientObject->inUse)) + { + returnClient = clientObject->client; + clientObject->inUse = true; + return returnClient; + } + } + it++; + } + return NULL; +} + +void MessageQueueClientPool::releaseInstance(MessageQueueClient * client) +{ + // Scan pool for pointer and release + // Set the last used and mark as not in use + + if (client == NULL) + return; + + boost::mutex::scoped_lock lock(queueMutex); + std::multimap::iterator it=clientMap.begin(); + + while (it!=clientMap.end()) + { + if (it->second->client == client) + { + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + uint64_t nowSeconds = TimeSpecToSeconds(&now); + it->second->inUse = false; + it->second->lastUsed = nowSeconds; + return; + } + it++; + } +} + +// WriteEngine needs this as it forces connections closed and can't reuse. Also good for connection errors +void MessageQueueClientPool::deleteInstance(MessageQueueClient * client) +{ + // Scan pool for pointer and delete + // Set the last used and mark as not in use + + if (client == NULL) + return; + + boost::mutex::scoped_lock lock(queueMutex); + std::multimap::iterator it=clientMap.begin(); + + while (it!=clientMap.end()) + { + if (it->second->client == client) + { + delete it->second->client; + delete it->second; + clientMap.erase(it); + return; + } + it++; + } + +} +} diff --git a/utils/messageqcpp/messagequeuepool.h b/utils/messageqcpp/messagequeuepool.h new file mode 100644 index 000000000..fc5576203 --- /dev/null +++ b/utils/messageqcpp/messagequeuepool.h @@ -0,0 +1,57 @@ +/* Copyright (C) 2017 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#ifndef MESSAGEQCPP_MESSAGEQUEUECLIENT_H +#define MESSAGEQCPP_MESSAGEQUEUECLIENT_H + +#include +#include "messagequeue.h" + +namespace messageqcpp { + + +struct ClientObject +{ + MessageQueueClient *client; + uint64_t lastUsed; + bool inUse; + + ClientObject() : + client(NULL), + lastUsed(0), + inUse(false) + {} +}; + +class MessageQueueClientPool +{ + public: + static MessageQueueClient *getInstance(const std::string &module); + static MessageQueueClient *getInstance(const std::string &ip, uint64_t port); + static void releaseInstance(MessageQueueClient * client); + static void deleteInstance(MessageQueueClient * client); + static MessageQueueClient *findInPool(const std::string &search); + + private: + MessageQueueClientPool() { }; + ~MessageQueueClientPool() { }; + + static std::multimap clientMap; +}; + +} +#endif //MESSAGEQCPP_MESSAGEQUEUECLIENT_H diff --git a/utils/messageqcpp/socket.h b/utils/messageqcpp/socket.h index f77304fc9..2c553c05b 100644 --- a/utils/messageqcpp/socket.h +++ b/utils/messageqcpp/socket.h @@ -158,6 +158,9 @@ public: */ virtual const bool isSameAddr(const Socket* rhs) const = 0; + virtual bool isConnected() const = 0; + virtual bool hasData() const = 0; + /* * allow test suite access to private data for OOB test */ diff --git a/versioning/BRM/dbrm.cpp b/versioning/BRM/dbrm.cpp index 47f02dd62..aa452d39f 100644 --- a/versioning/BRM/dbrm.cpp +++ b/versioning/BRM/dbrm.cpp @@ -40,6 +40,7 @@ #include "socketclosed.h" #include "configcpp.h" #include "sessionmanagerserver.h" +#include "messagequeuepool.h" #define DBRM_DLLEXPORT #include "dbrm.h" #undef DBRM_DLLEXPORT @@ -53,7 +54,7 @@ #endif #define DO_ERR_NETWORK \ - delete msgClient; \ + MessageQueueClientPool::releaseInstance(msgClient); \ msgClient = NULL; \ mutex.unlock(); \ return ERR_NETWORK; @@ -97,8 +98,7 @@ DBRM::DBRM(const DBRM& brm) DBRM::~DBRM() throw() { if (msgClient != NULL) - msgClient->shutdown(); - delete msgClient; + MessageQueueClientPool::releaseInstance(msgClient); } DBRM& DBRM::operator=(const DBRM& brm) @@ -742,7 +742,7 @@ reconnect: if (msgClient == NULL) try { - msgClient = new MessageQueueClient(masterName); + msgClient = MessageQueueClientPool::getInstance(masterName); } catch(exception &e) { cerr << "class DBRM failed to create a MessageQueueClient: " << @@ -766,7 +766,7 @@ reconnect: cerr << "DBRM::send_recv caught: " << e.what() << endl; if (firstAttempt) { firstAttempt = false; - delete msgClient; + MessageQueueClientPool::releaseInstance(msgClient); msgClient = NULL; goto reconnect; } @@ -776,7 +776,7 @@ reconnect: cerr << "DBRM::send_recv: controller node closed the connection" << endl; if (firstAttempt) { firstAttempt = false; - delete msgClient; + MessageQueueClientPool::releaseInstance(msgClient); msgClient = NULL; sleep(10); goto reconnect; @@ -3222,7 +3222,7 @@ bool DBRM::isDBRMReady() throw() { if (msgClient == NULL) { - msgClient = new MessageQueueClient(masterName); + msgClient = MessageQueueClientPool::getInstance(masterName); } if (msgClient->connect()) { @@ -3232,7 +3232,7 @@ bool DBRM::isDBRMReady() throw() catch (...) { } - delete msgClient; + MessageQueueClientPool::releaseInstance(msgClient); msgClient = NULL; sleep(1); } diff --git a/versioning/BRM/masterdbrmnode.cpp b/versioning/BRM/masterdbrmnode.cpp index 72900ee0a..46f49906f 100644 --- a/versioning/BRM/masterdbrmnode.cpp +++ b/versioning/BRM/masterdbrmnode.cpp @@ -32,7 +32,7 @@ #include "liboamcpp.h" #include "stopwatch.h" #include "masterdbrmnode.h" - +#include "messagequeuepool.h" // #define BRM_VERBOSE // minor improvement to code clarity... @@ -163,7 +163,8 @@ void MasterDBRMNode::initMsgQueues(config::Config *config) serverLock.unlock(); for (i = 1; i <= NumWorkers; i++) { snprintf(ctmp, 50, "DBRM_Worker%d", i); - slaves.push_back(new MessageQueueClient(ctmp, config)); + std::string module(ctmp); + slaves.push_back(MessageQueueClientPool::getInstance(module)); } } @@ -882,8 +883,8 @@ void MasterDBRMNode::finalCleanup() cerr << "Closing connections" << endl; #endif for (sIt = slaves.begin(); sIt != slaves.end(); sIt++) { - (*sIt)->shutdown(); - delete *sIt; + MessageQueueClientPool::releaseInstance(*sIt); + *sIt = NULL; } slaves.clear(); @@ -1036,8 +1037,8 @@ void MasterDBRMNode::doReload(messageqcpp::IOSocket *sock) } for (i = 0; i < (int) slaves.size(); i++) { - slaves[i]->shutdown(); - delete slaves[i]; + MessageQueueClientPool::releaseInstance(slaves[i]); + slaves[i] = NULL; } slaves.clear(); @@ -1045,7 +1046,8 @@ void MasterDBRMNode::doReload(messageqcpp::IOSocket *sock) for (i = 1; i <= NumWorkers; i++) { snprintf(ctmp, 50, "DBRM_Worker%d", i); - slaves.push_back(new MessageQueueClient(ctmp, config)); + std::string module(ctmp); + slaves.push_back(MessageQueueClientPool::getInstance(module)); } iSlave = slaves.end();