You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
1. Set 1M as the threshold on the number of records to flush the cache.
2. Set 100k as the batch size when flushing records into ColumnStore, i.e., a flush of 1M records will be performed in 10 batches, each being 100k. 3. For INSERT ... SELECT on the cache, use the default insertion method of cpimport.
This commit is contained in:
@ -32,6 +32,7 @@
|
||||
#endif
|
||||
|
||||
#define CACHE_PREFIX "#cache#"
|
||||
#define CACHE_FLUSH_THRESHOLD 1000000
|
||||
|
||||
static handler* mcs_create_handler(handlerton* hton,
|
||||
TABLE_SHARE* table,
|
||||
@ -1272,7 +1273,9 @@ my_bool get_status_and_flush_cache(void *param,
|
||||
/* If first get_status() call for this table, flush cache if needed */
|
||||
if (!cache->lock_counter++)
|
||||
{
|
||||
if (!cache->insert_command && cache->rows_cached())
|
||||
ha_rows num_rows = cache->num_rows_cached();
|
||||
if ((!cache->insert_command && num_rows != 0) ||
|
||||
num_rows == CACHE_FLUSH_THRESHOLD)
|
||||
{
|
||||
if ((error= cache->flush_insert_cache()))
|
||||
{
|
||||
@ -1703,7 +1706,7 @@ void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags)
|
||||
bzero(&cache_handler->copy_info, sizeof(cache_handler->copy_info));
|
||||
return cache_handler->start_bulk_insert(rows, flags);
|
||||
}
|
||||
return parent::start_bulk_insert(rows, flags);
|
||||
return parent::start_bulk_insert_from_cache(rows, flags);
|
||||
}
|
||||
|
||||
|
||||
@ -1887,9 +1890,9 @@ maria_declare_plugin_end;
|
||||
Implementation of write cache
|
||||
******************************************************************************/
|
||||
|
||||
bool ha_mcs_cache::rows_cached()
|
||||
ha_rows ha_mcs_cache::num_rows_cached()
|
||||
{
|
||||
return cache_handler->file->state->records != 0;
|
||||
return cache_handler->file->state->records;
|
||||
}
|
||||
|
||||
|
||||
|
@ -300,7 +300,7 @@ public:
|
||||
|
||||
/* Cache functions */
|
||||
void free_locks();
|
||||
bool rows_cached();
|
||||
ha_rows num_rows_cached();
|
||||
int flush_insert_cache();
|
||||
friend my_bool get_status_and_flush_cache(void *param,
|
||||
my_bool concurrent_insert);
|
||||
|
@ -77,6 +77,7 @@ using namespace joblist;
|
||||
|
||||
namespace
|
||||
{
|
||||
#define BATCH_INSERT_GROUP_ROWS_FOR_CACHE 100000
|
||||
uint64_t fBatchInsertGroupRows = 0; // ResourceManager::instance()->getRowsPerBatch();
|
||||
// HDFS is never used nowadays, so don't bother
|
||||
bool useHdfs = false; // ResourceManager::instance()->useHdfs();
|
||||
@ -594,13 +595,17 @@ int ha_mcs_impl_write_row_(const uchar* buf, TABLE* table, cal_connection_info&
|
||||
}
|
||||
|
||||
if (fBatchInsertGroupRows == 0)
|
||||
{
|
||||
fBatchInsertGroupRows = ResourceManager::instance()->getRowsPerBatch();
|
||||
}
|
||||
|
||||
//timer.stop( "buildValueList");
|
||||
if ( ci.singleInsert // Single insert
|
||||
|| (( ci.bulkInsertRows > 0 ) && (( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows ) || ( size >= fBatchInsertGroupRows )) )
|
||||
|| (( ci.bulkInsertRows > 0 ) && (( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows )
|
||||
|| ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows) || (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) )) )
|
||||
//Insert with mutilple value case: processed batch by batch. Last batch is sent also.
|
||||
|| (( ci.bulkInsertRows == 0 ) && ( size >= fBatchInsertGroupRows ) ) ) // Load data in file is processed batch by batch
|
||||
|| (( ci.bulkInsertRows == 0 ) && ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows)
|
||||
|| (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) ) ) ) // Load data in file is processed batch by batch
|
||||
{
|
||||
//timer.start( "DMLProc takes");
|
||||
//cout <<" sending a batch to DMLProc ... The size is " << size << " the current bulkInsertRows = " << ci.bulkInsertRows << endl;
|
||||
|
@ -3161,7 +3161,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
ci->isLoaddataInfile = true;
|
||||
}
|
||||
|
||||
if (is_cache_insert)
|
||||
if (is_cache_insert && (thd->lex)->sql_command != SQLCOM_INSERT_SELECT)
|
||||
{
|
||||
ci->isCacheInsert = true;
|
||||
|
||||
@ -3174,7 +3174,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
if ((((thd->lex)->sql_command == SQLCOM_INSERT) ||
|
||||
((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
||||
(thd->lex)->sql_command == SQLCOM_INSERT_SELECT ||
|
||||
is_cache_insert) && !ci->singleInsert )
|
||||
ci->isCacheInsert) && !ci->singleInsert )
|
||||
{
|
||||
ci->useCpimport = get_use_import_for_batchinsert(thd);
|
||||
|
||||
@ -3182,7 +3182,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
ci->useCpimport = 0;
|
||||
|
||||
// For now, disable cpimport for cache inserts
|
||||
if (is_cache_insert)
|
||||
if (ci->isCacheInsert)
|
||||
ci->useCpimport = 0;
|
||||
|
||||
// ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
|
||||
@ -3215,6 +3215,11 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
}
|
||||
|
||||
ci->useXbit = table->s->db_options_in_use & HA_OPTION_PACK_RECORD;
|
||||
|
||||
// TODO: This needs a proper fix.
|
||||
if (is_cache_insert)
|
||||
ci->useXbit = false;
|
||||
|
||||
//@bug 6122 Check how many columns have not null constraint. columnn with not null constraint will not show up in header.
|
||||
unsigned int numberNotNull = 0;
|
||||
|
||||
@ -3541,7 +3546,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
}
|
||||
|
||||
//Save table oid for commit to use
|
||||
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || is_cache_insert)
|
||||
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert)
|
||||
{
|
||||
// query stats. only collect execution time and rows inserted for insert/load_data_infile
|
||||
ci->stats.reset();
|
||||
|
Reference in New Issue
Block a user