1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Use batch inserts for the cache flush.

This commit is contained in:
Gagan Goel
2020-05-28 13:52:26 -04:00
parent e671b1d1e2
commit c30d105c30
6 changed files with 76 additions and 29 deletions

View File

@ -3051,7 +3051,7 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed)
(!ci->singleInsert) &&
((ci->isLoaddataInfile) ||
((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) )
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) )
{
rc = ha_mcs_impl_write_batch_row_(buf, table, *ci);
}
@ -3104,7 +3104,7 @@ int ha_mcs_impl_delete_row()
return ( rc );
}
void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_insert)
{
THD* thd = current_thd;
@ -3168,17 +3168,30 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
ci->isLoaddataInfile = true;
}
if (is_cache_insert)
{
ci->isCacheInsert = true;
if (rows > 1)
ci->singleInsert = false;
}
ci->bulkInsertRows = rows;
if ((((thd->lex)->sql_command == SQLCOM_INSERT) ||
((thd->lex)->sql_command == SQLCOM_LOAD) ||
(thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert )
(thd->lex)->sql_command == SQLCOM_INSERT_SELECT ||
is_cache_insert) && !ci->singleInsert )
{
ci->useCpimport = get_use_import_for_batchinsert(thd);
if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0))
ci->useCpimport = 0;
// For now, disable cpimport for cache inserts
if (is_cache_insert)
ci->useCpimport = 0;
// ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
// transaction or not. User should use this option very carefully since
// cpimport currently does not support rollbacks
@ -3535,7 +3548,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
}
//Save table oid for commit to use
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) )
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || is_cache_insert)
{
// query stats. only collect execution time and rows inserted for insert/load_data_infile
ci->stats.reset();
@ -3643,14 +3656,14 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
// @bug 2378. do not enter for select, reset singleInsert flag after multiple insert.
// @bug 2515. Check command intead of vtable state
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert )
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || ci->isCacheInsert) && !ci->singleInsert )
{
if (((ci->useCpimport == 2) ||
((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
(!ci->singleInsert) &&
((ci->isLoaddataInfile) ||
((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) )
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) )
{
#ifdef _MSC_VER
@ -3824,7 +3837,9 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
// populate query stats for insert and load data infile. insert select has
// stats entered in sm already
if (((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD))
if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
((thd->lex)->sql_command == SQLCOM_LOAD) ||
ci->isCacheInsert)
{
ci->stats.setEndTime();
ci->stats.fErrorNo = rc;
@ -3851,6 +3866,7 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
// However, we should be resetting these members anyways.
ci->singleInsert = true; // reset the flag
ci->isLoaddataInfile = false;
ci->isCacheInsert = false;
ci->tableOid = 0;
ci->rowsHaveInserted = 0;
ci->useCpimport = 1;
@ -3885,6 +3901,7 @@ int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all)
thd->server_status &= ~SERVER_STATUS_IN_TRANS;
ci->singleInsert = true; // reset the flag
ci->isLoaddataInfile = false;
ci->isCacheInsert = false;
ci->tableOid = 0;
ci->rowsHaveInserted = 0;
return rc;
@ -3906,6 +3923,7 @@ int ha_mcs_impl_rollback (handlerton* hton, THD* thd, bool all)
int rc = ha_mcs_impl_rollback_(hton, thd, all, *ci);
ci->singleInsert = true; // reset the flag
ci->isLoaddataInfile = false;
ci->isCacheInsert = false;
ci->tableOid = 0;
ci->rowsHaveInserted = 0;
thd->server_status &= ~SERVER_STATUS_IN_TRANS;