You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
Merge pull request #1254 from mariadb-corporation/columnstore_cache
Columnstore cache
This commit is contained in:
@ -2,7 +2,8 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/install_mcs_mysql.sh.in" "${CMAKE_CU
|
|||||||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/ha_mcs_version.h.in" "${CMAKE_CURRENT_SOURCE_DIR}/ha_mcs_version.h")
|
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/ha_mcs_version.h.in" "${CMAKE_CURRENT_SOURCE_DIR}/ha_mcs_version.h")
|
||||||
|
|
||||||
include_directories( ${ENGINE_COMMON_INCLUDES}
|
include_directories( ${ENGINE_COMMON_INCLUDES}
|
||||||
/usr/include/libxml2 )
|
/usr/include/libxml2
|
||||||
|
${SERVER_SOURCE_ROOT_DIR}/storage/maria )
|
||||||
|
|
||||||
|
|
||||||
SET ( libcalmysql_SRCS
|
SET ( libcalmysql_SRCS
|
||||||
@ -45,6 +46,7 @@ else ()
|
|||||||
|
|
||||||
install(TARGETS ha_columnstore DESTINATION ${MARIADB_PLUGINDIR} COMPONENT columnstore-engine)
|
install(TARGETS ha_columnstore DESTINATION ${MARIADB_PLUGINDIR} COMPONENT columnstore-engine)
|
||||||
endif ()
|
endif ()
|
||||||
|
|
||||||
install(FILES syscatalog_mysql.sql
|
install(FILES syscatalog_mysql.sql
|
||||||
dumpcat_mysql.sql
|
dumpcat_mysql.sql
|
||||||
calsetuserpriority.sql
|
calsetuserpriority.sql
|
||||||
|
@ -16,8 +16,9 @@
|
|||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||||
MA 02110-1301, USA. */
|
MA 02110-1301, USA. */
|
||||||
|
|
||||||
#include <typeinfo>
|
|
||||||
#include "ha_mcs.h"
|
#include "ha_mcs.h"
|
||||||
|
#include "maria_def.h"
|
||||||
|
#include <typeinfo>
|
||||||
#include "columnstoreversion.h"
|
#include "columnstoreversion.h"
|
||||||
|
|
||||||
#include "ha_mcs_pushdown.h"
|
#include "ha_mcs_pushdown.h"
|
||||||
@ -30,6 +31,8 @@
|
|||||||
#define COLUMNSTORE_MATURITY MariaDB_PLUGIN_MATURITY_STABLE
|
#define COLUMNSTORE_MATURITY MariaDB_PLUGIN_MATURITY_STABLE
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define CACHE_PREFIX "#cache#"
|
||||||
|
|
||||||
static handler* mcs_create_handler(handlerton* hton,
|
static handler* mcs_create_handler(handlerton* hton,
|
||||||
TABLE_SHARE* table,
|
TABLE_SHARE* table,
|
||||||
MEM_ROOT* mem_root);
|
MEM_ROOT* mem_root);
|
||||||
@ -38,7 +41,9 @@ static int mcs_commit(handlerton* hton, THD* thd, bool all);
|
|||||||
|
|
||||||
static int mcs_rollback(handlerton* hton, THD* thd, bool all);
|
static int mcs_rollback(handlerton* hton, THD* thd, bool all);
|
||||||
static int mcs_close_connection(handlerton* hton, THD* thd );
|
static int mcs_close_connection(handlerton* hton, THD* thd );
|
||||||
handlerton* mcs_hton;
|
handlerton* mcs_hton = NULL;
|
||||||
|
// This is the maria handlerton that we need for the cache
|
||||||
|
static handlerton *mcs_maria_hton = NULL;
|
||||||
char cs_version[25];
|
char cs_version[25];
|
||||||
char cs_commit_hash[41]; // a commit hash is 40 characters
|
char cs_commit_hash[41]; // a commit hash is 40 characters
|
||||||
|
|
||||||
@ -377,6 +382,20 @@ void ha_mcs::start_bulk_insert(ha_rows rows, uint flags)
|
|||||||
DBUG_VOID_RETURN;
|
DBUG_VOID_RETURN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ha_mcs::start_bulk_insert_from_cache(ha_rows rows, uint flags)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("ha_mcs::start_bulk_insert_from_cache");
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ha_mcs_impl_start_bulk_insert(rows, table, true);
|
||||||
|
}
|
||||||
|
catch (std::runtime_error& e)
|
||||||
|
{
|
||||||
|
current_thd->raise_error_printf(ER_INTERNAL_ERROR, e.what());
|
||||||
|
}
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
|
}
|
||||||
|
|
||||||
int ha_mcs::end_bulk_insert()
|
int ha_mcs::end_bulk_insert()
|
||||||
{
|
{
|
||||||
DBUG_ENTER("ha_mcs::end_bulk_insert");
|
DBUG_ENTER("ha_mcs::end_bulk_insert");
|
||||||
@ -1137,6 +1156,514 @@ void ha_mcs::cond_pop()
|
|||||||
DBUG_VOID_RETURN;
|
DBUG_VOID_RETURN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ha_mcs::repair(THD* thd, HA_CHECK_OPT* check_opt)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("ha_mcs::repair");
|
||||||
|
DBUG_ASSERT(!(int_table_flags & HA_CAN_REPAIR));
|
||||||
|
DBUG_RETURN(HA_ADMIN_NOT_IMPLEMENTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ha_mcs::is_crashed() const
|
||||||
|
{
|
||||||
|
DBUG_ENTER("ha_mcs::is_crashed");
|
||||||
|
DBUG_RETURN(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Implementation of ha_mcs_cache, ha_mcs_cache is an insert cache for ColumnStore to
|
||||||
|
speed up inserts.
|
||||||
|
|
||||||
|
The idea is that inserts are first stored in storage engine that is
|
||||||
|
fast for inserts, like MyISAM or Aria, and in case of select,
|
||||||
|
update, delete then the rows are first flushed to ColumnStore before
|
||||||
|
the original requests is made.
|
||||||
|
|
||||||
|
The table used for the cache is the original table name prefixed with #cache#
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
TODO:
|
||||||
|
- Add create flag to myisam_open that will ensure that on open and recovery
|
||||||
|
it restores only as many rows as stored in the header.
|
||||||
|
(Can be fast as we have number of rows and file size stored in the header)
|
||||||
|
- Commit to the cache is now done per statement. Should be changed to be per
|
||||||
|
transaction. Should not be impossible to do as we only have to update
|
||||||
|
key_file_length and data_file_length on commit.
|
||||||
|
- On recovery, check all #cache# tables to see if the last stored commit
|
||||||
|
is already in ColumnStore. If yes, truncate the cache.
|
||||||
|
|
||||||
|
Things to consider:
|
||||||
|
|
||||||
|
Current implementation is doing a syncronization tables as part of
|
||||||
|
thd->get_status(), which is the function to be called when server
|
||||||
|
has got a lock of all used tables. This ensure that the row
|
||||||
|
visibility is the same for all tables.
|
||||||
|
The disadvantage of this is that we have to always take a write lock
|
||||||
|
for the cache table. In case of read transactions, this lock is released
|
||||||
|
in free_locks() as soon as we get the table lock.
|
||||||
|
Another alternative would be to assume that if there are no cached rows
|
||||||
|
during the call to 'store_lock', then we can ignore any new rows added.
|
||||||
|
This would allows us to avoid write locks for the cached table, except
|
||||||
|
for inserts or if there are rows in the cache. The disadvanteg would be
|
||||||
|
that we would not see any rows inserted while we are trying to get the
|
||||||
|
lock.
|
||||||
|
*/
|
||||||
|
|
||||||
|
static my_bool (*original_get_status)(void*, my_bool);
|
||||||
|
|
||||||
|
my_bool get_status_and_flush_cache(void *param,
|
||||||
|
my_bool concurrent_insert);
|
||||||
|
|
||||||
|
/*
|
||||||
|
Create a name for the cache table
|
||||||
|
*/
|
||||||
|
|
||||||
|
static void create_cache_name(char *to, const char *name)
|
||||||
|
{
|
||||||
|
uint dir_length= dirname_length(name);
|
||||||
|
to= strnmov(to, name, dir_length);
|
||||||
|
strxmov(to, CACHE_PREFIX, name+ dir_length, NullS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*****************************************************************************
|
||||||
|
THR_LOCK wrapper functions
|
||||||
|
|
||||||
|
The idea of these is to highjack 'THR_LOCK->get_status() so that if this
|
||||||
|
is called in a non-insert context then we will flush the cache
|
||||||
|
*****************************************************************************/
|
||||||
|
|
||||||
|
/*
|
||||||
|
First call to get_status() will flush the cache if the command is not an
|
||||||
|
insert
|
||||||
|
*/
|
||||||
|
|
||||||
|
my_bool get_status_and_flush_cache(void *param,
|
||||||
|
my_bool concurrent_insert)
|
||||||
|
{
|
||||||
|
ha_mcs_cache *cache= (ha_mcs_cache*) param;
|
||||||
|
int error;
|
||||||
|
enum_sql_command sql_command= cache->table->in_use->lex->sql_command;
|
||||||
|
|
||||||
|
cache->insert_command= (sql_command == SQLCOM_INSERT ||
|
||||||
|
sql_command == SQLCOM_LOAD);
|
||||||
|
/*
|
||||||
|
Call first the original Aria get_status function
|
||||||
|
All Aria get_status functions takes Maria handler as the parameter
|
||||||
|
*/
|
||||||
|
if (original_get_status)
|
||||||
|
(*original_get_status)(&cache->cache_handler->file, concurrent_insert);
|
||||||
|
|
||||||
|
/* If first get_status() call for this table, flush cache if needed */
|
||||||
|
if (!cache->lock_counter++)
|
||||||
|
{
|
||||||
|
if (!cache->insert_command && cache->rows_cached())
|
||||||
|
{
|
||||||
|
if ((error= cache->flush_insert_cache()))
|
||||||
|
{
|
||||||
|
my_error(error, MYF(MY_WME | ME_FATAL),
|
||||||
|
"Got error while trying to flush insert cache: %d",
|
||||||
|
my_errno);
|
||||||
|
return(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (!cache->insert_command)
|
||||||
|
cache->free_locks();
|
||||||
|
}
|
||||||
|
else if (!cache->insert_command)
|
||||||
|
cache->free_locks();
|
||||||
|
|
||||||
|
return (0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Pass through functions for all the THR_LOCK virtual functions */
|
||||||
|
|
||||||
|
static my_bool cache_start_trans(void* param)
|
||||||
|
{
|
||||||
|
ha_mcs_cache *cache= (ha_mcs_cache*) param;
|
||||||
|
if (cache->org_lock.start_trans)
|
||||||
|
return (*cache->org_lock.start_trans)(cache->cache_handler->file);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cache_copy_status(void* to, void *from)
|
||||||
|
{
|
||||||
|
ha_mcs_cache *to_cache= (ha_mcs_cache*) to, *from_cache= (ha_mcs_cache*) from;
|
||||||
|
if (to_cache->org_lock.copy_status)
|
||||||
|
(*to_cache->org_lock.copy_status)(to_cache->cache_handler->file,
|
||||||
|
from_cache->cache_handler->file);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cache_update_status(void* param)
|
||||||
|
{
|
||||||
|
ha_mcs_cache *cache= (ha_mcs_cache*) param;
|
||||||
|
if (cache->org_lock.update_status)
|
||||||
|
(*cache->org_lock.update_status)(cache->cache_handler->file);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cache_restore_status(void *param)
|
||||||
|
{
|
||||||
|
ha_mcs_cache *cache= (ha_mcs_cache*) param;
|
||||||
|
if (cache->org_lock.restore_status)
|
||||||
|
(*cache->org_lock.restore_status)(cache->cache_handler->file);
|
||||||
|
}
|
||||||
|
|
||||||
|
static my_bool cache_check_status(void *param)
|
||||||
|
{
|
||||||
|
ha_mcs_cache *cache= (ha_mcs_cache*) param;
|
||||||
|
if (cache->org_lock.check_status)
|
||||||
|
return (*cache->org_lock.check_status)(cache->cache_handler->file);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*****************************************************************************
|
||||||
|
ha_mcs_cache handler functions
|
||||||
|
*****************************************************************************/
|
||||||
|
|
||||||
|
ha_mcs_cache::ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root)
|
||||||
|
:ha_mcs(mcs_hton, table_arg)
|
||||||
|
{
|
||||||
|
cache_handler= (ha_maria*) mcs_maria_hton->create(mcs_maria_hton, table_arg, mem_root);
|
||||||
|
lock_counter= 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ha_mcs_cache::~ha_mcs_cache()
|
||||||
|
{
|
||||||
|
if (cache_handler)
|
||||||
|
delete cache_handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
The following functions duplicates calls to derived handler and
|
||||||
|
cache handler
|
||||||
|
*/
|
||||||
|
|
||||||
|
int ha_mcs_cache::create(const char *name, TABLE *table_arg,
|
||||||
|
HA_CREATE_INFO *ha_create_info)
|
||||||
|
{
|
||||||
|
int error;
|
||||||
|
char cache_name[FN_REFLEN+8];
|
||||||
|
DBUG_ENTER("ha_mcs_cache::create");
|
||||||
|
|
||||||
|
create_cache_name(cache_name, name);
|
||||||
|
{
|
||||||
|
/* Create a cached table */
|
||||||
|
ha_choice save_transactional= ha_create_info->transactional;
|
||||||
|
row_type save_row_type= ha_create_info->row_type;
|
||||||
|
ha_create_info->transactional= HA_CHOICE_NO;
|
||||||
|
ha_create_info->row_type= ROW_TYPE_DYNAMIC;
|
||||||
|
|
||||||
|
if ((error= cache_handler->create(cache_name, table_arg, ha_create_info)))
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
ha_create_info->transactional= save_transactional;
|
||||||
|
ha_create_info->row_type= save_row_type;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Create the real table in ColumnStore */
|
||||||
|
if ((error= parent::create(name, table_arg, ha_create_info)))
|
||||||
|
{
|
||||||
|
cache_handler->delete_table(cache_name);
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
}
|
||||||
|
DBUG_RETURN(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int ha_mcs_cache::open(const char *name, int mode, uint open_flags)
|
||||||
|
{
|
||||||
|
int error;
|
||||||
|
char cache_name[FN_REFLEN+8];
|
||||||
|
DBUG_ENTER("ha_mcs_cache::open");
|
||||||
|
|
||||||
|
/* Copy table object to cache_handler */
|
||||||
|
cache_handler->change_table_ptr(table, table->s);
|
||||||
|
|
||||||
|
create_cache_name(cache_name, name);
|
||||||
|
if ((error= cache_handler->open(cache_name, mode, open_flags)))
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
|
||||||
|
/* Fix lock so that it goes through get_status_and_flush() */
|
||||||
|
THR_LOCK *lock= &cache_handler->file->s->lock;
|
||||||
|
mysql_mutex_lock(&cache_handler->file->s->intern_lock);
|
||||||
|
org_lock= lock[0];
|
||||||
|
lock->get_status= &get_status_and_flush_cache;
|
||||||
|
lock->start_trans= &cache_start_trans;
|
||||||
|
lock->copy_status= &cache_copy_status;
|
||||||
|
lock->update_status= &cache_update_status;
|
||||||
|
lock->restore_status= &cache_restore_status;
|
||||||
|
lock->check_status= &cache_check_status;
|
||||||
|
lock->restore_status= &cache_restore_status;
|
||||||
|
cache_handler->file->lock.status_param= (void*) this;
|
||||||
|
mysql_mutex_unlock(&cache_handler->file->s->intern_lock);
|
||||||
|
|
||||||
|
if ((error= parent::open(name, mode, open_flags)))
|
||||||
|
{
|
||||||
|
cache_handler->close();
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
}
|
||||||
|
DBUG_RETURN(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int ha_mcs_cache::close()
|
||||||
|
{
|
||||||
|
int error, error2;
|
||||||
|
DBUG_ENTER("ha_mcs_cache::close()");
|
||||||
|
error= cache_handler->close();
|
||||||
|
if ((error2= parent::close()))
|
||||||
|
error= error2;
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Handling locking of the tables. In case of INSERT we have to lock both
|
||||||
|
the cache handler and main table. If not, we only lock the main table
|
||||||
|
*/
|
||||||
|
|
||||||
|
uint ha_mcs_cache::lock_count(void) const
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
If we are doing an insert or if we want to flush the cache, we have to lock
|
||||||
|
both MyISAM table and normal table.
|
||||||
|
*/
|
||||||
|
return 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
Store locks for the Aria table and ColumnStore table
|
||||||
|
*/
|
||||||
|
|
||||||
|
THR_LOCK_DATA **ha_mcs_cache::store_lock(THD *thd,
|
||||||
|
THR_LOCK_DATA **to,
|
||||||
|
enum thr_lock_type lock_type)
|
||||||
|
{
|
||||||
|
to= cache_handler->store_lock(thd, to, TL_WRITE);
|
||||||
|
return parent::store_lock(thd, to, lock_type);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
Do external locking of the tables
|
||||||
|
*/
|
||||||
|
|
||||||
|
int ha_mcs_cache::external_lock(THD *thd, int lock_type)
|
||||||
|
{
|
||||||
|
int error;
|
||||||
|
DBUG_ENTER("ha_mcs_cache::external_lock");
|
||||||
|
|
||||||
|
/*
|
||||||
|
Reset lock_counter. This is ok as external_lock() is guaranteed to be
|
||||||
|
called before first get_status()
|
||||||
|
*/
|
||||||
|
lock_counter= 0;
|
||||||
|
|
||||||
|
if (lock_type == F_UNLCK)
|
||||||
|
{
|
||||||
|
int error2;
|
||||||
|
error= cache_handler->external_lock(thd, lock_type);
|
||||||
|
if ((error2= parent::external_lock(thd, lock_type)))
|
||||||
|
error= error2;
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Lock first with write lock to be able to do insert or flush table */
|
||||||
|
original_lock_type= lock_type;
|
||||||
|
lock_type= F_WRLCK;
|
||||||
|
if ((error= cache_handler->external_lock(thd, lock_type)))
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
if ((error= parent::external_lock(thd, lock_type)))
|
||||||
|
{
|
||||||
|
error= cache_handler->external_lock(thd, F_UNLCK);
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
}
|
||||||
|
DBUG_RETURN(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int ha_mcs_cache::delete_table(const char *name)
|
||||||
|
{
|
||||||
|
int error, error2;
|
||||||
|
char cache_name[FN_REFLEN+8];
|
||||||
|
DBUG_ENTER("ha_mcs_cache::delete_table");
|
||||||
|
|
||||||
|
create_cache_name(cache_name, name);
|
||||||
|
error= cache_handler->delete_table(cache_name);
|
||||||
|
if ((error2= parent::delete_table(name)))
|
||||||
|
error= error2;
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int ha_mcs_cache::rename_table(const char *from, const char *to)
|
||||||
|
{
|
||||||
|
int error;
|
||||||
|
char cache_from[FN_REFLEN+8], cache_to[FN_REFLEN+8];
|
||||||
|
DBUG_ENTER("ha_mcs_cache::rename_table");
|
||||||
|
|
||||||
|
create_cache_name(cache_from, from);
|
||||||
|
create_cache_name(cache_to, to);
|
||||||
|
if ((error= cache_handler->rename_table(cache_from, cache_to)))
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
|
||||||
|
if ((error= parent::rename_table(from, to)))
|
||||||
|
{
|
||||||
|
cache_handler->rename_table(cache_to, cache_from);
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
}
|
||||||
|
DBUG_RETURN(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int ha_mcs_cache::delete_all_rows(void)
|
||||||
|
{
|
||||||
|
int error,error2;
|
||||||
|
DBUG_ENTER("ha_mcs_cache::delete_all_rows");
|
||||||
|
|
||||||
|
error= cache_handler->delete_all_rows();
|
||||||
|
if ((error2= parent::delete_all_rows()))
|
||||||
|
error= error2;
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ha_mcs_cache::is_crashed() const
|
||||||
|
{
|
||||||
|
return (cache_handler->is_crashed() ||
|
||||||
|
parent::is_crashed());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
After a crash, repair will be run on next open.
|
||||||
|
|
||||||
|
There are two cases when repair is run:
|
||||||
|
1) Automatically on open if the table is crashed
|
||||||
|
2) When the user explicitely runs repair
|
||||||
|
|
||||||
|
In the case of 1) we don't want to run repair on both tables as
|
||||||
|
the repair can be a slow process. Instead we only run repair
|
||||||
|
on the crashed tables. If not tables are marked crashed, we
|
||||||
|
run repair on both tables.
|
||||||
|
|
||||||
|
Repair on the cache table will delete the part of the cache that was
|
||||||
|
not committed.
|
||||||
|
|
||||||
|
key_file_length and data_file_length are updated last for a statement.
|
||||||
|
When these are updated, we threat the cache as committed
|
||||||
|
*/
|
||||||
|
|
||||||
|
int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt)
|
||||||
|
{
|
||||||
|
int error= 0, error2;
|
||||||
|
int something_crashed= is_crashed();
|
||||||
|
DBUG_ENTER("ha_mcs_cache::repair");
|
||||||
|
|
||||||
|
if (cache_handler->is_crashed() || !something_crashed)
|
||||||
|
{
|
||||||
|
/* Delete everything that was not already committed */
|
||||||
|
mysql_file_chsize(cache_handler->file->dfile.file,
|
||||||
|
cache_handler->file->s->state.state.key_file_length,
|
||||||
|
0, MYF(MY_WME));
|
||||||
|
mysql_file_chsize(cache_handler->file->s->kfile.file,
|
||||||
|
cache_handler->file->s->state.state.data_file_length,
|
||||||
|
0, MYF(MY_WME));
|
||||||
|
error= cache_handler->repair(thd, check_opt);
|
||||||
|
}
|
||||||
|
if (parent::is_crashed() || !something_crashed)
|
||||||
|
if ((error2= parent::repair(thd, check_opt)))
|
||||||
|
error= error2;
|
||||||
|
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
Write to cache handler or main table
|
||||||
|
*/
|
||||||
|
int ha_mcs_cache::write_row(const uchar *buf)
|
||||||
|
{
|
||||||
|
if (insert_command)
|
||||||
|
return cache_handler->write_row(buf);
|
||||||
|
return parent::write_row(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags)
|
||||||
|
{
|
||||||
|
if (insert_command)
|
||||||
|
{
|
||||||
|
bzero(&cache_handler->copy_info, sizeof(cache_handler->copy_info));
|
||||||
|
return cache_handler->start_bulk_insert(rows, flags);
|
||||||
|
}
|
||||||
|
return parent::start_bulk_insert(rows, flags);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int ha_mcs_cache::end_bulk_insert()
|
||||||
|
{
|
||||||
|
if (insert_command)
|
||||||
|
return cache_handler->end_bulk_insert();
|
||||||
|
return parent::end_bulk_insert();
|
||||||
|
}
|
||||||
|
|
||||||
|
/******************************************************************************
|
||||||
|
ha_mcs_cache Plugin code
|
||||||
|
******************************************************************************/
|
||||||
|
|
||||||
|
static handler *ha_mcs_cache_create_handler(handlerton *hton,
|
||||||
|
TABLE_SHARE *table,
|
||||||
|
MEM_ROOT *mem_root)
|
||||||
|
{
|
||||||
|
return new (mem_root) ha_mcs_cache(hton, table, mem_root);
|
||||||
|
}
|
||||||
|
|
||||||
|
static plugin_ref plugin_maria;
|
||||||
|
|
||||||
|
static int ha_mcs_cache_init(void *p)
|
||||||
|
{
|
||||||
|
handlerton *cache_hton;
|
||||||
|
int error;
|
||||||
|
|
||||||
|
cache_hton= (handlerton *) p;
|
||||||
|
cache_hton->create= ha_mcs_cache_create_handler;
|
||||||
|
cache_hton->panic= 0;
|
||||||
|
cache_hton->flags= HTON_NO_PARTITION;
|
||||||
|
|
||||||
|
error= mcs_hton == NULL; // Engine must exists!
|
||||||
|
|
||||||
|
if (error)
|
||||||
|
{
|
||||||
|
my_error(HA_ERR_INITIALIZATION, MYF(0),
|
||||||
|
"Could not find storage engine %s", "Columnstore");
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
LEX_CSTRING name= { STRING_WITH_LEN("Aria") };
|
||||||
|
plugin_maria= ha_resolve_by_name(0, &name, 0);
|
||||||
|
mcs_maria_hton= plugin_hton(plugin_maria);
|
||||||
|
error= mcs_maria_hton == NULL; // Engine must exists!
|
||||||
|
if (error)
|
||||||
|
my_error(HA_ERR_INITIALIZATION, MYF(0),
|
||||||
|
"Could not find storage engine %s", name.str);
|
||||||
|
}
|
||||||
|
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int ha_mcs_cache_deinit(void *p)
|
||||||
|
{
|
||||||
|
if (plugin_maria)
|
||||||
|
{
|
||||||
|
plugin_unlock(0, plugin_maria);
|
||||||
|
plugin_maria= NULL;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
struct st_mysql_storage_engine ha_mcs_cache_storage_engine=
|
||||||
|
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
|
||||||
|
|
||||||
struct st_mysql_storage_engine columnstore_storage_engine =
|
struct st_mysql_storage_engine columnstore_storage_engine =
|
||||||
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
|
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
|
||||||
|
|
||||||
@ -1155,11 +1682,26 @@ maria_declare_plugin(columnstore)
|
|||||||
columnstore_init_func,
|
columnstore_init_func,
|
||||||
columnstore_done_func,
|
columnstore_done_func,
|
||||||
MCSVERSIONHEX,
|
MCSVERSIONHEX,
|
||||||
mcs_status_variables, /* status variables */
|
mcs_status_variables, /* status variables */
|
||||||
mcs_system_variables, /* system variables */
|
mcs_system_variables, /* system variables */
|
||||||
MCSVERSION, /* string version */
|
MCSVERSION, /* string version */
|
||||||
COLUMNSTORE_MATURITY /* maturity */
|
COLUMNSTORE_MATURITY /* maturity */
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
MYSQL_STORAGE_ENGINE_PLUGIN,
|
||||||
|
&ha_mcs_cache_storage_engine,
|
||||||
|
"Columnstore_cache",
|
||||||
|
"MariaDB Corporation AB",
|
||||||
|
"Insert cache for ColumnStore",
|
||||||
|
PLUGIN_LICENSE_GPL,
|
||||||
|
ha_mcs_cache_init, /* Plugin Init */
|
||||||
|
ha_mcs_cache_deinit, /* Plugin Deinit */
|
||||||
|
MCSVERSIONHEX,
|
||||||
|
NULL, /* status variables */
|
||||||
|
NULL, /* system variables */
|
||||||
|
MCSVERSION, /* string version */
|
||||||
|
MariaDB_PLUGIN_MATURITY_ALPHA /* maturity */
|
||||||
|
},
|
||||||
{
|
{
|
||||||
MYSQL_INFORMATION_SCHEMA_PLUGIN,
|
MYSQL_INFORMATION_SCHEMA_PLUGIN,
|
||||||
&is_columnstore_plugin_version,
|
&is_columnstore_plugin_version,
|
||||||
@ -1226,3 +1768,89 @@ maria_declare_plugin(columnstore)
|
|||||||
}
|
}
|
||||||
maria_declare_plugin_end;
|
maria_declare_plugin_end;
|
||||||
|
|
||||||
|
/******************************************************************************
|
||||||
|
Implementation of write cache
|
||||||
|
******************************************************************************/
|
||||||
|
|
||||||
|
bool ha_mcs_cache::rows_cached()
|
||||||
|
{
|
||||||
|
return cache_handler->file->state->records != 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Free write locks if this was not an insert */
|
||||||
|
|
||||||
|
void ha_mcs_cache::free_locks()
|
||||||
|
{
|
||||||
|
/* We don't need to lock cache_handler anymore as it's already flushed */
|
||||||
|
|
||||||
|
mysql_mutex_unlock(&cache_handler->file->lock.lock->mutex);
|
||||||
|
thr_unlock(&cache_handler->file->lock, 0);
|
||||||
|
mysql_mutex_lock(&cache_handler->file->lock.lock->mutex);
|
||||||
|
|
||||||
|
/* Restart transaction for columnstore table */
|
||||||
|
if (original_lock_type != F_WRLCK)
|
||||||
|
{
|
||||||
|
parent::external_lock(table->in_use, F_UNLCK);
|
||||||
|
parent::external_lock(table->in_use, original_lock_type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
Copy data from cache to ColumnStore
|
||||||
|
|
||||||
|
Both tables are locked. The from table has also an exclusive lock to
|
||||||
|
ensure that no one is inserting data to it while we are reading it.
|
||||||
|
*/
|
||||||
|
|
||||||
|
int ha_mcs_cache::flush_insert_cache()
|
||||||
|
{
|
||||||
|
int error, error2;
|
||||||
|
ha_maria *from= cache_handler;
|
||||||
|
uchar *record= table->record[0];
|
||||||
|
DBUG_ENTER("flush_insert_cache");
|
||||||
|
|
||||||
|
parent::start_bulk_insert_from_cache(from->file->state->records, 0);
|
||||||
|
from->rnd_init(1);
|
||||||
|
while (!(error= from->rnd_next(record)))
|
||||||
|
{
|
||||||
|
if ((error= parent::write_row(record)))
|
||||||
|
goto end;
|
||||||
|
rows_changed++;
|
||||||
|
}
|
||||||
|
if (error == HA_ERR_END_OF_FILE)
|
||||||
|
error= 0;
|
||||||
|
|
||||||
|
end:
|
||||||
|
from->rnd_end();
|
||||||
|
if ((error2= parent::end_bulk_insert()) && !error)
|
||||||
|
error= error2;
|
||||||
|
|
||||||
|
if (!error)
|
||||||
|
{
|
||||||
|
if (parent::ht->commit)
|
||||||
|
error= parent::ht->commit(parent::ht, table->in_use, 1);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* We can ignore the rollback error as we already have some other errors */
|
||||||
|
if (parent::ht->rollback)
|
||||||
|
parent::ht->rollback(parent::ht, table->in_use, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!error)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
Everything when fine, delete all rows from the cache and allow others
|
||||||
|
to use it.
|
||||||
|
*/
|
||||||
|
from->delete_all_rows();
|
||||||
|
|
||||||
|
/*
|
||||||
|
This was not an insert command, so we can delete the thr lock
|
||||||
|
(We are not going to use the insert cache for this statement anymore)
|
||||||
|
*/
|
||||||
|
free_locks();
|
||||||
|
}
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
}
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
#include <my_config.h>
|
#include <my_config.h>
|
||||||
#include "idb_mysql.h"
|
#include "idb_mysql.h"
|
||||||
#include "ha_mcs_sysvars.h"
|
#include "ha_mcs_sysvars.h"
|
||||||
|
#include "ha_maria.h"
|
||||||
|
|
||||||
extern handlerton* mcs_hton;
|
extern handlerton* mcs_hton;
|
||||||
|
|
||||||
@ -53,7 +54,7 @@ class ha_mcs: public handler
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
ha_mcs(handlerton* hton, TABLE_SHARE* table_arg);
|
ha_mcs(handlerton* hton, TABLE_SHARE* table_arg);
|
||||||
~ha_mcs()
|
virtual ~ha_mcs()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -149,6 +150,7 @@ public:
|
|||||||
skip it and and MySQL will treat it as not implemented.
|
skip it and and MySQL will treat it as not implemented.
|
||||||
*/
|
*/
|
||||||
void start_bulk_insert(ha_rows rows, uint flags = 0) ;
|
void start_bulk_insert(ha_rows rows, uint flags = 0) ;
|
||||||
|
void start_bulk_insert_from_cache(ha_rows rows, uint flags = 0) ;
|
||||||
|
|
||||||
/**@bug 2461 - Overloaded end_bulk_insert. MariaDB uses the abort bool, mysql does not. */
|
/**@bug 2461 - Overloaded end_bulk_insert. MariaDB uses the abort bool, mysql does not. */
|
||||||
int end_bulk_insert() ;
|
int end_bulk_insert() ;
|
||||||
@ -234,5 +236,60 @@ public:
|
|||||||
return HA_CACHE_TBL_NOCACHE;
|
return HA_CACHE_TBL_NOCACHE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int repair(THD* thd, HA_CHECK_OPT* check_opt);
|
||||||
|
bool is_crashed() const;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
class ha_mcs_cache :public ha_mcs
|
||||||
|
{
|
||||||
|
typedef ha_mcs parent;
|
||||||
|
int original_lock_type;
|
||||||
|
bool insert_command;
|
||||||
|
|
||||||
|
public:
|
||||||
|
THR_LOCK org_lock;
|
||||||
|
uint lock_counter;
|
||||||
|
ha_maria *cache_handler;
|
||||||
|
|
||||||
|
ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root);
|
||||||
|
~ha_mcs_cache();
|
||||||
|
|
||||||
|
/*
|
||||||
|
The following functions duplicates calls to derived handler and
|
||||||
|
cache handler
|
||||||
|
*/
|
||||||
|
|
||||||
|
int create(const char *name, TABLE *table_arg,
|
||||||
|
HA_CREATE_INFO *ha_create_info);
|
||||||
|
int open(const char *name, int mode, uint open_flags);
|
||||||
|
int delete_table(const char *name);
|
||||||
|
int rename_table(const char *from, const char *to);
|
||||||
|
int delete_all_rows(void);
|
||||||
|
int close(void);
|
||||||
|
|
||||||
|
uint lock_count(void) const;
|
||||||
|
THR_LOCK_DATA **store_lock(THD *thd,
|
||||||
|
THR_LOCK_DATA **to,
|
||||||
|
enum thr_lock_type lock_type);
|
||||||
|
int external_lock(THD *thd, int lock_type);
|
||||||
|
int repair(THD *thd, HA_CHECK_OPT *check_opt);
|
||||||
|
bool is_crashed() const;
|
||||||
|
|
||||||
|
/*
|
||||||
|
Write row uses cache_handler, for normal inserts, otherwise derived
|
||||||
|
handler
|
||||||
|
*/
|
||||||
|
int write_row(const uchar *buf);
|
||||||
|
void start_bulk_insert(ha_rows rows, uint flags);
|
||||||
|
int end_bulk_insert();
|
||||||
|
|
||||||
|
/* Cache functions */
|
||||||
|
void free_locks();
|
||||||
|
bool rows_cached();
|
||||||
|
int flush_insert_cache();
|
||||||
|
friend my_bool get_status_and_flush_cache(void *param,
|
||||||
|
my_bool concurrent_insert);
|
||||||
|
};
|
||||||
|
|
||||||
#endif //HA_MCS_H__
|
#endif //HA_MCS_H__
|
||||||
|
@ -6105,7 +6105,8 @@ bool isMCSTable(TABLE* table_ptr)
|
|||||||
string engineName = table_ptr->s->db_plugin->name.str;
|
string engineName = table_ptr->s->db_plugin->name.str;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (engineName == "Columnstore" || engineName == "InfiniDB")
|
if (engineName == "Columnstore" ||
|
||||||
|
engineName == "Columnstore_cache")
|
||||||
return true;
|
return true;
|
||||||
else
|
else
|
||||||
return false;
|
return false;
|
||||||
|
@ -3051,7 +3051,7 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed)
|
|||||||
(!ci->singleInsert) &&
|
(!ci->singleInsert) &&
|
||||||
((ci->isLoaddataInfile) ||
|
((ci->isLoaddataInfile) ||
|
||||||
((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
||||||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) )
|
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) )
|
||||||
{
|
{
|
||||||
rc = ha_mcs_impl_write_batch_row_(buf, table, *ci);
|
rc = ha_mcs_impl_write_batch_row_(buf, table, *ci);
|
||||||
}
|
}
|
||||||
@ -3104,7 +3104,7 @@ int ha_mcs_impl_delete_row()
|
|||||||
return ( rc );
|
return ( rc );
|
||||||
}
|
}
|
||||||
|
|
||||||
void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
|
void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_insert)
|
||||||
{
|
{
|
||||||
THD* thd = current_thd;
|
THD* thd = current_thd;
|
||||||
|
|
||||||
@ -3168,17 +3168,30 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
|
|||||||
ci->isLoaddataInfile = true;
|
ci->isLoaddataInfile = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (is_cache_insert)
|
||||||
|
{
|
||||||
|
ci->isCacheInsert = true;
|
||||||
|
|
||||||
|
if (rows > 1)
|
||||||
|
ci->singleInsert = false;
|
||||||
|
}
|
||||||
|
|
||||||
ci->bulkInsertRows = rows;
|
ci->bulkInsertRows = rows;
|
||||||
|
|
||||||
if ((((thd->lex)->sql_command == SQLCOM_INSERT) ||
|
if ((((thd->lex)->sql_command == SQLCOM_INSERT) ||
|
||||||
((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
||||||
(thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert )
|
(thd->lex)->sql_command == SQLCOM_INSERT_SELECT ||
|
||||||
|
is_cache_insert) && !ci->singleInsert )
|
||||||
{
|
{
|
||||||
ci->useCpimport = get_use_import_for_batchinsert(thd);
|
ci->useCpimport = get_use_import_for_batchinsert(thd);
|
||||||
|
|
||||||
if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0))
|
if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0))
|
||||||
ci->useCpimport = 0;
|
ci->useCpimport = 0;
|
||||||
|
|
||||||
|
// For now, disable cpimport for cache inserts
|
||||||
|
if (is_cache_insert)
|
||||||
|
ci->useCpimport = 0;
|
||||||
|
|
||||||
// ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
|
// ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
|
||||||
// transaction or not. User should use this option very carefully since
|
// transaction or not. User should use this option very carefully since
|
||||||
// cpimport currently does not support rollbacks
|
// cpimport currently does not support rollbacks
|
||||||
@ -3535,7 +3548,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
|
|||||||
}
|
}
|
||||||
|
|
||||||
//Save table oid for commit to use
|
//Save table oid for commit to use
|
||||||
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) )
|
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || is_cache_insert)
|
||||||
{
|
{
|
||||||
// query stats. only collect execution time and rows inserted for insert/load_data_infile
|
// query stats. only collect execution time and rows inserted for insert/load_data_infile
|
||||||
ci->stats.reset();
|
ci->stats.reset();
|
||||||
@ -3643,14 +3656,14 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
|
|||||||
|
|
||||||
// @bug 2378. do not enter for select, reset singleInsert flag after multiple insert.
|
// @bug 2378. do not enter for select, reset singleInsert flag after multiple insert.
|
||||||
// @bug 2515. Check command intead of vtable state
|
// @bug 2515. Check command intead of vtable state
|
||||||
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert )
|
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || ci->isCacheInsert) && !ci->singleInsert )
|
||||||
{
|
{
|
||||||
if (((ci->useCpimport == 2) ||
|
if (((ci->useCpimport == 2) ||
|
||||||
((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
|
((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
|
||||||
(!ci->singleInsert) &&
|
(!ci->singleInsert) &&
|
||||||
((ci->isLoaddataInfile) ||
|
((ci->isLoaddataInfile) ||
|
||||||
((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
||||||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) )
|
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) )
|
||||||
{
|
{
|
||||||
#ifdef _MSC_VER
|
#ifdef _MSC_VER
|
||||||
|
|
||||||
@ -3824,7 +3837,9 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
|
|||||||
|
|
||||||
// populate query stats for insert and load data infile. insert select has
|
// populate query stats for insert and load data infile. insert select has
|
||||||
// stats entered in sm already
|
// stats entered in sm already
|
||||||
if (((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD))
|
if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
|
||||||
|
((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
||||||
|
ci->isCacheInsert)
|
||||||
{
|
{
|
||||||
ci->stats.setEndTime();
|
ci->stats.setEndTime();
|
||||||
ci->stats.fErrorNo = rc;
|
ci->stats.fErrorNo = rc;
|
||||||
@ -3851,6 +3866,7 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
|
|||||||
// However, we should be resetting these members anyways.
|
// However, we should be resetting these members anyways.
|
||||||
ci->singleInsert = true; // reset the flag
|
ci->singleInsert = true; // reset the flag
|
||||||
ci->isLoaddataInfile = false;
|
ci->isLoaddataInfile = false;
|
||||||
|
ci->isCacheInsert = false;
|
||||||
ci->tableOid = 0;
|
ci->tableOid = 0;
|
||||||
ci->rowsHaveInserted = 0;
|
ci->rowsHaveInserted = 0;
|
||||||
ci->useCpimport = 1;
|
ci->useCpimport = 1;
|
||||||
@ -3885,6 +3901,7 @@ int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all)
|
|||||||
thd->server_status &= ~SERVER_STATUS_IN_TRANS;
|
thd->server_status &= ~SERVER_STATUS_IN_TRANS;
|
||||||
ci->singleInsert = true; // reset the flag
|
ci->singleInsert = true; // reset the flag
|
||||||
ci->isLoaddataInfile = false;
|
ci->isLoaddataInfile = false;
|
||||||
|
ci->isCacheInsert = false;
|
||||||
ci->tableOid = 0;
|
ci->tableOid = 0;
|
||||||
ci->rowsHaveInserted = 0;
|
ci->rowsHaveInserted = 0;
|
||||||
return rc;
|
return rc;
|
||||||
@ -3906,6 +3923,7 @@ int ha_mcs_impl_rollback (handlerton* hton, THD* thd, bool all)
|
|||||||
int rc = ha_mcs_impl_rollback_(hton, thd, all, *ci);
|
int rc = ha_mcs_impl_rollback_(hton, thd, all, *ci);
|
||||||
ci->singleInsert = true; // reset the flag
|
ci->singleInsert = true; // reset the flag
|
||||||
ci->isLoaddataInfile = false;
|
ci->isLoaddataInfile = false;
|
||||||
|
ci->isCacheInsert = false;
|
||||||
ci->tableOid = 0;
|
ci->tableOid = 0;
|
||||||
ci->rowsHaveInserted = 0;
|
ci->rowsHaveInserted = 0;
|
||||||
thd->server_status &= ~SERVER_STATUS_IN_TRANS;
|
thd->server_status &= ~SERVER_STATUS_IN_TRANS;
|
||||||
|
@ -33,7 +33,7 @@ extern int ha_mcs_impl_rnd_init(TABLE* table, const std::vector<COND*>& condStac
|
|||||||
extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table);
|
extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table);
|
||||||
extern int ha_mcs_impl_rnd_end(TABLE* table, bool is_derived_hand = false);
|
extern int ha_mcs_impl_rnd_end(TABLE* table, bool is_derived_hand = false);
|
||||||
extern int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed);
|
extern int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed);
|
||||||
extern void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table);
|
extern void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_insert = false);
|
||||||
extern int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table);
|
extern int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table);
|
||||||
extern int ha_mcs_impl_rename_table(const char* from, const char* to);
|
extern int ha_mcs_impl_rename_table(const char* from, const char* to);
|
||||||
extern int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all);
|
extern int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all);
|
||||||
|
@ -245,7 +245,8 @@ struct cal_connection_info
|
|||||||
isAlter(false),
|
isAlter(false),
|
||||||
bulkInsertRows(0),
|
bulkInsertRows(0),
|
||||||
singleInsert(true),
|
singleInsert(true),
|
||||||
isLoaddataInfile( false ),
|
isLoaddataInfile(false),
|
||||||
|
isCacheInsert(false),
|
||||||
dmlProc(0),
|
dmlProc(0),
|
||||||
rowsHaveInserted(0),
|
rowsHaveInserted(0),
|
||||||
rc(0),
|
rc(0),
|
||||||
@ -297,6 +298,7 @@ struct cal_connection_info
|
|||||||
ha_rows bulkInsertRows;
|
ha_rows bulkInsertRows;
|
||||||
bool singleInsert;
|
bool singleInsert;
|
||||||
bool isLoaddataInfile;
|
bool isLoaddataInfile;
|
||||||
|
bool isCacheInsert;
|
||||||
std::string extendedStats;
|
std::string extendedStats;
|
||||||
std::string miniStats;
|
std::string miniStats;
|
||||||
messageqcpp::MessageQueueClient* dmlProc;
|
messageqcpp::MessageQueueClient* dmlProc;
|
||||||
|
Reference in New Issue
Block a user