diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 50b14dd26..5d9c2fbf8 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -33,14 +33,6 @@ #define CACHE_PREFIX "#cache#" -static handler* mcs_create_handler(handlerton* hton, - TABLE_SHARE* table, - MEM_ROOT* mem_root); - -static int mcs_commit(handlerton* hton, THD* thd, bool all); - -static int mcs_rollback(handlerton* hton, THD* thd, bool all); -static int mcs_close_connection(handlerton* hton, THD* thd ); handlerton* mcs_hton = NULL; // This is the maria handlerton that we need for the cache static handlerton *mcs_maria_hton = NULL; @@ -125,68 +117,6 @@ int mcs_discover_existence(handlerton* hton, const char* db, return ha_mcs_impl_discover_existence(db, table_name); } -static int columnstore_init_func(void* p) -{ - DBUG_ENTER("columnstore_init_func"); - - struct tm tm; - time_t t; - - - time(&t); - localtime_r(&t, &tm); - fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d ", - tm.tm_year % 100, tm.tm_mon + 1, tm.tm_mday, - tm.tm_hour, tm.tm_min, tm.tm_sec); - - fprintf(stderr, "Columnstore: Started; Version: %s-%s\n", columnstore_version.c_str(), columnstore_release.c_str()); - - strncpy(cs_version, columnstore_version.c_str(), sizeof(cs_version)); - cs_version[sizeof(cs_version) - 1] = 0; - - strncpy(cs_commit_hash, columnstore_commit_hash.c_str(), sizeof(cs_commit_hash)); - cs_commit_hash[sizeof(cs_commit_hash) - 1] = 0; - - mcs_hton = (handlerton*)p; -#ifndef _MSC_VER - (void) pthread_mutex_init(&mcs_mutex, MY_MUTEX_INIT_FAST); -#endif - (void) my_hash_init(PSI_NOT_INSTRUMENTED, &mcs_open_tables, system_charset_info, 32, 0, 0, - (my_hash_get_key) mcs_get_key, 0, 0); - - mcs_hton->create = mcs_create_handler; - mcs_hton->flags = HTON_CAN_RECREATE; -// mcs_hton->discover_table = mcs_discover; -// mcs_hton->discover_table_existence = mcs_discover_existence; - mcs_hton->commit = mcs_commit; - mcs_hton->rollback = mcs_rollback; - mcs_hton->close_connection = mcs_close_connection; - mcs_hton->create_group_by = create_columnstore_group_by_handler; - mcs_hton->create_derived = create_columnstore_derived_handler; - mcs_hton->create_select = create_columnstore_select_handler; - mcs_hton->db_type = DB_TYPE_AUTOASSIGN; - DBUG_RETURN(0); -} - -static int columnstore_done_func(void* p) -{ - DBUG_ENTER("columnstore_done_func"); - - config::Config::deleteInstanceMap(); - my_hash_free(&mcs_open_tables); -#ifndef _MSC_VER - pthread_mutex_destroy(&mcs_mutex); -#endif - DBUG_RETURN(0); -} - -static handler* mcs_create_handler(handlerton* hton, - TABLE_SHARE* table, - MEM_ROOT* mem_root) -{ - return new (mem_root) ha_mcs(hton, table); -} - static int mcs_commit(handlerton* hton, THD* thd, bool all) { int rc; @@ -1223,11 +1153,10 @@ bool ha_mcs::is_crashed() const lock. */ -static my_bool (*original_get_status)(void*, my_bool); - my_bool get_status_and_flush_cache(void *param, my_bool concurrent_insert); + /* Create a name for the cache table */ @@ -1243,7 +1172,9 @@ static void create_cache_name(char *to, const char *name) THR_LOCK wrapper functions The idea of these is to highjack 'THR_LOCK->get_status() so that if this - is called in a non-insert context then we will flush the cache + is called in a non-insert context then we will flush the cache. + We also hijack THR_LOCK->start_trans() to free any locks on the cache + if the command was not an insert command. *****************************************************************************/ /* @@ -1264,13 +1195,16 @@ my_bool get_status_and_flush_cache(void *param, Call first the original Aria get_status function All Aria get_status functions takes Maria handler as the parameter */ - if (original_get_status) - (*original_get_status)(&cache->cache_handler->file, concurrent_insert); + if (cache->share->org_lock.get_status) + (*cache->share->org_lock.get_status)(cache->cache_handler->file, + concurrent_insert); /* If first get_status() call for this table, flush cache if needed */ if (!cache->lock_counter++) { - if (!cache->insert_command && cache->rows_cached()) + ha_rows num_rows = cache->num_rows_cached(); + if ((!cache->insert_command && num_rows != 0) || + num_rows >= get_cache_flush_threshold(current_thd)) { if ((error= cache->flush_insert_cache())) { @@ -1280,71 +1214,165 @@ my_bool get_status_and_flush_cache(void *param, return(1); } } - else if (!cache->insert_command) - cache->free_locks(); } - else if (!cache->insert_command) - cache->free_locks(); return (0); } -/* Pass through functions for all the THR_LOCK virtual functions */ +/* + start_trans() is called when all locks has been given + If this was not an insert command then we can free the write lock on + the cache table and also downgrade external lock for the cached table + to F_READ +*/ -static my_bool cache_start_trans(void* param) +my_bool cache_start_trans(void* param) { ha_mcs_cache *cache= (ha_mcs_cache*) param; - if (cache->org_lock.start_trans) - return (*cache->org_lock.start_trans)(cache->cache_handler->file); - return 0; + + if (!cache->insert_command) + { + cache->free_locks(); + return 0; + } + + return (*cache->share->org_lock.start_trans)(cache->cache_handler->file); } +/* Pass through functions for all the THR_LOCK virtual functions */ + static void cache_copy_status(void* to, void *from) { ha_mcs_cache *to_cache= (ha_mcs_cache*) to, *from_cache= (ha_mcs_cache*) from; - if (to_cache->org_lock.copy_status) - (*to_cache->org_lock.copy_status)(to_cache->cache_handler->file, - from_cache->cache_handler->file); + (*to_cache->share->org_lock.copy_status)(to_cache->cache_handler->file, + from_cache->cache_handler->file); } static void cache_update_status(void* param) { ha_mcs_cache *cache= (ha_mcs_cache*) param; - if (cache->org_lock.update_status) - (*cache->org_lock.update_status)(cache->cache_handler->file); + (*cache->share->org_lock.update_status)(cache->cache_handler->file); } static void cache_restore_status(void *param) { ha_mcs_cache *cache= (ha_mcs_cache*) param; - if (cache->org_lock.restore_status) - (*cache->org_lock.restore_status)(cache->cache_handler->file); + (*cache->share->org_lock.restore_status)(cache->cache_handler->file); } static my_bool cache_check_status(void *param) { ha_mcs_cache *cache= (ha_mcs_cache*) param; - if (cache->org_lock.check_status) - return (*cache->org_lock.check_status)(cache->cache_handler->file); - return 0; + return (*cache->share->org_lock.check_status)(cache->cache_handler->file); } + +/***************************************************************************** + ha_mcs_cache_share functions (Common storage for an open cache file) +*****************************************************************************/ + +static ha_mcs_cache_share *cache_share_list= 0; +static PSI_mutex_key key_LOCK_cache_share; +static PSI_mutex_info all_mutexes[]= +{ + { &key_LOCK_cache_share, "LOCK_cache_share", PSI_FLAG_GLOBAL}, +}; +static mysql_mutex_t LOCK_cache_share; + +/* + Find or create a share +*/ + +ha_mcs_cache_share *find_cache_share(const char *name, ulonglong cached_rows) +{ + ha_mcs_cache_share *pos, *share; + mysql_mutex_lock(&LOCK_cache_share); + for (pos= cache_share_list; pos; pos= pos->next) + { + if (!strcmp(pos->name, name)) + { + pos->open_count++; + mysql_mutex_unlock(&LOCK_cache_share); + return(pos); + } + } + if (!(share= (ha_mcs_cache_share*) my_malloc(PSI_NOT_INSTRUMENTED, + sizeof(*share) + strlen(name)+1, + MYF(MY_FAE)))) + { + mysql_mutex_unlock(&LOCK_cache_share); + return 0; + } + share->name= (char*) (share+1); + share->open_count= 1; + share->cached_rows= cached_rows; + strmov((char*) share->name, name); + share->next= cache_share_list; + cache_share_list= share; + mysql_mutex_unlock(&LOCK_cache_share); + return share; +} + + +/* + Decrement open counter and free share if there is no more users +*/ + +void ha_mcs_cache_share::close() +{ + ha_mcs_cache_share *pos; + mysql_mutex_lock(&LOCK_cache_share); + if (!--open_count) + { + ha_mcs_cache_share **prev= &cache_share_list; + for ( ; (pos= *prev) != this; prev= &pos->next) + ; + *prev= next; + my_free(this); + } + mysql_mutex_unlock(&LOCK_cache_share); +} + + /***************************************************************************** ha_mcs_cache handler functions *****************************************************************************/ +static plugin_ref plugin_maria = NULL; + ha_mcs_cache::ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root) :ha_mcs(mcs_hton, table_arg) { - cache_handler= (ha_maria*) mcs_maria_hton->create(mcs_maria_hton, table_arg, mem_root); - lock_counter= 0; + if (get_cache_inserts(current_thd)) + { + if (!plugin_maria) + { + LEX_CSTRING name = { STRING_WITH_LEN("Aria") }; + plugin_maria = ha_resolve_by_name(0, &name, 0); + mcs_maria_hton = plugin_hton(plugin_maria); + int error = mcs_maria_hton == NULL; // Engine must exists! + if (error) + my_error(HA_ERR_INITIALIZATION, MYF(0), + "Could not find storage engine %s", name.str); + } + + assert(mcs_maria_hton); + + cache_handler= (ha_maria*) mcs_maria_hton->create(mcs_maria_hton, table_arg, mem_root); + share= 0; + lock_counter= 0; + cache_locked= 0; + } } ha_mcs_cache::~ha_mcs_cache() { - if (cache_handler) + if (get_cache_inserts(current_thd) && cache_handler) + { delete cache_handler; + cache_handler= NULL; + } } /* @@ -1359,26 +1387,31 @@ int ha_mcs_cache::create(const char *name, TABLE *table_arg, char cache_name[FN_REFLEN+8]; DBUG_ENTER("ha_mcs_cache::create"); - create_cache_name(cache_name, name); + if (get_cache_inserts(current_thd)) { - /* Create a cached table */ - ha_choice save_transactional= ha_create_info->transactional; - row_type save_row_type= ha_create_info->row_type; - ha_create_info->transactional= HA_CHOICE_NO; - ha_create_info->row_type= ROW_TYPE_DYNAMIC; + create_cache_name(cache_name, name); + { + /* Create a cached table */ + ha_choice save_transactional= ha_create_info->transactional; + row_type save_row_type= ha_create_info->row_type; + ha_create_info->transactional= HA_CHOICE_NO; + ha_create_info->row_type= ROW_TYPE_DYNAMIC; - if ((error= cache_handler->create(cache_name, table_arg, ha_create_info))) - DBUG_RETURN(error); - ha_create_info->transactional= save_transactional; - ha_create_info->row_type= save_row_type; + if ((error= cache_handler->create(cache_name, table_arg, ha_create_info))) + DBUG_RETURN(error); + ha_create_info->transactional= save_transactional; + ha_create_info->row_type= save_row_type; + } } /* Create the real table in ColumnStore */ if ((error= parent::create(name, table_arg, ha_create_info))) { - cache_handler->delete_table(cache_name); + if (get_cache_inserts(current_thd)) + cache_handler->delete_table(cache_name); DBUG_RETURN(error); } + DBUG_RETURN(0); } @@ -1386,35 +1419,64 @@ int ha_mcs_cache::create(const char *name, TABLE *table_arg, int ha_mcs_cache::open(const char *name, int mode, uint open_flags) { int error; - char cache_name[FN_REFLEN+8]; DBUG_ENTER("ha_mcs_cache::open"); - /* Copy table object to cache_handler */ - cache_handler->change_table_ptr(table, table->s); + if (get_cache_inserts(current_thd)) + { + /* Copy table object to cache_handler */ + cache_handler->change_table_ptr(table, table->s); - create_cache_name(cache_name, name); - if ((error= cache_handler->open(cache_name, mode, open_flags))) - DBUG_RETURN(error); + char cache_name[FN_REFLEN+8]; + create_cache_name(cache_name, name); + if ((error= cache_handler->open(cache_name, mode, open_flags))) + DBUG_RETURN(error); - /* Fix lock so that it goes through get_status_and_flush() */ - THR_LOCK *lock= &cache_handler->file->s->lock; - mysql_mutex_lock(&cache_handler->file->s->intern_lock); - org_lock= lock[0]; - lock->get_status= &get_status_and_flush_cache; - lock->start_trans= &cache_start_trans; - lock->copy_status= &cache_copy_status; - lock->update_status= &cache_update_status; - lock->restore_status= &cache_restore_status; - lock->check_status= &cache_check_status; - lock->restore_status= &cache_restore_status; - cache_handler->file->lock.status_param= (void*) this; - mysql_mutex_unlock(&cache_handler->file->s->intern_lock); + if (!(share= find_cache_share(name, cache_handler->file->state->records))) + { + cache_handler->close(); + DBUG_RETURN(ER_OUTOFMEMORY); + } + + /* Fix lock so that it goes through get_status_and_flush() */ + THR_LOCK *lock= &cache_handler->file->s->lock; + if (lock->get_status != &get_status_and_flush_cache) + { + mysql_mutex_lock(&cache_handler->file->s->intern_lock); + + /* The following lock is here just to establish mutex locking order */ + mysql_mutex_lock(&lock->mutex); + mysql_mutex_unlock(&lock->mutex); + + if (lock->get_status != &get_status_and_flush_cache) + { + /* Remember original lock. Used by the THR_lock cache functions */ + share->org_lock= lock[0]; + if (lock->start_trans) + lock->start_trans= &cache_start_trans; + if (lock->copy_status) + lock->copy_status= &cache_copy_status; + if (lock->update_status) + lock->update_status= &cache_update_status; + if (lock->restore_status) + lock->restore_status= &cache_restore_status; + if (lock->check_status) + lock->check_status= &cache_check_status; + if (lock->restore_status) + lock->restore_status= &cache_restore_status; + lock->get_status= &get_status_and_flush_cache; + } + mysql_mutex_unlock(&cache_handler->file->s->intern_lock); + } + cache_handler->file->lock.status_param= (void*) this; + } if ((error= parent::open(name, mode, open_flags))) { - cache_handler->close(); + if (get_cache_inserts(current_thd)) + cache_handler->close(); DBUG_RETURN(error); } + DBUG_RETURN(0); } @@ -1422,10 +1484,23 @@ int ha_mcs_cache::open(const char *name, int mode, uint open_flags) int ha_mcs_cache::close() { int error, error2; + DBUG_ENTER("ha_mcs_cache::close()"); - error= cache_handler->close(); - if ((error2= parent::close())) - error= error2; + + if (get_cache_inserts(current_thd)) + { + error= cache_handler->close(); + if ((error2= parent::close())) + error= error2; + ha_mcs_cache_share *org_share= share; + if (org_share) + org_share->close(); + } + else + { + error= parent::close(); + } + DBUG_RETURN(error); } @@ -1439,9 +1514,12 @@ uint ha_mcs_cache::lock_count(void) const { /* If we are doing an insert or if we want to flush the cache, we have to lock - both MyISAM table and normal table. + both the Aria table and normal table. */ - return 2; + if (get_cache_inserts(current_thd)) + return 2; + else + return 1; } /** @@ -1452,7 +1530,8 @@ THR_LOCK_DATA **ha_mcs_cache::store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type) { - to= cache_handler->store_lock(thd, to, TL_WRITE); + if (get_cache_inserts(current_thd)) + to= cache_handler->store_lock(thd, to, TL_WRITE); return parent::store_lock(thd, to, lock_type); } @@ -1463,78 +1542,113 @@ THR_LOCK_DATA **ha_mcs_cache::store_lock(THD *thd, int ha_mcs_cache::external_lock(THD *thd, int lock_type) { - int error; + int error= 0; DBUG_ENTER("ha_mcs_cache::external_lock"); - /* - Reset lock_counter. This is ok as external_lock() is guaranteed to be - called before first get_status() - */ - lock_counter= 0; - - if (lock_type == F_UNLCK) + if (get_cache_inserts(current_thd)) { - int error2; - error= cache_handler->external_lock(thd, lock_type); - if ((error2= parent::external_lock(thd, lock_type))) - error= error2; - DBUG_RETURN(error); + /* + Reset lock_counter. This is ok as external_lock() is guaranteed to be + called before first get_status() + */ + lock_counter= 0; + + if (lock_type == F_UNLCK) + { + int error2; + error= 0; + if (cache_locked) + { + error= cache_handler->external_lock(thd, lock_type); + cache_locked= 0; + } + if ((error2= parent::external_lock(thd, lock_type))) + error= error2; + DBUG_RETURN(error); + } + + /* Lock first with write lock to be able to do insert or flush table */ + original_lock_type= lock_type; + lock_type= F_WRLCK; + if ((error= cache_handler->external_lock(thd, lock_type))) + DBUG_RETURN(error); + if ((error= parent::external_lock(thd, lock_type))) + { + error= cache_handler->external_lock(thd, F_UNLCK); + DBUG_RETURN(error); + } + + cache_locked= 1; + } + else + { + error= parent::external_lock(thd, lock_type); } - /* Lock first with write lock to be able to do insert or flush table */ - original_lock_type= lock_type; - lock_type= F_WRLCK; - if ((error= cache_handler->external_lock(thd, lock_type))) - DBUG_RETURN(error); - if ((error= parent::external_lock(thd, lock_type))) - { - error= cache_handler->external_lock(thd, F_UNLCK); - DBUG_RETURN(error); - } - DBUG_RETURN(0); + DBUG_RETURN(error); } int ha_mcs_cache::delete_table(const char *name) { - int error, error2; - char cache_name[FN_REFLEN+8]; + int error= 0, error2; + DBUG_ENTER("ha_mcs_cache::delete_table"); - create_cache_name(cache_name, name); - error= cache_handler->delete_table(cache_name); + if (get_cache_inserts(current_thd)) + { + char cache_name[FN_REFLEN+8]; + create_cache_name(cache_name, name); + error= cache_handler->delete_table(cache_name); + } + if ((error2= parent::delete_table(name))) error= error2; + DBUG_RETURN(error); } int ha_mcs_cache::rename_table(const char *from, const char *to) { - int error; - char cache_from[FN_REFLEN+8], cache_to[FN_REFLEN+8]; + int error= 0; + DBUG_ENTER("ha_mcs_cache::rename_table"); - create_cache_name(cache_from, from); - create_cache_name(cache_to, to); - if ((error= cache_handler->rename_table(cache_from, cache_to))) - DBUG_RETURN(error); - - if ((error= parent::rename_table(from, to))) + if (get_cache_inserts(current_thd)) { - cache_handler->rename_table(cache_to, cache_from); - DBUG_RETURN(error); + char cache_from[FN_REFLEN+8], cache_to[FN_REFLEN+8]; + create_cache_name(cache_from, from); + create_cache_name(cache_to, to); + if ((error= cache_handler->rename_table(cache_from, cache_to))) + DBUG_RETURN(error); + + if ((error= parent::rename_table(from, to))) + { + cache_handler->rename_table(cache_to, cache_from); + DBUG_RETURN(error); + } } - DBUG_RETURN(0); + else + { + error= parent::rename_table(from, to); + } + + DBUG_RETURN(error); } int ha_mcs_cache::delete_all_rows(void) { - int error,error2; + int error= 0, error2; + DBUG_ENTER("ha_mcs_cache::delete_all_rows"); - error= cache_handler->delete_all_rows(); + if (get_cache_inserts(current_thd)) + { + error= cache_handler->delete_all_rows(); + share->cached_rows= 0; + } if ((error2= parent::delete_all_rows())) error= error2; DBUG_RETURN(error); @@ -1542,8 +1656,11 @@ int ha_mcs_cache::delete_all_rows(void) bool ha_mcs_cache::is_crashed() const { - return (cache_handler->is_crashed() || - parent::is_crashed()); + if (get_cache_inserts(current_thd)) + return (cache_handler->is_crashed() || + parent::is_crashed()); + else + return parent::is_crashed(); } /** @@ -1555,7 +1672,7 @@ bool ha_mcs_cache::is_crashed() const In the case of 1) we don't want to run repair on both tables as the repair can be a slow process. Instead we only run repair - on the crashed tables. If not tables are marked crashed, we + on the crashed tables. If no tables are marked crashed, we run repair on both tables. Repair on the cache table will delete the part of the cache that was @@ -1571,17 +1688,23 @@ int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt) int something_crashed= is_crashed(); DBUG_ENTER("ha_mcs_cache::repair"); - if (cache_handler->is_crashed() || !something_crashed) + if (get_cache_inserts(current_thd)) { - /* Delete everything that was not already committed */ - mysql_file_chsize(cache_handler->file->dfile.file, - cache_handler->file->s->state.state.key_file_length, - 0, MYF(MY_WME)); - mysql_file_chsize(cache_handler->file->s->kfile.file, - cache_handler->file->s->state.state.data_file_length, - 0, MYF(MY_WME)); - error= cache_handler->repair(thd, check_opt); + if (cache_handler->is_crashed() || !something_crashed) + { + /* Delete everything that was not already committed */ + mysql_file_chsize(cache_handler->file->dfile.file, + cache_handler->file->s->state.state.data_file_length, + 0, MYF(MY_WME)); + mysql_file_chsize(cache_handler->file->s->kfile.file, + cache_handler->file->s->state.state.key_file_length, + 0, MYF(MY_WME)); + check_opt->flags|= T_AUTO_REPAIR; + error= cache_handler->repair(thd, check_opt); + share->cached_rows= cache_handler->file->state->records; + } } + if (parent::is_crashed() || !something_crashed) if ((error2= parent::repair(thd, check_opt))) error= error2; @@ -1595,26 +1718,37 @@ int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt) */ int ha_mcs_cache::write_row(const uchar *buf) { - if (insert_command) + if (get_cache_inserts(current_thd) && insert_command) + { + DBUG_ASSERT(share->cached_rows == cache_handler->file->state->records); + share->cached_rows++; return cache_handler->write_row(buf); + } return parent::write_row(buf); } void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags) { - if (insert_command) + if (get_cache_inserts(current_thd)) { - bzero(&cache_handler->copy_info, sizeof(cache_handler->copy_info)); - return cache_handler->start_bulk_insert(rows, flags); + if (insert_command) + { + bzero(&cache_handler->copy_info, sizeof(cache_handler->copy_info)); + return cache_handler->start_bulk_insert(rows, flags); + } + return parent::start_bulk_insert_from_cache(rows, flags); + } + else + { + return parent::start_bulk_insert(rows, flags); } - return parent::start_bulk_insert(rows, flags); } int ha_mcs_cache::end_bulk_insert() { - if (insert_command) + if (get_cache_inserts(current_thd) && insert_command) return cache_handler->end_bulk_insert(); return parent::end_bulk_insert(); } @@ -1623,7 +1757,6 @@ int ha_mcs_cache::end_bulk_insert() ha_mcs_cache Plugin code ******************************************************************************/ -#if 0 static handler *ha_mcs_cache_create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root) @@ -1631,54 +1764,82 @@ static handler *ha_mcs_cache_create_handler(handlerton *hton, return new (mem_root) ha_mcs_cache(hton, table, mem_root); } -static plugin_ref plugin_maria; -static int ha_mcs_cache_init(void *p) +/****************************************************************************** + ha_mcs Plugin code +******************************************************************************/ + +static int columnstore_init_func(void* p) { - handlerton *cache_hton; - int error; + DBUG_ENTER("columnstore_init_func"); - cache_hton= (handlerton *) p; - cache_hton->create= ha_mcs_cache_create_handler; - cache_hton->panic= 0; - cache_hton->flags= HTON_NO_PARTITION; + struct tm tm; + time_t t; + time(&t); + localtime_r(&t, &tm); + fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d ", + tm.tm_year % 100, tm.tm_mon + 1, tm.tm_mday, + tm.tm_hour, tm.tm_min, tm.tm_sec); - error= mcs_hton == NULL; // Engine must exists! + fprintf(stderr, "Columnstore: Started; Version: %s-%s\n", columnstore_version.c_str(), columnstore_release.c_str()); - if (error) - { - my_error(HA_ERR_INITIALIZATION, MYF(0), - "Could not find storage engine %s", "Columnstore"); - return error; - } + strncpy(cs_version, columnstore_version.c_str(), sizeof(cs_version)); + cs_version[sizeof(cs_version) - 1] = 0; - { - LEX_CSTRING name= { STRING_WITH_LEN("Aria") }; - plugin_maria= ha_resolve_by_name(0, &name, 0); - mcs_maria_hton= plugin_hton(plugin_maria); - error= mcs_maria_hton == NULL; // Engine must exists! - if (error) - my_error(HA_ERR_INITIALIZATION, MYF(0), - "Could not find storage engine %s", name.str); - } + strncpy(cs_commit_hash, columnstore_commit_hash.c_str(), sizeof(cs_commit_hash)); + cs_commit_hash[sizeof(cs_commit_hash) - 1] = 0; - return error; + mcs_hton = (handlerton*)p; + +#ifndef _MSC_VER + (void) pthread_mutex_init(&mcs_mutex, MY_MUTEX_INIT_FAST); +#endif + (void) my_hash_init(PSI_NOT_INSTRUMENTED, &mcs_open_tables, system_charset_info, 32, 0, 0, + (my_hash_get_key) mcs_get_key, 0, 0); + + mcs_hton->create = ha_mcs_cache_create_handler; + mcs_hton->panic = 0; + mcs_hton->flags = HTON_CAN_RECREATE | HTON_NO_PARTITION; +// mcs_hton->discover_table = mcs_discover; +// mcs_hton->discover_table_existence = mcs_discover_existence; + mcs_hton->commit = mcs_commit; + mcs_hton->rollback = mcs_rollback; + mcs_hton->close_connection = mcs_close_connection; + mcs_hton->create_group_by = create_columnstore_group_by_handler; + mcs_hton->create_derived = create_columnstore_derived_handler; + mcs_hton->create_select = create_columnstore_select_handler; + mcs_hton->db_type = DB_TYPE_AUTOASSIGN; + + uint count = sizeof(all_mutexes)/sizeof(all_mutexes[0]); + mysql_mutex_register("ha_mcs_cache", all_mutexes, count); + mysql_mutex_init(key_LOCK_cache_share, &LOCK_cache_share, MY_MUTEX_INIT_FAST); + + DBUG_RETURN(0); } -static int ha_mcs_cache_deinit(void *p) +static int columnstore_done_func(void* p) { - if (plugin_maria) - { - plugin_unlock(0, plugin_maria); - plugin_maria= NULL; - } - return 0; -} + DBUG_ENTER("columnstore_done_func"); + config::Config::deleteInstanceMap(); + my_hash_free(&mcs_open_tables); +#ifndef _MSC_VER + pthread_mutex_destroy(&mcs_mutex); +#endif + + if (plugin_maria) + { + plugin_unlock(0, plugin_maria); + plugin_maria = NULL; + } + + mysql_mutex_destroy(&LOCK_cache_share); + + DBUG_RETURN(0); +} struct st_mysql_storage_engine ha_mcs_cache_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; -#endif struct st_mysql_storage_engine columnstore_storage_engine = { MYSQL_HANDLERTON_INTERFACE_VERSION }; @@ -1703,23 +1864,6 @@ maria_declare_plugin(columnstore) MCSVERSION, /* string version */ COLUMNSTORE_MATURITY /* maturity */ }, -#if 0 -{ - MYSQL_STORAGE_ENGINE_PLUGIN, - &ha_mcs_cache_storage_engine, - "Columnstore_cache", - "MariaDB Corporation AB", - "Insert cache for ColumnStore", - PLUGIN_LICENSE_GPL, - ha_mcs_cache_init, /* Plugin Init */ - ha_mcs_cache_deinit, /* Plugin Deinit */ - MCSVERSIONHEX, - NULL, /* status variables */ - NULL, /* system variables */ - MCSVERSION, /* string version */ - MariaDB_PLUGIN_MATURITY_ALPHA /* maturity */ -}, -#endif { MYSQL_INFORMATION_SCHEMA_PLUGIN, &is_columnstore_plugin_version, @@ -1790,9 +1934,9 @@ maria_declare_plugin_end; Implementation of write cache ******************************************************************************/ -bool ha_mcs_cache::rows_cached() +ha_rows ha_mcs_cache::num_rows_cached() { - return cache_handler->file->state->records != 0; + return cache_handler->file->state->records; } @@ -1800,20 +1944,20 @@ bool ha_mcs_cache::rows_cached() void ha_mcs_cache::free_locks() { - /* We don't need to lock cache_handler anymore as it's already flushed */ - - mysql_mutex_unlock(&cache_handler->file->lock.lock->mutex); - thr_unlock(&cache_handler->file->lock, 0); - mysql_mutex_lock(&cache_handler->file->lock.lock->mutex); - /* Restart transaction for columnstore table */ if (original_lock_type != F_WRLCK) { parent::external_lock(table->in_use, F_UNLCK); parent::external_lock(table->in_use, original_lock_type); } + + /* We don't need to lock cache_handler anymore as it's already flushed */ + cache_handler->external_lock(table->in_use, F_UNLCK); + thr_unlock(&cache_handler->file->lock, 0); + cache_locked= false; } + /** Copy data from cache to ColumnStore @@ -1826,18 +1970,23 @@ int ha_mcs_cache::flush_insert_cache() int error, error2; ha_maria *from= cache_handler; uchar *record= table->record[0]; + ulonglong copied_rows= 0; DBUG_ENTER("flush_insert_cache"); + DBUG_ASSERT(from->file->state->records == share->cached_rows); + parent::start_bulk_insert_from_cache(from->file->state->records, 0); from->rnd_init(1); while (!(error= from->rnd_next(record))) { + copied_rows++; if ((error= parent::write_row(record))) goto end; rows_changed++; } if (error == HA_ERR_END_OF_FILE) error= 0; + DBUG_ASSERT(copied_rows == share->cached_rows); end: from->rnd_end(); @@ -1856,19 +2005,24 @@ end: parent::ht->rollback(parent::ht, table->in_use, 1); } + DBUG_ASSERT(error == 0); if (!error) { /* - Everything when fine, delete all rows from the cache and allow others + Everything went fine, delete all rows from the cache and allow others to use it. */ - from->delete_all_rows(); - - /* - This was not an insert command, so we can delete the thr lock - (We are not going to use the insert cache for this statement anymore) - */ - free_locks(); + { + /* + We have to unlock the lock mutex as otherwise we get a conflict in + mutex order. This is fine as we have a write lock on the mutex, + so we will be able to get it again + */ + mysql_mutex_unlock(&from->file->s->lock.mutex); + from->delete_all_rows(); + share->cached_rows= 0; + mysql_mutex_lock(&from->file->s->lock.mutex); + } } DBUG_RETURN(error); } diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index 04db8d6c0..eff57eb5f 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -242,16 +242,31 @@ public: }; +class ha_mcs_cache_share +{ + ha_mcs_cache_share *next; /* Next open share */ + const char *name; + uint open_count; +public: + ulonglong cached_rows; + THR_LOCK org_lock; + friend ha_mcs_cache_share *find_cache_share(const char *name, + ulonglong cached_rows); + void close(); +}; + + + class ha_mcs_cache :public ha_mcs { typedef ha_mcs parent; int original_lock_type; - bool insert_command; + bool insert_command, cache_locked; public: - THR_LOCK org_lock; uint lock_counter; ha_maria *cache_handler; + ha_mcs_cache_share *share; ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root); ~ha_mcs_cache(); @@ -287,10 +302,11 @@ public: /* Cache functions */ void free_locks(); - bool rows_cached(); + ha_rows num_rows_cached(); int flush_insert_cache(); friend my_bool get_status_and_flush_cache(void *param, my_bool concurrent_insert); + friend my_bool cache_start_trans(void *param); }; #endif //HA_MCS_H__ diff --git a/dbcon/mysql/ha_mcs_dml.cpp b/dbcon/mysql/ha_mcs_dml.cpp index d79b1c6cb..733506ac1 100644 --- a/dbcon/mysql/ha_mcs_dml.cpp +++ b/dbcon/mysql/ha_mcs_dml.cpp @@ -77,6 +77,7 @@ using namespace joblist; namespace { +#define BATCH_INSERT_GROUP_ROWS_FOR_CACHE 100000 uint64_t fBatchInsertGroupRows = 0; // ResourceManager::instance()->getRowsPerBatch(); // HDFS is never used nowadays, so don't bother bool useHdfs = false; // ResourceManager::instance()->useHdfs(); @@ -594,13 +595,17 @@ int ha_mcs_impl_write_row_(const uchar* buf, TABLE* table, cal_connection_info& } if (fBatchInsertGroupRows == 0) + { fBatchInsertGroupRows = ResourceManager::instance()->getRowsPerBatch(); + } //timer.stop( "buildValueList"); if ( ci.singleInsert // Single insert - || (( ci.bulkInsertRows > 0 ) && (( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows ) || ( size >= fBatchInsertGroupRows )) ) - //Insert with mutilple value case: processed batch by batch. Last batch is sent also. - || (( ci.bulkInsertRows == 0 ) && ( size >= fBatchInsertGroupRows ) ) ) // Load data in file is processed batch by batch + || (( ci.bulkInsertRows > 0 ) && (( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows ) + || ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows) || (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) )) ) + //Insert with mutilple value case: processed batch by batch. Last batch is sent also. + || (( ci.bulkInsertRows == 0 ) && ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows) + || (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) ) ) ) // Load data in file is processed batch by batch { //timer.start( "DMLProc takes"); //cout <<" sending a batch to DMLProc ... The size is " << size << " the current bulkInsertRows = " << ci.bulkInsertRows << endl; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index ad83918d8..29884004e 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -3161,7 +3161,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins ci->isLoaddataInfile = true; } - if (is_cache_insert) + if (is_cache_insert && (thd->lex)->sql_command != SQLCOM_INSERT_SELECT) { ci->isCacheInsert = true; @@ -3174,7 +3174,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins if ((((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || - is_cache_insert) && !ci->singleInsert ) + ci->isCacheInsert) && !ci->singleInsert ) { ci->useCpimport = get_use_import_for_batchinsert(thd); @@ -3182,7 +3182,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins ci->useCpimport = 0; // For now, disable cpimport for cache inserts - if (is_cache_insert) + if (ci->isCacheInsert) ci->useCpimport = 0; // ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a @@ -3215,6 +3215,11 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins } ci->useXbit = table->s->db_options_in_use & HA_OPTION_PACK_RECORD; + + // TODO: This needs a proper fix. + if (is_cache_insert) + ci->useXbit = false; + //@bug 6122 Check how many columns have not null constraint. columnn with not null constraint will not show up in header. unsigned int numberNotNull = 0; @@ -3541,7 +3546,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins } //Save table oid for commit to use - if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || is_cache_insert) + if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) { // query stats. only collect execution time and rows inserted for insert/load_data_infile ci->stats.reset(); diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index 66856f26e..eb1c1c531 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -303,6 +303,27 @@ static MYSQL_THDVAR_BOOL( 0 ); +static MYSQL_THDVAR_BOOL( + cache_inserts, + PLUGIN_VAR_NOCMDARG | PLUGIN_VAR_READONLY, + "Perform cache-based inserts to ColumnStore", + NULL, + NULL, + 0 +); + +static MYSQL_THDVAR_ULONGLONG( + cache_flush_threshold, + PLUGIN_VAR_RQCMDARG, + "Threshold on the number of rows in the cache to trigger a flush", + NULL, + NULL, + 500000, + 1, + 1000000000, + 1 +); + st_mysql_sys_var* mcs_system_variables[] = { MYSQL_SYSVAR(compression_type), @@ -328,6 +349,8 @@ st_mysql_sys_var* mcs_system_variables[] = MYSQL_SYSVAR(import_for_batchinsert_enclosed_by), MYSQL_SYSVAR(varbin_always_hex), MYSQL_SYSVAR(replication_slave), + MYSQL_SYSVAR(cache_inserts), + MYSQL_SYSVAR(cache_flush_threshold), NULL }; @@ -559,3 +582,22 @@ void set_replication_slave(THD* thd, bool value) { THDVAR(thd, replication_slave) = value; } + +bool get_cache_inserts(THD* thd) +{ + return ( thd == NULL ) ? false : THDVAR(thd, cache_inserts); +} +void set_cache_inserts(THD* thd, bool value) +{ + THDVAR(thd, cache_inserts) = value; +} + +ulonglong get_cache_flush_threshold(THD* thd) +{ + return ( thd == NULL ) ? 500000 : THDVAR(thd, cache_flush_threshold); +} + +void set_cache_flush_threshold(THD* thd, ulonglong value) +{ + THDVAR(thd, cache_flush_threshold) = value; +} diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index 90d3b99a5..ffb9f2fae 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -113,4 +113,10 @@ void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value); bool get_replication_slave(THD* thd); void set_replication_slave(THD* thd, bool value); +bool get_cache_inserts(THD* thd); +void set_cache_inserts(THD* thd, bool value); + +ulonglong get_cache_flush_threshold(THD* thd); +void set_cache_flush_threshold(THD* thd, ulonglong value); + #endif