1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Merge pull request #1371 from mariadb-corporation/columnstore_cache

ColumnStore Cache fixes
This commit is contained in:
David.Hall
2020-08-24 15:48:56 -05:00
committed by GitHub
6 changed files with 500 additions and 272 deletions

View File

@ -33,14 +33,6 @@
#define CACHE_PREFIX "#cache#"
static handler* mcs_create_handler(handlerton* hton,
TABLE_SHARE* table,
MEM_ROOT* mem_root);
static int mcs_commit(handlerton* hton, THD* thd, bool all);
static int mcs_rollback(handlerton* hton, THD* thd, bool all);
static int mcs_close_connection(handlerton* hton, THD* thd );
handlerton* mcs_hton = NULL;
// This is the maria handlerton that we need for the cache
static handlerton *mcs_maria_hton = NULL;
@ -125,68 +117,6 @@ int mcs_discover_existence(handlerton* hton, const char* db,
return ha_mcs_impl_discover_existence(db, table_name);
}
static int columnstore_init_func(void* p)
{
DBUG_ENTER("columnstore_init_func");
struct tm tm;
time_t t;
time(&t);
localtime_r(&t, &tm);
fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d ",
tm.tm_year % 100, tm.tm_mon + 1, tm.tm_mday,
tm.tm_hour, tm.tm_min, tm.tm_sec);
fprintf(stderr, "Columnstore: Started; Version: %s-%s\n", columnstore_version.c_str(), columnstore_release.c_str());
strncpy(cs_version, columnstore_version.c_str(), sizeof(cs_version));
cs_version[sizeof(cs_version) - 1] = 0;
strncpy(cs_commit_hash, columnstore_commit_hash.c_str(), sizeof(cs_commit_hash));
cs_commit_hash[sizeof(cs_commit_hash) - 1] = 0;
mcs_hton = (handlerton*)p;
#ifndef _MSC_VER
(void) pthread_mutex_init(&mcs_mutex, MY_MUTEX_INIT_FAST);
#endif
(void) my_hash_init(PSI_NOT_INSTRUMENTED, &mcs_open_tables, system_charset_info, 32, 0, 0,
(my_hash_get_key) mcs_get_key, 0, 0);
mcs_hton->create = mcs_create_handler;
mcs_hton->flags = HTON_CAN_RECREATE;
// mcs_hton->discover_table = mcs_discover;
// mcs_hton->discover_table_existence = mcs_discover_existence;
mcs_hton->commit = mcs_commit;
mcs_hton->rollback = mcs_rollback;
mcs_hton->close_connection = mcs_close_connection;
mcs_hton->create_group_by = create_columnstore_group_by_handler;
mcs_hton->create_derived = create_columnstore_derived_handler;
mcs_hton->create_select = create_columnstore_select_handler;
mcs_hton->db_type = DB_TYPE_AUTOASSIGN;
DBUG_RETURN(0);
}
static int columnstore_done_func(void* p)
{
DBUG_ENTER("columnstore_done_func");
config::Config::deleteInstanceMap();
my_hash_free(&mcs_open_tables);
#ifndef _MSC_VER
pthread_mutex_destroy(&mcs_mutex);
#endif
DBUG_RETURN(0);
}
static handler* mcs_create_handler(handlerton* hton,
TABLE_SHARE* table,
MEM_ROOT* mem_root)
{
return new (mem_root) ha_mcs(hton, table);
}
static int mcs_commit(handlerton* hton, THD* thd, bool all)
{
int rc;
@ -1223,11 +1153,10 @@ bool ha_mcs::is_crashed() const
lock.
*/
static my_bool (*original_get_status)(void*, my_bool);
my_bool get_status_and_flush_cache(void *param,
my_bool concurrent_insert);
/*
Create a name for the cache table
*/
@ -1243,7 +1172,9 @@ static void create_cache_name(char *to, const char *name)
THR_LOCK wrapper functions
The idea of these is to highjack 'THR_LOCK->get_status() so that if this
is called in a non-insert context then we will flush the cache
is called in a non-insert context then we will flush the cache.
We also hijack THR_LOCK->start_trans() to free any locks on the cache
if the command was not an insert command.
*****************************************************************************/
/*
@ -1264,13 +1195,16 @@ my_bool get_status_and_flush_cache(void *param,
Call first the original Aria get_status function
All Aria get_status functions takes Maria handler as the parameter
*/
if (original_get_status)
(*original_get_status)(&cache->cache_handler->file, concurrent_insert);
if (cache->share->org_lock.get_status)
(*cache->share->org_lock.get_status)(cache->cache_handler->file,
concurrent_insert);
/* If first get_status() call for this table, flush cache if needed */
if (!cache->lock_counter++)
{
if (!cache->insert_command && cache->rows_cached())
ha_rows num_rows = cache->num_rows_cached();
if ((!cache->insert_command && num_rows != 0) ||
num_rows >= get_cache_flush_threshold(current_thd))
{
if ((error= cache->flush_insert_cache()))
{
@ -1280,71 +1214,165 @@ my_bool get_status_and_flush_cache(void *param,
return(1);
}
}
else if (!cache->insert_command)
cache->free_locks();
}
else if (!cache->insert_command)
cache->free_locks();
return (0);
}
/* Pass through functions for all the THR_LOCK virtual functions */
/*
start_trans() is called when all locks has been given
If this was not an insert command then we can free the write lock on
the cache table and also downgrade external lock for the cached table
to F_READ
*/
static my_bool cache_start_trans(void* param)
my_bool cache_start_trans(void* param)
{
ha_mcs_cache *cache= (ha_mcs_cache*) param;
if (cache->org_lock.start_trans)
return (*cache->org_lock.start_trans)(cache->cache_handler->file);
return 0;
if (!cache->insert_command)
{
cache->free_locks();
return 0;
}
return (*cache->share->org_lock.start_trans)(cache->cache_handler->file);
}
/* Pass through functions for all the THR_LOCK virtual functions */
static void cache_copy_status(void* to, void *from)
{
ha_mcs_cache *to_cache= (ha_mcs_cache*) to, *from_cache= (ha_mcs_cache*) from;
if (to_cache->org_lock.copy_status)
(*to_cache->org_lock.copy_status)(to_cache->cache_handler->file,
from_cache->cache_handler->file);
(*to_cache->share->org_lock.copy_status)(to_cache->cache_handler->file,
from_cache->cache_handler->file);
}
static void cache_update_status(void* param)
{
ha_mcs_cache *cache= (ha_mcs_cache*) param;
if (cache->org_lock.update_status)
(*cache->org_lock.update_status)(cache->cache_handler->file);
(*cache->share->org_lock.update_status)(cache->cache_handler->file);
}
static void cache_restore_status(void *param)
{
ha_mcs_cache *cache= (ha_mcs_cache*) param;
if (cache->org_lock.restore_status)
(*cache->org_lock.restore_status)(cache->cache_handler->file);
(*cache->share->org_lock.restore_status)(cache->cache_handler->file);
}
static my_bool cache_check_status(void *param)
{
ha_mcs_cache *cache= (ha_mcs_cache*) param;
if (cache->org_lock.check_status)
return (*cache->org_lock.check_status)(cache->cache_handler->file);
return 0;
return (*cache->share->org_lock.check_status)(cache->cache_handler->file);
}
/*****************************************************************************
ha_mcs_cache_share functions (Common storage for an open cache file)
*****************************************************************************/
static ha_mcs_cache_share *cache_share_list= 0;
static PSI_mutex_key key_LOCK_cache_share;
static PSI_mutex_info all_mutexes[]=
{
{ &key_LOCK_cache_share, "LOCK_cache_share", PSI_FLAG_GLOBAL},
};
static mysql_mutex_t LOCK_cache_share;
/*
Find or create a share
*/
ha_mcs_cache_share *find_cache_share(const char *name, ulonglong cached_rows)
{
ha_mcs_cache_share *pos, *share;
mysql_mutex_lock(&LOCK_cache_share);
for (pos= cache_share_list; pos; pos= pos->next)
{
if (!strcmp(pos->name, name))
{
pos->open_count++;
mysql_mutex_unlock(&LOCK_cache_share);
return(pos);
}
}
if (!(share= (ha_mcs_cache_share*) my_malloc(PSI_NOT_INSTRUMENTED,
sizeof(*share) + strlen(name)+1,
MYF(MY_FAE))))
{
mysql_mutex_unlock(&LOCK_cache_share);
return 0;
}
share->name= (char*) (share+1);
share->open_count= 1;
share->cached_rows= cached_rows;
strmov((char*) share->name, name);
share->next= cache_share_list;
cache_share_list= share;
mysql_mutex_unlock(&LOCK_cache_share);
return share;
}
/*
Decrement open counter and free share if there is no more users
*/
void ha_mcs_cache_share::close()
{
ha_mcs_cache_share *pos;
mysql_mutex_lock(&LOCK_cache_share);
if (!--open_count)
{
ha_mcs_cache_share **prev= &cache_share_list;
for ( ; (pos= *prev) != this; prev= &pos->next)
;
*prev= next;
my_free(this);
}
mysql_mutex_unlock(&LOCK_cache_share);
}
/*****************************************************************************
ha_mcs_cache handler functions
*****************************************************************************/
static plugin_ref plugin_maria = NULL;
ha_mcs_cache::ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root)
:ha_mcs(mcs_hton, table_arg)
{
cache_handler= (ha_maria*) mcs_maria_hton->create(mcs_maria_hton, table_arg, mem_root);
lock_counter= 0;
if (get_cache_inserts(current_thd))
{
if (!plugin_maria)
{
LEX_CSTRING name = { STRING_WITH_LEN("Aria") };
plugin_maria = ha_resolve_by_name(0, &name, 0);
mcs_maria_hton = plugin_hton(plugin_maria);
int error = mcs_maria_hton == NULL; // Engine must exists!
if (error)
my_error(HA_ERR_INITIALIZATION, MYF(0),
"Could not find storage engine %s", name.str);
}
assert(mcs_maria_hton);
cache_handler= (ha_maria*) mcs_maria_hton->create(mcs_maria_hton, table_arg, mem_root);
share= 0;
lock_counter= 0;
cache_locked= 0;
}
}
ha_mcs_cache::~ha_mcs_cache()
{
if (cache_handler)
if (get_cache_inserts(current_thd) && cache_handler)
{
delete cache_handler;
cache_handler= NULL;
}
}
/*
@ -1359,26 +1387,31 @@ int ha_mcs_cache::create(const char *name, TABLE *table_arg,
char cache_name[FN_REFLEN+8];
DBUG_ENTER("ha_mcs_cache::create");
create_cache_name(cache_name, name);
if (get_cache_inserts(current_thd))
{
/* Create a cached table */
ha_choice save_transactional= ha_create_info->transactional;
row_type save_row_type= ha_create_info->row_type;
ha_create_info->transactional= HA_CHOICE_NO;
ha_create_info->row_type= ROW_TYPE_DYNAMIC;
create_cache_name(cache_name, name);
{
/* Create a cached table */
ha_choice save_transactional= ha_create_info->transactional;
row_type save_row_type= ha_create_info->row_type;
ha_create_info->transactional= HA_CHOICE_NO;
ha_create_info->row_type= ROW_TYPE_DYNAMIC;
if ((error= cache_handler->create(cache_name, table_arg, ha_create_info)))
DBUG_RETURN(error);
ha_create_info->transactional= save_transactional;
ha_create_info->row_type= save_row_type;
if ((error= cache_handler->create(cache_name, table_arg, ha_create_info)))
DBUG_RETURN(error);
ha_create_info->transactional= save_transactional;
ha_create_info->row_type= save_row_type;
}
}
/* Create the real table in ColumnStore */
if ((error= parent::create(name, table_arg, ha_create_info)))
{
cache_handler->delete_table(cache_name);
if (get_cache_inserts(current_thd))
cache_handler->delete_table(cache_name);
DBUG_RETURN(error);
}
DBUG_RETURN(0);
}
@ -1386,35 +1419,64 @@ int ha_mcs_cache::create(const char *name, TABLE *table_arg,
int ha_mcs_cache::open(const char *name, int mode, uint open_flags)
{
int error;
char cache_name[FN_REFLEN+8];
DBUG_ENTER("ha_mcs_cache::open");
/* Copy table object to cache_handler */
cache_handler->change_table_ptr(table, table->s);
if (get_cache_inserts(current_thd))
{
/* Copy table object to cache_handler */
cache_handler->change_table_ptr(table, table->s);
create_cache_name(cache_name, name);
if ((error= cache_handler->open(cache_name, mode, open_flags)))
DBUG_RETURN(error);
char cache_name[FN_REFLEN+8];
create_cache_name(cache_name, name);
if ((error= cache_handler->open(cache_name, mode, open_flags)))
DBUG_RETURN(error);
/* Fix lock so that it goes through get_status_and_flush() */
THR_LOCK *lock= &cache_handler->file->s->lock;
mysql_mutex_lock(&cache_handler->file->s->intern_lock);
org_lock= lock[0];
lock->get_status= &get_status_and_flush_cache;
lock->start_trans= &cache_start_trans;
lock->copy_status= &cache_copy_status;
lock->update_status= &cache_update_status;
lock->restore_status= &cache_restore_status;
lock->check_status= &cache_check_status;
lock->restore_status= &cache_restore_status;
cache_handler->file->lock.status_param= (void*) this;
mysql_mutex_unlock(&cache_handler->file->s->intern_lock);
if (!(share= find_cache_share(name, cache_handler->file->state->records)))
{
cache_handler->close();
DBUG_RETURN(ER_OUTOFMEMORY);
}
/* Fix lock so that it goes through get_status_and_flush() */
THR_LOCK *lock= &cache_handler->file->s->lock;
if (lock->get_status != &get_status_and_flush_cache)
{
mysql_mutex_lock(&cache_handler->file->s->intern_lock);
/* The following lock is here just to establish mutex locking order */
mysql_mutex_lock(&lock->mutex);
mysql_mutex_unlock(&lock->mutex);
if (lock->get_status != &get_status_and_flush_cache)
{
/* Remember original lock. Used by the THR_lock cache functions */
share->org_lock= lock[0];
if (lock->start_trans)
lock->start_trans= &cache_start_trans;
if (lock->copy_status)
lock->copy_status= &cache_copy_status;
if (lock->update_status)
lock->update_status= &cache_update_status;
if (lock->restore_status)
lock->restore_status= &cache_restore_status;
if (lock->check_status)
lock->check_status= &cache_check_status;
if (lock->restore_status)
lock->restore_status= &cache_restore_status;
lock->get_status= &get_status_and_flush_cache;
}
mysql_mutex_unlock(&cache_handler->file->s->intern_lock);
}
cache_handler->file->lock.status_param= (void*) this;
}
if ((error= parent::open(name, mode, open_flags)))
{
cache_handler->close();
if (get_cache_inserts(current_thd))
cache_handler->close();
DBUG_RETURN(error);
}
DBUG_RETURN(0);
}
@ -1422,10 +1484,23 @@ int ha_mcs_cache::open(const char *name, int mode, uint open_flags)
int ha_mcs_cache::close()
{
int error, error2;
DBUG_ENTER("ha_mcs_cache::close()");
error= cache_handler->close();
if ((error2= parent::close()))
error= error2;
if (get_cache_inserts(current_thd))
{
error= cache_handler->close();
if ((error2= parent::close()))
error= error2;
ha_mcs_cache_share *org_share= share;
if (org_share)
org_share->close();
}
else
{
error= parent::close();
}
DBUG_RETURN(error);
}
@ -1439,9 +1514,12 @@ uint ha_mcs_cache::lock_count(void) const
{
/*
If we are doing an insert or if we want to flush the cache, we have to lock
both MyISAM table and normal table.
both the Aria table and normal table.
*/
return 2;
if (get_cache_inserts(current_thd))
return 2;
else
return 1;
}
/**
@ -1452,7 +1530,8 @@ THR_LOCK_DATA **ha_mcs_cache::store_lock(THD *thd,
THR_LOCK_DATA **to,
enum thr_lock_type lock_type)
{
to= cache_handler->store_lock(thd, to, TL_WRITE);
if (get_cache_inserts(current_thd))
to= cache_handler->store_lock(thd, to, TL_WRITE);
return parent::store_lock(thd, to, lock_type);
}
@ -1463,78 +1542,113 @@ THR_LOCK_DATA **ha_mcs_cache::store_lock(THD *thd,
int ha_mcs_cache::external_lock(THD *thd, int lock_type)
{
int error;
int error= 0;
DBUG_ENTER("ha_mcs_cache::external_lock");
/*
Reset lock_counter. This is ok as external_lock() is guaranteed to be
called before first get_status()
*/
lock_counter= 0;
if (lock_type == F_UNLCK)
if (get_cache_inserts(current_thd))
{
int error2;
error= cache_handler->external_lock(thd, lock_type);
if ((error2= parent::external_lock(thd, lock_type)))
error= error2;
DBUG_RETURN(error);
/*
Reset lock_counter. This is ok as external_lock() is guaranteed to be
called before first get_status()
*/
lock_counter= 0;
if (lock_type == F_UNLCK)
{
int error2;
error= 0;
if (cache_locked)
{
error= cache_handler->external_lock(thd, lock_type);
cache_locked= 0;
}
if ((error2= parent::external_lock(thd, lock_type)))
error= error2;
DBUG_RETURN(error);
}
/* Lock first with write lock to be able to do insert or flush table */
original_lock_type= lock_type;
lock_type= F_WRLCK;
if ((error= cache_handler->external_lock(thd, lock_type)))
DBUG_RETURN(error);
if ((error= parent::external_lock(thd, lock_type)))
{
error= cache_handler->external_lock(thd, F_UNLCK);
DBUG_RETURN(error);
}
cache_locked= 1;
}
else
{
error= parent::external_lock(thd, lock_type);
}
/* Lock first with write lock to be able to do insert or flush table */
original_lock_type= lock_type;
lock_type= F_WRLCK;
if ((error= cache_handler->external_lock(thd, lock_type)))
DBUG_RETURN(error);
if ((error= parent::external_lock(thd, lock_type)))
{
error= cache_handler->external_lock(thd, F_UNLCK);
DBUG_RETURN(error);
}
DBUG_RETURN(0);
DBUG_RETURN(error);
}
int ha_mcs_cache::delete_table(const char *name)
{
int error, error2;
char cache_name[FN_REFLEN+8];
int error= 0, error2;
DBUG_ENTER("ha_mcs_cache::delete_table");
create_cache_name(cache_name, name);
error= cache_handler->delete_table(cache_name);
if (get_cache_inserts(current_thd))
{
char cache_name[FN_REFLEN+8];
create_cache_name(cache_name, name);
error= cache_handler->delete_table(cache_name);
}
if ((error2= parent::delete_table(name)))
error= error2;
DBUG_RETURN(error);
}
int ha_mcs_cache::rename_table(const char *from, const char *to)
{
int error;
char cache_from[FN_REFLEN+8], cache_to[FN_REFLEN+8];
int error= 0;
DBUG_ENTER("ha_mcs_cache::rename_table");
create_cache_name(cache_from, from);
create_cache_name(cache_to, to);
if ((error= cache_handler->rename_table(cache_from, cache_to)))
DBUG_RETURN(error);
if ((error= parent::rename_table(from, to)))
if (get_cache_inserts(current_thd))
{
cache_handler->rename_table(cache_to, cache_from);
DBUG_RETURN(error);
char cache_from[FN_REFLEN+8], cache_to[FN_REFLEN+8];
create_cache_name(cache_from, from);
create_cache_name(cache_to, to);
if ((error= cache_handler->rename_table(cache_from, cache_to)))
DBUG_RETURN(error);
if ((error= parent::rename_table(from, to)))
{
cache_handler->rename_table(cache_to, cache_from);
DBUG_RETURN(error);
}
}
DBUG_RETURN(0);
else
{
error= parent::rename_table(from, to);
}
DBUG_RETURN(error);
}
int ha_mcs_cache::delete_all_rows(void)
{
int error,error2;
int error= 0, error2;
DBUG_ENTER("ha_mcs_cache::delete_all_rows");
error= cache_handler->delete_all_rows();
if (get_cache_inserts(current_thd))
{
error= cache_handler->delete_all_rows();
share->cached_rows= 0;
}
if ((error2= parent::delete_all_rows()))
error= error2;
DBUG_RETURN(error);
@ -1542,8 +1656,11 @@ int ha_mcs_cache::delete_all_rows(void)
bool ha_mcs_cache::is_crashed() const
{
return (cache_handler->is_crashed() ||
parent::is_crashed());
if (get_cache_inserts(current_thd))
return (cache_handler->is_crashed() ||
parent::is_crashed());
else
return parent::is_crashed();
}
/**
@ -1555,7 +1672,7 @@ bool ha_mcs_cache::is_crashed() const
In the case of 1) we don't want to run repair on both tables as
the repair can be a slow process. Instead we only run repair
on the crashed tables. If not tables are marked crashed, we
on the crashed tables. If no tables are marked crashed, we
run repair on both tables.
Repair on the cache table will delete the part of the cache that was
@ -1571,17 +1688,23 @@ int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt)
int something_crashed= is_crashed();
DBUG_ENTER("ha_mcs_cache::repair");
if (cache_handler->is_crashed() || !something_crashed)
if (get_cache_inserts(current_thd))
{
/* Delete everything that was not already committed */
mysql_file_chsize(cache_handler->file->dfile.file,
cache_handler->file->s->state.state.key_file_length,
0, MYF(MY_WME));
mysql_file_chsize(cache_handler->file->s->kfile.file,
cache_handler->file->s->state.state.data_file_length,
0, MYF(MY_WME));
error= cache_handler->repair(thd, check_opt);
if (cache_handler->is_crashed() || !something_crashed)
{
/* Delete everything that was not already committed */
mysql_file_chsize(cache_handler->file->dfile.file,
cache_handler->file->s->state.state.data_file_length,
0, MYF(MY_WME));
mysql_file_chsize(cache_handler->file->s->kfile.file,
cache_handler->file->s->state.state.key_file_length,
0, MYF(MY_WME));
check_opt->flags|= T_AUTO_REPAIR;
error= cache_handler->repair(thd, check_opt);
share->cached_rows= cache_handler->file->state->records;
}
}
if (parent::is_crashed() || !something_crashed)
if ((error2= parent::repair(thd, check_opt)))
error= error2;
@ -1595,26 +1718,37 @@ int ha_mcs_cache::repair(THD *thd, HA_CHECK_OPT *check_opt)
*/
int ha_mcs_cache::write_row(const uchar *buf)
{
if (insert_command)
if (get_cache_inserts(current_thd) && insert_command)
{
DBUG_ASSERT(share->cached_rows == cache_handler->file->state->records);
share->cached_rows++;
return cache_handler->write_row(buf);
}
return parent::write_row(buf);
}
void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags)
{
if (insert_command)
if (get_cache_inserts(current_thd))
{
bzero(&cache_handler->copy_info, sizeof(cache_handler->copy_info));
return cache_handler->start_bulk_insert(rows, flags);
if (insert_command)
{
bzero(&cache_handler->copy_info, sizeof(cache_handler->copy_info));
return cache_handler->start_bulk_insert(rows, flags);
}
return parent::start_bulk_insert_from_cache(rows, flags);
}
else
{
return parent::start_bulk_insert(rows, flags);
}
return parent::start_bulk_insert(rows, flags);
}
int ha_mcs_cache::end_bulk_insert()
{
if (insert_command)
if (get_cache_inserts(current_thd) && insert_command)
return cache_handler->end_bulk_insert();
return parent::end_bulk_insert();
}
@ -1623,7 +1757,6 @@ int ha_mcs_cache::end_bulk_insert()
ha_mcs_cache Plugin code
******************************************************************************/
#if 0
static handler *ha_mcs_cache_create_handler(handlerton *hton,
TABLE_SHARE *table,
MEM_ROOT *mem_root)
@ -1631,54 +1764,82 @@ static handler *ha_mcs_cache_create_handler(handlerton *hton,
return new (mem_root) ha_mcs_cache(hton, table, mem_root);
}
static plugin_ref plugin_maria;
static int ha_mcs_cache_init(void *p)
/******************************************************************************
ha_mcs Plugin code
******************************************************************************/
static int columnstore_init_func(void* p)
{
handlerton *cache_hton;
int error;
DBUG_ENTER("columnstore_init_func");
cache_hton= (handlerton *) p;
cache_hton->create= ha_mcs_cache_create_handler;
cache_hton->panic= 0;
cache_hton->flags= HTON_NO_PARTITION;
struct tm tm;
time_t t;
time(&t);
localtime_r(&t, &tm);
fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d ",
tm.tm_year % 100, tm.tm_mon + 1, tm.tm_mday,
tm.tm_hour, tm.tm_min, tm.tm_sec);
error= mcs_hton == NULL; // Engine must exists!
fprintf(stderr, "Columnstore: Started; Version: %s-%s\n", columnstore_version.c_str(), columnstore_release.c_str());
if (error)
{
my_error(HA_ERR_INITIALIZATION, MYF(0),
"Could not find storage engine %s", "Columnstore");
return error;
}
strncpy(cs_version, columnstore_version.c_str(), sizeof(cs_version));
cs_version[sizeof(cs_version) - 1] = 0;
{
LEX_CSTRING name= { STRING_WITH_LEN("Aria") };
plugin_maria= ha_resolve_by_name(0, &name, 0);
mcs_maria_hton= plugin_hton(plugin_maria);
error= mcs_maria_hton == NULL; // Engine must exists!
if (error)
my_error(HA_ERR_INITIALIZATION, MYF(0),
"Could not find storage engine %s", name.str);
}
strncpy(cs_commit_hash, columnstore_commit_hash.c_str(), sizeof(cs_commit_hash));
cs_commit_hash[sizeof(cs_commit_hash) - 1] = 0;
return error;
mcs_hton = (handlerton*)p;
#ifndef _MSC_VER
(void) pthread_mutex_init(&mcs_mutex, MY_MUTEX_INIT_FAST);
#endif
(void) my_hash_init(PSI_NOT_INSTRUMENTED, &mcs_open_tables, system_charset_info, 32, 0, 0,
(my_hash_get_key) mcs_get_key, 0, 0);
mcs_hton->create = ha_mcs_cache_create_handler;
mcs_hton->panic = 0;
mcs_hton->flags = HTON_CAN_RECREATE | HTON_NO_PARTITION;
// mcs_hton->discover_table = mcs_discover;
// mcs_hton->discover_table_existence = mcs_discover_existence;
mcs_hton->commit = mcs_commit;
mcs_hton->rollback = mcs_rollback;
mcs_hton->close_connection = mcs_close_connection;
mcs_hton->create_group_by = create_columnstore_group_by_handler;
mcs_hton->create_derived = create_columnstore_derived_handler;
mcs_hton->create_select = create_columnstore_select_handler;
mcs_hton->db_type = DB_TYPE_AUTOASSIGN;
uint count = sizeof(all_mutexes)/sizeof(all_mutexes[0]);
mysql_mutex_register("ha_mcs_cache", all_mutexes, count);
mysql_mutex_init(key_LOCK_cache_share, &LOCK_cache_share, MY_MUTEX_INIT_FAST);
DBUG_RETURN(0);
}
static int ha_mcs_cache_deinit(void *p)
static int columnstore_done_func(void* p)
{
if (plugin_maria)
{
plugin_unlock(0, plugin_maria);
plugin_maria= NULL;
}
return 0;
}
DBUG_ENTER("columnstore_done_func");
config::Config::deleteInstanceMap();
my_hash_free(&mcs_open_tables);
#ifndef _MSC_VER
pthread_mutex_destroy(&mcs_mutex);
#endif
if (plugin_maria)
{
plugin_unlock(0, plugin_maria);
plugin_maria = NULL;
}
mysql_mutex_destroy(&LOCK_cache_share);
DBUG_RETURN(0);
}
struct st_mysql_storage_engine ha_mcs_cache_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
#endif
struct st_mysql_storage_engine columnstore_storage_engine =
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
@ -1703,23 +1864,6 @@ maria_declare_plugin(columnstore)
MCSVERSION, /* string version */
COLUMNSTORE_MATURITY /* maturity */
},
#if 0
{
MYSQL_STORAGE_ENGINE_PLUGIN,
&ha_mcs_cache_storage_engine,
"Columnstore_cache",
"MariaDB Corporation AB",
"Insert cache for ColumnStore",
PLUGIN_LICENSE_GPL,
ha_mcs_cache_init, /* Plugin Init */
ha_mcs_cache_deinit, /* Plugin Deinit */
MCSVERSIONHEX,
NULL, /* status variables */
NULL, /* system variables */
MCSVERSION, /* string version */
MariaDB_PLUGIN_MATURITY_ALPHA /* maturity */
},
#endif
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&is_columnstore_plugin_version,
@ -1790,9 +1934,9 @@ maria_declare_plugin_end;
Implementation of write cache
******************************************************************************/
bool ha_mcs_cache::rows_cached()
ha_rows ha_mcs_cache::num_rows_cached()
{
return cache_handler->file->state->records != 0;
return cache_handler->file->state->records;
}
@ -1800,20 +1944,20 @@ bool ha_mcs_cache::rows_cached()
void ha_mcs_cache::free_locks()
{
/* We don't need to lock cache_handler anymore as it's already flushed */
mysql_mutex_unlock(&cache_handler->file->lock.lock->mutex);
thr_unlock(&cache_handler->file->lock, 0);
mysql_mutex_lock(&cache_handler->file->lock.lock->mutex);
/* Restart transaction for columnstore table */
if (original_lock_type != F_WRLCK)
{
parent::external_lock(table->in_use, F_UNLCK);
parent::external_lock(table->in_use, original_lock_type);
}
/* We don't need to lock cache_handler anymore as it's already flushed */
cache_handler->external_lock(table->in_use, F_UNLCK);
thr_unlock(&cache_handler->file->lock, 0);
cache_locked= false;
}
/**
Copy data from cache to ColumnStore
@ -1826,18 +1970,23 @@ int ha_mcs_cache::flush_insert_cache()
int error, error2;
ha_maria *from= cache_handler;
uchar *record= table->record[0];
ulonglong copied_rows= 0;
DBUG_ENTER("flush_insert_cache");
DBUG_ASSERT(from->file->state->records == share->cached_rows);
parent::start_bulk_insert_from_cache(from->file->state->records, 0);
from->rnd_init(1);
while (!(error= from->rnd_next(record)))
{
copied_rows++;
if ((error= parent::write_row(record)))
goto end;
rows_changed++;
}
if (error == HA_ERR_END_OF_FILE)
error= 0;
DBUG_ASSERT(copied_rows == share->cached_rows);
end:
from->rnd_end();
@ -1856,19 +2005,24 @@ end:
parent::ht->rollback(parent::ht, table->in_use, 1);
}
DBUG_ASSERT(error == 0);
if (!error)
{
/*
Everything when fine, delete all rows from the cache and allow others
Everything went fine, delete all rows from the cache and allow others
to use it.
*/
from->delete_all_rows();
/*
This was not an insert command, so we can delete the thr lock
(We are not going to use the insert cache for this statement anymore)
*/
free_locks();
{
/*
We have to unlock the lock mutex as otherwise we get a conflict in
mutex order. This is fine as we have a write lock on the mutex,
so we will be able to get it again
*/
mysql_mutex_unlock(&from->file->s->lock.mutex);
from->delete_all_rows();
share->cached_rows= 0;
mysql_mutex_lock(&from->file->s->lock.mutex);
}
}
DBUG_RETURN(error);
}

