From 816139d06d21004d05c40e99aaaa31e46dece398 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Tue, 12 May 2020 19:42:15 -0400 Subject: [PATCH] MCOL-4000 Allow columnstore_use_import_for_batchinsert to use a new value, ALWAYS, which invokes cpimport for LDI and INSERT..SELECT from within and outside a transaction. Default value of the session variable, ON, remains unchanged. --- dbcon/mysql/ha_mcs_impl.cpp | 27 ++++++++++++++++++++------- dbcon/mysql/ha_mcs_sysvars.cpp | 28 ++++++++++++++++++++++------ dbcon/mysql/ha_mcs_sysvars.h | 11 +++++++++-- 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 0fbfb0ca1..f933b3bae 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -3043,9 +3043,15 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed) ha_rows rowsInserted = 0; int rc = 0; - if ( (ci->useCpimport > 0) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) && (!ci->singleInsert) && ((ci->isLoaddataInfile) || - ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || - ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) ) + // ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a + // transaction or not. User should use this option very carefully since + // cpimport currently does not support rollbacks + if (((ci->useCpimport == 2) || + ((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) && + (!ci->singleInsert) && + ((ci->isLoaddataInfile) || + ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || + ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) ) { rc = ha_mcs_impl_write_batch_row_(buf, table, *ci); } @@ -3173,7 +3179,11 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table) if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0)) ci->useCpimport = 0; - if ((ci->useCpimport > 0) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))) //If autocommit on batch insert will use cpimport to load data + // ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a + // transaction or not. User should use this option very carefully since + // cpimport currently does not support rollbacks + if ((ci->useCpimport == 2) || + ((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) //If autocommit on batch insert will use cpimport to load data { //store table info to connection info CalpontSystemCatalog::TableName tableName; @@ -3635,9 +3645,12 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) // @bug 2515. Check command intead of vtable state if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert ) { - if ((ci->useCpimport > 0) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) && (!ci->singleInsert) && ((ci->isLoaddataInfile) || - ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || - ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) ) + if (((ci->useCpimport == 2) || + ((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) && + (!ci->singleInsert) && + ((ci->isLoaddataInfile) || + ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || + ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) ) { #ifdef _MSC_VER diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index 597d50efa..66856f26e 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -270,13 +270,28 @@ static MYSQL_THDVAR_ULONG( 1 // block size ); -static MYSQL_THDVAR_BOOL( +const char* mcs_use_import_for_batchinsert_values[] = { + "OFF", + "ON", + "ALWAYS", + NullS +}; + +static TYPELIB mcs_use_import_for_batchinsert_values_lib = { + array_elements(mcs_use_import_for_batchinsert_values) - 1, + "mcs_use_import_for_batchinsert_values", + mcs_use_import_for_batchinsert_values, + NULL +}; + +static MYSQL_THDVAR_ENUM( use_import_for_batchinsert, - PLUGIN_VAR_NOCMDARG, + PLUGIN_VAR_RQCMDARG, "LOAD DATA INFILE and INSERT..SELECT will use cpimport internally", NULL, // check NULL, // update - 1 // default + 1, // default + &mcs_use_import_for_batchinsert_values_lib // values lib ); static MYSQL_THDVAR_BOOL( @@ -508,11 +523,12 @@ void set_local_query(THD* thd, ulong value) THDVAR(thd, local_query) = value; } -bool get_use_import_for_batchinsert(THD* thd) +mcs_use_import_for_batchinsert_t get_use_import_for_batchinsert(THD* thd) { - return ( thd == NULL ) ? false : THDVAR(thd, use_import_for_batchinsert); + return ( thd == NULL ) ? mcs_use_import_for_batchinsert_t::ON : + (mcs_use_import_for_batchinsert_t) THDVAR(thd, use_import_for_batchinsert); } -void set_use_import_for_batchinsert(THD* thd, bool value) +void set_use_import_for_batchinsert(THD* thd, ulong value) { THDVAR(thd, use_import_for_batchinsert) = value; } diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index 1efe29fcc..90d3b99a5 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -33,6 +33,13 @@ enum mcs_compression_type_t { SNAPPY = 2 }; +// use_import_for_batchinsert +enum mcs_use_import_for_batchinsert_t { + OFF = 0, + ON = 1, + ALWAYS = 2 +}; + // simple setters/getters const char* get_original_query(THD* thd); void set_original_query(THD* thd, char* query); @@ -94,8 +101,8 @@ void set_double_for_decimal_math(THD* thd, bool value); ulong get_local_query(THD* thd); void set_local_query(THD* thd, ulong value); -bool get_use_import_for_batchinsert(THD* thd); -void set_use_import_for_batchinsert(THD* thd, bool value); +mcs_use_import_for_batchinsert_t get_use_import_for_batchinsert(THD* thd); +void set_use_import_for_batchinsert(THD* thd, ulong value); ulong get_import_for_batchinsert_delimiter(THD* thd); void set_import_for_batchinsert_delimiter(THD* thd, ulong value);