From a0bd7900051c4d100410ffb214a13dec07cfd35e Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Wed, 7 Jul 2021 14:01:47 +0000 Subject: [PATCH] ColumnStore Cache changes. 1. Add a new system variable, columnstore_use_cpimport_for_cache_inserts, that when set to ON, uses cpimport for the cache flush into ColumnStore. This variable is set to OFF by default. By default, we perform batch inserts for the cache flush. 2. Disable DMLProc logging of the SQL statement text for the cache flush operation in case of batch inserts. Under certain heavy loads involving INSERT statements, this logging becomes a bottleneck for the cache flush, causing subsequent inserts into the cache table to hang. --- dbcon/dmlpackage/calpontdmlpackage.cpp | 6 ++++-- dbcon/dmlpackage/calpontdmlpackage.h | 10 ++++++++++ dbcon/dmlpackage/insertdmlpackage.cpp | 3 +++ dbcon/mysql/ha_mcs_dml.cpp | 2 ++ dbcon/mysql/ha_mcs_impl.cpp | 8 ++++++-- dbcon/mysql/ha_mcs_sysvars.cpp | 19 +++++++++++++++++++ dbcon/mysql/ha_mcs_sysvars.h | 3 +++ dmlproc/dmlprocessor.cpp | 11 ++++++++--- 8 files changed, 55 insertions(+), 7 deletions(-) diff --git a/dbcon/dmlpackage/calpontdmlpackage.cpp b/dbcon/dmlpackage/calpontdmlpackage.cpp index 1916016ea..51554362c 100644 --- a/dbcon/dmlpackage/calpontdmlpackage.cpp +++ b/dbcon/dmlpackage/calpontdmlpackage.cpp @@ -31,7 +31,9 @@ namespace dmlpackage */ CalpontDMLPackage::CalpontDMLPackage() - : fPlan(new messageqcpp::ByteStream()), fTable(0), fHasFilter(0), fLogging(true), fIsInsertSelect(false), fIsBatchInsert(false), fIsAutocommitOn(false), fTableOid(0) + : fPlan(new messageqcpp::ByteStream()), + fTable(0), fHasFilter(0), fLogging(true), fIsInsertSelect(false), + fIsBatchInsert(false), fIsCacheInsert(false), fIsAutocommitOn(false), fIsWarnToError(false), fTableOid(0) { } @@ -40,7 +42,7 @@ CalpontDMLPackage::CalpontDMLPackage( std::string schemaName, std::string tableN std::string dmlStatement, int sessionID ) : fSchemaName(schemaName), fTableName( tableName ), fDMLStatement( dmlStatement ), fSessionID(sessionID), fPlan(new messageqcpp::ByteStream()), fTable(0), fHasFilter(false), fLogging(true), fIsInsertSelect(false), - fIsBatchInsert(false), fIsAutocommitOn(false), fIsWarnToError(false), fTableOid(0) + fIsBatchInsert(false), fIsCacheInsert(false), fIsAutocommitOn(false), fIsWarnToError(false), fTableOid(0) { } diff --git a/dbcon/dmlpackage/calpontdmlpackage.h b/dbcon/dmlpackage/calpontdmlpackage.h index 34f67c947..af25e2524 100644 --- a/dbcon/dmlpackage/calpontdmlpackage.h +++ b/dbcon/dmlpackage/calpontdmlpackage.h @@ -320,6 +320,15 @@ public: fIsBatchInsert = isBatchInsert; } + bool get_isCacheInsert() + { + return fIsCacheInsert; + } + void set_isCacheInsert( const bool isCacheInsert ) + { + fIsCacheInsert = isCacheInsert; + } + bool get_isAutocommitOn() { return fIsAutocommitOn; @@ -378,6 +387,7 @@ protected: std::string StripLeadingWhitespace( std::string value ); bool fIsInsertSelect; bool fIsBatchInsert; + bool fIsCacheInsert; bool fIsAutocommitOn; bool fIsWarnToError; uint32_t fTableOid; diff --git a/dbcon/dmlpackage/insertdmlpackage.cpp b/dbcon/dmlpackage/insertdmlpackage.cpp index b91848e29..bdc81a454 100644 --- a/dbcon/dmlpackage/insertdmlpackage.cpp +++ b/dbcon/dmlpackage/insertdmlpackage.cpp @@ -69,6 +69,7 @@ int InsertDMLPackage::write(messageqcpp::ByteStream& bytestream) bytestream << fTableOid; bytestream << static_cast(fIsInsertSelect); bytestream << static_cast(fIsBatchInsert); + bytestream << static_cast(fIsCacheInsert); bytestream << static_cast(fIsAutocommitOn); if (fTable != 0) @@ -103,6 +104,7 @@ int InsertDMLPackage::read(messageqcpp::ByteStream& bytestream) bytestream >> fTableOid; bytestream >> reinterpret_cast(fIsInsertSelect); bytestream >> reinterpret_cast(fIsBatchInsert); + bytestream >> reinterpret_cast(fIsCacheInsert); bytestream >> reinterpret_cast(fIsAutocommitOn); fTable = new DMLTable(); @@ -132,6 +134,7 @@ void InsertDMLPackage::readMetaData(messageqcpp::ByteStream& bytestream) bytestream >> fTableOid; bytestream >> reinterpret_cast(fIsInsertSelect); bytestream >> reinterpret_cast(fIsBatchInsert); + bytestream >> reinterpret_cast(fIsCacheInsert); bytestream >> reinterpret_cast(fIsAutocommitOn); fTable = new DMLTable(); diff --git a/dbcon/mysql/ha_mcs_dml.cpp b/dbcon/mysql/ha_mcs_dml.cpp index 742488592..fafdf3f79 100644 --- a/dbcon/mysql/ha_mcs_dml.cpp +++ b/dbcon/mysql/ha_mcs_dml.cpp @@ -284,6 +284,8 @@ int doProcessInsertValues ( TABLE* table, uint32_t size, cal_connection_info& ci ci.tableValuesMap.clear(); ci.colNameList.clear(); + pDMLPackage->set_isCacheInsert(ci.isCacheInsert); + if (!pDMLPackage) { rc = -1; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 4dff702ab..6acc214ea 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -3040,9 +3040,13 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0)) ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::OFF; - // For now, disable cpimport for cache inserts if (ci->isCacheInsert) - ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::OFF; + { + if (get_use_cpimport_for_cache_inserts(thd)) + ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::ALWAYS; + else + ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::OFF; + } // ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::ALWAYS means ALWAYS use // cpimport, whether it's in a transaction or not. User should use this option diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index 234fe69ae..55e4b14d7 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -348,6 +348,15 @@ static MYSQL_THDVAR_BOOL( 0 ); +static MYSQL_THDVAR_BOOL( + use_cpimport_for_cache_inserts, + PLUGIN_VAR_RQCMDARG, + "Use cpimport for the cache flush into ColumnStore", + NULL, + NULL, + 0 +); + static MYSQL_THDVAR_ULONGLONG( cache_flush_threshold, PLUGIN_VAR_RQCMDARG, @@ -388,6 +397,7 @@ st_mysql_sys_var* mcs_system_variables[] = MYSQL_SYSVAR(varbin_always_hex), MYSQL_SYSVAR(replication_slave), MYSQL_SYSVAR(cache_inserts), + MYSQL_SYSVAR(use_cpimport_for_cache_inserts), MYSQL_SYSVAR(cache_flush_threshold), NULL }; @@ -650,6 +660,15 @@ void set_cache_inserts(THD* thd, bool value) THDVAR(thd, cache_inserts) = value; } +bool get_use_cpimport_for_cache_inserts(THD* thd) +{ + return ( thd == NULL ) ? false : THDVAR(thd, use_cpimport_for_cache_inserts); +} +void set_use_cpimport_for_cache_inserts(THD* thd, bool value) +{ + THDVAR(thd, use_cpimport_for_cache_inserts) = value; +} + ulonglong get_cache_flush_threshold(THD* thd) { return ( thd == NULL ) ? 500000 : THDVAR(thd, cache_flush_threshold); diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index a1c9afe9f..e74585862 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -130,6 +130,9 @@ void set_replication_slave(THD* thd, bool value); bool get_cache_inserts(THD* thd); void set_cache_inserts(THD* thd, bool value); +bool get_use_cpimport_for_cache_inserts(THD* thd); +void set_use_cpimport_for_cache_inserts(THD* thd, bool value); + ulonglong get_cache_flush_threshold(THD* thd); void set_cache_flush_threshold(THD* thd, ulonglong value); diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index abd9c5600..0465672c2 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -620,9 +620,14 @@ void PackageHandler::run() logging::Message::Args args1; logging::Message msg(1); args1.add("Start SQL statement: "); - ostringstream oss; - oss << insertPkg.get_SQLStatement() << "; |" << insertPkg.get_SchemaName() << "|"; - args1.add(oss.str()); + + if (!insertPkg.get_isCacheInsert()) + { + ostringstream oss; + oss << insertPkg.get_SQLStatement() << "; |" << insertPkg.get_SchemaName() << "|"; + args1.add(oss.str()); + } + msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid);