From affb2ae77052dd348cb8494d20a7a8c52b57afb8 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Mon, 22 Nov 2021 15:20:50 -0500 Subject: [PATCH] MCOL-4769 Fix cache bugs. (#2151) * MCOL-4769 Do not replay INSERTs and LDIs on the replica nodes when the write cache is enabled. * MCOL-4769 If a table is created with the write cache disabled (i.e. when columnstore_cache_inserts=OFF), make it accessible when the cache feature is enabled (columnstore_cache_inserts=ON). --- dbcon/mysql/ha_mcs.cpp | 165 +++++++++++++++++++++++++---------------- dbcon/mysql/ha_mcs.h | 10 +++ 2 files changed, 110 insertions(+), 65 deletions(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 5ae8a3a0c..cd1275a8c 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -1233,6 +1233,9 @@ static void create_cache_name(char *to, const char *name) my_bool get_status_and_flush_cache(void *param, my_bool concurrent_insert) { + if (current_thd->slave_thread && !get_replication_slave(current_thd)) + return (0); + ha_mcs_cache *cache= (ha_mcs_cache*) param; int error; enum_sql_command sql_command= cache->table->in_use->lex->sql_command; @@ -1398,7 +1401,7 @@ void ha_mcs_cache_share::close() static plugin_ref plugin_maria = NULL; ha_mcs_cache::ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root) - :ha_mcs(mcs_hton, table_arg), isSysCatTable(false) + :ha_mcs(mcs_hton, table_arg), isSysCatTable(false), isCacheDisabled(false) { if (table_arg && table_arg->db.str && !strcasecmp(table_arg->db.str, "calpontsys") && @@ -1451,21 +1454,19 @@ int ha_mcs_cache::create(const char *name, TABLE *table_arg, char cache_name[FN_REFLEN+8]; DBUG_ENTER("ha_mcs_cache::create"); - if (get_cache_inserts(current_thd) && !isSysCatTable) + if (isCacheEnabled()) { create_cache_name(cache_name, name); - { - /* Create a cached table */ - ha_choice save_transactional= ha_create_info->transactional; - row_type save_row_type= ha_create_info->row_type; - ha_create_info->transactional= HA_CHOICE_NO; - ha_create_info->row_type= ROW_TYPE_DYNAMIC; + /* Create a cached table */ + ha_choice save_transactional= ha_create_info->transactional; + row_type save_row_type= ha_create_info->row_type; + ha_create_info->transactional= HA_CHOICE_NO; + ha_create_info->row_type= ROW_TYPE_DYNAMIC; - if ((error= cache_handler->create(cache_name, table_arg, ha_create_info))) - DBUG_RETURN(error); - ha_create_info->transactional= save_transactional; - ha_create_info->row_type= save_row_type; - } + if ((error= cache_handler->create(cache_name, table_arg, ha_create_info))) + DBUG_RETURN(error); + ha_create_info->transactional= save_transactional; + ha_create_info->row_type= save_row_type; } /* Create the real table in ColumnStore */ @@ -1482,7 +1483,7 @@ int ha_mcs_cache::create(const char *name, TABLE *table_arg, int ha_mcs_cache::open(const char *name, int mode, uint open_flags) { - int error; + int error, cache_error; DBUG_ENTER("ha_mcs_cache::open"); if (get_cache_inserts(current_thd) && !isSysCatTable) @@ -1492,46 +1493,63 @@ int ha_mcs_cache::open(const char *name, int mode, uint open_flags) char cache_name[FN_REFLEN+8]; create_cache_name(cache_name, name); - if ((error= cache_handler->open(cache_name, mode, open_flags))) - DBUG_RETURN(error); - if (!(share= find_cache_share(name, cache_handler->file->state->records))) + if (!(cache_error= cache_handler->open(cache_name, mode, open_flags))) { - cache_handler->close(); - DBUG_RETURN(ER_OUTOFMEMORY); - } - - /* Fix lock so that it goes through get_status_and_flush() */ - THR_LOCK *lock= &cache_handler->file->s->lock; - if (lock->get_status != &get_status_and_flush_cache) - { - mysql_mutex_lock(&cache_handler->file->s->intern_lock); - - /* The following lock is here just to establish mutex locking order */ - mysql_mutex_lock(&lock->mutex); - mysql_mutex_unlock(&lock->mutex); + if (!(share= find_cache_share(name, cache_handler->file->state->records))) + { + cache_handler->close(); + DBUG_RETURN(ER_OUTOFMEMORY); + } + /* Fix lock so that it goes through get_status_and_flush() */ + THR_LOCK *lock= &cache_handler->file->s->lock; if (lock->get_status != &get_status_and_flush_cache) { - /* Remember original lock. Used by the THR_lock cache functions */ - share->org_lock= lock[0]; - if (lock->start_trans) - lock->start_trans= &cache_start_trans; - if (lock->copy_status) - lock->copy_status= &cache_copy_status; - if (lock->update_status) - lock->update_status= &cache_update_status; - if (lock->restore_status) - lock->restore_status= &cache_restore_status; - if (lock->check_status) - lock->check_status= &cache_check_status; - if (lock->restore_status) - lock->restore_status= &cache_restore_status; - lock->get_status= &get_status_and_flush_cache; + mysql_mutex_lock(&cache_handler->file->s->intern_lock); + + /* The following lock is here just to establish mutex locking order */ + mysql_mutex_lock(&lock->mutex); + mysql_mutex_unlock(&lock->mutex); + + if (lock->get_status != &get_status_and_flush_cache) + { + /* Remember original lock. Used by the THR_lock cache functions */ + share->org_lock= lock[0]; + if (lock->start_trans) + lock->start_trans= &cache_start_trans; + if (lock->copy_status) + lock->copy_status= &cache_copy_status; + if (lock->update_status) + lock->update_status= &cache_update_status; + if (lock->restore_status) + lock->restore_status= &cache_restore_status; + if (lock->check_status) + lock->check_status= &cache_check_status; + if (lock->restore_status) + lock->restore_status= &cache_restore_status; + lock->get_status= &get_status_and_flush_cache; + } + mysql_mutex_unlock(&cache_handler->file->s->intern_lock); } - mysql_mutex_unlock(&cache_handler->file->s->intern_lock); + cache_handler->file->lock.status_param= (void*) this; + } + else if (cache_error == ENOENT) + { + if (!(error= parent::open(name, mode, open_flags))) + { + isCacheDisabled = true; + DBUG_RETURN(0); + } + else + { + DBUG_RETURN(error); + } + } + else + { + DBUG_RETURN(cache_error); } - cache_handler->file->lock.status_param= (void*) this; } if ((error= parent::open(name, mode, open_flags))) @@ -1551,7 +1569,7 @@ int ha_mcs_cache::close() DBUG_ENTER("ha_mcs_cache::close()"); - if (get_cache_inserts(current_thd) && !isSysCatTable) + if (isCacheEnabled()) { error= cache_handler->close(); if ((error2= parent::close())) @@ -1580,7 +1598,7 @@ uint ha_mcs_cache::lock_count(void) const If we are doing an insert or if we want to flush the cache, we have to lock both the Aria table and normal table. */ - if (get_cache_inserts(current_thd) && !isSysCatTable) + if (isCacheEnabled()) return 2; else return 1; @@ -1594,7 +1612,7 @@ THR_LOCK_DATA **ha_mcs_cache::store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type) { - if (get_cache_inserts(current_thd) && !isSysCatTable) + if (isCacheEnabled()) to= cache_handler->store_lock(thd, to, TL_WRITE); return parent::store_lock(thd, to, lock_type); } @@ -1609,7 +1627,7 @@ int ha_mcs_cache::external_lock(THD *thd, int lock_type) int error= 0; DBUG_ENTER("ha_mcs_cache::external_lock"); - if (get_cache_inserts(current_thd) && !isSysCatTable) + if (isCacheEnabled()) { /* Reset lock_counter. This is ok as external_lock() is guaranteed to be @@ -1659,7 +1677,7 @@ int ha_mcs_cache::delete_table(const char *name) DBUG_ENTER("ha_mcs_cache::delete_table"); - if (get_cache_inserts(current_thd) && !isSysCatTable) + if (isCacheEnabled()) { char cache_name[FN_REFLEN+8]; create_cache_name(cache_name, name); @@ -1679,18 +1697,25 @@ int ha_mcs_cache::rename_table(const char *from, const char *to) DBUG_ENTER("ha_mcs_cache::rename_table"); - if (get_cache_inserts(current_thd) && !isSysCatTable) + if (isCacheEnabled()) { char cache_from[FN_REFLEN+8], cache_to[FN_REFLEN+8]; create_cache_name(cache_from, from); create_cache_name(cache_to, to); - if ((error= cache_handler->rename_table(cache_from, cache_to))) - DBUG_RETURN(error); - - if ((error= parent::rename_table(from, to))) + if (!(error= cache_handler->rename_table(cache_from, cache_to))) { - cache_handler->rename_table(cache_to, cache_from); - DBUG_RETURN(error); + if ((error= parent::rename_table(from, to))) + { + cache_handler->rename_table(cache_to, cache_from); + DBUG_RETURN(error); + } + } + else if (error == ENOENT) + { + if ((error= parent::rename_table(from, to))) + { + DBUG_RETURN(error); + } } } else @@ -1708,7 +1733,7 @@ int ha_mcs_cache::delete_all_rows(void) DBUG_ENTER("ha_mcs_cache::delete_all_rows"); - if (get_cache_inserts(current_thd) && !isSysCatTable) + if (isCacheEnabled()) { error= cache_handler->delete_all_rows(); share->cached_rows= 0; @@ -1720,7 +1745,7 @@ int ha_mcs_cache::delete_all_rows(void) bool ha_mcs_cache::is_crashed() const { - if (get_cache_inserts(current_thd) && !isSysCatTable) + if (isCacheEnabled()) return (cache_handler->is_crashed() || parent::is_crashed()); else @@ -1752,7 +1777,7 @@ int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt) int something_crashed= is_crashed(); DBUG_ENTER("ha_mcs_cache::repair"); - if (get_cache_inserts(current_thd) && !isSysCatTable) + if (isCacheEnabled()) { if (cache_handler->is_crashed() || !something_crashed) { @@ -1782,19 +1807,26 @@ int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt) */ int ha_mcs_cache::write_row(const uchar *buf) { - if (get_cache_inserts(current_thd) && !isSysCatTable && insert_command) + if (current_thd->slave_thread && !get_replication_slave(current_thd)) + return (0); + + if (isCacheEnabled() && insert_command) { DBUG_ASSERT(share->cached_rows == cache_handler->file->state->records); share->cached_rows++; return cache_handler->write_row(buf); } + return parent::write_row(buf); } void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags) { - if (get_cache_inserts(current_thd) && !isSysCatTable) + if (current_thd->slave_thread && !get_replication_slave(current_thd)) + return; + + if (isCacheEnabled()) { if (insert_command) { @@ -1821,7 +1853,10 @@ void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags) int ha_mcs_cache::end_bulk_insert() { - if (get_cache_inserts(current_thd) && !isSysCatTable && insert_command) + if (current_thd->slave_thread && !get_replication_slave(current_thd)) + return (0); + + if (isCacheEnabled() && insert_command) return cache_handler->end_bulk_insert(); return parent::end_bulk_insert(); } diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index 0ca719536..609acb056 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -296,6 +296,16 @@ class ha_mcs_cache :public ha_mcs // calpontsys.syscolumn system catalog tables bool isSysCatTable; + // True if the ColumnStore table is not cached (i.e. when the table + // was created with columnstore_cache_inserts=OFF). + bool isCacheDisabled; + + bool isCacheEnabled() const + { + return (get_cache_inserts(current_thd) && !isSysCatTable && + !isCacheDisabled); + } + public: uint lock_counter; ha_maria *cache_handler;