From 025838629b619760f8405986a101ac0c56f33869 Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Wed, 14 Dec 2016 16:55:03 +0000 Subject: [PATCH] MCOL-454 I_S.COLUMNSTORE_FILES multi-node I_S.COLUMNSTORE_FILES returned bad filenames and NULL file sizes when there are multiple nodes in a ColumnStore cluster It adds an extra message call to the WriteEngine to get the file size for that file. The I_S function will figure out which WriteEngine to communicate with and get the file size details from it. --- dbcon/mysql/is_columnstore_files.cpp | 72 +++++++++++++++++++++++--- writeengine/server/we_getfilesizes.cpp | 32 ++++++++++++ writeengine/server/we_getfilesizes.h | 2 +- writeengine/server/we_messages.h | 1 + writeengine/server/we_readthread.cpp | 6 +++ 5 files changed, 104 insertions(+), 9 deletions(-) diff --git a/dbcon/mysql/is_columnstore_files.cpp b/dbcon/mysql/is_columnstore_files.cpp index b39be0ee6..664f06e38 100644 --- a/dbcon/mysql/is_columnstore_files.cpp +++ b/dbcon/mysql/is_columnstore_files.cpp @@ -27,8 +27,13 @@ #include "we_convertor.h" #include "we_define.h" #include "IDBPolicy.h" +#include "configcpp.h" #include "we_config.h" #include "we_brm.h" +#include "bytestream.h" +#include "liboamcpp.h" +#include "messagequeue.h" +#include "we_messages.h" // Required declaration as it isn't in a MairaDB include bool schema_table_store_record(THD *thd, TABLE *table); @@ -44,6 +49,46 @@ ST_FIELD_INFO is_columnstore_files_fields[] = {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0} }; +static bool get_file_sizes(int db_root, const char *fileName, off_t *fileSize, off_t *compressedFileSize) +{ + oam::Oam oam_instance; + messageqcpp::MessageQueueClient *msgQueueClient; + std::ostringstream oss; + messageqcpp::ByteStream bs; + messageqcpp::ByteStream::byte rc; + std::string errMsg; + int pmId = 0; + + oam_instance.getDbrootPmConfig(db_root, pmId); + oss << "pm" << pmId << "_WriteEngineServer"; + try + { + msgQueueClient = new messageqcpp::MessageQueueClient(oss.str()); + } + catch (...) + { + delete msgQueueClient; + return false; + } + bs << (messageqcpp::ByteStream::byte) WriteEngine::WE_SVR_GET_FILESIZE; + // header?? + bs << fileName; + msgQueueClient->write(bs); + // namespace?? + messageqcpp::SBS sbs; + sbs = msgQueueClient->read(); + if (sbs->length() == 0) + { + delete msgQueueClient; + return false; + } + *sbs >> rc; + *sbs >> errMsg; + *sbs >> *fileSize; + *sbs >> *compressedFileSize; + delete msgQueueClient; + return true; +} static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM::OID_t oid, std::vector &entries) { @@ -53,8 +98,12 @@ static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM:: char oidDirName[WriteEngine::FILE_NAME_SIZE]; char fullFileName[WriteEngine::FILE_NAME_SIZE]; char dbDir[WriteEngine::MAX_DB_DIR_LEVEL][WriteEngine::MAX_DB_DIR_NAME_SIZE]; - WriteEngine::Config config; - config.initConfigCache(); + config::Config* config = config::Config::makeConfig(); + WriteEngine::Config we_config; + off_t fileSize = 0; + off_t compressedFileSize = 0; + we_config.initConfigCache(); + std::vector::const_iterator iter = entries.begin(); while ( iter != entries.end() ) //organize extents into files @@ -70,18 +119,25 @@ static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM:: table->field[2]->store(iter->partitionNum); WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum); - snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", WriteEngine::Config::getDBRootByNum(iter->dbRoot).c_str(), oidDirName); + std::stringstream DbRootName; + DbRootName << "DBRoot" << iter->dbRoot; + std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str()); + fileSize = compressedFileSize = 0; + snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName); + if (!get_file_sizes(iter->dbRoot, fullFileName, &fileSize, &compressedFileSize)) + { + return 1; + } table->field[3]->store(fullFileName, strlen(fullFileName), cs); - if (idbdatafile::IDBPolicy::exists(fullFileName)) + if (fileSize > 0) { table->field[4]->set_notnull(); - table->field[4]->store(idbdatafile::IDBPolicy::size(fullFileName)); - off64_t comp_size; - if ((comp_size = idbdatafile::IDBPolicy::compressedSize(fullFileName)) != -1) + table->field[4]->store(fileSize); + if (compressedFileSize > 0) { table->field[5]->set_notnull(); - table->field[5]->store(comp_size); + table->field[5]->store(compressedFileSize); } else { diff --git a/writeengine/server/we_getfilesizes.cpp b/writeengine/server/we_getfilesizes.cpp index a9f92f46c..e2bcc87e5 100644 --- a/writeengine/server/we_getfilesizes.cpp +++ b/writeengine/server/we_getfilesizes.cpp @@ -245,6 +245,38 @@ struct ColumnThread bool fReportRealUse; int fKey; }; + +//------------------------------------------------------------------------------ +// Get file size from file name in bytestream object +//------------------------------------------------------------------------------ +int WE_GetFileSizes::processFileName( + messageqcpp::ByteStream& bs, + std::string& errMsg, int key) +{ + uint8_t rc = 0; + off_t fileSize; + off_t compressedFileSize; + errMsg.clear(); + + try + { + std::string fileName; + + bs >> fileName; + fileSize = IDBPolicy::size(fileName.c_str()); + compressedFileSize = IDBPolicy::compressedSize(fileName.c_str()); + } + catch(std::exception& ex) + { + errMsg = ex.what(); + rc = 1; + } + bs.reset(); + bs << fileSize; + bs << compressedFileSize; + return rc; +} + //------------------------------------------------------------------------------ // Process a table size based on input from the // bytestream object. diff --git a/writeengine/server/we_getfilesizes.h b/writeengine/server/we_getfilesizes.h index 4b2a2407c..037395b9f 100644 --- a/writeengine/server/we_getfilesizes.h +++ b/writeengine/server/we_getfilesizes.h @@ -38,7 +38,7 @@ class WE_GetFileSizes public: static int processTable(messageqcpp::ByteStream& bs, std::string& errMsg, int key); - + static int processFileName(messageqcpp::ByteStream& bs, std::string& errMsg, int key); }; class ActiveThreadCounter diff --git a/writeengine/server/we_messages.h b/writeengine/server/we_messages.h index bda39e981..afd769049 100644 --- a/writeengine/server/we_messages.h +++ b/writeengine/server/we_messages.h @@ -76,6 +76,7 @@ enum ServerMessages WE_SVR_REDISTRIBUTE, WE_SVR_CLOSE_CONNECTION, WE_SVR_GET_FILESIZES, + WE_SVR_GET_FILESIZE, WE_SVR_PURGEFD, WE_END_TRANSACTION, WE_SRV_FIX_ROWS, diff --git a/writeengine/server/we_readthread.cpp b/writeengine/server/we_readthread.cpp index dd3db617a..67d765511 100644 --- a/writeengine/server/we_readthread.cpp +++ b/writeengine/server/we_readthread.cpp @@ -733,6 +733,11 @@ void GetFileSizeThread::operator()() rc = fWeGetFileSizes->processTable(fIbs, errMsg, key); break; } + case WE_SVR_GET_FILESIZE: + { + rc = fWeGetFileSizes->processFileName(fIbs, errMsg, key); + break; + } default: { break; @@ -845,6 +850,7 @@ void ReadThreadFactory::CreateReadThread(ThreadPool& Tp, IOSocket& Ios, BRM::DBR } break; case WE_SVR_GET_FILESIZES: + case WE_SVR_GET_FILESIZE: { GetFileSizeThread getFileSizeThread(Ios, aBs, dbrm); Tp.invoke(getFileSizeThread);