You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
feat(plugin): All InnoDB queries are pushed down to Columnstore if columnstore_innodb_queries_uses_mcs = ON in the configuration before server start
This commit is contained in:
@ -1808,6 +1808,8 @@ static handler* ha_mcs_cache_create_handler(handlerton* hton, TABLE_SHARE* table
|
||||
return new (mem_root) ha_mcs_cache(hton, table, mem_root);
|
||||
}
|
||||
|
||||
bool get_innodb_queries_uses_mcs(THD* thd);
|
||||
|
||||
/******************************************************************************
|
||||
ha_mcs Plugin code
|
||||
******************************************************************************/
|
||||
@ -1858,16 +1860,17 @@ static int columnstore_init_func(void* p)
|
||||
mcs_hton->create_unit = create_columnstore_unit_handler;
|
||||
mcs_hton->db_type = DB_TYPE_AUTOASSIGN;
|
||||
|
||||
{
|
||||
auto* innodb_hton = plugin_hton(plugin_innodb);
|
||||
int error = innodb_hton == nullptr; // Engine must exists!
|
||||
if (error)
|
||||
if (get_innodb_queries_uses_mcs(current_thd))
|
||||
{
|
||||
my_error(HA_ERR_INITIALIZATION, MYF(0), "Could not find storage engine %s", name.str);
|
||||
auto* innodb_hton = plugin_hton(plugin_innodb);
|
||||
int error = innodb_hton == nullptr; // Engine must exists!
|
||||
if (error)
|
||||
{
|
||||
my_error(HA_ERR_INITIALIZATION, MYF(0), "Could not find storage engine %s", name.str);
|
||||
}
|
||||
innodb_hton->create_select = create_columnstore_select_handler;
|
||||
innodb_hton->create_unit = create_columnstore_unit_handler;
|
||||
}
|
||||
innodb_hton->create_select = create_columnstore_select_handler;
|
||||
innodb_hton->create_unit = create_columnstore_unit_handler;
|
||||
}
|
||||
|
||||
#ifdef HAVE_PSI_INTERFACE
|
||||
uint count = sizeof(all_mutexes) / sizeof(all_mutexes[0]);
|
||||
|
@ -4077,9 +4077,9 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool
|
||||
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
||||
csc->identity(CalpontSystemCatalog::FE);
|
||||
|
||||
if (!get_fe_conn_info_ptr())
|
||||
if (!get_fe_conn_info_ptr())
|
||||
{
|
||||
set_fe_conn_info_ptr(reinterpret_cast<void*>(new cal_connection_info(), thd));
|
||||
set_fe_conn_info_ptr((void*)new cal_connection_info());
|
||||
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
|
||||
}
|
||||
|
||||
@ -4127,33 +4127,33 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool
|
||||
bool localQuery = (get_local_query(thd) > 0 ? true : false);
|
||||
|
||||
{
|
||||
// ci->stats.reset(); // reset query stats
|
||||
// ci->stats.setStartTime();
|
||||
// if (thd->main_security_ctx.user)
|
||||
// {
|
||||
// ci->stats.fUser = thd->main_security_ctx.user;
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// ci->stats.fUser = "";
|
||||
// }
|
||||
ci->stats.reset(); // reset query stats
|
||||
ci->stats.setStartTime();
|
||||
if (thd->main_security_ctx.user)
|
||||
{
|
||||
ci->stats.fUser = thd->main_security_ctx.user;
|
||||
}
|
||||
else
|
||||
{
|
||||
ci->stats.fUser = "";
|
||||
}
|
||||
|
||||
// if (thd->main_security_ctx.host)
|
||||
// ci->stats.fHost = thd->main_security_ctx.host;
|
||||
// else if (thd->main_security_ctx.host_or_ip)
|
||||
// ci->stats.fHost = thd->main_security_ctx.host_or_ip;
|
||||
// else
|
||||
// ci->stats.fHost = "unknown";
|
||||
if (thd->main_security_ctx.host)
|
||||
ci->stats.fHost = thd->main_security_ctx.host;
|
||||
else if (thd->main_security_ctx.host_or_ip)
|
||||
ci->stats.fHost = thd->main_security_ctx.host_or_ip;
|
||||
else
|
||||
ci->stats.fHost = "unknown";
|
||||
|
||||
// try
|
||||
// {
|
||||
// ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
|
||||
// }
|
||||
// catch (std::exception& e)
|
||||
// {
|
||||
// string msg = string("Columnstore User Priority - ") + e.what();
|
||||
// ci->warningMsg = msg;
|
||||
// }
|
||||
try
|
||||
{
|
||||
ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
string msg = string("Columnstore User Priority - ") + e.what();
|
||||
ci->warningMsg = msg;
|
||||
}
|
||||
|
||||
// if the previous query has error, re-establish the connection
|
||||
if (ci->queryState != 0)
|
||||
|
@ -220,6 +220,9 @@ static MYSQL_THDVAR_ULONG(max_allowed_in_values, PLUGIN_VAR_RQCMDARG,
|
||||
"The maximum length of the entries in the IN query clause.", NULL, NULL, 6000, 1,
|
||||
~0U, 1);
|
||||
|
||||
static MYSQL_THDVAR_BOOL(innodb_queries_uses_mcs, PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY,
|
||||
"Direct all InnoDB-only queries into MCS via Select Handler.", NULL, NULL, 0);
|
||||
|
||||
st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type),
|
||||
MYSQL_SYSVAR(fe_conn_info_ptr),
|
||||
MYSQL_SYSVAR(original_optimizer_flags),
|
||||
@ -260,6 +263,7 @@ st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type),
|
||||
MYSQL_SYSVAR(s3_region),
|
||||
MYSQL_SYSVAR(pron),
|
||||
MYSQL_SYSVAR(max_allowed_in_values),
|
||||
MYSQL_SYSVAR(innodb_queries_uses_mcs),
|
||||
NULL};
|
||||
|
||||
st_mysql_show_var mcs_status_variables[] = {{"columnstore_version", (char*)&cs_version, SHOW_CHAR},
|
||||
@ -654,4 +658,14 @@ ulong get_max_allowed_in_values(THD* thd)
|
||||
void set_max_allowed_in_values(THD* thd, ulong value)
|
||||
{
|
||||
THDVAR(thd, max_allowed_in_values) = value;
|
||||
}
|
||||
}
|
||||
|
||||
bool get_innodb_queries_uses_mcs(THD* thd)
|
||||
{
|
||||
return THDVAR(thd, innodb_queries_uses_mcs);
|
||||
}
|
||||
|
||||
void set_innodb_queries_uses_mcs(THD* thd, bool value)
|
||||
{
|
||||
THDVAR(thd, innodb_queries_uses_mcs) = value;
|
||||
}
|
||||
|
Reference in New Issue
Block a user