1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/dbcon/mysql/is_columnstore_files.cpp
Serguey Zefirov 5aa2a824c2 feat(MCOL-6082): Multiple readers of dbroots using OamCache logic
This patch introduces centralized logic of selecting what dbroot is
accessible in PrimProc on what node. The logic is in OamCache for time
being and can be moved later.
2025-07-21 14:32:39 +03:00

321 lines
8.8 KiB
C++

/* c-basic-offset: 4; tab-width: 4; indent-tabs-mode: nil
* vi: set shiftwidth=4 tabstop=4 expandtab:
* :indentSize=4:tabSize=4:noTabs=true:
*
* Copyright (C) 2016 MariaDB Corporation
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; version 2 of
* the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*/
#define PREFER_MY_CONFIG_H
#include "idb_mysql.h"
#include "dbrm.h"
#include "objectidmanager.h"
#include "we_convertor.h"
#include "we_define.h"
#include "IDBPolicy.h"
#include "configcpp.h"
#include "we_config.h"
#include "we_brm.h"
#include "bytestream.h"
#include "liboamcpp.h"
#include "oamcache.h"
#include "messagequeue.h"
#include "messagequeuepool.h"
#include "we_messages.h"
#include "is_columnstore.h"
#include "ha_mcs_logging.h"
// Required declaration as it isn't in a MairaDB include
bool schema_table_store_record(THD* thd, TABLE* table);
ST_FIELD_INFO is_columnstore_files_fields[] = {
Show::Column("OBJECT_ID", Show::ULong(0), NOT_NULL),
Show::Column("SEGMENT_ID", Show::ULong(0), NOT_NULL),
Show::Column("PARTITION_ID", Show::ULong(0), NOT_NULL),
Show::Column("FILENAME", Show::Varchar(1024), NOT_NULL),
Show::Column("FILE_SIZE", Show::ULonglong(0), NULLABLE),
Show::Column("COMPRESSED_DATA_SIZE", Show::ULonglong(0), NULLABLE),
Show::CEnd()};
static bool get_file_sizes(THD* thd, messageqcpp::MessageQueueClient* msgQueueClient, const char* fileName,
off_t* fileSize, off_t* compressedFileSize)
{
messageqcpp::ByteStream bs;
messageqcpp::ByteStream::byte rc;
std::string errMsg;
try
{
bs << (messageqcpp::ByteStream::byte)WriteEngine::WE_SVR_GET_FILESIZE;
// header??
bs << fileName;
msgQueueClient->write(bs);
// namespace??
messageqcpp::SBS sbs;
sbs = msgQueueClient->read();
if (sbs->length() == 0)
{
delete msgQueueClient;
return false;
}
*sbs >> rc;
*sbs >> errMsg;
if (rc)
{
ha_mcs_impl::log_this(thd,
"I_S::COLUMNSTORE_FILE::get_file_sizes(): WriteEngineServer returns an error().",
logging::LOG_TYPE_ERROR, thd->thread_id);
return false;
}
*sbs >> *fileSize;
*sbs >> *compressedFileSize;
return true;
}
catch (...)
{
return false;
}
}
static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* thd)
{
std::vector<struct BRM::EMEntry> entries;
CHARSET_INFO* cs = system_charset_info;
char oidDirName[WriteEngine::FILE_NAME_SIZE];
char fullFileName[WriteEngine::FILE_NAME_SIZE];
char dbDir[WriteEngine::MAX_DB_DIR_LEVEL][WriteEngine::MAX_DB_DIR_NAME_SIZE];
config::Config* config;
try
{
config = config::Config::makeConfig();
}
catch (std::runtime_error& e)
{
thd->raise_error_printf(ER_INTERNAL_ERROR, e.what());
return ER_INTERNAL_ERROR;
}
WriteEngine::Config we_config;
off_t fileSize = 0;
off_t compressedFileSize = 0;
we_config.initConfigCache();
messageqcpp::MessageQueueClient* msgQueueClient;
oam::OamCache* oamcache = oam::OamCache::makeOamCache();
int pmId = 0;
int rc;
emp->getExtents(oid, entries, false, false, true);
if (entries.size() == 0)
return 0;
std::vector<struct BRM::EMEntry>::const_iterator iter = entries.begin();
while (iter != entries.end()) // organize extents into files
{
// Don't include files more than once at different block offsets
if (iter->blockOffset > 0)
{
iter++;
continue;
}
try
{
pmId = oamcache->getOwnerPM(iter->dbRoot);
}
catch (std::runtime_error&)
{
// MCOL-1116: If we are here a DBRoot is offline/missing
iter++;
return 0;
}
table->field[0]->store(oid);
table->field[1]->store(iter->segmentNum);
table->field[2]->store(iter->partitionNum);
WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum);
std::stringstream DbRootName;
DbRootName << "DBRoot" << iter->dbRoot;
std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str());
fileSize = compressedFileSize = 0;
rc = snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName);
std::ostringstream oss;
oss << "pm" << pmId << "_WriteEngineServer";
std::string client = oss.str();
msgQueueClient = messageqcpp::MessageQueueClientPool::getInstance(oss.str());
// snprintf output truncation check
if (rc == WriteEngine::FILE_NAME_SIZE ||
!get_file_sizes(thd, msgQueueClient, fullFileName, &fileSize, &compressedFileSize))
{
messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient);
delete emp;
return 1;
}
table->field[3]->store(fullFileName, strlen(fullFileName), cs);
if (fileSize > 0)
{
table->field[4]->set_notnull();
table->field[4]->store(fileSize);
if (compressedFileSize > 0)
{
table->field[5]->set_notnull();
table->field[5]->store(compressedFileSize);
}
else
{
table->field[5]->set_null();
}
}
else
{
table->field[4]->set_null();
table->field[5]->set_null();
}
if (schema_table_store_record(thd, table))
{
messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient);
delete emp;
return 1;
}
iter++;
messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient);
msgQueueClient = NULL;
}
return 0;
}
static int is_columnstore_files_fill(THD* thd, TABLE_LIST* tables, COND* cond)
{
BRM::DBRM::refreshShmWithLock();
std::unique_ptr<BRM::DBRM> emp(new BRM::DBRM());
BRM::OID_t cond_oid = 0;
TABLE* table = tables->table;
if (!emp || !emp->isDBRMReady())
{
return 1;
}
if (cond && cond->type() == Item::FUNC_ITEM)
{
Item_func* fitem = (Item_func*)cond;
if ((fitem->functype() == Item_func::EQ_FUNC) && (fitem->argument_count() == 2))
{
if (fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM &&
fitem->arguments()[1]->const_item())
{
// WHERE object_id = value
Item_field* item_field = (Item_field*)fitem->arguments()[0]->real_item();
if (strcasecmp(item_field->field_name.str, "object_id") == 0)
{
cond_oid = fitem->arguments()[1]->val_int();
return generate_result(cond_oid, emp.get(), table, thd);
}
}
else if (fitem->arguments()[1]->real_item()->type() == Item::FIELD_ITEM &&
fitem->arguments()[0]->const_item())
{
// WHERE value = object_id
Item_field* item_field = (Item_field*)fitem->arguments()[1]->real_item();
if (strcasecmp(item_field->field_name.str, "object_id") == 0)
{
cond_oid = fitem->arguments()[0]->val_int();
return generate_result(cond_oid, emp.get(), table, thd);
}
}
}
else if (fitem->functype() == Item_func::IN_FUNC)
{
// WHERE object_id in (value1, value2)
Item_field* item_field = (Item_field*)fitem->arguments()[0]->real_item();
if (strcasecmp(item_field->field_name.str, "object_id") == 0)
{
for (unsigned int i = 1; i < fitem->argument_count(); i++)
{
cond_oid = fitem->arguments()[i]->val_int();
int result = generate_result(cond_oid, emp.get(), table, thd);
if (result)
return 1;
}
}
}
else if (fitem->functype() == Item_func::UNKNOWN_FUNC &&
strcasecmp(fitem->func_name(), "find_in_set") == 0)
{
// WHERE FIND_IN_SET(object_id, values)
String* tmp_var = fitem->arguments()[1]->val_str();
std::stringstream ss(tmp_var->ptr());
while (ss >> cond_oid)
{
int ret = generate_result(cond_oid, emp.get(), table, thd);
if (ret)
return 1;
if (ss.peek() == ',')
ss.ignore();
}
}
}
execplan::ObjectIDManager oidm;
BRM::OID_t MaxOID = oidm.size();
if (!cond_oid)
{
for (BRM::OID_t oid = 3000; oid <= MaxOID; oid++)
{
int result = generate_result(oid, emp.get(), table, thd);
if (result)
return 1;
}
}
return 0;
}
int is_columnstore_files_plugin_init(void* p)
{
ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
schema->fields_info = is_columnstore_files_fields;
schema->fill_table = is_columnstore_files_fill;
return 0;
}