You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-454 I_S.COLUMNSTORE_FILES multi-node
I_S.COLUMNSTORE_FILES returned bad filenames and NULL file sizes when there are multiple nodes in a ColumnStore cluster It adds an extra message call to the WriteEngine to get the file size for that file. The I_S function will figure out which WriteEngine to communicate with and get the file size details from it.
This commit is contained in:
@ -27,8 +27,13 @@
|
|||||||
#include "we_convertor.h"
|
#include "we_convertor.h"
|
||||||
#include "we_define.h"
|
#include "we_define.h"
|
||||||
#include "IDBPolicy.h"
|
#include "IDBPolicy.h"
|
||||||
|
#include "configcpp.h"
|
||||||
#include "we_config.h"
|
#include "we_config.h"
|
||||||
#include "we_brm.h"
|
#include "we_brm.h"
|
||||||
|
#include "bytestream.h"
|
||||||
|
#include "liboamcpp.h"
|
||||||
|
#include "messagequeue.h"
|
||||||
|
#include "we_messages.h"
|
||||||
|
|
||||||
// Required declaration as it isn't in a MairaDB include
|
// Required declaration as it isn't in a MairaDB include
|
||||||
bool schema_table_store_record(THD *thd, TABLE *table);
|
bool schema_table_store_record(THD *thd, TABLE *table);
|
||||||
@ -44,6 +49,46 @@ ST_FIELD_INFO is_columnstore_files_fields[] =
|
|||||||
{0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0}
|
{0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static bool get_file_sizes(int db_root, const char *fileName, off_t *fileSize, off_t *compressedFileSize)
|
||||||
|
{
|
||||||
|
oam::Oam oam_instance;
|
||||||
|
messageqcpp::MessageQueueClient *msgQueueClient;
|
||||||
|
std::ostringstream oss;
|
||||||
|
messageqcpp::ByteStream bs;
|
||||||
|
messageqcpp::ByteStream::byte rc;
|
||||||
|
std::string errMsg;
|
||||||
|
int pmId = 0;
|
||||||
|
|
||||||
|
oam_instance.getDbrootPmConfig(db_root, pmId);
|
||||||
|
oss << "pm" << pmId << "_WriteEngineServer";
|
||||||
|
try
|
||||||
|
{
|
||||||
|
msgQueueClient = new messageqcpp::MessageQueueClient(oss.str());
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
delete msgQueueClient;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
bs << (messageqcpp::ByteStream::byte) WriteEngine::WE_SVR_GET_FILESIZE;
|
||||||
|
// header??
|
||||||
|
bs << fileName;
|
||||||
|
msgQueueClient->write(bs);
|
||||||
|
// namespace??
|
||||||
|
messageqcpp::SBS sbs;
|
||||||
|
sbs = msgQueueClient->read();
|
||||||
|
if (sbs->length() == 0)
|
||||||
|
{
|
||||||
|
delete msgQueueClient;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
*sbs >> rc;
|
||||||
|
*sbs >> errMsg;
|
||||||
|
*sbs >> *fileSize;
|
||||||
|
*sbs >> *compressedFileSize;
|
||||||
|
delete msgQueueClient;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM::OID_t oid, std::vector<struct BRM::EMEntry> &entries)
|
static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM::OID_t oid, std::vector<struct BRM::EMEntry> &entries)
|
||||||
{
|
{
|
||||||
@ -53,8 +98,12 @@ static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM::
|
|||||||
char oidDirName[WriteEngine::FILE_NAME_SIZE];
|
char oidDirName[WriteEngine::FILE_NAME_SIZE];
|
||||||
char fullFileName[WriteEngine::FILE_NAME_SIZE];
|
char fullFileName[WriteEngine::FILE_NAME_SIZE];
|
||||||
char dbDir[WriteEngine::MAX_DB_DIR_LEVEL][WriteEngine::MAX_DB_DIR_NAME_SIZE];
|
char dbDir[WriteEngine::MAX_DB_DIR_LEVEL][WriteEngine::MAX_DB_DIR_NAME_SIZE];
|
||||||
WriteEngine::Config config;
|
config::Config* config = config::Config::makeConfig();
|
||||||
config.initConfigCache();
|
WriteEngine::Config we_config;
|
||||||
|
off_t fileSize = 0;
|
||||||
|
off_t compressedFileSize = 0;
|
||||||
|
we_config.initConfigCache();
|
||||||
|
|
||||||
|
|
||||||
std::vector<struct BRM::EMEntry>::const_iterator iter = entries.begin();
|
std::vector<struct BRM::EMEntry>::const_iterator iter = entries.begin();
|
||||||
while ( iter != entries.end() ) //organize extents into files
|
while ( iter != entries.end() ) //organize extents into files
|
||||||
@ -70,18 +119,25 @@ static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM::
|
|||||||
table->field[2]->store(iter->partitionNum);
|
table->field[2]->store(iter->partitionNum);
|
||||||
|
|
||||||
WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum);
|
WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum);
|
||||||
snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", WriteEngine::Config::getDBRootByNum(iter->dbRoot).c_str(), oidDirName);
|
std::stringstream DbRootName;
|
||||||
|
DbRootName << "DBRoot" << iter->dbRoot;
|
||||||
|
std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str());
|
||||||
|
fileSize = compressedFileSize = 0;
|
||||||
|
snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName);
|
||||||
|
if (!get_file_sizes(iter->dbRoot, fullFileName, &fileSize, &compressedFileSize))
|
||||||
|
{
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
table->field[3]->store(fullFileName, strlen(fullFileName), cs);
|
table->field[3]->store(fullFileName, strlen(fullFileName), cs);
|
||||||
|
|
||||||
if (idbdatafile::IDBPolicy::exists(fullFileName))
|
if (fileSize > 0)
|
||||||
{
|
{
|
||||||
table->field[4]->set_notnull();
|
table->field[4]->set_notnull();
|
||||||
table->field[4]->store(idbdatafile::IDBPolicy::size(fullFileName));
|
table->field[4]->store(fileSize);
|
||||||
off64_t comp_size;
|
if (compressedFileSize > 0)
|
||||||
if ((comp_size = idbdatafile::IDBPolicy::compressedSize(fullFileName)) != -1)
|
|
||||||
{
|
{
|
||||||
table->field[5]->set_notnull();
|
table->field[5]->set_notnull();
|
||||||
table->field[5]->store(comp_size);
|
table->field[5]->store(compressedFileSize);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -245,6 +245,38 @@ struct ColumnThread
|
|||||||
bool fReportRealUse;
|
bool fReportRealUse;
|
||||||
int fKey;
|
int fKey;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
// Get file size from file name in bytestream object
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
int WE_GetFileSizes::processFileName(
|
||||||
|
messageqcpp::ByteStream& bs,
|
||||||
|
std::string& errMsg, int key)
|
||||||
|
{
|
||||||
|
uint8_t rc = 0;
|
||||||
|
off_t fileSize;
|
||||||
|
off_t compressedFileSize;
|
||||||
|
errMsg.clear();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
std::string fileName;
|
||||||
|
|
||||||
|
bs >> fileName;
|
||||||
|
fileSize = IDBPolicy::size(fileName.c_str());
|
||||||
|
compressedFileSize = IDBPolicy::compressedSize(fileName.c_str());
|
||||||
|
}
|
||||||
|
catch(std::exception& ex)
|
||||||
|
{
|
||||||
|
errMsg = ex.what();
|
||||||
|
rc = 1;
|
||||||
|
}
|
||||||
|
bs.reset();
|
||||||
|
bs << fileSize;
|
||||||
|
bs << compressedFileSize;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
// Process a table size based on input from the
|
// Process a table size based on input from the
|
||||||
// bytestream object.
|
// bytestream object.
|
||||||
|
@ -38,7 +38,7 @@ class WE_GetFileSizes
|
|||||||
public:
|
public:
|
||||||
|
|
||||||
static int processTable(messageqcpp::ByteStream& bs, std::string& errMsg, int key);
|
static int processTable(messageqcpp::ByteStream& bs, std::string& errMsg, int key);
|
||||||
|
static int processFileName(messageqcpp::ByteStream& bs, std::string& errMsg, int key);
|
||||||
};
|
};
|
||||||
|
|
||||||
class ActiveThreadCounter
|
class ActiveThreadCounter
|
||||||
|
@ -76,6 +76,7 @@ enum ServerMessages
|
|||||||
WE_SVR_REDISTRIBUTE,
|
WE_SVR_REDISTRIBUTE,
|
||||||
WE_SVR_CLOSE_CONNECTION,
|
WE_SVR_CLOSE_CONNECTION,
|
||||||
WE_SVR_GET_FILESIZES,
|
WE_SVR_GET_FILESIZES,
|
||||||
|
WE_SVR_GET_FILESIZE,
|
||||||
WE_SVR_PURGEFD,
|
WE_SVR_PURGEFD,
|
||||||
WE_END_TRANSACTION,
|
WE_END_TRANSACTION,
|
||||||
WE_SRV_FIX_ROWS,
|
WE_SRV_FIX_ROWS,
|
||||||
|
@ -733,6 +733,11 @@ void GetFileSizeThread::operator()()
|
|||||||
rc = fWeGetFileSizes->processTable(fIbs, errMsg, key);
|
rc = fWeGetFileSizes->processTable(fIbs, errMsg, key);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case WE_SVR_GET_FILESIZE:
|
||||||
|
{
|
||||||
|
rc = fWeGetFileSizes->processFileName(fIbs, errMsg, key);
|
||||||
|
break;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
@ -845,6 +850,7 @@ void ReadThreadFactory::CreateReadThread(ThreadPool& Tp, IOSocket& Ios, BRM::DBR
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case WE_SVR_GET_FILESIZES:
|
case WE_SVR_GET_FILESIZES:
|
||||||
|
case WE_SVR_GET_FILESIZE:
|
||||||
{
|
{
|
||||||
GetFileSizeThread getFileSizeThread(Ios, aBs, dbrm);
|
GetFileSizeThread getFileSizeThread(Ios, aBs, dbrm);
|
||||||
Tp.invoke(getFileSizeThread);
|
Tp.invoke(getFileSizeThread);
|
||||||
|
Reference in New Issue
Block a user