From cb2ddad308903638ce3335aba7e33b093c1f6072 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Mon, 22 Jun 2020 19:16:18 -0400 Subject: [PATCH 1/8] Port of commit 0463e1f722d4d32526760c923e0092a380a9e634 from server/columnstore_cache. Commit message: Fixed bug in cache: - The THR_LOCK org_lock must be stored in a shared structure so that all instances of a table can use it. Fixed by adding a ha_cache_share object that keeps track of this one. - Fixed wrong test in get_status_and_flush_cache to detect in insert command - Fixed in get_status_and_flush_cache that we always free the insert lock if we don't need it. --- dbcon/mysql/ha_mcs.cpp | 155 ++++++++++++++++++++++++++++++++--------- dbcon/mysql/ha_mcs.h | 15 +++- 2 files changed, 137 insertions(+), 33 deletions(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 50b14dd26..af74b8dff 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -1223,8 +1223,6 @@ bool ha_mcs::is_crashed() const lock. */ -static my_bool (*original_get_status)(void*, my_bool); - my_bool get_status_and_flush_cache(void *param, my_bool concurrent_insert); @@ -1264,8 +1262,9 @@ my_bool get_status_and_flush_cache(void *param, Call first the original Aria get_status function All Aria get_status functions takes Maria handler as the parameter */ - if (original_get_status) - (*original_get_status)(&cache->cache_handler->file, concurrent_insert); + if (cache->share->org_lock.get_status) + (*cache->share->org_lock.get_status)(&cache->cache_handler->file, + concurrent_insert); /* If first get_status() call for this table, flush cache if needed */ if (!cache->lock_counter++) @@ -1280,10 +1279,9 @@ my_bool get_status_and_flush_cache(void *param, return(1); } } - else if (!cache->insert_command) - cache->free_locks(); } - else if (!cache->insert_command) + + if (!cache->insert_command) cache->free_locks(); return (0); @@ -1294,41 +1292,100 @@ my_bool get_status_and_flush_cache(void *param, static my_bool cache_start_trans(void* param) { ha_mcs_cache *cache= (ha_mcs_cache*) param; - if (cache->org_lock.start_trans) - return (*cache->org_lock.start_trans)(cache->cache_handler->file); - return 0; + return (*cache->share->org_lock.start_trans)(cache->cache_handler->file); } static void cache_copy_status(void* to, void *from) { ha_mcs_cache *to_cache= (ha_mcs_cache*) to, *from_cache= (ha_mcs_cache*) from; - if (to_cache->org_lock.copy_status) - (*to_cache->org_lock.copy_status)(to_cache->cache_handler->file, - from_cache->cache_handler->file); + (*to_cache->share->org_lock.copy_status)(to_cache->cache_handler->file, + from_cache->cache_handler->file); } static void cache_update_status(void* param) { ha_mcs_cache *cache= (ha_mcs_cache*) param; - if (cache->org_lock.update_status) - (*cache->org_lock.update_status)(cache->cache_handler->file); + (*cache->share->org_lock.update_status)(cache->cache_handler->file); } static void cache_restore_status(void *param) { ha_mcs_cache *cache= (ha_mcs_cache*) param; - if (cache->org_lock.restore_status) - (*cache->org_lock.restore_status)(cache->cache_handler->file); + (*cache->share->org_lock.restore_status)(cache->cache_handler->file); } static my_bool cache_check_status(void *param) { ha_mcs_cache *cache= (ha_mcs_cache*) param; - if (cache->org_lock.check_status) - return (*cache->org_lock.check_status)(cache->cache_handler->file); - return 0; + return (*cache->share->org_lock.check_status)(cache->cache_handler->file); } + +/***************************************************************************** + ha_mcs_cache_share functions (Common storage for an open cache file) +*****************************************************************************/ + +static ha_mcs_cache_share *cache_share_list= 0; +static PSI_mutex_key key_LOCK_cache_share; +static PSI_mutex_info all_mutexes[]= +{ + { &key_LOCK_cache_share, "LOCK_cache_share", PSI_FLAG_GLOBAL}, +}; +static mysql_mutex_t LOCK_cache_share; + +/* + Find or create a share +*/ + +ha_mcs_cache_share *find_cache_share(const char *name) +{ + ha_mcs_cache_share *pos, *share; + mysql_mutex_lock(&LOCK_cache_share); + for (pos= cache_share_list; pos; pos= pos->next) + { + if (!strcmp(pos->name, name)) + { + mysql_mutex_unlock(&LOCK_cache_share); + return(pos); + } + } + if (!(share= (ha_mcs_cache_share*) my_malloc(PSI_NOT_INSTRUMENTED, + sizeof(*share) + strlen(name)+1, + MYF(MY_FAE)))) + { + mysql_mutex_unlock(&LOCK_cache_share); + return 0; + } + share->name= (char*) (share+1); + share->open_count= 1; + strmov((char*) share->name, name); + share->next= cache_share_list; + cache_share_list= share; + mysql_mutex_unlock(&LOCK_cache_share); + return share; +} + + +/* + Decrement open counter and free share if there is no more users +*/ + +void ha_mcs_cache_share::close() +{ + ha_mcs_cache_share *pos; + mysql_mutex_lock(&LOCK_cache_share); + if (!--open_count) + { + ha_mcs_cache_share **prev= &cache_share_list; + for ( ; (pos= *prev) != this; prev= &pos->next) + ; + *prev= next; + my_free(this); + } + mysql_mutex_unlock(&LOCK_cache_share); +} + + /***************************************************************************** ha_mcs_cache handler functions *****************************************************************************/ @@ -1337,6 +1394,7 @@ ha_mcs_cache::ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *m :ha_mcs(mcs_hton, table_arg) { cache_handler= (ha_maria*) mcs_maria_hton->create(mcs_maria_hton, table_arg, mem_root); + share= 0; lock_counter= 0; } @@ -1344,7 +1402,10 @@ ha_mcs_cache::ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *m ha_mcs_cache::~ha_mcs_cache() { if (cache_handler) + { delete cache_handler; + cache_handler= NULL; + } } /* @@ -1396,19 +1457,38 @@ int ha_mcs_cache::open(const char *name, int mode, uint open_flags) if ((error= cache_handler->open(cache_name, mode, open_flags))) DBUG_RETURN(error); + if (!(share= find_cache_share(name))) + { + cache_handler->close(); + DBUG_RETURN(ER_OUTOFMEMORY); + } + /* Fix lock so that it goes through get_status_and_flush() */ THR_LOCK *lock= &cache_handler->file->s->lock; - mysql_mutex_lock(&cache_handler->file->s->intern_lock); - org_lock= lock[0]; - lock->get_status= &get_status_and_flush_cache; - lock->start_trans= &cache_start_trans; - lock->copy_status= &cache_copy_status; - lock->update_status= &cache_update_status; - lock->restore_status= &cache_restore_status; - lock->check_status= &cache_check_status; - lock->restore_status= &cache_restore_status; + if (lock->get_status != &get_status_and_flush_cache) + { + mysql_mutex_lock(&cache_handler->file->s->intern_lock); + if (lock->get_status != &get_status_and_flush_cache) + { + /* Remember original lock. Used by the THR_lock cache functions */ + share->org_lock= lock[0]; + if (lock->start_trans) + lock->start_trans= &cache_start_trans; + if (lock->copy_status) + lock->copy_status= &cache_copy_status; + if (lock->update_status) + lock->update_status= &cache_update_status; + if (lock->restore_status) + lock->restore_status= &cache_restore_status; + if (lock->check_status) + lock->check_status= &cache_check_status; + if (lock->restore_status) + lock->restore_status= &cache_restore_status; + lock->get_status= &get_status_and_flush_cache; + } + mysql_mutex_unlock(&cache_handler->file->s->intern_lock); + } cache_handler->file->lock.status_param= (void*) this; - mysql_mutex_unlock(&cache_handler->file->s->intern_lock); if ((error= parent::open(name, mode, open_flags))) { @@ -1422,10 +1502,13 @@ int ha_mcs_cache::open(const char *name, int mode, uint open_flags) int ha_mcs_cache::close() { int error, error2; + ha_mcs_cache_share *org_share= share; DBUG_ENTER("ha_mcs_cache::close()"); error= cache_handler->close(); if ((error2= parent::close())) error= error2; + if (org_share) + org_share->close(); DBUG_RETURN(error); } @@ -1637,12 +1720,17 @@ static int ha_mcs_cache_init(void *p) { handlerton *cache_hton; int error; + uint count; cache_hton= (handlerton *) p; cache_hton->create= ha_mcs_cache_create_handler; cache_hton->panic= 0; cache_hton->flags= HTON_NO_PARTITION; + count= sizeof(all_mutexes)/sizeof(all_mutexes[0]); + mysql_mutex_register("ha_mcs_cache", all_mutexes, count); + mysql_mutex_init(key_LOCK_cache_share, &LOCK_cache_share, MY_MUTEX_INIT_FAST); + error= mcs_hton == NULL; // Engine must exists! if (error) @@ -1672,6 +1760,7 @@ static int ha_mcs_cache_deinit(void *p) plugin_unlock(0, plugin_maria); plugin_maria= NULL; } + mysql_mutex_destroy(&LOCK_cache_share); return 0; } @@ -1717,7 +1806,7 @@ maria_declare_plugin(columnstore) NULL, /* status variables */ NULL, /* system variables */ MCSVERSION, /* string version */ - MariaDB_PLUGIN_MATURITY_ALPHA /* maturity */ + MariaDB_PLUGIN_MATURITY_GAMMA /* maturity */ }, #endif { @@ -1804,7 +1893,6 @@ void ha_mcs_cache::free_locks() mysql_mutex_unlock(&cache_handler->file->lock.lock->mutex); thr_unlock(&cache_handler->file->lock, 0); - mysql_mutex_lock(&cache_handler->file->lock.lock->mutex); /* Restart transaction for columnstore table */ if (original_lock_type != F_WRLCK) @@ -1812,6 +1900,9 @@ void ha_mcs_cache::free_locks() parent::external_lock(table->in_use, F_UNLCK); parent::external_lock(table->in_use, original_lock_type); } + + /* Needed as we are going back to end of thr_lock() */ + mysql_mutex_lock(&cache_handler->file->lock.lock->mutex); } /** diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index 04db8d6c0..6b51c201a 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -242,6 +242,19 @@ public: }; +class ha_mcs_cache_share +{ + ha_mcs_cache_share *next; /* Next open share */ + const char *name; + uint open_count; +public: + THR_LOCK org_lock; + friend ha_mcs_cache_share *find_cache_share(const char *name); + void close(); +}; + + + class ha_mcs_cache :public ha_mcs { typedef ha_mcs parent; @@ -249,9 +262,9 @@ class ha_mcs_cache :public ha_mcs bool insert_command; public: - THR_LOCK org_lock; uint lock_counter; ha_maria *cache_handler; + ha_mcs_cache_share *share; ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root); ~ha_mcs_cache(); From 4ff4e9eb892902b70a64bcd551a7981d6d89ee0a Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Thu, 25 Jun 2020 11:03:06 -0400 Subject: [PATCH 2/8] Re-enable the ColumnStore_Cache plugin. --- dbcon/mysql/ha_mcs.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index af74b8dff..ffe91d0a5 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -1706,7 +1706,6 @@ int ha_mcs_cache::end_bulk_insert() ha_mcs_cache Plugin code ******************************************************************************/ -#if 0 static handler *ha_mcs_cache_create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root) @@ -1767,7 +1766,6 @@ static int ha_mcs_cache_deinit(void *p) struct st_mysql_storage_engine ha_mcs_cache_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; -#endif struct st_mysql_storage_engine columnstore_storage_engine = { MYSQL_HANDLERTON_INTERFACE_VERSION }; @@ -1792,7 +1790,6 @@ maria_declare_plugin(columnstore) MCSVERSION, /* string version */ COLUMNSTORE_MATURITY /* maturity */ }, -#if 0 { MYSQL_STORAGE_ENGINE_PLUGIN, &ha_mcs_cache_storage_engine, @@ -1808,7 +1805,6 @@ maria_declare_plugin(columnstore) MCSVERSION, /* string version */ MariaDB_PLUGIN_MATURITY_GAMMA /* maturity */ }, -#endif { MYSQL_INFORMATION_SCHEMA_PLUGIN, &is_columnstore_plugin_version, From f5a8d228a00a6eaf77ad61a0d2915d85e2cbb989 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Wed, 1 Jul 2020 17:39:22 -0400 Subject: [PATCH 3/8] Port of commit ba731bdc6a80e88d32e7440044b548c3e3edc591 from server/columnstore_cache Commit message: Fixed crashed bug on simple insert Other things: - Added test from columnstore team - Fixed two reported bugs from columnstore team - Call free_locks as part of start_trans() instead of get_status() to ensure that we have locks both for cached table and cache table before we try to free any. - Store pointers to lock->get_status and lock->update_status for the cached table. Was needed by ha_tina in flush_insert_cache to make new insert rows visible for the SELECT that caused the flush --- dbcon/mysql/ha_mcs.cpp | 40 +++++++++++++++++++++------------------- dbcon/mysql/ha_mcs.h | 1 + 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index ffe91d0a5..5c419e814 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -1226,6 +1226,7 @@ bool ha_mcs::is_crashed() const my_bool get_status_and_flush_cache(void *param, my_bool concurrent_insert); + /* Create a name for the cache table */ @@ -1241,7 +1242,9 @@ static void create_cache_name(char *to, const char *name) THR_LOCK wrapper functions The idea of these is to highjack 'THR_LOCK->get_status() so that if this - is called in a non-insert context then we will flush the cache + is called in a non-insert context then we will flush the cache. + We also hijack THR_LOCK->start_trans() to free any locks on the cache + if the command was not an insert command. *****************************************************************************/ /* @@ -1263,7 +1266,7 @@ my_bool get_status_and_flush_cache(void *param, All Aria get_status functions takes Maria handler as the parameter */ if (cache->share->org_lock.get_status) - (*cache->share->org_lock.get_status)(&cache->cache_handler->file, + (*cache->share->org_lock.get_status)(cache->cache_handler->file, concurrent_insert); /* If first get_status() call for this table, flush cache if needed */ @@ -1281,20 +1284,28 @@ my_bool get_status_and_flush_cache(void *param, } } - if (!cache->insert_command) - cache->free_locks(); - return (0); } -/* Pass through functions for all the THR_LOCK virtual functions */ +/* + start_trans() is called when all locks has been given + If this was not an insert command then we can free the write lock on + the cache table and also downgrade external lock for the cached table + to F_READ +*/ -static my_bool cache_start_trans(void* param) +my_bool cache_start_trans(void* param) { ha_mcs_cache *cache= (ha_mcs_cache*) param; + + if (!cache->insert_command) + cache->free_locks(); + return (*cache->share->org_lock.start_trans)(cache->cache_handler->file); } +/* Pass through functions for all the THR_LOCK virtual functions */ + static void cache_copy_status(void* to, void *from) { ha_mcs_cache *to_cache= (ha_mcs_cache*) to, *from_cache= (ha_mcs_cache*) from; @@ -1345,6 +1356,7 @@ ha_mcs_cache_share *find_cache_share(const char *name) { if (!strcmp(pos->name, name)) { + pos->open_count++; mysql_mutex_unlock(&LOCK_cache_share); return(pos); } @@ -1522,7 +1534,7 @@ uint ha_mcs_cache::lock_count(void) const { /* If we are doing an insert or if we want to flush the cache, we have to lock - both MyISAM table and normal table. + both the Aria table and normal table. */ return 2; } @@ -1886,8 +1898,6 @@ bool ha_mcs_cache::rows_cached() void ha_mcs_cache::free_locks() { /* We don't need to lock cache_handler anymore as it's already flushed */ - - mysql_mutex_unlock(&cache_handler->file->lock.lock->mutex); thr_unlock(&cache_handler->file->lock, 0); /* Restart transaction for columnstore table */ @@ -1896,11 +1906,9 @@ void ha_mcs_cache::free_locks() parent::external_lock(table->in_use, F_UNLCK); parent::external_lock(table->in_use, original_lock_type); } - - /* Needed as we are going back to end of thr_lock() */ - mysql_mutex_lock(&cache_handler->file->lock.lock->mutex); } + /** Copy data from cache to ColumnStore @@ -1950,12 +1958,6 @@ end: to use it. */ from->delete_all_rows(); - - /* - This was not an insert command, so we can delete the thr lock - (We are not going to use the insert cache for this statement anymore) - */ - free_locks(); } DBUG_RETURN(error); } diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index 6b51c201a..c3751fa47 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -304,6 +304,7 @@ public: int flush_insert_cache(); friend my_bool get_status_and_flush_cache(void *param, my_bool concurrent_insert); + friend my_bool cache_start_trans(void *param); }; #endif //HA_MCS_H__ From 86fb66365ca6d1dd3b948ef6874d87e07eb6c039 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Wed, 1 Jul 2020 20:13:04 -0400 Subject: [PATCH 4/8] 1. Set 1M as the threshold on the number of records to flush the cache. 2. Set 100k as the batch size when flushing records into ColumnStore, i.e., a flush of 1M records will be performed in 10 batches, each being 100k. 3. For INSERT ... SELECT on the cache, use the default insertion method of cpimport. --- dbcon/mysql/ha_mcs.cpp | 11 +++++++---- dbcon/mysql/ha_mcs.h | 2 +- dbcon/mysql/ha_mcs_dml.cpp | 11 ++++++++--- dbcon/mysql/ha_mcs_impl.cpp | 13 +++++++++---- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 5c419e814..edb22eb6d 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -32,6 +32,7 @@ #endif #define CACHE_PREFIX "#cache#" +#define CACHE_FLUSH_THRESHOLD 1000000 static handler* mcs_create_handler(handlerton* hton, TABLE_SHARE* table, @@ -1272,7 +1273,9 @@ my_bool get_status_and_flush_cache(void *param, /* If first get_status() call for this table, flush cache if needed */ if (!cache->lock_counter++) { - if (!cache->insert_command && cache->rows_cached()) + ha_rows num_rows = cache->num_rows_cached(); + if ((!cache->insert_command && num_rows != 0) || + num_rows == CACHE_FLUSH_THRESHOLD) { if ((error= cache->flush_insert_cache())) { @@ -1703,7 +1706,7 @@ void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags) bzero(&cache_handler->copy_info, sizeof(cache_handler->copy_info)); return cache_handler->start_bulk_insert(rows, flags); } - return parent::start_bulk_insert(rows, flags); + return parent::start_bulk_insert_from_cache(rows, flags); } @@ -1887,9 +1890,9 @@ maria_declare_plugin_end; Implementation of write cache ******************************************************************************/ -bool ha_mcs_cache::rows_cached() +ha_rows ha_mcs_cache::num_rows_cached() { - return cache_handler->file->state->records != 0; + return cache_handler->file->state->records; } diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index c3751fa47..2c0dcca7a 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -300,7 +300,7 @@ public: /* Cache functions */ void free_locks(); - bool rows_cached(); + ha_rows num_rows_cached(); int flush_insert_cache(); friend my_bool get_status_and_flush_cache(void *param, my_bool concurrent_insert); diff --git a/dbcon/mysql/ha_mcs_dml.cpp b/dbcon/mysql/ha_mcs_dml.cpp index d79b1c6cb..733506ac1 100644 --- a/dbcon/mysql/ha_mcs_dml.cpp +++ b/dbcon/mysql/ha_mcs_dml.cpp @@ -77,6 +77,7 @@ using namespace joblist; namespace { +#define BATCH_INSERT_GROUP_ROWS_FOR_CACHE 100000 uint64_t fBatchInsertGroupRows = 0; // ResourceManager::instance()->getRowsPerBatch(); // HDFS is never used nowadays, so don't bother bool useHdfs = false; // ResourceManager::instance()->useHdfs(); @@ -594,13 +595,17 @@ int ha_mcs_impl_write_row_(const uchar* buf, TABLE* table, cal_connection_info& } if (fBatchInsertGroupRows == 0) + { fBatchInsertGroupRows = ResourceManager::instance()->getRowsPerBatch(); + } //timer.stop( "buildValueList"); if ( ci.singleInsert // Single insert - || (( ci.bulkInsertRows > 0 ) && (( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows ) || ( size >= fBatchInsertGroupRows )) ) - //Insert with mutilple value case: processed batch by batch. Last batch is sent also. - || (( ci.bulkInsertRows == 0 ) && ( size >= fBatchInsertGroupRows ) ) ) // Load data in file is processed batch by batch + || (( ci.bulkInsertRows > 0 ) && (( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows ) + || ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows) || (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) )) ) + //Insert with mutilple value case: processed batch by batch. Last batch is sent also. + || (( ci.bulkInsertRows == 0 ) && ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows) + || (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) ) ) ) // Load data in file is processed batch by batch { //timer.start( "DMLProc takes"); //cout <<" sending a batch to DMLProc ... The size is " << size << " the current bulkInsertRows = " << ci.bulkInsertRows << endl; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index ad83918d8..29884004e 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -3161,7 +3161,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins ci->isLoaddataInfile = true; } - if (is_cache_insert) + if (is_cache_insert && (thd->lex)->sql_command != SQLCOM_INSERT_SELECT) { ci->isCacheInsert = true; @@ -3174,7 +3174,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins if ((((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || - is_cache_insert) && !ci->singleInsert ) + ci->isCacheInsert) && !ci->singleInsert ) { ci->useCpimport = get_use_import_for_batchinsert(thd); @@ -3182,7 +3182,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins ci->useCpimport = 0; // For now, disable cpimport for cache inserts - if (is_cache_insert) + if (ci->isCacheInsert) ci->useCpimport = 0; // ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a @@ -3215,6 +3215,11 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins } ci->useXbit = table->s->db_options_in_use & HA_OPTION_PACK_RECORD; + + // TODO: This needs a proper fix. + if (is_cache_insert) + ci->useXbit = false; + //@bug 6122 Check how many columns have not null constraint. columnn with not null constraint will not show up in header. unsigned int numberNotNull = 0; @@ -3541,7 +3546,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins } //Save table oid for commit to use - if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || is_cache_insert) + if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) { // query stats. only collect execution time and rows inserted for insert/load_data_infile ci->stats.reset(); From 4afcba95208954158bf18e74d2e2cb214893504e Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Tue, 11 Aug 2020 13:12:44 -0400 Subject: [PATCH 5/8] Do not build the cache as a separate user-visible engine. We are creating a new read-only system variable, columnstore_cache_inserts, to enable/disable the cache. When this variable is set at server start up, any table created with engine=columnstore will also create the corresponding cache table in Aria engine for performing inserts. It is important to note that a ColumnStore table created with this option unset should not be queried when the server is restarted with the option set, as this will most likely result in query failures. --- dbcon/mysql/ha_mcs.cpp | 497 +++++++++++++++++---------------- dbcon/mysql/ha_mcs_sysvars.cpp | 19 ++ dbcon/mysql/ha_mcs_sysvars.h | 3 + 3 files changed, 280 insertions(+), 239 deletions(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index edb22eb6d..45d2e9383 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -34,14 +34,6 @@ #define CACHE_PREFIX "#cache#" #define CACHE_FLUSH_THRESHOLD 1000000 -static handler* mcs_create_handler(handlerton* hton, - TABLE_SHARE* table, - MEM_ROOT* mem_root); - -static int mcs_commit(handlerton* hton, THD* thd, bool all); - -static int mcs_rollback(handlerton* hton, THD* thd, bool all); -static int mcs_close_connection(handlerton* hton, THD* thd ); handlerton* mcs_hton = NULL; // This is the maria handlerton that we need for the cache static handlerton *mcs_maria_hton = NULL; @@ -126,68 +118,6 @@ int mcs_discover_existence(handlerton* hton, const char* db, return ha_mcs_impl_discover_existence(db, table_name); } -static int columnstore_init_func(void* p) -{ - DBUG_ENTER("columnstore_init_func"); - - struct tm tm; - time_t t; - - - time(&t); - localtime_r(&t, &tm); - fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d ", - tm.tm_year % 100, tm.tm_mon + 1, tm.tm_mday, - tm.tm_hour, tm.tm_min, tm.tm_sec); - - fprintf(stderr, "Columnstore: Started; Version: %s-%s\n", columnstore_version.c_str(), columnstore_release.c_str()); - - strncpy(cs_version, columnstore_version.c_str(), sizeof(cs_version)); - cs_version[sizeof(cs_version) - 1] = 0; - - strncpy(cs_commit_hash, columnstore_commit_hash.c_str(), sizeof(cs_commit_hash)); - cs_commit_hash[sizeof(cs_commit_hash) - 1] = 0; - - mcs_hton = (handlerton*)p; -#ifndef _MSC_VER - (void) pthread_mutex_init(&mcs_mutex, MY_MUTEX_INIT_FAST); -#endif - (void) my_hash_init(PSI_NOT_INSTRUMENTED, &mcs_open_tables, system_charset_info, 32, 0, 0, - (my_hash_get_key) mcs_get_key, 0, 0); - - mcs_hton->create = mcs_create_handler; - mcs_hton->flags = HTON_CAN_RECREATE; -// mcs_hton->discover_table = mcs_discover; -// mcs_hton->discover_table_existence = mcs_discover_existence; - mcs_hton->commit = mcs_commit; - mcs_hton->rollback = mcs_rollback; - mcs_hton->close_connection = mcs_close_connection; - mcs_hton->create_group_by = create_columnstore_group_by_handler; - mcs_hton->create_derived = create_columnstore_derived_handler; - mcs_hton->create_select = create_columnstore_select_handler; - mcs_hton->db_type = DB_TYPE_AUTOASSIGN; - DBUG_RETURN(0); -} - -static int columnstore_done_func(void* p) -{ - DBUG_ENTER("columnstore_done_func"); - - config::Config::deleteInstanceMap(); - my_hash_free(&mcs_open_tables); -#ifndef _MSC_VER - pthread_mutex_destroy(&mcs_mutex); -#endif - DBUG_RETURN(0); -} - -static handler* mcs_create_handler(handlerton* hton, - TABLE_SHARE* table, - MEM_ROOT* mem_root) -{ - return new (mem_root) ha_mcs(hton, table); -} - static int mcs_commit(handlerton* hton, THD* thd, bool all) { int rc; @@ -1405,18 +1335,36 @@ void ha_mcs_cache_share::close() ha_mcs_cache handler functions *****************************************************************************/ +static plugin_ref plugin_maria = NULL; + ha_mcs_cache::ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root) :ha_mcs(mcs_hton, table_arg) { - cache_handler= (ha_maria*) mcs_maria_hton->create(mcs_maria_hton, table_arg, mem_root); - share= 0; - lock_counter= 0; + if (get_cache_inserts(current_thd)) + { + if (!plugin_maria) + { + LEX_CSTRING name = { STRING_WITH_LEN("Aria") }; + plugin_maria = ha_resolve_by_name(0, &name, 0); + mcs_maria_hton = plugin_hton(plugin_maria); + int error = mcs_maria_hton == NULL; // Engine must exists! + if (error) + my_error(HA_ERR_INITIALIZATION, MYF(0), + "Could not find storage engine %s", name.str); + } + + assert(mcs_maria_hton); + + cache_handler= (ha_maria*) mcs_maria_hton->create(mcs_maria_hton, table_arg, mem_root); + share= 0; + lock_counter= 0; + } } ha_mcs_cache::~ha_mcs_cache() { - if (cache_handler) + if (get_cache_inserts(current_thd) && cache_handler) { delete cache_handler; cache_handler= NULL; @@ -1435,26 +1383,31 @@ int ha_mcs_cache::create(const char *name, TABLE *table_arg, char cache_name[FN_REFLEN+8]; DBUG_ENTER("ha_mcs_cache::create"); - create_cache_name(cache_name, name); + if (get_cache_inserts(current_thd)) { - /* Create a cached table */ - ha_choice save_transactional= ha_create_info->transactional; - row_type save_row_type= ha_create_info->row_type; - ha_create_info->transactional= HA_CHOICE_NO; - ha_create_info->row_type= ROW_TYPE_DYNAMIC; + create_cache_name(cache_name, name); + { + /* Create a cached table */ + ha_choice save_transactional= ha_create_info->transactional; + row_type save_row_type= ha_create_info->row_type; + ha_create_info->transactional= HA_CHOICE_NO; + ha_create_info->row_type= ROW_TYPE_DYNAMIC; - if ((error= cache_handler->create(cache_name, table_arg, ha_create_info))) - DBUG_RETURN(error); - ha_create_info->transactional= save_transactional; - ha_create_info->row_type= save_row_type; + if ((error= cache_handler->create(cache_name, table_arg, ha_create_info))) + DBUG_RETURN(error); + ha_create_info->transactional= save_transactional; + ha_create_info->row_type= save_row_type; + } } /* Create the real table in ColumnStore */ if ((error= parent::create(name, table_arg, ha_create_info))) { - cache_handler->delete_table(cache_name); + if (get_cache_inserts(current_thd)) + cache_handler->delete_table(cache_name); DBUG_RETURN(error); } + DBUG_RETURN(0); } @@ -1462,54 +1415,59 @@ int ha_mcs_cache::create(const char *name, TABLE *table_arg, int ha_mcs_cache::open(const char *name, int mode, uint open_flags) { int error; - char cache_name[FN_REFLEN+8]; DBUG_ENTER("ha_mcs_cache::open"); - /* Copy table object to cache_handler */ - cache_handler->change_table_ptr(table, table->s); - - create_cache_name(cache_name, name); - if ((error= cache_handler->open(cache_name, mode, open_flags))) - DBUG_RETURN(error); - - if (!(share= find_cache_share(name))) + if (get_cache_inserts(current_thd)) { - cache_handler->close(); - DBUG_RETURN(ER_OUTOFMEMORY); - } + /* Copy table object to cache_handler */ + cache_handler->change_table_ptr(table, table->s); - /* Fix lock so that it goes through get_status_and_flush() */ - THR_LOCK *lock= &cache_handler->file->s->lock; - if (lock->get_status != &get_status_and_flush_cache) - { - mysql_mutex_lock(&cache_handler->file->s->intern_lock); + char cache_name[FN_REFLEN+8]; + create_cache_name(cache_name, name); + if ((error= cache_handler->open(cache_name, mode, open_flags))) + DBUG_RETURN(error); + + if (!(share= find_cache_share(name))) + { + cache_handler->close(); + DBUG_RETURN(ER_OUTOFMEMORY); + } + + /* Fix lock so that it goes through get_status_and_flush() */ + THR_LOCK *lock= &cache_handler->file->s->lock; if (lock->get_status != &get_status_and_flush_cache) { - /* Remember original lock. Used by the THR_lock cache functions */ - share->org_lock= lock[0]; - if (lock->start_trans) - lock->start_trans= &cache_start_trans; - if (lock->copy_status) - lock->copy_status= &cache_copy_status; - if (lock->update_status) - lock->update_status= &cache_update_status; - if (lock->restore_status) - lock->restore_status= &cache_restore_status; - if (lock->check_status) - lock->check_status= &cache_check_status; - if (lock->restore_status) - lock->restore_status= &cache_restore_status; - lock->get_status= &get_status_and_flush_cache; + mysql_mutex_lock(&cache_handler->file->s->intern_lock); + if (lock->get_status != &get_status_and_flush_cache) + { + /* Remember original lock. Used by the THR_lock cache functions */ + share->org_lock= lock[0]; + if (lock->start_trans) + lock->start_trans= &cache_start_trans; + if (lock->copy_status) + lock->copy_status= &cache_copy_status; + if (lock->update_status) + lock->update_status= &cache_update_status; + if (lock->restore_status) + lock->restore_status= &cache_restore_status; + if (lock->check_status) + lock->check_status= &cache_check_status; + if (lock->restore_status) + lock->restore_status= &cache_restore_status; + lock->get_status= &get_status_and_flush_cache; + } + mysql_mutex_unlock(&cache_handler->file->s->intern_lock); } - mysql_mutex_unlock(&cache_handler->file->s->intern_lock); + cache_handler->file->lock.status_param= (void*) this; } - cache_handler->file->lock.status_param= (void*) this; if ((error= parent::open(name, mode, open_flags))) { - cache_handler->close(); + if (get_cache_inserts(current_thd)) + cache_handler->close(); DBUG_RETURN(error); } + DBUG_RETURN(0); } @@ -1517,13 +1475,23 @@ int ha_mcs_cache::open(const char *name, int mode, uint open_flags) int ha_mcs_cache::close() { int error, error2; - ha_mcs_cache_share *org_share= share; + DBUG_ENTER("ha_mcs_cache::close()"); - error= cache_handler->close(); - if ((error2= parent::close())) - error= error2; - if (org_share) - org_share->close(); + + if (get_cache_inserts(current_thd)) + { + error= cache_handler->close(); + if ((error2= parent::close())) + error= error2; + ha_mcs_cache_share *org_share= share; + if (org_share) + org_share->close(); + } + else + { + error= parent::close(); + } + DBUG_RETURN(error); } @@ -1539,7 +1507,10 @@ uint ha_mcs_cache::lock_count(void) const If we are doing an insert or if we want to flush the cache, we have to lock both the Aria table and normal table. */ - return 2; + if (get_cache_inserts(current_thd)) + return 2; + else + return 1; } /** @@ -1550,7 +1521,8 @@ THR_LOCK_DATA **ha_mcs_cache::store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type) { - to= cache_handler->store_lock(thd, to, TL_WRITE); + if (get_cache_inserts(current_thd)) + to= cache_handler->store_lock(thd, to, TL_WRITE); return parent::store_lock(thd, to, lock_type); } @@ -1561,78 +1533,103 @@ THR_LOCK_DATA **ha_mcs_cache::store_lock(THD *thd, int ha_mcs_cache::external_lock(THD *thd, int lock_type) { - int error; + int error= 0; DBUG_ENTER("ha_mcs_cache::external_lock"); - /* - Reset lock_counter. This is ok as external_lock() is guaranteed to be - called before first get_status() - */ - lock_counter= 0; - - if (lock_type == F_UNLCK) + if (get_cache_inserts(current_thd)) { - int error2; - error= cache_handler->external_lock(thd, lock_type); - if ((error2= parent::external_lock(thd, lock_type))) - error= error2; - DBUG_RETURN(error); + /* + Reset lock_counter. This is ok as external_lock() is guaranteed to be + called before first get_status() + */ + lock_counter= 0; + + if (lock_type == F_UNLCK) + { + int error2; + error= cache_handler->external_lock(thd, lock_type); + if ((error2= parent::external_lock(thd, lock_type))) + error= error2; + DBUG_RETURN(error); + } + + /* Lock first with write lock to be able to do insert or flush table */ + original_lock_type= lock_type; + lock_type= F_WRLCK; + if ((error= cache_handler->external_lock(thd, lock_type))) + DBUG_RETURN(error); + if ((error= parent::external_lock(thd, lock_type))) + { + error= cache_handler->external_lock(thd, F_UNLCK); + DBUG_RETURN(error); + } + } + else + { + error= parent::external_lock(thd, lock_type); } - /* Lock first with write lock to be able to do insert or flush table */ - original_lock_type= lock_type; - lock_type= F_WRLCK; - if ((error= cache_handler->external_lock(thd, lock_type))) - DBUG_RETURN(error); - if ((error= parent::external_lock(thd, lock_type))) - { - error= cache_handler->external_lock(thd, F_UNLCK); - DBUG_RETURN(error); - } - DBUG_RETURN(0); + DBUG_RETURN(error); } int ha_mcs_cache::delete_table(const char *name) { - int error, error2; - char cache_name[FN_REFLEN+8]; + int error= 0, error2; + DBUG_ENTER("ha_mcs_cache::delete_table"); - create_cache_name(cache_name, name); - error= cache_handler->delete_table(cache_name); + if (get_cache_inserts(current_thd)) + { + char cache_name[FN_REFLEN+8]; + create_cache_name(cache_name, name); + error= cache_handler->delete_table(cache_name); + } + if ((error2= parent::delete_table(name))) error= error2; + DBUG_RETURN(error); } int ha_mcs_cache::rename_table(const char *from, const char *to) { - int error; - char cache_from[FN_REFLEN+8], cache_to[FN_REFLEN+8]; + int error= 0; + DBUG_ENTER("ha_mcs_cache::rename_table"); - create_cache_name(cache_from, from); - create_cache_name(cache_to, to); - if ((error= cache_handler->rename_table(cache_from, cache_to))) - DBUG_RETURN(error); - - if ((error= parent::rename_table(from, to))) + if (get_cache_inserts(current_thd)) { - cache_handler->rename_table(cache_to, cache_from); - DBUG_RETURN(error); + char cache_from[FN_REFLEN+8], cache_to[FN_REFLEN+8]; + create_cache_name(cache_from, from); + create_cache_name(cache_to, to); + if ((error= cache_handler->rename_table(cache_from, cache_to))) + DBUG_RETURN(error); + + if ((error= parent::rename_table(from, to))) + { + cache_handler->rename_table(cache_to, cache_from); + DBUG_RETURN(error); + } } - DBUG_RETURN(0); + else + { + error= parent::rename_table(from, to); + } + + DBUG_RETURN(error); } int ha_mcs_cache::delete_all_rows(void) { - int error,error2; + int error= 0, error2; + DBUG_ENTER("ha_mcs_cache::delete_all_rows"); - error= cache_handler->delete_all_rows(); + if (get_cache_inserts(current_thd)) + error= cache_handler->delete_all_rows(); if ((error2= parent::delete_all_rows())) error= error2; DBUG_RETURN(error); @@ -1640,8 +1637,11 @@ int ha_mcs_cache::delete_all_rows(void) bool ha_mcs_cache::is_crashed() const { - return (cache_handler->is_crashed() || - parent::is_crashed()); + if (get_cache_inserts(current_thd)) + return (cache_handler->is_crashed() || + parent::is_crashed()); + else + return parent::is_crashed(); } /** @@ -1669,17 +1669,21 @@ int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt) int something_crashed= is_crashed(); DBUG_ENTER("ha_mcs_cache::repair"); - if (cache_handler->is_crashed() || !something_crashed) + if (get_cache_inserts(current_thd)) { - /* Delete everything that was not already committed */ - mysql_file_chsize(cache_handler->file->dfile.file, - cache_handler->file->s->state.state.key_file_length, - 0, MYF(MY_WME)); - mysql_file_chsize(cache_handler->file->s->kfile.file, - cache_handler->file->s->state.state.data_file_length, - 0, MYF(MY_WME)); - error= cache_handler->repair(thd, check_opt); + if (cache_handler->is_crashed() || !something_crashed) + { + /* Delete everything that was not already committed */ + mysql_file_chsize(cache_handler->file->dfile.file, + cache_handler->file->s->state.state.key_file_length, + 0, MYF(MY_WME)); + mysql_file_chsize(cache_handler->file->s->kfile.file, + cache_handler->file->s->state.state.data_file_length, + 0, MYF(MY_WME)); + error= cache_handler->repair(thd, check_opt); + } } + if (parent::is_crashed() || !something_crashed) if ((error2= parent::repair(thd, check_opt))) error= error2; @@ -1693,7 +1697,7 @@ int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt) */ int ha_mcs_cache::write_row(const uchar *buf) { - if (insert_command) + if (get_cache_inserts(current_thd) && insert_command) return cache_handler->write_row(buf); return parent::write_row(buf); } @@ -1701,18 +1705,25 @@ int ha_mcs_cache::write_row(const uchar *buf) void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags) { - if (insert_command) + if (get_cache_inserts(current_thd)) { - bzero(&cache_handler->copy_info, sizeof(cache_handler->copy_info)); - return cache_handler->start_bulk_insert(rows, flags); + if (insert_command) + { + bzero(&cache_handler->copy_info, sizeof(cache_handler->copy_info)); + return cache_handler->start_bulk_insert(rows, flags); + } + return parent::start_bulk_insert_from_cache(rows, flags); + } + else + { + return parent::start_bulk_insert(rows, flags); } - return parent::start_bulk_insert_from_cache(rows, flags); } int ha_mcs_cache::end_bulk_insert() { - if (insert_command) + if (get_cache_inserts(current_thd) && insert_command) return cache_handler->end_bulk_insert(); return parent::end_bulk_insert(); } @@ -1728,56 +1739,79 @@ static handler *ha_mcs_cache_create_handler(handlerton *hton, return new (mem_root) ha_mcs_cache(hton, table, mem_root); } -static plugin_ref plugin_maria; -static int ha_mcs_cache_init(void *p) +/****************************************************************************** + ha_mcs Plugin code +******************************************************************************/ + +static int columnstore_init_func(void* p) { - handlerton *cache_hton; - int error; - uint count; + DBUG_ENTER("columnstore_init_func"); - cache_hton= (handlerton *) p; - cache_hton->create= ha_mcs_cache_create_handler; - cache_hton->panic= 0; - cache_hton->flags= HTON_NO_PARTITION; + struct tm tm; + time_t t; + time(&t); + localtime_r(&t, &tm); + fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d ", + tm.tm_year % 100, tm.tm_mon + 1, tm.tm_mday, + tm.tm_hour, tm.tm_min, tm.tm_sec); - count= sizeof(all_mutexes)/sizeof(all_mutexes[0]); - mysql_mutex_register("ha_mcs_cache", all_mutexes, count); - mysql_mutex_init(key_LOCK_cache_share, &LOCK_cache_share, MY_MUTEX_INIT_FAST); + fprintf(stderr, "Columnstore: Started; Version: %s-%s\n", columnstore_version.c_str(), columnstore_release.c_str()); - error= mcs_hton == NULL; // Engine must exists! + strncpy(cs_version, columnstore_version.c_str(), sizeof(cs_version)); + cs_version[sizeof(cs_version) - 1] = 0; - if (error) - { - my_error(HA_ERR_INITIALIZATION, MYF(0), - "Could not find storage engine %s", "Columnstore"); - return error; - } + strncpy(cs_commit_hash, columnstore_commit_hash.c_str(), sizeof(cs_commit_hash)); + cs_commit_hash[sizeof(cs_commit_hash) - 1] = 0; - { - LEX_CSTRING name= { STRING_WITH_LEN("Aria") }; - plugin_maria= ha_resolve_by_name(0, &name, 0); - mcs_maria_hton= plugin_hton(plugin_maria); - error= mcs_maria_hton == NULL; // Engine must exists! - if (error) - my_error(HA_ERR_INITIALIZATION, MYF(0), - "Could not find storage engine %s", name.str); - } + mcs_hton = (handlerton*)p; - return error; +#ifndef _MSC_VER + (void) pthread_mutex_init(&mcs_mutex, MY_MUTEX_INIT_FAST); +#endif + (void) my_hash_init(PSI_NOT_INSTRUMENTED, &mcs_open_tables, system_charset_info, 32, 0, 0, + (my_hash_get_key) mcs_get_key, 0, 0); + + mcs_hton->create = ha_mcs_cache_create_handler; + mcs_hton->panic = 0; + mcs_hton->flags = HTON_CAN_RECREATE | HTON_NO_PARTITION; +// mcs_hton->discover_table = mcs_discover; +// mcs_hton->discover_table_existence = mcs_discover_existence; + mcs_hton->commit = mcs_commit; + mcs_hton->rollback = mcs_rollback; + mcs_hton->close_connection = mcs_close_connection; + mcs_hton->create_group_by = create_columnstore_group_by_handler; + mcs_hton->create_derived = create_columnstore_derived_handler; + mcs_hton->create_select = create_columnstore_select_handler; + mcs_hton->db_type = DB_TYPE_AUTOASSIGN; + + uint count = sizeof(all_mutexes)/sizeof(all_mutexes[0]); + mysql_mutex_register("ha_mcs_cache", all_mutexes, count); + mysql_mutex_init(key_LOCK_cache_share, &LOCK_cache_share, MY_MUTEX_INIT_FAST); + + DBUG_RETURN(0); } -static int ha_mcs_cache_deinit(void *p) +static int columnstore_done_func(void* p) { - if (plugin_maria) - { - plugin_unlock(0, plugin_maria); - plugin_maria= NULL; - } - mysql_mutex_destroy(&LOCK_cache_share); - return 0; -} + DBUG_ENTER("columnstore_done_func"); + config::Config::deleteInstanceMap(); + my_hash_free(&mcs_open_tables); +#ifndef _MSC_VER + pthread_mutex_destroy(&mcs_mutex); +#endif + + if (plugin_maria) + { + plugin_unlock(0, plugin_maria); + plugin_maria = NULL; + } + + mysql_mutex_destroy(&LOCK_cache_share); + + DBUG_RETURN(0); +} struct st_mysql_storage_engine ha_mcs_cache_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; @@ -1805,21 +1839,6 @@ maria_declare_plugin(columnstore) MCSVERSION, /* string version */ COLUMNSTORE_MATURITY /* maturity */ }, -{ - MYSQL_STORAGE_ENGINE_PLUGIN, - &ha_mcs_cache_storage_engine, - "Columnstore_cache", - "MariaDB Corporation AB", - "Insert cache for ColumnStore", - PLUGIN_LICENSE_GPL, - ha_mcs_cache_init, /* Plugin Init */ - ha_mcs_cache_deinit, /* Plugin Deinit */ - MCSVERSIONHEX, - NULL, /* status variables */ - NULL, /* system variables */ - MCSVERSION, /* string version */ - MariaDB_PLUGIN_MATURITY_GAMMA /* maturity */ -}, { MYSQL_INFORMATION_SCHEMA_PLUGIN, &is_columnstore_plugin_version, diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index 66856f26e..9d57fce51 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -303,6 +303,15 @@ static MYSQL_THDVAR_BOOL( 0 ); +static MYSQL_THDVAR_BOOL( + cache_inserts, + PLUGIN_VAR_NOCMDARG | PLUGIN_VAR_READONLY, + "Perform cache-based inserts to ColumnStore", + NULL, + NULL, + 0 +); + st_mysql_sys_var* mcs_system_variables[] = { MYSQL_SYSVAR(compression_type), @@ -328,6 +337,7 @@ st_mysql_sys_var* mcs_system_variables[] = MYSQL_SYSVAR(import_for_batchinsert_enclosed_by), MYSQL_SYSVAR(varbin_always_hex), MYSQL_SYSVAR(replication_slave), + MYSQL_SYSVAR(cache_inserts), NULL }; @@ -559,3 +569,12 @@ void set_replication_slave(THD* thd, bool value) { THDVAR(thd, replication_slave) = value; } + +bool get_cache_inserts(THD* thd) +{ + return ( thd == NULL ) ? false : THDVAR(thd, cache_inserts); +} +void set_cache_inserts(THD* thd, bool value) +{ + THDVAR(thd, cache_inserts) = value; +} diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index 90d3b99a5..105660e79 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -113,4 +113,7 @@ void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value); bool get_replication_slave(THD* thd); void set_replication_slave(THD* thd, bool value); +bool get_cache_inserts(THD* thd); +void set_cache_inserts(THD* thd, bool value); + #endif From 709290cc3d784dd842a4bd28bb5855a649de1e9f Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Tue, 11 Aug 2020 18:44:08 -0400 Subject: [PATCH 6/8] Port of commit: 1ff23d0cd70d576a0f4e512ce332cff348591d36 from server/columnstore_cache. Commit message: Fixed bug in free locks that caused rows in cache to not be properly flushed Fixed by doing adding external_lock(F_UNLCK) in free_locks. I also moved unlock of cache_handler to be after changing lock type of cached tables to ensure that no one can use cached table while this is happening (as cache table is locked with write lock). I also fixed a wrong mutex order bug in ha_cache::flush_insert_cache() Other things: - Addded share::cached_rows to track inserted rows. This is only used for asserts to check the number of rows in the cache. - Fixed wrong mysql_file_chsize() in case of repair --- dbcon/mysql/ha_mcs.cpp | 68 ++++++++++++++++++++++++++++++++++-------- dbcon/mysql/ha_mcs.h | 6 ++-- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 45d2e9383..6879f4995 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -1232,7 +1232,10 @@ my_bool cache_start_trans(void* param) ha_mcs_cache *cache= (ha_mcs_cache*) param; if (!cache->insert_command) + { cache->free_locks(); + return 0; + } return (*cache->share->org_lock.start_trans)(cache->cache_handler->file); } @@ -1281,7 +1284,7 @@ static mysql_mutex_t LOCK_cache_share; Find or create a share */ -ha_mcs_cache_share *find_cache_share(const char *name) +ha_mcs_cache_share *find_cache_share(const char *name, ulonglong cached_rows) { ha_mcs_cache_share *pos, *share; mysql_mutex_lock(&LOCK_cache_share); @@ -1303,6 +1306,7 @@ ha_mcs_cache_share *find_cache_share(const char *name) } share->name= (char*) (share+1); share->open_count= 1; + share->cached_rows= cached_rows; strmov((char*) share->name, name); share->next= cache_share_list; cache_share_list= share; @@ -1358,6 +1362,7 @@ ha_mcs_cache::ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *m cache_handler= (ha_maria*) mcs_maria_hton->create(mcs_maria_hton, table_arg, mem_root); share= 0; lock_counter= 0; + cache_locked= 0; } } @@ -1427,7 +1432,7 @@ int ha_mcs_cache::open(const char *name, int mode, uint open_flags) if ((error= cache_handler->open(cache_name, mode, open_flags))) DBUG_RETURN(error); - if (!(share= find_cache_share(name))) + if (!(share= find_cache_share(name, cache_handler->file->state->records))) { cache_handler->close(); DBUG_RETURN(ER_OUTOFMEMORY); @@ -1438,6 +1443,11 @@ int ha_mcs_cache::open(const char *name, int mode, uint open_flags) if (lock->get_status != &get_status_and_flush_cache) { mysql_mutex_lock(&cache_handler->file->s->intern_lock); + + /* The following lock is here just to establish mutex locking order */ + mysql_mutex_lock(&lock->mutex); + mysql_mutex_unlock(&lock->mutex); + if (lock->get_status != &get_status_and_flush_cache) { /* Remember original lock. Used by the THR_lock cache functions */ @@ -1547,7 +1557,12 @@ int ha_mcs_cache::external_lock(THD *thd, int lock_type) if (lock_type == F_UNLCK) { int error2; - error= cache_handler->external_lock(thd, lock_type); + error= 0; + if (cache_locked) + { + error= cache_handler->external_lock(thd, lock_type); + cache_locked= 0; + } if ((error2= parent::external_lock(thd, lock_type))) error= error2; DBUG_RETURN(error); @@ -1563,6 +1578,8 @@ int ha_mcs_cache::external_lock(THD *thd, int lock_type) error= cache_handler->external_lock(thd, F_UNLCK); DBUG_RETURN(error); } + + cache_locked= 1; } else { @@ -1629,7 +1646,10 @@ int ha_mcs_cache::delete_all_rows(void) DBUG_ENTER("ha_mcs_cache::delete_all_rows"); if (get_cache_inserts(current_thd)) + { error= cache_handler->delete_all_rows(); + share->cached_rows= 0; + } if ((error2= parent::delete_all_rows())) error= error2; DBUG_RETURN(error); @@ -1653,7 +1673,7 @@ bool ha_mcs_cache::is_crashed() const In the case of 1) we don't want to run repair on both tables as the repair can be a slow process. Instead we only run repair - on the crashed tables. If not tables are marked crashed, we + on the crashed tables. If no tables are marked crashed, we run repair on both tables. Repair on the cache table will delete the part of the cache that was @@ -1675,12 +1695,14 @@ int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt) { /* Delete everything that was not already committed */ mysql_file_chsize(cache_handler->file->dfile.file, - cache_handler->file->s->state.state.key_file_length, - 0, MYF(MY_WME)); - mysql_file_chsize(cache_handler->file->s->kfile.file, cache_handler->file->s->state.state.data_file_length, 0, MYF(MY_WME)); + mysql_file_chsize(cache_handler->file->s->kfile.file, + cache_handler->file->s->state.state.key_file_length, + 0, MYF(MY_WME)); + check_opt->flags|= T_AUTO_REPAIR; error= cache_handler->repair(thd, check_opt); + share->cached_rows= cache_handler->file->state->records; } } @@ -1698,7 +1720,11 @@ int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt) int ha_mcs_cache::write_row(const uchar *buf) { if (get_cache_inserts(current_thd) && insert_command) + { + DBUG_ASSERT(share->cached_rows == cache_handler->file->state->records); + share->cached_rows++; return cache_handler->write_row(buf); + } return parent::write_row(buf); } @@ -1919,15 +1945,17 @@ ha_rows ha_mcs_cache::num_rows_cached() void ha_mcs_cache::free_locks() { - /* We don't need to lock cache_handler anymore as it's already flushed */ - thr_unlock(&cache_handler->file->lock, 0); - /* Restart transaction for columnstore table */ if (original_lock_type != F_WRLCK) { parent::external_lock(table->in_use, F_UNLCK); parent::external_lock(table->in_use, original_lock_type); } + + /* We don't need to lock cache_handler anymore as it's already flushed */ + cache_handler->external_lock(table->in_use, F_UNLCK); + thr_unlock(&cache_handler->file->lock, 0); + cache_locked= false; } @@ -1943,18 +1971,23 @@ int ha_mcs_cache::flush_insert_cache() int error, error2; ha_maria *from= cache_handler; uchar *record= table->record[0]; + ulonglong copied_rows= 0; DBUG_ENTER("flush_insert_cache"); + DBUG_ASSERT(from->file->state->records == share->cached_rows); + parent::start_bulk_insert_from_cache(from->file->state->records, 0); from->rnd_init(1); while (!(error= from->rnd_next(record))) { + copied_rows++; if ((error= parent::write_row(record))) goto end; rows_changed++; } if (error == HA_ERR_END_OF_FILE) error= 0; + DBUG_ASSERT(copied_rows == share->cached_rows); end: from->rnd_end(); @@ -1973,13 +2006,24 @@ end: parent::ht->rollback(parent::ht, table->in_use, 1); } + DBUG_ASSERT(error == 0); if (!error) { /* - Everything when fine, delete all rows from the cache and allow others + Everything went fine, delete all rows from the cache and allow others to use it. */ - from->delete_all_rows(); + { + /* + We have to unlock the lock mutex as otherwise we get a conflict in + mutex order. This is fine as we have a write lock on the mutex, + so we will be able to get it again + */ + mysql_mutex_unlock(&from->file->s->lock.mutex); + from->delete_all_rows(); + share->cached_rows= 0; + mysql_mutex_lock(&from->file->s->lock.mutex); + } } DBUG_RETURN(error); } diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index 2c0dcca7a..eff57eb5f 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -248,8 +248,10 @@ class ha_mcs_cache_share const char *name; uint open_count; public: + ulonglong cached_rows; THR_LOCK org_lock; - friend ha_mcs_cache_share *find_cache_share(const char *name); + friend ha_mcs_cache_share *find_cache_share(const char *name, + ulonglong cached_rows); void close(); }; @@ -259,7 +261,7 @@ class ha_mcs_cache :public ha_mcs { typedef ha_mcs parent; int original_lock_type; - bool insert_command; + bool insert_command, cache_locked; public: uint lock_counter; From 47f2291f9f51b6dc6e687c8c631f26a2de4d5475 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Mon, 17 Aug 2020 16:44:08 -0400 Subject: [PATCH 7/8] Number of cached rows can be > CACHE_FLUSH_THRESHOLD (in case of batch inserts). --- dbcon/mysql/ha_mcs.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 6879f4995..598eb51af 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -1205,7 +1205,7 @@ my_bool get_status_and_flush_cache(void *param, { ha_rows num_rows = cache->num_rows_cached(); if ((!cache->insert_command && num_rows != 0) || - num_rows == CACHE_FLUSH_THRESHOLD) + num_rows >= CACHE_FLUSH_THRESHOLD) { if ((error= cache->flush_insert_cache())) { From b3ae9cf04e91642e760e62055be6e88213e63583 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Tue, 18 Aug 2020 16:39:07 -0400 Subject: [PATCH 8/8] Use a session variable, columnstore_cache_flush_threshold, to allow the user to set the threshold, instead of using a hard coded value. --- dbcon/mysql/ha_mcs.cpp | 3 +-- dbcon/mysql/ha_mcs_sysvars.cpp | 23 +++++++++++++++++++++++ dbcon/mysql/ha_mcs_sysvars.h | 3 +++ 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 598eb51af..5d9c2fbf8 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -32,7 +32,6 @@ #endif #define CACHE_PREFIX "#cache#" -#define CACHE_FLUSH_THRESHOLD 1000000 handlerton* mcs_hton = NULL; // This is the maria handlerton that we need for the cache @@ -1205,7 +1204,7 @@ my_bool get_status_and_flush_cache(void *param, { ha_rows num_rows = cache->num_rows_cached(); if ((!cache->insert_command && num_rows != 0) || - num_rows >= CACHE_FLUSH_THRESHOLD) + num_rows >= get_cache_flush_threshold(current_thd)) { if ((error= cache->flush_insert_cache())) { diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index 9d57fce51..eb1c1c531 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -312,6 +312,18 @@ static MYSQL_THDVAR_BOOL( 0 ); +static MYSQL_THDVAR_ULONGLONG( + cache_flush_threshold, + PLUGIN_VAR_RQCMDARG, + "Threshold on the number of rows in the cache to trigger a flush", + NULL, + NULL, + 500000, + 1, + 1000000000, + 1 +); + st_mysql_sys_var* mcs_system_variables[] = { MYSQL_SYSVAR(compression_type), @@ -338,6 +350,7 @@ st_mysql_sys_var* mcs_system_variables[] = MYSQL_SYSVAR(varbin_always_hex), MYSQL_SYSVAR(replication_slave), MYSQL_SYSVAR(cache_inserts), + MYSQL_SYSVAR(cache_flush_threshold), NULL }; @@ -578,3 +591,13 @@ void set_cache_inserts(THD* thd, bool value) { THDVAR(thd, cache_inserts) = value; } + +ulonglong get_cache_flush_threshold(THD* thd) +{ + return ( thd == NULL ) ? 500000 : THDVAR(thd, cache_flush_threshold); +} + +void set_cache_flush_threshold(THD* thd, ulonglong value) +{ + THDVAR(thd, cache_flush_threshold) = value; +} diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index 105660e79..ffb9f2fae 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -116,4 +116,7 @@ void set_replication_slave(THD* thd, bool value); bool get_cache_inserts(THD* thd); void set_cache_inserts(THD* thd, bool value); +ulonglong get_cache_flush_threshold(THD* thd); +void set_cache_flush_threshold(THD* thd, ulonglong value); + #endif