From 86fb66365ca6d1dd3b948ef6874d87e07eb6c039 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Wed, 1 Jul 2020 20:13:04 -0400 Subject: [PATCH] 1. Set 1M as the threshold on the number of records to flush the cache. 2. Set 100k as the batch size when flushing records into ColumnStore, i.e., a flush of 1M records will be performed in 10 batches, each being 100k. 3. For INSERT ... SELECT on the cache, use the default insertion method of cpimport. --- dbcon/mysql/ha_mcs.cpp | 11 +++++++---- dbcon/mysql/ha_mcs.h | 2 +- dbcon/mysql/ha_mcs_dml.cpp | 11 ++++++++--- dbcon/mysql/ha_mcs_impl.cpp | 13 +++++++++---- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 5c419e814..edb22eb6d 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -32,6 +32,7 @@ #endif #define CACHE_PREFIX "#cache#" +#define CACHE_FLUSH_THRESHOLD 1000000 static handler* mcs_create_handler(handlerton* hton, TABLE_SHARE* table, @@ -1272,7 +1273,9 @@ my_bool get_status_and_flush_cache(void *param, /* If first get_status() call for this table, flush cache if needed */ if (!cache->lock_counter++) { - if (!cache->insert_command && cache->rows_cached()) + ha_rows num_rows = cache->num_rows_cached(); + if ((!cache->insert_command && num_rows != 0) || + num_rows == CACHE_FLUSH_THRESHOLD) { if ((error= cache->flush_insert_cache())) { @@ -1703,7 +1706,7 @@ void ha_mcs_cache::start_bulk_insert(ha_rows rows, uint flags) bzero(&cache_handler->copy_info, sizeof(cache_handler->copy_info)); return cache_handler->start_bulk_insert(rows, flags); } - return parent::start_bulk_insert(rows, flags); + return parent::start_bulk_insert_from_cache(rows, flags); } @@ -1887,9 +1890,9 @@ maria_declare_plugin_end; Implementation of write cache ******************************************************************************/ -bool ha_mcs_cache::rows_cached() +ha_rows ha_mcs_cache::num_rows_cached() { - return cache_handler->file->state->records != 0; + return cache_handler->file->state->records; } diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index c3751fa47..2c0dcca7a 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -300,7 +300,7 @@ public: /* Cache functions */ void free_locks(); - bool rows_cached(); + ha_rows num_rows_cached(); int flush_insert_cache(); friend my_bool get_status_and_flush_cache(void *param, my_bool concurrent_insert); diff --git a/dbcon/mysql/ha_mcs_dml.cpp b/dbcon/mysql/ha_mcs_dml.cpp index d79b1c6cb..733506ac1 100644 --- a/dbcon/mysql/ha_mcs_dml.cpp +++ b/dbcon/mysql/ha_mcs_dml.cpp @@ -77,6 +77,7 @@ using namespace joblist; namespace { +#define BATCH_INSERT_GROUP_ROWS_FOR_CACHE 100000 uint64_t fBatchInsertGroupRows = 0; // ResourceManager::instance()->getRowsPerBatch(); // HDFS is never used nowadays, so don't bother bool useHdfs = false; // ResourceManager::instance()->useHdfs(); @@ -594,13 +595,17 @@ int ha_mcs_impl_write_row_(const uchar* buf, TABLE* table, cal_connection_info& } if (fBatchInsertGroupRows == 0) + { fBatchInsertGroupRows = ResourceManager::instance()->getRowsPerBatch(); + } //timer.stop( "buildValueList"); if ( ci.singleInsert // Single insert - || (( ci.bulkInsertRows > 0 ) && (( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows ) || ( size >= fBatchInsertGroupRows )) ) - //Insert with mutilple value case: processed batch by batch. Last batch is sent also. - || (( ci.bulkInsertRows == 0 ) && ( size >= fBatchInsertGroupRows ) ) ) // Load data in file is processed batch by batch + || (( ci.bulkInsertRows > 0 ) && (( ( ci.rowsHaveInserted + size) >= ci.bulkInsertRows ) + || ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows) || (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) )) ) + //Insert with mutilple value case: processed batch by batch. Last batch is sent also. + || (( ci.bulkInsertRows == 0 ) && ( (!ci.isCacheInsert && size >= fBatchInsertGroupRows) + || (ci.isCacheInsert && size >= BATCH_INSERT_GROUP_ROWS_FOR_CACHE) ) ) ) // Load data in file is processed batch by batch { //timer.start( "DMLProc takes"); //cout <<" sending a batch to DMLProc ... The size is " << size << " the current bulkInsertRows = " << ci.bulkInsertRows << endl; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index ad83918d8..29884004e 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -3161,7 +3161,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins ci->isLoaddataInfile = true; } - if (is_cache_insert) + if (is_cache_insert && (thd->lex)->sql_command != SQLCOM_INSERT_SELECT) { ci->isCacheInsert = true; @@ -3174,7 +3174,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins if ((((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || - is_cache_insert) && !ci->singleInsert ) + ci->isCacheInsert) && !ci->singleInsert ) { ci->useCpimport = get_use_import_for_batchinsert(thd); @@ -3182,7 +3182,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins ci->useCpimport = 0; // For now, disable cpimport for cache inserts - if (is_cache_insert) + if (ci->isCacheInsert) ci->useCpimport = 0; // ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a @@ -3215,6 +3215,11 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins } ci->useXbit = table->s->db_options_in_use & HA_OPTION_PACK_RECORD; + + // TODO: This needs a proper fix. + if (is_cache_insert) + ci->useXbit = false; + //@bug 6122 Check how many columns have not null constraint. columnn with not null constraint will not show up in header. unsigned int numberNotNull = 0; @@ -3541,7 +3546,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins } //Save table oid for commit to use - if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || is_cache_insert) + if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) { // query stats. only collect execution time and rows inserted for insert/load_data_infile ci->stats.reset();