From 468fc8c260e9621cf931736dece10851d44e954d Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Sun, 5 Mar 2017 09:58:20 +0000 Subject: [PATCH] MCOL-605 limit connections for I_S table I_S.COLUMNSTORE_FILES now caches a connection per dbroot instead of using a connection per file. --- dbcon/mysql/is_columnstore_files.cpp | 206 +++++++++++++++------------ 1 file changed, 115 insertions(+), 91 deletions(-) diff --git a/dbcon/mysql/is_columnstore_files.cpp b/dbcon/mysql/is_columnstore_files.cpp index 664f06e38..f70e646e0 100644 --- a/dbcon/mysql/is_columnstore_files.cpp +++ b/dbcon/mysql/is_columnstore_files.cpp @@ -49,49 +49,51 @@ ST_FIELD_INFO is_columnstore_files_fields[] = {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0} }; -static bool get_file_sizes(int db_root, const char *fileName, off_t *fileSize, off_t *compressedFileSize) +static bool get_file_sizes(messageqcpp::MessageQueueClient *msgQueueClient, const char *fileName, off_t *fileSize, off_t *compressedFileSize) { - oam::Oam oam_instance; - messageqcpp::MessageQueueClient *msgQueueClient; - std::ostringstream oss; messageqcpp::ByteStream bs; messageqcpp::ByteStream::byte rc; std::string errMsg; - int pmId = 0; - oam_instance.getDbrootPmConfig(db_root, pmId); - oss << "pm" << pmId << "_WriteEngineServer"; try { - msgQueueClient = new messageqcpp::MessageQueueClient(oss.str()); + bs << (messageqcpp::ByteStream::byte) WriteEngine::WE_SVR_GET_FILESIZE; + // header?? + bs << fileName; + msgQueueClient->write(bs); + // namespace?? + messageqcpp::SBS sbs; + sbs = msgQueueClient->read(); + if (sbs->length() == 0) + { + delete msgQueueClient; + return false; + } + *sbs >> rc; + *sbs >> errMsg; + *sbs >> *fileSize; + *sbs >> *compressedFileSize; + return true; } catch (...) { - delete msgQueueClient; return false; } - bs << (messageqcpp::ByteStream::byte) WriteEngine::WE_SVR_GET_FILESIZE; - // header?? - bs << fileName; - msgQueueClient->write(bs); - // namespace?? - messageqcpp::SBS sbs; - sbs = msgQueueClient->read(); - if (sbs->length() == 0) - { - delete msgQueueClient; - return false; - } - *sbs >> rc; - *sbs >> errMsg; - *sbs >> *fileSize; - *sbs >> *compressedFileSize; - delete msgQueueClient; - return true; } -static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM::OID_t oid, std::vector &entries) +static void cleanup(std::map &clients) { + for(std::map::iterator itr = clients.begin(); itr != clients.end(); itr++) + { + delete itr->second; + } +} + + +static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) +{ + BRM::DBRM *emp = new BRM::DBRM(); + std::vector entries; CHARSET_INFO *cs = system_charset_info; TABLE *table = tables->table; @@ -103,64 +105,11 @@ static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM:: off_t fileSize = 0; off_t compressedFileSize = 0; we_config.initConfigCache(); - - - std::vector::const_iterator iter = entries.begin(); - while ( iter != entries.end() ) //organize extents into files - { - // Don't include files more than once at different block offsets - if (iter->blockOffset > 0) - { - iter++; - continue; - } - table->field[0]->store(oid); - table->field[1]->store(iter->segmentNum); - table->field[2]->store(iter->partitionNum); - - WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum); - std::stringstream DbRootName; - DbRootName << "DBRoot" << iter->dbRoot; - std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str()); - fileSize = compressedFileSize = 0; - snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName); - if (!get_file_sizes(iter->dbRoot, fullFileName, &fileSize, &compressedFileSize)) - { - return 1; - } - table->field[3]->store(fullFileName, strlen(fullFileName), cs); - - if (fileSize > 0) - { - table->field[4]->set_notnull(); - table->field[4]->store(fileSize); - if (compressedFileSize > 0) - { - table->field[5]->set_notnull(); - table->field[5]->store(compressedFileSize); - } - else - { - table->field[5]->set_null(); - } - } - else - { - table->field[4]->set_null(); - table->field[5]->set_null(); - } - - if (schema_table_store_record(thd, table)) - return 1; - iter++; - } - return 0; -} - -static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) -{ - BRM::DBRM *emp = new BRM::DBRM(); - std::vector entries; + std::map clients; + messageqcpp::MessageQueueClient *msgQueueClient; + oam::Oam oam_instance; + int pmId = 0; + std::ostringstream oss; if (!emp || !emp->isDBRMReady()) { @@ -176,14 +125,89 @@ static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) if (entries.size() == 0) continue; - if (is_columnstore_files_get_entries(thd, tables, oid, entries)) + std::vector::const_iterator iter = entries.begin(); + while ( iter != entries.end() ) //organize extents into files { - delete emp; - return 1; + // Don't include files more than once at different block offsets + if (iter->blockOffset > 0) + { + iter++; + continue; + } + table->field[0]->store(oid); + table->field[1]->store(iter->segmentNum); + table->field[2]->store(iter->partitionNum); + + WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum); + std::stringstream DbRootName; + DbRootName << "DBRoot" << iter->dbRoot; + std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str()); + fileSize = compressedFileSize = 0; + snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName); + try + { + msgQueueClient = clients.at(iter->dbRoot); + } + catch (...) + { + msgQueueClient = NULL; + } + if (!msgQueueClient) + { + oam_instance.getDbrootPmConfig(iter->dbRoot, pmId); + oss << "pm" << pmId << "_WriteEngineServer"; + try + { + msgQueueClient = new messageqcpp::MessageQueueClient(oss.str()); + } + catch (...) + { + delete msgQueueClient; + cleanup(clients); + delete emp; + return 1; + } + clients[iter->dbRoot] = msgQueueClient; + } + + + if (!get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize)) + { + cleanup(clients); + delete emp; + return 1; + } + table->field[3]->store(fullFileName, strlen(fullFileName), cs); + + if (fileSize > 0) + { + table->field[4]->set_notnull(); + table->field[4]->store(fileSize); + if (compressedFileSize > 0) + { + table->field[5]->set_notnull(); + table->field[5]->store(compressedFileSize); + } + else + { + table->field[5]->set_null(); + } + } + else + { + table->field[4]->set_null(); + table->field[5]->set_null(); + } + + if (schema_table_store_record(thd, table)) + { + cleanup(clients); + delete emp; + return 1; + } + iter++; } - } - delete emp; return 0; }