diff --git a/dbcon/mysql/CMakeLists.txt b/dbcon/mysql/CMakeLists.txt index 2ee052a44..512da1a85 100644 --- a/dbcon/mysql/CMakeLists.txt +++ b/dbcon/mysql/CMakeLists.txt @@ -2,7 +2,8 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/install_mcs_mysql.sh.in" "${CMAKE_CU configure_file("${CMAKE_CURRENT_SOURCE_DIR}/ha_mcs_version.h.in" "${CMAKE_CURRENT_SOURCE_DIR}/ha_mcs_version.h") include_directories( ${ENGINE_COMMON_INCLUDES} - /usr/include/libxml2 ) + /usr/include/libxml2 + ${SERVER_SOURCE_ROOT_DIR}/storage/maria ) SET ( libcalmysql_SRCS @@ -35,13 +36,13 @@ set_source_files_properties(ha_mcs.cpp PROPERTIES COMPILE_FLAGS "-fno-implicit-t if (COMMAND mysql_add_plugin) mysql_add_plugin(columnstore ${libcalmysql_SRCS} STORAGE_ENGINE MODULE_ONLY DEFAULT - LINK_LIBRARIES ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} threadpool ${SERVER_BUILD_DIR}/storage/maria/libaria.a ${SERVER_BUILD_DIR}/storage/myisam/libmyisam.a ${SERVER_BUILD_DIR}/storage/perfschema/libperfschema.a + LINK_LIBRARIES ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} threadpool aria myisam perfschema COMPONENT columnstore-engine) else () add_library(ha_columnstore SHARED ${libcalmysql_SRCS}) SET_TARGET_PROPERTIES(ha_columnstore PROPERTIES PREFIX "") - target_link_libraries(ha_columnstore ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool) + target_link_libraries(ha_columnstore ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool aria myisam perfschema) install(TARGETS ha_columnstore DESTINATION ${MARIADB_PLUGINDIR} COMPONENT columnstore-engine) endif () diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index e9852b815..23557870a 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -17,7 +17,7 @@ MA 02110-1301, USA. */ #include "ha_mcs.h" -#include "../../../maria/maria_def.h" +#include "maria_def.h" #include #include "columnstoreversion.h" @@ -33,8 +33,6 @@ #define CACHE_PREFIX "#cache#" -static handlerton *derived_hton; - static handler* mcs_create_handler(handlerton* hton, TABLE_SHARE* table, MEM_ROOT* mem_root); @@ -43,7 +41,9 @@ static int mcs_commit(handlerton* hton, THD* thd, bool all); static int mcs_rollback(handlerton* hton, THD* thd, bool all); static int mcs_close_connection(handlerton* hton, THD* thd ); -handlerton* mcs_hton; +handlerton* mcs_hton = NULL; +// This is the maria handlerton that we need for the cache +static handlerton *mcs_maria_hton = NULL; char cs_version[25]; char cs_commit_hash[41]; // a commit hash is 40 characters @@ -382,6 +382,20 @@ void ha_mcs::start_bulk_insert(ha_rows rows, uint flags) DBUG_VOID_RETURN; } +void ha_mcs::start_bulk_insert_from_cache(ha_rows rows, uint flags) +{ + DBUG_ENTER("ha_mcs::start_bulk_insert_from_cache"); + try + { + ha_mcs_impl_start_bulk_insert(rows, table, true); + } + catch (std::runtime_error& e) + { + current_thd->raise_error_printf(ER_INTERNAL_ERROR, e.what()); + } + DBUG_VOID_RETURN; +} + int ha_mcs::end_bulk_insert() { DBUG_ENTER("ha_mcs::end_bulk_insert"); @@ -1299,9 +1313,9 @@ static my_bool cache_check_status(void *param) *****************************************************************************/ ha_mcs_cache::ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root) - :ha_mcs(derived_hton, table_arg) + :ha_mcs(mcs_hton, table_arg) { - cache_handler= new (mem_root) ha_maria(maria_hton, table_arg); + cache_handler= (ha_maria*) mcs_maria_hton->create(mcs_maria_hton, table_arg, mem_root); lock_counter= 0; } @@ -1584,7 +1598,7 @@ void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags) bzero(&cache_handler->copy_info, sizeof(cache_handler->copy_info)); return cache_handler->start_bulk_insert(rows, flags); } - return parent::start_bulk_insert(rows, flags); + return parent::start_bulk_insert_from_cache(rows, flags); } @@ -1606,7 +1620,7 @@ static handler *ha_mcs_cache_create_handler(handlerton *hton, return new (mem_root) ha_mcs_cache(hton, table, mem_root); } -static plugin_ref plugin; +static plugin_ref plugin_maria; static int ha_mcs_cache_init(void *p) { @@ -1618,24 +1632,34 @@ static int ha_mcs_cache_init(void *p) cache_hton->panic= 0; cache_hton->flags= HTON_NO_PARTITION; + error= mcs_hton == NULL; // Engine must exists! + + if (error) { - LEX_CSTRING name= { STRING_WITH_LEN("Columnstore") }; - plugin= ha_resolve_by_name(0, &name, 0); - derived_hton= plugin_hton(plugin); - error= derived_hton == 0; // Engine must exists! + my_error(HA_ERR_INITIALIZATION, MYF(0), + "Could not find storage engine %s", "Columnstore"); + return error; + } + + { + LEX_CSTRING name= { STRING_WITH_LEN("Aria") }; + plugin_maria= ha_resolve_by_name(0, &name, 0); + mcs_maria_hton= plugin_hton(plugin_maria); + error= mcs_maria_hton == NULL; // Engine must exists! if (error) my_error(HA_ERR_INITIALIZATION, MYF(0), "Could not find storage engine %s", name.str); } + return error; } static int ha_mcs_cache_deinit(void *p) { - if (plugin) + if (plugin_maria) { - plugin_unlock(0, plugin); - plugin= 0; + plugin_unlock(0, plugin_maria); + plugin_maria= 0; } return 0; } @@ -1790,12 +1814,13 @@ int ha_mcs_cache::flush_insert_cache() uchar *record= table->record[0]; DBUG_ENTER("flush_insert_cache"); - parent::start_bulk_insert(from->file->state->records, 0); + parent::start_bulk_insert_from_cache(from->file->state->records, 0); from->rnd_init(1); while (!(error= from->rnd_next(record))) { if ((error= parent::write_row(record))) goto end; + rows_changed++; } if (error == HA_ERR_END_OF_FILE) error= 0; diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index 3c68053d0..fde032a22 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -22,7 +22,7 @@ #include #include "idb_mysql.h" #include "ha_mcs_sysvars.h" -#include "../../../maria/ha_maria.h" +#include "ha_maria.h" extern handlerton* mcs_hton; @@ -150,6 +150,7 @@ public: skip it and and MySQL will treat it as not implemented. */ void start_bulk_insert(ha_rows rows, uint flags = 0) ; + void start_bulk_insert_from_cache(ha_rows rows, uint flags = 0) ; /**@bug 2461 - Overloaded end_bulk_insert. MariaDB uses the abort bool, mysql does not. */ int end_bulk_insert() ; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index bfd7a6846..489393801 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -3051,7 +3051,7 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed) (!ci->singleInsert) && ((ci->isLoaddataInfile) || ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || - ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) ) + ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) ) { rc = ha_mcs_impl_write_batch_row_(buf, table, *ci); } @@ -3104,7 +3104,7 @@ int ha_mcs_impl_delete_row() return ( rc ); } -void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table) +void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_insert) { THD* thd = current_thd; @@ -3168,17 +3168,30 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table) ci->isLoaddataInfile = true; } + if (is_cache_insert) + { + ci->isCacheInsert = true; + + if (rows > 1) + ci->singleInsert = false; + } + ci->bulkInsertRows = rows; if ((((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || - (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert ) + (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || + is_cache_insert) && !ci->singleInsert ) { ci->useCpimport = get_use_import_for_batchinsert(thd); if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0)) ci->useCpimport = 0; + // For now, disable cpimport for cache inserts + if (is_cache_insert) + ci->useCpimport = 0; + // ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a // transaction or not. User should use this option very carefully since // cpimport currently does not support rollbacks @@ -3535,7 +3548,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table) } //Save table oid for commit to use - if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) ) + if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || is_cache_insert) { // query stats. only collect execution time and rows inserted for insert/load_data_infile ci->stats.reset(); @@ -3643,14 +3656,14 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) // @bug 2378. do not enter for select, reset singleInsert flag after multiple insert. // @bug 2515. Check command intead of vtable state - if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert ) + if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || ci->isCacheInsert) && !ci->singleInsert ) { if (((ci->useCpimport == 2) || ((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) && (!ci->singleInsert) && ((ci->isLoaddataInfile) || ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || - ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) ) + ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) ) { #ifdef _MSC_VER @@ -3824,7 +3837,9 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) // populate query stats for insert and load data infile. insert select has // stats entered in sm already - if (((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD)) + if (((thd->lex)->sql_command == SQLCOM_INSERT) || + ((thd->lex)->sql_command == SQLCOM_LOAD) || + ci->isCacheInsert) { ci->stats.setEndTime(); ci->stats.fErrorNo = rc; @@ -3851,6 +3866,7 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) // However, we should be resetting these members anyways. ci->singleInsert = true; // reset the flag ci->isLoaddataInfile = false; + ci->isCacheInsert = false; ci->tableOid = 0; ci->rowsHaveInserted = 0; ci->useCpimport = 1; @@ -3885,6 +3901,7 @@ int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all) thd->server_status &= ~SERVER_STATUS_IN_TRANS; ci->singleInsert = true; // reset the flag ci->isLoaddataInfile = false; + ci->isCacheInsert = false; ci->tableOid = 0; ci->rowsHaveInserted = 0; return rc; @@ -3906,6 +3923,7 @@ int ha_mcs_impl_rollback (handlerton* hton, THD* thd, bool all) int rc = ha_mcs_impl_rollback_(hton, thd, all, *ci); ci->singleInsert = true; // reset the flag ci->isLoaddataInfile = false; + ci->isCacheInsert = false; ci->tableOid = 0; ci->rowsHaveInserted = 0; thd->server_status &= ~SERVER_STATUS_IN_TRANS; diff --git a/dbcon/mysql/ha_mcs_impl.h b/dbcon/mysql/ha_mcs_impl.h index 9024130ba..51949e3a2 100644 --- a/dbcon/mysql/ha_mcs_impl.h +++ b/dbcon/mysql/ha_mcs_impl.h @@ -33,7 +33,7 @@ extern int ha_mcs_impl_rnd_init(TABLE* table, const std::vector& condStac extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table); extern int ha_mcs_impl_rnd_end(TABLE* table, bool is_derived_hand = false); extern int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed); -extern void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table); +extern void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_insert = false); extern int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table); extern int ha_mcs_impl_rename_table(const char* from, const char* to); extern int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all); diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 3d2ae4cfe..e9a433710 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -245,7 +245,8 @@ struct cal_connection_info isAlter(false), bulkInsertRows(0), singleInsert(true), - isLoaddataInfile( false ), + isLoaddataInfile(false), + isCacheInsert(false), dmlProc(0), rowsHaveInserted(0), rc(0), @@ -297,6 +298,7 @@ struct cal_connection_info ha_rows bulkInsertRows; bool singleInsert; bool isLoaddataInfile; + bool isCacheInsert; std::string extendedStats; std::string miniStats; messageqcpp::MessageQueueClient* dmlProc;