diff --git a/dbcon/dmlpackage/calpontdmlpackage.cpp b/dbcon/dmlpackage/calpontdmlpackage.cpp index 1916016ea..51554362c 100644 --- a/dbcon/dmlpackage/calpontdmlpackage.cpp +++ b/dbcon/dmlpackage/calpontdmlpackage.cpp @@ -31,7 +31,9 @@ namespace dmlpackage */ CalpontDMLPackage::CalpontDMLPackage() - : fPlan(new messageqcpp::ByteStream()), fTable(0), fHasFilter(0), fLogging(true), fIsInsertSelect(false), fIsBatchInsert(false), fIsAutocommitOn(false), fTableOid(0) + : fPlan(new messageqcpp::ByteStream()), + fTable(0), fHasFilter(0), fLogging(true), fIsInsertSelect(false), + fIsBatchInsert(false), fIsCacheInsert(false), fIsAutocommitOn(false), fIsWarnToError(false), fTableOid(0) { } @@ -40,7 +42,7 @@ CalpontDMLPackage::CalpontDMLPackage( std::string schemaName, std::string tableN std::string dmlStatement, int sessionID ) : fSchemaName(schemaName), fTableName( tableName ), fDMLStatement( dmlStatement ), fSessionID(sessionID), fPlan(new messageqcpp::ByteStream()), fTable(0), fHasFilter(false), fLogging(true), fIsInsertSelect(false), - fIsBatchInsert(false), fIsAutocommitOn(false), fIsWarnToError(false), fTableOid(0) + fIsBatchInsert(false), fIsCacheInsert(false), fIsAutocommitOn(false), fIsWarnToError(false), fTableOid(0) { } diff --git a/dbcon/dmlpackage/calpontdmlpackage.h b/dbcon/dmlpackage/calpontdmlpackage.h index 34f67c947..af25e2524 100644 --- a/dbcon/dmlpackage/calpontdmlpackage.h +++ b/dbcon/dmlpackage/calpontdmlpackage.h @@ -320,6 +320,15 @@ public: fIsBatchInsert = isBatchInsert; } + bool get_isCacheInsert() + { + return fIsCacheInsert; + } + void set_isCacheInsert( const bool isCacheInsert ) + { + fIsCacheInsert = isCacheInsert; + } + bool get_isAutocommitOn() { return fIsAutocommitOn; @@ -378,6 +387,7 @@ protected: std::string StripLeadingWhitespace( std::string value ); bool fIsInsertSelect; bool fIsBatchInsert; + bool fIsCacheInsert; bool fIsAutocommitOn; bool fIsWarnToError; uint32_t fTableOid; diff --git a/dbcon/dmlpackage/insertdmlpackage.cpp b/dbcon/dmlpackage/insertdmlpackage.cpp index b91848e29..bdc81a454 100644 --- a/dbcon/dmlpackage/insertdmlpackage.cpp +++ b/dbcon/dmlpackage/insertdmlpackage.cpp @@ -69,6 +69,7 @@ int InsertDMLPackage::write(messageqcpp::ByteStream& bytestream) bytestream << fTableOid; bytestream << static_cast(fIsInsertSelect); bytestream << static_cast(fIsBatchInsert); + bytestream << static_cast(fIsCacheInsert); bytestream << static_cast(fIsAutocommitOn); if (fTable != 0) @@ -103,6 +104,7 @@ int InsertDMLPackage::read(messageqcpp::ByteStream& bytestream) bytestream >> fTableOid; bytestream >> reinterpret_cast(fIsInsertSelect); bytestream >> reinterpret_cast(fIsBatchInsert); + bytestream >> reinterpret_cast(fIsCacheInsert); bytestream >> reinterpret_cast(fIsAutocommitOn); fTable = new DMLTable(); @@ -132,6 +134,7 @@ void InsertDMLPackage::readMetaData(messageqcpp::ByteStream& bytestream) bytestream >> fTableOid; bytestream >> reinterpret_cast(fIsInsertSelect); bytestream >> reinterpret_cast(fIsBatchInsert); + bytestream >> reinterpret_cast(fIsCacheInsert); bytestream >> reinterpret_cast(fIsAutocommitOn); fTable = new DMLTable(); diff --git a/dbcon/mysql/ha_mcs_dml.cpp b/dbcon/mysql/ha_mcs_dml.cpp index 742488592..fafdf3f79 100644 --- a/dbcon/mysql/ha_mcs_dml.cpp +++ b/dbcon/mysql/ha_mcs_dml.cpp @@ -284,6 +284,8 @@ int doProcessInsertValues ( TABLE* table, uint32_t size, cal_connection_info& ci ci.tableValuesMap.clear(); ci.colNameList.clear(); + pDMLPackage->set_isCacheInsert(ci.isCacheInsert); + if (!pDMLPackage) { rc = -1; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 4dff702ab..6acc214ea 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -3040,9 +3040,13 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0)) ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::OFF; - // For now, disable cpimport for cache inserts if (ci->isCacheInsert) - ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::OFF; + { + if (get_use_cpimport_for_cache_inserts(thd)) + ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::ALWAYS; + else + ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::OFF; + } // ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::ALWAYS means ALWAYS use // cpimport, whether it's in a transaction or not. User should use this option diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index 234fe69ae..55e4b14d7 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -348,6 +348,15 @@ static MYSQL_THDVAR_BOOL( 0 ); +static MYSQL_THDVAR_BOOL( + use_cpimport_for_cache_inserts, + PLUGIN_VAR_RQCMDARG, + "Use cpimport for the cache flush into ColumnStore", + NULL, + NULL, + 0 +); + static MYSQL_THDVAR_ULONGLONG( cache_flush_threshold, PLUGIN_VAR_RQCMDARG, @@ -388,6 +397,7 @@ st_mysql_sys_var* mcs_system_variables[] = MYSQL_SYSVAR(varbin_always_hex), MYSQL_SYSVAR(replication_slave), MYSQL_SYSVAR(cache_inserts), + MYSQL_SYSVAR(use_cpimport_for_cache_inserts), MYSQL_SYSVAR(cache_flush_threshold), NULL }; @@ -650,6 +660,15 @@ void set_cache_inserts(THD* thd, bool value) THDVAR(thd, cache_inserts) = value; } +bool get_use_cpimport_for_cache_inserts(THD* thd) +{ + return ( thd == NULL ) ? false : THDVAR(thd, use_cpimport_for_cache_inserts); +} +void set_use_cpimport_for_cache_inserts(THD* thd, bool value) +{ + THDVAR(thd, use_cpimport_for_cache_inserts) = value; +} + ulonglong get_cache_flush_threshold(THD* thd) { return ( thd == NULL ) ? 500000 : THDVAR(thd, cache_flush_threshold); diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index a1c9afe9f..e74585862 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -130,6 +130,9 @@ void set_replication_slave(THD* thd, bool value); bool get_cache_inserts(THD* thd); void set_cache_inserts(THD* thd, bool value); +bool get_use_cpimport_for_cache_inserts(THD* thd); +void set_use_cpimport_for_cache_inserts(THD* thd, bool value); + ulonglong get_cache_flush_threshold(THD* thd); void set_cache_flush_threshold(THD* thd, ulonglong value); diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index abd9c5600..0465672c2 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -620,9 +620,14 @@ void PackageHandler::run() logging::Message::Args args1; logging::Message msg(1); args1.add("Start SQL statement: "); - ostringstream oss; - oss << insertPkg.get_SQLStatement() << "; |" << insertPkg.get_SchemaName() << "|"; - args1.add(oss.str()); + + if (!insertPkg.get_isCacheInsert()) + { + ostringstream oss; + oss << insertPkg.get_SQLStatement() << "; |" << insertPkg.get_SchemaName() << "|"; + args1.add(oss.str()); + } + msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid);