1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-4769 Fix cache bugs. (#2151)

* MCOL-4769 Do not replay INSERTs and LDIs on the replica nodes when
the write cache is enabled.

* MCOL-4769 If a table is created with the write cache disabled
(i.e. when columnstore_cache_inserts=OFF), make it accessible when
the cache feature is enabled (columnstore_cache_inserts=ON).
This commit is contained in:
Gagan Goel
2021-11-22 15:20:50 -05:00
committed by GitHub
parent 3c65499820
commit affb2ae770
2 changed files with 110 additions and 65 deletions

View File

@ -1233,6 +1233,9 @@ static void create_cache_name(char *to, const char *name)
my_bool get_status_and_flush_cache(void *param,
my_bool concurrent_insert)
{
if (current_thd->slave_thread && !get_replication_slave(current_thd))
return (0);
ha_mcs_cache *cache= (ha_mcs_cache*) param;
int error;
enum_sql_command sql_command= cache->table->in_use->lex->sql_command;
@ -1398,7 +1401,7 @@ void ha_mcs_cache_share::close()
static plugin_ref plugin_maria = NULL;
ha_mcs_cache::ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root)
:ha_mcs(mcs_hton, table_arg), isSysCatTable(false)
:ha_mcs(mcs_hton, table_arg), isSysCatTable(false), isCacheDisabled(false)
{
if (table_arg && table_arg->db.str &&
!strcasecmp(table_arg->db.str, "calpontsys") &&
@ -1451,21 +1454,19 @@ int ha_mcs_cache::create(const char *name, TABLE *table_arg,
char cache_name[FN_REFLEN+8];
DBUG_ENTER("ha_mcs_cache::create");
if (get_cache_inserts(current_thd) && !isSysCatTable)
if (isCacheEnabled())
{
create_cache_name(cache_name, name);
{
/* Create a cached table */
ha_choice save_transactional= ha_create_info->transactional;
row_type save_row_type= ha_create_info->row_type;
ha_create_info->transactional= HA_CHOICE_NO;
ha_create_info->row_type= ROW_TYPE_DYNAMIC;
/* Create a cached table */
ha_choice save_transactional= ha_create_info->transactional;
row_type save_row_type= ha_create_info->row_type;
ha_create_info->transactional= HA_CHOICE_NO;
ha_create_info->row_type= ROW_TYPE_DYNAMIC;
if ((error= cache_handler->create(cache_name, table_arg, ha_create_info)))
DBUG_RETURN(error);
ha_create_info->transactional= save_transactional;
ha_create_info->row_type= save_row_type;
}
if ((error= cache_handler->create(cache_name, table_arg, ha_create_info)))
DBUG_RETURN(error);
ha_create_info->transactional= save_transactional;
ha_create_info->row_type= save_row_type;
}
/* Create the real table in ColumnStore */
@ -1482,7 +1483,7 @@ int ha_mcs_cache::create(const char *name, TABLE *table_arg,
int ha_mcs_cache::open(const char *name, int mode, uint open_flags)
{
int error;
int error, cache_error;
DBUG_ENTER("ha_mcs_cache::open");
if (get_cache_inserts(current_thd) && !isSysCatTable)
@ -1492,46 +1493,63 @@ int ha_mcs_cache::open(const char *name, int mode, uint open_flags)
char cache_name[FN_REFLEN+8];
create_cache_name(cache_name, name);
if ((error= cache_handler->open(cache_name, mode, open_flags)))
DBUG_RETURN(error);
if (!(share= find_cache_share(name, cache_handler->file->state->records)))
if (!(cache_error= cache_handler->open(cache_name, mode, open_flags)))
{
cache_handler->close();
DBUG_RETURN(ER_OUTOFMEMORY);
}
/* Fix lock so that it goes through get_status_and_flush() */
THR_LOCK *lock= &cache_handler->file->s->lock;
if (lock->get_status != &get_status_and_flush_cache)
{
mysql_mutex_lock(&cache_handler->file->s->intern_lock);
/* The following lock is here just to establish mutex locking order */
mysql_mutex_lock(&lock->mutex);
mysql_mutex_unlock(&lock->mutex);
if (!(share= find_cache_share(name, cache_handler->file->state->records)))
{
cache_handler->close();
DBUG_RETURN(ER_OUTOFMEMORY);
}
/* Fix lock so that it goes through get_status_and_flush() */
THR_LOCK *lock= &cache_handler->file->s->lock;
if (lock->get_status != &get_status_and_flush_cache)
{
/* Remember original lock. Used by the THR_lock cache functions */
share->org_lock= lock[0];
if (lock->start_trans)
lock->start_trans= &cache_start_trans;
if (lock->copy_status)
lock->copy_status= &cache_copy_status;
if (lock->update_status)
lock->update_status= &cache_update_status;
if (lock->restore_status)
lock->restore_status= &cache_restore_status;
if (lock->check_status)
lock->check_status= &cache_check_status;
if (lock->restore_status)
lock->restore_status= &cache_restore_status;
lock->get_status= &get_status_and_flush_cache;
mysql_mutex_lock(&cache_handler->file->s->intern_lock);
/* The following lock is here just to establish mutex locking order */
mysql_mutex_lock(&lock->mutex);
mysql_mutex_unlock(&lock->mutex);
if (lock->get_status != &get_status_and_flush_cache)
{
/* Remember original lock. Used by the THR_lock cache functions */
share->org_lock= lock[0];
if (lock->start_trans)
lock->start_trans= &cache_start_trans;
if (lock->copy_status)
lock->copy_status= &cache_copy_status;
if (lock->update_status)
lock->update_status= &cache_update_status;
if (lock->restore_status)
lock->restore_status= &cache_restore_status;
if (lock->check_status)
lock->check_status= &cache_check_status;
if (lock->restore_status)
lock->restore_status= &cache_restore_status;
lock->get_status= &get_status_and_flush_cache;
}
mysql_mutex_unlock(&cache_handler->file->s->intern_lock);
}
mysql_mutex_unlock(&cache_handler->file->s->intern_lock);
cache_handler->file->lock.status_param= (void*) this;
}
else if (cache_error == ENOENT)
{
if (!(error= parent::open(name, mode, open_flags)))
{
isCacheDisabled = true;
DBUG_RETURN(0);
}
else
{
DBUG_RETURN(error);
}
}
else
{
DBUG_RETURN(cache_error);
}
cache_handler->file->lock.status_param= (void*) this;
}
if ((error= parent::open(name, mode, open_flags)))
@ -1551,7 +1569,7 @@ int ha_mcs_cache::close()
DBUG_ENTER("ha_mcs_cache::close()");
if (get_cache_inserts(current_thd) && !isSysCatTable)
if (isCacheEnabled())
{
error= cache_handler->close();
if ((error2= parent::close()))
@ -1580,7 +1598,7 @@ uint ha_mcs_cache::lock_count(void) const
If we are doing an insert or if we want to flush the cache, we have to lock
both the Aria table and normal table.
*/
if (get_cache_inserts(current_thd) && !isSysCatTable)
if (isCacheEnabled())
return 2;
else
return 1;
@ -1594,7 +1612,7 @@ THR_LOCK_DATA **ha_mcs_cache::store_lock(THD *thd,
THR_LOCK_DATA **to,
enum thr_lock_type lock_type)
{
if (get_cache_inserts(current_thd) && !isSysCatTable)
if (isCacheEnabled())
to= cache_handler->store_lock(thd, to, TL_WRITE);
return parent::store_lock(thd, to, lock_type);
}
@ -1609,7 +1627,7 @@ int ha_mcs_cache::external_lock(THD *thd, int lock_type)
int error= 0;
DBUG_ENTER("ha_mcs_cache::external_lock");
if (get_cache_inserts(current_thd) && !isSysCatTable)
if (isCacheEnabled())
{
/*
Reset lock_counter. This is ok as external_lock() is guaranteed to be
@ -1659,7 +1677,7 @@ int ha_mcs_cache::delete_table(const char *name)
DBUG_ENTER("ha_mcs_cache::delete_table");
if (get_cache_inserts(current_thd) && !isSysCatTable)
if (isCacheEnabled())
{
char cache_name[FN_REFLEN+8];
create_cache_name(cache_name, name);
@ -1679,18 +1697,25 @@ int ha_mcs_cache::rename_table(const char *from, const char *to)
DBUG_ENTER("ha_mcs_cache::rename_table");
if (get_cache_inserts(current_thd) && !isSysCatTable)
if (isCacheEnabled())
{
char cache_from[FN_REFLEN+8], cache_to[FN_REFLEN+8];
create_cache_name(cache_from, from);
create_cache_name(cache_to, to);
if ((error= cache_handler->rename_table(cache_from, cache_to)))
DBUG_RETURN(error);
if ((error= parent::rename_table(from, to)))
if (!(error= cache_handler->rename_table(cache_from, cache_to)))
{
cache_handler->rename_table(cache_to, cache_from);
DBUG_RETURN(error);
if ((error= parent::rename_table(from, to)))
{
cache_handler->rename_table(cache_to, cache_from);
DBUG_RETURN(error);
}
}
else if (error == ENOENT)
{
if ((error= parent::rename_table(from, to)))
{
DBUG_RETURN(error);
}
}
}
else
@ -1708,7 +1733,7 @@ int ha_mcs_cache::delete_all_rows(void)
DBUG_ENTER("ha_mcs_cache::delete_all_rows");
if (get_cache_inserts(current_thd) && !isSysCatTable)
if (isCacheEnabled())
{
error= cache_handler->delete_all_rows();
share->cached_rows= 0;
@ -1720,7 +1745,7 @@ int ha_mcs_cache::delete_all_rows(void)
bool ha_mcs_cache::is_crashed() const
{
if (get_cache_inserts(current_thd) && !isSysCatTable)
if (isCacheEnabled())
return (cache_handler->is_crashed() ||
parent::is_crashed());
else
@ -1752,7 +1777,7 @@ int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt)
int something_crashed= is_crashed();
DBUG_ENTER("ha_mcs_cache::repair");
if (get_cache_inserts(current_thd) && !isSysCatTable)
if (isCacheEnabled())
{
if (cache_handler->is_crashed() || !something_crashed)
{
@ -1782,19 +1807,26 @@ int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt)
*/
int ha_mcs_cache::write_row(const uchar *buf)
{
if (get_cache_inserts(current_thd) && !isSysCatTable && insert_command)
if (current_thd->slave_thread && !get_replication_slave(current_thd))
return (0);
if (isCacheEnabled() && insert_command)
{
DBUG_ASSERT(share->cached_rows == cache_handler->file->state->records);
share->cached_rows++;
return cache_handler->write_row(buf);
}
return parent::write_row(buf);
}
void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags)
{
if (get_cache_inserts(current_thd) && !isSysCatTable)
if (current_thd->slave_thread && !get_replication_slave(current_thd))
return;
if (isCacheEnabled())
{
if (insert_command)
{
@ -1821,7 +1853,10 @@ void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags)
int ha_mcs_cache::end_bulk_insert()
{
if (get_cache_inserts(current_thd) && !isSysCatTable && insert_command)
if (current_thd->slave_thread && !get_replication_slave(current_thd))
return (0);
if (isCacheEnabled() && insert_command)
return cache_handler->end_bulk_insert();
return parent::end_bulk_insert();
}