diff --git a/dbcon/mysql/CMakeLists.txt b/dbcon/mysql/CMakeLists.txt index 85c13f2a1..eafba7e5e 100644 --- a/dbcon/mysql/CMakeLists.txt +++ b/dbcon/mysql/CMakeLists.txt @@ -2,7 +2,8 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/install_mcs_mysql.sh.in" "${CMAKE_CU configure_file("${CMAKE_CURRENT_SOURCE_DIR}/ha_mcs_version.h.in" "${CMAKE_CURRENT_SOURCE_DIR}/ha_mcs_version.h") include_directories( ${ENGINE_COMMON_INCLUDES} - /usr/include/libxml2 ) + /usr/include/libxml2 + ${SERVER_SOURCE_ROOT_DIR}/storage/maria ) SET ( libcalmysql_SRCS @@ -45,6 +46,7 @@ else () install(TARGETS ha_columnstore DESTINATION ${MARIADB_PLUGINDIR} COMPONENT columnstore-engine) endif () + install(FILES syscatalog_mysql.sql dumpcat_mysql.sql calsetuserpriority.sql diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 1c771063d..e1d7adec3 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -16,8 +16,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -#include #include "ha_mcs.h" +#include "maria_def.h" +#include #include "columnstoreversion.h" #include "ha_mcs_pushdown.h" @@ -30,6 +31,8 @@ #define COLUMNSTORE_MATURITY MariaDB_PLUGIN_MATURITY_STABLE #endif +#define CACHE_PREFIX "#cache#" + static handler* mcs_create_handler(handlerton* hton, TABLE_SHARE* table, MEM_ROOT* mem_root); @@ -38,7 +41,9 @@ static int mcs_commit(handlerton* hton, THD* thd, bool all); static int mcs_rollback(handlerton* hton, THD* thd, bool all); static int mcs_close_connection(handlerton* hton, THD* thd ); -handlerton* mcs_hton; +handlerton* mcs_hton = NULL; +// This is the maria handlerton that we need for the cache +static handlerton *mcs_maria_hton = NULL; char cs_version[25]; char cs_commit_hash[41]; // a commit hash is 40 characters @@ -377,6 +382,20 @@ void ha_mcs::start_bulk_insert(ha_rows rows, uint flags) DBUG_VOID_RETURN; } +void ha_mcs::start_bulk_insert_from_cache(ha_rows rows, uint flags) +{ + DBUG_ENTER("ha_mcs::start_bulk_insert_from_cache"); + try + { + ha_mcs_impl_start_bulk_insert(rows, table, true); + } + catch (std::runtime_error& e) + { + current_thd->raise_error_printf(ER_INTERNAL_ERROR, e.what()); + } + DBUG_VOID_RETURN; +} + int ha_mcs::end_bulk_insert() { DBUG_ENTER("ha_mcs::end_bulk_insert"); @@ -1137,6 +1156,514 @@ void ha_mcs::cond_pop() DBUG_VOID_RETURN; } +int ha_mcs::repair(THD* thd, HA_CHECK_OPT* check_opt) +{ + DBUG_ENTER("ha_mcs::repair"); + DBUG_ASSERT(!(int_table_flags & HA_CAN_REPAIR)); + DBUG_RETURN(HA_ADMIN_NOT_IMPLEMENTED); +} + +bool ha_mcs::is_crashed() const +{ + DBUG_ENTER("ha_mcs::is_crashed"); + DBUG_RETURN(0); +} + +/* + Implementation of ha_mcs_cache, ha_mcs_cache is an insert cache for ColumnStore to + speed up inserts. + + The idea is that inserts are first stored in storage engine that is + fast for inserts, like MyISAM or Aria, and in case of select, + update, delete then the rows are first flushed to ColumnStore before + the original requests is made. + + The table used for the cache is the original table name prefixed with #cache# +*/ + +/* + TODO: + - Add create flag to myisam_open that will ensure that on open and recovery + it restores only as many rows as stored in the header. + (Can be fast as we have number of rows and file size stored in the header) + - Commit to the cache is now done per statement. Should be changed to be per + transaction. Should not be impossible to do as we only have to update + key_file_length and data_file_length on commit. + - On recovery, check all #cache# tables to see if the last stored commit + is already in ColumnStore. If yes, truncate the cache. + + Things to consider: + + Current implementation is doing a syncronization tables as part of + thd->get_status(), which is the function to be called when server + has got a lock of all used tables. This ensure that the row + visibility is the same for all tables. + The disadvantage of this is that we have to always take a write lock + for the cache table. In case of read transactions, this lock is released + in free_locks() as soon as we get the table lock. + Another alternative would be to assume that if there are no cached rows + during the call to 'store_lock', then we can ignore any new rows added. + This would allows us to avoid write locks for the cached table, except + for inserts or if there are rows in the cache. The disadvanteg would be + that we would not see any rows inserted while we are trying to get the + lock. +*/ + +static my_bool (*original_get_status)(void*, my_bool); + +my_bool get_status_and_flush_cache(void *param, + my_bool concurrent_insert); + +/* + Create a name for the cache table +*/ + +static void create_cache_name(char *to, const char *name) +{ + uint dir_length= dirname_length(name); + to= strnmov(to, name, dir_length); + strxmov(to, CACHE_PREFIX, name+ dir_length, NullS); +} + +/***************************************************************************** + THR_LOCK wrapper functions + + The idea of these is to highjack 'THR_LOCK->get_status() so that if this + is called in a non-insert context then we will flush the cache +*****************************************************************************/ + +/* + First call to get_status() will flush the cache if the command is not an + insert +*/ + +my_bool get_status_and_flush_cache(void *param, + my_bool concurrent_insert) +{ + ha_mcs_cache *cache= (ha_mcs_cache*) param; + int error; + enum_sql_command sql_command= cache->table->in_use->lex->sql_command; + + cache->insert_command= (sql_command == SQLCOM_INSERT || + sql_command == SQLCOM_LOAD); + /* + Call first the original Aria get_status function + All Aria get_status functions takes Maria handler as the parameter + */ + if (original_get_status) + (*original_get_status)(&cache->cache_handler->file, concurrent_insert); + + /* If first get_status() call for this table, flush cache if needed */ + if (!cache->lock_counter++) + { + if (!cache->insert_command && cache->rows_cached()) + { + if ((error= cache->flush_insert_cache())) + { + my_error(error, MYF(MY_WME | ME_FATAL), + "Got error while trying to flush insert cache: %d", + my_errno); + return(1); + } + } + else if (!cache->insert_command) + cache->free_locks(); + } + else if (!cache->insert_command) + cache->free_locks(); + + return (0); +} + +/* Pass through functions for all the THR_LOCK virtual functions */ + +static my_bool cache_start_trans(void* param) +{ + ha_mcs_cache *cache= (ha_mcs_cache*) param; + if (cache->org_lock.start_trans) + return (*cache->org_lock.start_trans)(cache->cache_handler->file); + return 0; +} + +static void cache_copy_status(void* to, void *from) +{ + ha_mcs_cache *to_cache= (ha_mcs_cache*) to, *from_cache= (ha_mcs_cache*) from; + if (to_cache->org_lock.copy_status) + (*to_cache->org_lock.copy_status)(to_cache->cache_handler->file, + from_cache->cache_handler->file); +} + +static void cache_update_status(void* param) +{ + ha_mcs_cache *cache= (ha_mcs_cache*) param; + if (cache->org_lock.update_status) + (*cache->org_lock.update_status)(cache->cache_handler->file); +} + +static void cache_restore_status(void *param) +{ + ha_mcs_cache *cache= (ha_mcs_cache*) param; + if (cache->org_lock.restore_status) + (*cache->org_lock.restore_status)(cache->cache_handler->file); +} + +static my_bool cache_check_status(void *param) +{ + ha_mcs_cache *cache= (ha_mcs_cache*) param; + if (cache->org_lock.check_status) + return (*cache->org_lock.check_status)(cache->cache_handler->file); + return 0; +} + +/***************************************************************************** + ha_mcs_cache handler functions +*****************************************************************************/ + +ha_mcs_cache::ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root) + :ha_mcs(mcs_hton, table_arg) +{ + cache_handler= (ha_maria*) mcs_maria_hton->create(mcs_maria_hton, table_arg, mem_root); + lock_counter= 0; +} + + +ha_mcs_cache::~ha_mcs_cache() +{ + if (cache_handler) + delete cache_handler; +} + +/* + The following functions duplicates calls to derived handler and + cache handler +*/ + +int ha_mcs_cache::create(const char *name, TABLE *table_arg, + HA_CREATE_INFO *ha_create_info) +{ + int error; + char cache_name[FN_REFLEN+8]; + DBUG_ENTER("ha_mcs_cache::create"); + + create_cache_name(cache_name, name); + { + /* Create a cached table */ + ha_choice save_transactional= ha_create_info->transactional; + row_type save_row_type= ha_create_info->row_type; + ha_create_info->transactional= HA_CHOICE_NO; + ha_create_info->row_type= ROW_TYPE_DYNAMIC; + + if ((error= cache_handler->create(cache_name, table_arg, ha_create_info))) + DBUG_RETURN(error); + ha_create_info->transactional= save_transactional; + ha_create_info->row_type= save_row_type; + } + + /* Create the real table in ColumnStore */ + if ((error= parent::create(name, table_arg, ha_create_info))) + { + cache_handler->delete_table(cache_name); + DBUG_RETURN(error); + } + DBUG_RETURN(0); +} + + +int ha_mcs_cache::open(const char *name, int mode, uint open_flags) +{ + int error; + char cache_name[FN_REFLEN+8]; + DBUG_ENTER("ha_mcs_cache::open"); + + /* Copy table object to cache_handler */ + cache_handler->change_table_ptr(table, table->s); + + create_cache_name(cache_name, name); + if ((error= cache_handler->open(cache_name, mode, open_flags))) + DBUG_RETURN(error); + + /* Fix lock so that it goes through get_status_and_flush() */ + THR_LOCK *lock= &cache_handler->file->s->lock; + mysql_mutex_lock(&cache_handler->file->s->intern_lock); + org_lock= lock[0]; + lock->get_status= &get_status_and_flush_cache; + lock->start_trans= &cache_start_trans; + lock->copy_status= &cache_copy_status; + lock->update_status= &cache_update_status; + lock->restore_status= &cache_restore_status; + lock->check_status= &cache_check_status; + lock->restore_status= &cache_restore_status; + cache_handler->file->lock.status_param= (void*) this; + mysql_mutex_unlock(&cache_handler->file->s->intern_lock); + + if ((error= parent::open(name, mode, open_flags))) + { + cache_handler->close(); + DBUG_RETURN(error); + } + DBUG_RETURN(0); +} + + +int ha_mcs_cache::close() +{ + int error, error2; + DBUG_ENTER("ha_mcs_cache::close()"); + error= cache_handler->close(); + if ((error2= parent::close())) + error= error2; + DBUG_RETURN(error); +} + + +/* + Handling locking of the tables. In case of INSERT we have to lock both + the cache handler and main table. If not, we only lock the main table +*/ + +uint ha_mcs_cache::lock_count(void) const +{ + /* + If we are doing an insert or if we want to flush the cache, we have to lock + both MyISAM table and normal table. + */ + return 2; +} + +/** + Store locks for the Aria table and ColumnStore table +*/ + +THR_LOCK_DATA **ha_mcs_cache::store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type) +{ + to= cache_handler->store_lock(thd, to, TL_WRITE); + return parent::store_lock(thd, to, lock_type); +} + + +/** + Do external locking of the tables +*/ + +int ha_mcs_cache::external_lock(THD *thd, int lock_type) +{ + int error; + DBUG_ENTER("ha_mcs_cache::external_lock"); + + /* + Reset lock_counter. This is ok as external_lock() is guaranteed to be + called before first get_status() + */ + lock_counter= 0; + + if (lock_type == F_UNLCK) + { + int error2; + error= cache_handler->external_lock(thd, lock_type); + if ((error2= parent::external_lock(thd, lock_type))) + error= error2; + DBUG_RETURN(error); + } + + /* Lock first with write lock to be able to do insert or flush table */ + original_lock_type= lock_type; + lock_type= F_WRLCK; + if ((error= cache_handler->external_lock(thd, lock_type))) + DBUG_RETURN(error); + if ((error= parent::external_lock(thd, lock_type))) + { + error= cache_handler->external_lock(thd, F_UNLCK); + DBUG_RETURN(error); + } + DBUG_RETURN(0); +} + + +int ha_mcs_cache::delete_table(const char *name) +{ + int error, error2; + char cache_name[FN_REFLEN+8]; + DBUG_ENTER("ha_mcs_cache::delete_table"); + + create_cache_name(cache_name, name); + error= cache_handler->delete_table(cache_name); + if ((error2= parent::delete_table(name))) + error= error2; + DBUG_RETURN(error); +} + + +int ha_mcs_cache::rename_table(const char *from, const char *to) +{ + int error; + char cache_from[FN_REFLEN+8], cache_to[FN_REFLEN+8]; + DBUG_ENTER("ha_mcs_cache::rename_table"); + + create_cache_name(cache_from, from); + create_cache_name(cache_to, to); + if ((error= cache_handler->rename_table(cache_from, cache_to))) + DBUG_RETURN(error); + + if ((error= parent::rename_table(from, to))) + { + cache_handler->rename_table(cache_to, cache_from); + DBUG_RETURN(error); + } + DBUG_RETURN(0); +} + + +int ha_mcs_cache::delete_all_rows(void) +{ + int error,error2; + DBUG_ENTER("ha_mcs_cache::delete_all_rows"); + + error= cache_handler->delete_all_rows(); + if ((error2= parent::delete_all_rows())) + error= error2; + DBUG_RETURN(error); +} + +bool ha_mcs_cache::is_crashed() const +{ + return (cache_handler->is_crashed() || + parent::is_crashed()); +} + +/** + After a crash, repair will be run on next open. + + There are two cases when repair is run: + 1) Automatically on open if the table is crashed + 2) When the user explicitely runs repair + + In the case of 1) we don't want to run repair on both tables as + the repair can be a slow process. Instead we only run repair + on the crashed tables. If not tables are marked crashed, we + run repair on both tables. + + Repair on the cache table will delete the part of the cache that was + not committed. + + key_file_length and data_file_length are updated last for a statement. + When these are updated, we threat the cache as committed +*/ + +int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt) +{ + int error= 0, error2; + int something_crashed= is_crashed(); + DBUG_ENTER("ha_mcs_cache::repair"); + + if (cache_handler->is_crashed() || !something_crashed) + { + /* Delete everything that was not already committed */ + mysql_file_chsize(cache_handler->file->dfile.file, + cache_handler->file->s->state.state.key_file_length, + 0, MYF(MY_WME)); + mysql_file_chsize(cache_handler->file->s->kfile.file, + cache_handler->file->s->state.state.data_file_length, + 0, MYF(MY_WME)); + error= cache_handler->repair(thd, check_opt); + } + if (parent::is_crashed() || !something_crashed) + if ((error2= parent::repair(thd, check_opt))) + error= error2; + + DBUG_RETURN(error); +} + + +/** + Write to cache handler or main table +*/ +int ha_mcs_cache::write_row(const uchar *buf) +{ + if (insert_command) + return cache_handler->write_row(buf); + return parent::write_row(buf); +} + + +void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags) +{ + if (insert_command) + { + bzero(&cache_handler->copy_info, sizeof(cache_handler->copy_info)); + return cache_handler->start_bulk_insert(rows, flags); + } + return parent::start_bulk_insert(rows, flags); +} + + +int ha_mcs_cache::end_bulk_insert() +{ + if (insert_command) + return cache_handler->end_bulk_insert(); + return parent::end_bulk_insert(); +} + +/****************************************************************************** + ha_mcs_cache Plugin code +******************************************************************************/ + +static handler *ha_mcs_cache_create_handler(handlerton *hton, + TABLE_SHARE *table, + MEM_ROOT *mem_root) +{ + return new (mem_root) ha_mcs_cache(hton, table, mem_root); +} + +static plugin_ref plugin_maria; + +static int ha_mcs_cache_init(void *p) +{ + handlerton *cache_hton; + int error; + + cache_hton= (handlerton *) p; + cache_hton->create= ha_mcs_cache_create_handler; + cache_hton->panic= 0; + cache_hton->flags= HTON_NO_PARTITION; + + error= mcs_hton == NULL; // Engine must exists! + + if (error) + { + my_error(HA_ERR_INITIALIZATION, MYF(0), + "Could not find storage engine %s", "Columnstore"); + return error; + } + + { + LEX_CSTRING name= { STRING_WITH_LEN("Aria") }; + plugin_maria= ha_resolve_by_name(0, &name, 0); + mcs_maria_hton= plugin_hton(plugin_maria); + error= mcs_maria_hton == NULL; // Engine must exists! + if (error) + my_error(HA_ERR_INITIALIZATION, MYF(0), + "Could not find storage engine %s", name.str); + } + + return error; +} + +static int ha_mcs_cache_deinit(void *p) +{ + if (plugin_maria) + { + plugin_unlock(0, plugin_maria); + plugin_maria= NULL; + } + return 0; +} + + +struct st_mysql_storage_engine ha_mcs_cache_storage_engine= +{ MYSQL_HANDLERTON_INTERFACE_VERSION }; + struct st_mysql_storage_engine columnstore_storage_engine = { MYSQL_HANDLERTON_INTERFACE_VERSION }; @@ -1155,11 +1682,26 @@ maria_declare_plugin(columnstore) columnstore_init_func, columnstore_done_func, MCSVERSIONHEX, - mcs_status_variables, /* status variables */ - mcs_system_variables, /* system variables */ + mcs_status_variables, /* status variables */ + mcs_system_variables, /* system variables */ MCSVERSION, /* string version */ COLUMNSTORE_MATURITY /* maturity */ }, +{ + MYSQL_STORAGE_ENGINE_PLUGIN, + &ha_mcs_cache_storage_engine, + "Columnstore_cache", + "MariaDB Corporation AB", + "Insert cache for ColumnStore", + PLUGIN_LICENSE_GPL, + ha_mcs_cache_init, /* Plugin Init */ + ha_mcs_cache_deinit, /* Plugin Deinit */ + MCSVERSIONHEX, + NULL, /* status variables */ + NULL, /* system variables */ + MCSVERSION, /* string version */ + MariaDB_PLUGIN_MATURITY_ALPHA /* maturity */ +}, { MYSQL_INFORMATION_SCHEMA_PLUGIN, &is_columnstore_plugin_version, @@ -1226,3 +1768,89 @@ maria_declare_plugin(columnstore) } maria_declare_plugin_end; +/****************************************************************************** +Implementation of write cache +******************************************************************************/ + +bool ha_mcs_cache::rows_cached() +{ + return cache_handler->file->state->records != 0; +} + + +/* Free write locks if this was not an insert */ + +void ha_mcs_cache::free_locks() +{ + /* We don't need to lock cache_handler anymore as it's already flushed */ + + mysql_mutex_unlock(&cache_handler->file->lock.lock->mutex); + thr_unlock(&cache_handler->file->lock, 0); + mysql_mutex_lock(&cache_handler->file->lock.lock->mutex); + + /* Restart transaction for columnstore table */ + if (original_lock_type != F_WRLCK) + { + parent::external_lock(table->in_use, F_UNLCK); + parent::external_lock(table->in_use, original_lock_type); + } +} + +/** + Copy data from cache to ColumnStore + + Both tables are locked. The from table has also an exclusive lock to + ensure that no one is inserting data to it while we are reading it. +*/ + +int ha_mcs_cache::flush_insert_cache() +{ + int error, error2; + ha_maria *from= cache_handler; + uchar *record= table->record[0]; + DBUG_ENTER("flush_insert_cache"); + + parent::start_bulk_insert_from_cache(from->file->state->records, 0); + from->rnd_init(1); + while (!(error= from->rnd_next(record))) + { + if ((error= parent::write_row(record))) + goto end; + rows_changed++; + } + if (error == HA_ERR_END_OF_FILE) + error= 0; + +end: + from->rnd_end(); + if ((error2= parent::end_bulk_insert()) && !error) + error= error2; + + if (!error) + { + if (parent::ht->commit) + error= parent::ht->commit(parent::ht, table->in_use, 1); + } + else + { + /* We can ignore the rollback error as we already have some other errors */ + if (parent::ht->rollback) + parent::ht->rollback(parent::ht, table->in_use, 1); + } + + if (!error) + { + /* + Everything when fine, delete all rows from the cache and allow others + to use it. + */ + from->delete_all_rows(); + + /* + This was not an insert command, so we can delete the thr lock + (We are not going to use the insert cache for this statement anymore) + */ + free_locks(); + } + DBUG_RETURN(error); +} diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index a82ea07c5..fde032a22 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -22,6 +22,7 @@ #include #include "idb_mysql.h" #include "ha_mcs_sysvars.h" +#include "ha_maria.h" extern handlerton* mcs_hton; @@ -53,7 +54,7 @@ class ha_mcs: public handler public: ha_mcs(handlerton* hton, TABLE_SHARE* table_arg); - ~ha_mcs() + virtual ~ha_mcs() { } @@ -149,6 +150,7 @@ public: skip it and and MySQL will treat it as not implemented. */ void start_bulk_insert(ha_rows rows, uint flags = 0) ; + void start_bulk_insert_from_cache(ha_rows rows, uint flags = 0) ; /**@bug 2461 - Overloaded end_bulk_insert. MariaDB uses the abort bool, mysql does not. */ int end_bulk_insert() ; @@ -234,5 +236,60 @@ public: return HA_CACHE_TBL_NOCACHE; } + int repair(THD* thd, HA_CHECK_OPT* check_opt); + bool is_crashed() const; }; + + +class ha_mcs_cache :public ha_mcs +{ + typedef ha_mcs parent; + int original_lock_type; + bool insert_command; + +public: + THR_LOCK org_lock; + uint lock_counter; + ha_maria *cache_handler; + + ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root); + ~ha_mcs_cache(); + + /* + The following functions duplicates calls to derived handler and + cache handler + */ + + int create(const char *name, TABLE *table_arg, + HA_CREATE_INFO *ha_create_info); + int open(const char *name, int mode, uint open_flags); + int delete_table(const char *name); + int rename_table(const char *from, const char *to); + int delete_all_rows(void); + int close(void); + + uint lock_count(void) const; + THR_LOCK_DATA **store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type); + int external_lock(THD *thd, int lock_type); + int repair(THD *thd, HA_CHECK_OPT *check_opt); + bool is_crashed() const; + + /* + Write row uses cache_handler, for normal inserts, otherwise derived + handler + */ + int write_row(const uchar *buf); + void start_bulk_insert(ha_rows rows, uint flags); + int end_bulk_insert(); + + /* Cache functions */ + void free_locks(); + bool rows_cached(); + int flush_insert_cache(); + friend my_bool get_status_and_flush_cache(void *param, + my_bool concurrent_insert); +}; + #endif //HA_MCS_H__ diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 8007485d2..1da9d8175 100755 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -6105,7 +6105,8 @@ bool isMCSTable(TABLE* table_ptr) string engineName = table_ptr->s->db_plugin->name.str; #endif - if (engineName == "Columnstore" || engineName == "InfiniDB") + if (engineName == "Columnstore" || + engineName == "Columnstore_cache") return true; else return false; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index bfd7a6846..489393801 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -3051,7 +3051,7 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed) (!ci->singleInsert) && ((ci->isLoaddataInfile) || ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || - ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) ) + ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) ) { rc = ha_mcs_impl_write_batch_row_(buf, table, *ci); } @@ -3104,7 +3104,7 @@ int ha_mcs_impl_delete_row() return ( rc ); } -void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table) +void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_insert) { THD* thd = current_thd; @@ -3168,17 +3168,30 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table) ci->isLoaddataInfile = true; } + if (is_cache_insert) + { + ci->isCacheInsert = true; + + if (rows > 1) + ci->singleInsert = false; + } + ci->bulkInsertRows = rows; if ((((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || - (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert ) + (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || + is_cache_insert) && !ci->singleInsert ) { ci->useCpimport = get_use_import_for_batchinsert(thd); if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0)) ci->useCpimport = 0; + // For now, disable cpimport for cache inserts + if (is_cache_insert) + ci->useCpimport = 0; + // ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a // transaction or not. User should use this option very carefully since // cpimport currently does not support rollbacks @@ -3535,7 +3548,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table) } //Save table oid for commit to use - if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) ) + if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || is_cache_insert) { // query stats. only collect execution time and rows inserted for insert/load_data_infile ci->stats.reset(); @@ -3643,14 +3656,14 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) // @bug 2378. do not enter for select, reset singleInsert flag after multiple insert. // @bug 2515. Check command intead of vtable state - if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert ) + if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || ci->isCacheInsert) && !ci->singleInsert ) { if (((ci->useCpimport == 2) || ((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) && (!ci->singleInsert) && ((ci->isLoaddataInfile) || ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || - ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) ) + ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) ) { #ifdef _MSC_VER @@ -3824,7 +3837,9 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) // populate query stats for insert and load data infile. insert select has // stats entered in sm already - if (((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD)) + if (((thd->lex)->sql_command == SQLCOM_INSERT) || + ((thd->lex)->sql_command == SQLCOM_LOAD) || + ci->isCacheInsert) { ci->stats.setEndTime(); ci->stats.fErrorNo = rc; @@ -3851,6 +3866,7 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) // However, we should be resetting these members anyways. ci->singleInsert = true; // reset the flag ci->isLoaddataInfile = false; + ci->isCacheInsert = false; ci->tableOid = 0; ci->rowsHaveInserted = 0; ci->useCpimport = 1; @@ -3885,6 +3901,7 @@ int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all) thd->server_status &= ~SERVER_STATUS_IN_TRANS; ci->singleInsert = true; // reset the flag ci->isLoaddataInfile = false; + ci->isCacheInsert = false; ci->tableOid = 0; ci->rowsHaveInserted = 0; return rc; @@ -3906,6 +3923,7 @@ int ha_mcs_impl_rollback (handlerton* hton, THD* thd, bool all) int rc = ha_mcs_impl_rollback_(hton, thd, all, *ci); ci->singleInsert = true; // reset the flag ci->isLoaddataInfile = false; + ci->isCacheInsert = false; ci->tableOid = 0; ci->rowsHaveInserted = 0; thd->server_status &= ~SERVER_STATUS_IN_TRANS; diff --git a/dbcon/mysql/ha_mcs_impl.h b/dbcon/mysql/ha_mcs_impl.h index 9024130ba..51949e3a2 100644 --- a/dbcon/mysql/ha_mcs_impl.h +++ b/dbcon/mysql/ha_mcs_impl.h @@ -33,7 +33,7 @@ extern int ha_mcs_impl_rnd_init(TABLE* table, const std::vector& condStac extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table); extern int ha_mcs_impl_rnd_end(TABLE* table, bool is_derived_hand = false); extern int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed); -extern void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table); +extern void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_insert = false); extern int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table); extern int ha_mcs_impl_rename_table(const char* from, const char* to); extern int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all); diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 3d2ae4cfe..e9a433710 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -245,7 +245,8 @@ struct cal_connection_info isAlter(false), bulkInsertRows(0), singleInsert(true), - isLoaddataInfile( false ), + isLoaddataInfile(false), + isCacheInsert(false), dmlProc(0), rowsHaveInserted(0), rc(0), @@ -297,6 +298,7 @@ struct cal_connection_info ha_rows bulkInsertRows; bool singleInsert; bool isLoaddataInfile; + bool isCacheInsert; std::string extendedStats; std::string miniStats; messageqcpp::MessageQueueClient* dmlProc;