You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-605 limit connections for I_S table
I_S.COLUMNSTORE_FILES now caches a connection per dbroot instead of using a connection per file.
This commit is contained in:
@ -49,49 +49,51 @@ ST_FIELD_INFO is_columnstore_files_fields[] =
|
|||||||
{0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0}
|
{0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0}
|
||||||
};
|
};
|
||||||
|
|
||||||
static bool get_file_sizes(int db_root, const char *fileName, off_t *fileSize, off_t *compressedFileSize)
|
static bool get_file_sizes(messageqcpp::MessageQueueClient *msgQueueClient, const char *fileName, off_t *fileSize, off_t *compressedFileSize)
|
||||||
{
|
{
|
||||||
oam::Oam oam_instance;
|
|
||||||
messageqcpp::MessageQueueClient *msgQueueClient;
|
|
||||||
std::ostringstream oss;
|
|
||||||
messageqcpp::ByteStream bs;
|
messageqcpp::ByteStream bs;
|
||||||
messageqcpp::ByteStream::byte rc;
|
messageqcpp::ByteStream::byte rc;
|
||||||
std::string errMsg;
|
std::string errMsg;
|
||||||
int pmId = 0;
|
|
||||||
|
|
||||||
oam_instance.getDbrootPmConfig(db_root, pmId);
|
|
||||||
oss << "pm" << pmId << "_WriteEngineServer";
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
msgQueueClient = new messageqcpp::MessageQueueClient(oss.str());
|
bs << (messageqcpp::ByteStream::byte) WriteEngine::WE_SVR_GET_FILESIZE;
|
||||||
|
// header??
|
||||||
|
bs << fileName;
|
||||||
|
msgQueueClient->write(bs);
|
||||||
|
// namespace??
|
||||||
|
messageqcpp::SBS sbs;
|
||||||
|
sbs = msgQueueClient->read();
|
||||||
|
if (sbs->length() == 0)
|
||||||
|
{
|
||||||
|
delete msgQueueClient;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
*sbs >> rc;
|
||||||
|
*sbs >> errMsg;
|
||||||
|
*sbs >> *fileSize;
|
||||||
|
*sbs >> *compressedFileSize;
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
delete msgQueueClient;
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
bs << (messageqcpp::ByteStream::byte) WriteEngine::WE_SVR_GET_FILESIZE;
|
|
||||||
// header??
|
|
||||||
bs << fileName;
|
|
||||||
msgQueueClient->write(bs);
|
|
||||||
// namespace??
|
|
||||||
messageqcpp::SBS sbs;
|
|
||||||
sbs = msgQueueClient->read();
|
|
||||||
if (sbs->length() == 0)
|
|
||||||
{
|
|
||||||
delete msgQueueClient;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
*sbs >> rc;
|
|
||||||
*sbs >> errMsg;
|
|
||||||
*sbs >> *fileSize;
|
|
||||||
*sbs >> *compressedFileSize;
|
|
||||||
delete msgQueueClient;
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM::OID_t oid, std::vector<struct BRM::EMEntry> &entries)
|
static void cleanup(std::map<int, messageqcpp::MessageQueueClient*> &clients)
|
||||||
{
|
{
|
||||||
|
for(std::map<int, messageqcpp::MessageQueueClient*>::iterator itr = clients.begin(); itr != clients.end(); itr++)
|
||||||
|
{
|
||||||
|
delete itr->second;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond)
|
||||||
|
{
|
||||||
|
BRM::DBRM *emp = new BRM::DBRM();
|
||||||
|
std::vector<struct BRM::EMEntry> entries;
|
||||||
CHARSET_INFO *cs = system_charset_info;
|
CHARSET_INFO *cs = system_charset_info;
|
||||||
TABLE *table = tables->table;
|
TABLE *table = tables->table;
|
||||||
|
|
||||||
@ -103,64 +105,11 @@ static bool is_columnstore_files_get_entries(THD *thd, TABLE_LIST *tables, BRM::
|
|||||||
off_t fileSize = 0;
|
off_t fileSize = 0;
|
||||||
off_t compressedFileSize = 0;
|
off_t compressedFileSize = 0;
|
||||||
we_config.initConfigCache();
|
we_config.initConfigCache();
|
||||||
|
std::map<int, messageqcpp::MessageQueueClient*> clients;
|
||||||
|
messageqcpp::MessageQueueClient *msgQueueClient;
|
||||||
std::vector<struct BRM::EMEntry>::const_iterator iter = entries.begin();
|
oam::Oam oam_instance;
|
||||||
while ( iter != entries.end() ) //organize extents into files
|
int pmId = 0;
|
||||||
{
|
std::ostringstream oss;
|
||||||
// Don't include files more than once at different block offsets
|
|
||||||
if (iter->blockOffset > 0)
|
|
||||||
{
|
|
||||||
iter++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
table->field[0]->store(oid);
|
|
||||||
table->field[1]->store(iter->segmentNum);
|
|
||||||
table->field[2]->store(iter->partitionNum);
|
|
||||||
|
|
||||||
WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum);
|
|
||||||
std::stringstream DbRootName;
|
|
||||||
DbRootName << "DBRoot" << iter->dbRoot;
|
|
||||||
std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str());
|
|
||||||
fileSize = compressedFileSize = 0;
|
|
||||||
snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName);
|
|
||||||
if (!get_file_sizes(iter->dbRoot, fullFileName, &fileSize, &compressedFileSize))
|
|
||||||
{
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
table->field[3]->store(fullFileName, strlen(fullFileName), cs);
|
|
||||||
|
|
||||||
if (fileSize > 0)
|
|
||||||
{
|
|
||||||
table->field[4]->set_notnull();
|
|
||||||
table->field[4]->store(fileSize);
|
|
||||||
if (compressedFileSize > 0)
|
|
||||||
{
|
|
||||||
table->field[5]->set_notnull();
|
|
||||||
table->field[5]->store(compressedFileSize);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
table->field[5]->set_null();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
table->field[4]->set_null();
|
|
||||||
table->field[5]->set_null();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (schema_table_store_record(thd, table))
|
|
||||||
return 1;
|
|
||||||
iter++;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond)
|
|
||||||
{
|
|
||||||
BRM::DBRM *emp = new BRM::DBRM();
|
|
||||||
std::vector<struct BRM::EMEntry> entries;
|
|
||||||
|
|
||||||
if (!emp || !emp->isDBRMReady())
|
if (!emp || !emp->isDBRMReady())
|
||||||
{
|
{
|
||||||
@ -176,14 +125,89 @@ static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond)
|
|||||||
if (entries.size() == 0)
|
if (entries.size() == 0)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (is_columnstore_files_get_entries(thd, tables, oid, entries))
|
std::vector<struct BRM::EMEntry>::const_iterator iter = entries.begin();
|
||||||
|
while ( iter != entries.end() ) //organize extents into files
|
||||||
{
|
{
|
||||||
delete emp;
|
// Don't include files more than once at different block offsets
|
||||||
return 1;
|
if (iter->blockOffset > 0)
|
||||||
|
{
|
||||||
|
iter++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
table->field[0]->store(oid);
|
||||||
|
table->field[1]->store(iter->segmentNum);
|
||||||
|
table->field[2]->store(iter->partitionNum);
|
||||||
|
|
||||||
|
WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum);
|
||||||
|
std::stringstream DbRootName;
|
||||||
|
DbRootName << "DBRoot" << iter->dbRoot;
|
||||||
|
std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str());
|
||||||
|
fileSize = compressedFileSize = 0;
|
||||||
|
snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
msgQueueClient = clients.at(iter->dbRoot);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
msgQueueClient = NULL;
|
||||||
|
}
|
||||||
|
if (!msgQueueClient)
|
||||||
|
{
|
||||||
|
oam_instance.getDbrootPmConfig(iter->dbRoot, pmId);
|
||||||
|
oss << "pm" << pmId << "_WriteEngineServer";
|
||||||
|
try
|
||||||
|
{
|
||||||
|
msgQueueClient = new messageqcpp::MessageQueueClient(oss.str());
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
delete msgQueueClient;
|
||||||
|
cleanup(clients);
|
||||||
|
delete emp;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
clients[iter->dbRoot] = msgQueueClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (!get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize))
|
||||||
|
{
|
||||||
|
cleanup(clients);
|
||||||
|
delete emp;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
table->field[3]->store(fullFileName, strlen(fullFileName), cs);
|
||||||
|
|
||||||
|
if (fileSize > 0)
|
||||||
|
{
|
||||||
|
table->field[4]->set_notnull();
|
||||||
|
table->field[4]->store(fileSize);
|
||||||
|
if (compressedFileSize > 0)
|
||||||
|
{
|
||||||
|
table->field[5]->set_notnull();
|
||||||
|
table->field[5]->store(compressedFileSize);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
table->field[5]->set_null();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
table->field[4]->set_null();
|
||||||
|
table->field[5]->set_null();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (schema_table_store_record(thd, table))
|
||||||
|
{
|
||||||
|
cleanup(clients);
|
||||||
|
delete emp;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
iter++;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
delete emp;
|
delete emp;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user