View File

@ -242,16 +242,31 @@ public:
};
class ha_mcs_cache_share
{
ha_mcs_cache_share *next; /* Next open share */
const char *name;
uint open_count;
public:
ulonglong cached_rows;
THR_LOCK org_lock;
friend ha_mcs_cache_share *find_cache_share(const char *name,
ulonglong cached_rows);
void close();
};
class ha_mcs_cache :public ha_mcs
{
typedef ha_mcs parent;
int original_lock_type;
bool insert_command;
bool insert_command, cache_locked;
public:
THR_LOCK org_lock;
uint lock_counter;
ha_maria *cache_handler;
ha_mcs_cache_share *share;
ha_mcs_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root);
~ha_mcs_cache();
@ -287,10 +302,11 @@ public:
/* Cache functions */
void free_locks();
bool rows_cached();
ha_rows num_rows_cached();
int flush_insert_cache();
friend my_bool get_status_and_flush_cache(void *param,
my_bool concurrent_insert);
friend my_bool cache_start_trans(void *param);
};
#endif //HA_MCS_H__

View File

@ -77,6 +77,7 @@ using namespace joblist;
namespace
{
#define BATCH_INSERT_GROUP_ROWS_FOR_CACHE 100000
uint64_t fBatchInsertGroupRows = 0; // ResourceManager::instance()->getRowsPerBatch();
// HDFS is never used nowadays, so don't bother
bool useHdfs = false; // ResourceManager::instance()->useHdfs();
@ -594,13 +595,17 @@ int ha_mcs_impl_write_row_(const uchar* buf, TABLE* table, cal_connection_info&
}
if (fBatchInsertGroupRows == 0)
{
fBatchInsertGroupRows = ResourceManager::instance()->getRowsPerBatch();
}
//timer.stop( "buildValueList");
if ( ci.singleInsert // Single insert
|| (( ci.bulkInsertRows > 0 ) && (( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows ) || ( size >= fBatchInsertGroupRows )) )
//Insert with mutilple value case: processed batch by batch. Last batch is sent also.
|| (( ci.bulkInsertRows == 0 ) && ( size >= fBatchInsertGroupRows ) ) ) // Load data in file is processed batch by batch
|| (( ci.bulkInsertRows > 0 ) && (( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows )
|| ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows) || (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) )) )
//Insert with mutilple value case: processed batch by batch. Last batch is sent also.
|| (( ci.bulkInsertRows == 0 ) && ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows)
|| (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) ) ) ) // Load data in file is processed batch by batch
{
//timer.start( "DMLProc takes");
//cout <<" sending a batch to DMLProc ... The size is " << size << " the current bulkInsertRows = " << ci.bulkInsertRows << endl;

View File

@ -3161,7 +3161,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
ci->isLoaddataInfile = true;
}
if (is_cache_insert)
if (is_cache_insert && (thd->lex)->sql_command != SQLCOM_INSERT_SELECT)
{
ci->isCacheInsert = true;
@ -3174,7 +3174,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
if ((((thd->lex)->sql_command == SQLCOM_INSERT) ||
((thd->lex)->sql_command == SQLCOM_LOAD) ||
(thd->lex)->sql_command == SQLCOM_INSERT_SELECT ||
is_cache_insert) && !ci->singleInsert )
ci->isCacheInsert) && !ci->singleInsert )
{
ci->useCpimport = get_use_import_for_batchinsert(thd);
@ -3182,7 +3182,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
ci->useCpimport = 0;
// For now, disable cpimport for cache inserts
if (is_cache_insert)
if (ci->isCacheInsert)
ci->useCpimport = 0;
// ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
@ -3215,6 +3215,11 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
}
ci->useXbit = table->s->db_options_in_use & HA_OPTION_PACK_RECORD;
// TODO: This needs a proper fix.
if (is_cache_insert)
ci->useXbit = false;
//@bug 6122 Check how many columns have not null constraint. columnn with not null constraint will not show up in header.
unsigned int numberNotNull = 0;
@ -3541,7 +3546,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
}
//Save table oid for commit to use
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || is_cache_insert)
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert)
{
// query stats. only collect execution time and rows inserted for insert/load_data_infile
ci->stats.reset();

View File

@ -303,6 +303,27 @@ static MYSQL_THDVAR_BOOL(
0
);
static MYSQL_THDVAR_BOOL(
cache_inserts,
PLUGIN_VAR_NOCMDARG | PLUGIN_VAR_READONLY,
"Perform cache-based inserts to ColumnStore",
NULL,
NULL,
0
);
static MYSQL_THDVAR_ULONGLONG(
cache_flush_threshold,
PLUGIN_VAR_RQCMDARG,
"Threshold on the number of rows in the cache to trigger a flush",
NULL,
NULL,
500000,
1,
1000000000,
1
);
st_mysql_sys_var* mcs_system_variables[] =
{
MYSQL_SYSVAR(compression_type),
@ -328,6 +349,8 @@ st_mysql_sys_var* mcs_system_variables[] =
MYSQL_SYSVAR(import_for_batchinsert_enclosed_by),
MYSQL_SYSVAR(varbin_always_hex),
MYSQL_SYSVAR(replication_slave),
MYSQL_SYSVAR(cache_inserts),
MYSQL_SYSVAR(cache_flush_threshold),
NULL
};
@ -559,3 +582,22 @@ void set_replication_slave(THD* thd, bool value)
{
THDVAR(thd, replication_slave) = value;
}
bool get_cache_inserts(THD* thd)
{
return ( thd == NULL ) ? false : THDVAR(thd, cache_inserts);
}
void set_cache_inserts(THD* thd, bool value)
{
THDVAR(thd, cache_inserts) = value;
}
ulonglong get_cache_flush_threshold(THD* thd)
{
return ( thd == NULL ) ? 500000 : THDVAR(thd, cache_flush_threshold);
}
void set_cache_flush_threshold(THD* thd, ulonglong value)
{
THDVAR(thd, cache_flush_threshold) = value;
}

View File

@ -113,4 +113,10 @@ void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value);
bool get_replication_slave(THD* thd);
void set_replication_slave(THD* thd, bool value);
bool get_cache_inserts(THD* thd);
void set_cache_inserts(THD* thd, bool value);
ulonglong get_cache_flush_threshold(THD* thd);
void set_cache_flush_threshold(THD* thd, ulonglong value);
#endif