From d4d0ebdf5dbec95c60c4004520e073b46e2186b4 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Sun, 10 May 2020 19:38:06 -0400 Subject: [PATCH 1/6] Improve batch inserts. 1) Instead of making dbrm calls to writeVBEntry() per block, we make these calls per batch. This can have non-trivial reductions in the overhead of these calls if the batch size is large. 2) In dmlproc, do not deserialize the whole insertpackage, which consists of the complete record set per column, which would be wasteful as we only need some metadata fields from insertpackage here. This is only done for batch inserts at the moment, this should also be applied to single inserts. --- dbcon/dmlpackage/dmltable.cpp | 22 +++++++++ dbcon/dmlpackage/dmltable.h | 14 ++++++ dbcon/dmlpackage/insertdmlpackage.cpp | 53 ++++++++++++++++++---- dbcon/dmlpackage/insertdmlpackage.h | 12 +++++ dmlproc/dmlprocessor.cpp | 10 +++-- versioning/BRM/brmtypes.h | 1 + versioning/BRM/dbrm.cpp | 36 +++++++++++++++ versioning/BRM/dbrm.h | 14 ++++++ versioning/BRM/slavecomm.cpp | 47 ++++++++++++++++++++ versioning/BRM/slavecomm.h | 1 + versioning/BRM/slavedbrmnode.cpp | 64 +++++++++++++++++++++++++++ versioning/BRM/slavedbrmnode.h | 14 ++++++ writeengine/shared/we_brm.cpp | 54 ++++++++++++---------- 13 files changed, 306 insertions(+), 36 deletions(-) diff --git a/dbcon/dmlpackage/dmltable.cpp b/dbcon/dmlpackage/dmltable.cpp index 0beedc10c..0d8fd5646 100644 --- a/dbcon/dmlpackage/dmltable.cpp +++ b/dbcon/dmlpackage/dmltable.cpp @@ -72,6 +72,28 @@ int DMLTable::read(messageqcpp::ByteStream& bytestream) return retval; } +void DMLTable::readMetaData(messageqcpp::ByteStream& bytestream) +{ + // read the table name + bytestream >> fName; + + // read the schema name + bytestream >> fSchema; +} + +void DMLTable::readRowData(messageqcpp::ByteStream& bytestream) +{ + messageqcpp::ByteStream::quadbyte rowNum; + bytestream >> rowNum; + + for (unsigned int i = 0; i < rowNum; i++) + { + Row* aRow = new Row(); + aRow->read(bytestream); + fRows.push_back(aRow); + } +} + int DMLTable::write(messageqcpp::ByteStream& bytestream) { int retval = 1; diff --git a/dbcon/dmlpackage/dmltable.h b/dbcon/dmlpackage/dmltable.h index 7ff0eca14..8847318f4 100644 --- a/dbcon/dmlpackage/dmltable.h +++ b/dbcon/dmlpackage/dmltable.h @@ -91,6 +91,20 @@ public: int read(messageqcpp::ByteStream& bytestream); + /** @brief read a DMLTable metadata from a ByteStream + * + * @param bytestream the ByteStream to read from + */ + void readMetaData(messageqcpp::ByteStream& bytestream); + + + /** @brief read a DMLTable row data from a ByteStream + * + * @param bytestream the ByteStream to read from + */ + void readRowData(messageqcpp::ByteStream& bytestream); + + /** @brief write a DMLTable to a ByteStream * * @param bytestream the ByteStream to write to diff --git a/dbcon/dmlpackage/insertdmlpackage.cpp b/dbcon/dmlpackage/insertdmlpackage.cpp index 8c00ed2fc..b46e58da3 100644 --- a/dbcon/dmlpackage/insertdmlpackage.cpp +++ b/dbcon/dmlpackage/insertdmlpackage.cpp @@ -66,16 +66,16 @@ int InsertDMLPackage::write(messageqcpp::ByteStream& bytestream) bytestream << (uint8_t)fLogging; bytestream << (uint8_t)fLogending; - if (fTable != 0) - { - retval = fTable->write(bytestream); - } - bytestream << fTableOid; bytestream << static_cast(fIsInsertSelect); bytestream << static_cast(fIsBatchInsert); bytestream << static_cast(fIsAutocommitOn); + if (fTable != 0) + { + retval = fTable->write(bytestream); + } + return retval; } @@ -100,15 +100,50 @@ int InsertDMLPackage::read(messageqcpp::ByteStream& bytestream) bytestream >> logending; fLogending = (logending != 0); + bytestream >> fTableOid; + bytestream >> reinterpret_cast(fIsInsertSelect); + bytestream >> reinterpret_cast(fIsBatchInsert); + bytestream >> reinterpret_cast(fIsAutocommitOn); + fTable = new DMLTable(); retval = fTable->read(bytestream); - bytestream >> fTableOid; - bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsInsertSelect); - bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsBatchInsert); - bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsAutocommitOn); return retval; } +void InsertDMLPackage::readMetaData(messageqcpp::ByteStream& bytestream) +{ + messageqcpp::ByteStream::quadbyte session_id; + bytestream >> session_id; + fSessionID = session_id; + bytestream >> fUuid; + + std::string dmlStatement; + bytestream >> fDMLStatement; + bytestream >> fSQLStatement; + bytestream >> fSchemaName; + bytestream >> fTimeZone; + uint8_t logging; + bytestream >> logging; + fLogging = (logging != 0); + uint8_t logending; + bytestream >> logending; + fLogending = (logending != 0); + + bytestream >> fTableOid; + bytestream >> reinterpret_cast(fIsInsertSelect); + bytestream >> reinterpret_cast(fIsBatchInsert); + bytestream >> reinterpret_cast(fIsAutocommitOn); + + fTable = new DMLTable(); + fTable->readMetaData(bytestream); +} + +// Has to be called after InsertDMLPackage::readMetaData() +void InsertDMLPackage::readRowData(messageqcpp::ByteStream& bytestream) +{ + fTable->readRowData(bytestream); +} + int InsertDMLPackage::buildFromBuffer(std::string& buffer, int columns, int rows) { #ifdef DML_PACKAGE_DEBUG diff --git a/dbcon/dmlpackage/insertdmlpackage.h b/dbcon/dmlpackage/insertdmlpackage.h index bd3d0245a..a2b76d8fd 100644 --- a/dbcon/dmlpackage/insertdmlpackage.h +++ b/dbcon/dmlpackage/insertdmlpackage.h @@ -73,6 +73,18 @@ public: */ EXPORT int read(messageqcpp::ByteStream& bytestream); + /** @brief read InsertDMLPackage metadata from bytestream + * + * @param bytestream the ByteStream to read from + */ + EXPORT void readMetaData(messageqcpp::ByteStream& bytestream); + + /** @brief read InsertDMLPackage row data from bytestream + * + * @param bytestream the ByteStream to read from + */ + EXPORT void readRowData(messageqcpp::ByteStream& bytestream); + /** @brief build a InsertDMLPackage from a string buffer * * @param buffer diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index ba13c4a21..112b23241 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -548,7 +548,7 @@ void PackageHandler::run() dmlpackage::InsertDMLPackage insertPkg; //boost::shared_ptr insertBs (new messageqcpp::ByteStream); messageqcpp::ByteStream bsSave = *(fByteStream.get()); - insertPkg.read(*(fByteStream.get())); + insertPkg.readMetaData(*(fByteStream.get())); #ifdef MCOL_140 if (fConcurrentSupport) @@ -584,8 +584,8 @@ void PackageHandler::run() //cout << "This is batch insert " << insertPkg->get_isBatchInsert() << endl; if (insertPkg.get_isBatchInsert()) { + fByteStream->reset(); //cout << "This is batch insert " << endl; - //boost::shared_ptr insertBs (new messageqcpp::ByteStream(fByteStream)); BatchInsertProc* batchProcessor = NULL; { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); @@ -900,7 +900,11 @@ void PackageHandler::run() } else // Single Insert { - //insertPkg.readTable(*(fByteStream.get())); + // make sure insertPkg.readMetaData() is called before + // this on fByteStream! + // TODO: Similar to batch inserts, don't + // deserialize the row data here for single inserts. + insertPkg.readRowData(*(fByteStream.get())); insertPkg.set_TxnID(fTxnid); fProcessor.reset(new dmlpackageprocessor::InsertPackageProcessor(fDbrm, insertPkg.get_SessionID())); result = fProcessor->processPackage(insertPkg); diff --git a/versioning/BRM/brmtypes.h b/versioning/BRM/brmtypes.h index 8de385c58..e0e3f64e7 100644 --- a/versioning/BRM/brmtypes.h +++ b/versioning/BRM/brmtypes.h @@ -507,6 +507,7 @@ const uint8_t RELEASE_LBID_RANGES = 91; /* More main BRM functions 100-110 */ const uint8_t BULK_UPDATE_DBROOT = 100; const uint8_t GET_SYSTEM_CATALOG = 101; +const uint8_t BULK_WRITE_VB_ENTRY = 102; /* Error codes returned by the DBRM functions. */ diff --git a/versioning/BRM/dbrm.cpp b/versioning/BRM/dbrm.cpp index 92b3f086d..dff639666 100644 --- a/versioning/BRM/dbrm.cpp +++ b/versioning/BRM/dbrm.cpp @@ -2226,6 +2226,42 @@ int DBRM::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, return err; } +int DBRM::bulkWriteVBEntry(VER_t transID, + const std::vector& lbids, + OID_t vbOID, + const std::vector& vbFBOs) DBRM_THROW +{ + +#ifdef BRM_INFO + + if (fDebug) + { + TRACER_WRITELATER("bulkWriteVBEntry"); + TRACER_WRITE; + } + +#endif + + ByteStream command, response; + uint8_t err; + + command << BULK_WRITE_VB_ENTRY << (uint32_t) transID; + serializeInlineVector(command, lbids); + command << (uint32_t) vbOID; + serializeInlineVector(command, vbFBOs); + err = send_recv(command, response); + + if (err != ERR_OK) + return err; + + if (response.length() != 1) + return ERR_NETWORK; + + response >> err; + CHECK_EMPTY(response); + return err; +} + struct _entry { _entry(LBID_t l) : lbid(l) { }; diff --git a/versioning/BRM/dbrm.h b/versioning/BRM/dbrm.h index 8427d9f24..1aac747f4 100644 --- a/versioning/BRM/dbrm.h +++ b/versioning/BRM/dbrm.h @@ -608,6 +608,20 @@ public: EXPORT int writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, uint32_t vbFBO) DBRM_THROW; + /** @brief Bulk registers a version buffer entry. + * + * Similar to writeVBEntry, but registers the version buffer + * entries in bulk for a list of lbids and vbFBOs, for a given + * transID and vbOID. + * @note The version buffer locations must hold the 'copy' lock + * first. + * @return 0 on success, non-0 on error (see brmtypes.h) + */ + EXPORT int bulkWriteVBEntry(VER_t transID, + const std::vector& lbids, + OID_t vbOID, + const std::vector& vbFBOs) DBRM_THROW; + /** @brief Retrieves a list of uncommitted LBIDs. * * Retrieves a list of uncommitted LBIDs for the given transaction ID. diff --git a/versioning/BRM/slavecomm.cpp b/versioning/BRM/slavecomm.cpp index 097cf3959..f43a8c351 100644 --- a/versioning/BRM/slavecomm.cpp +++ b/versioning/BRM/slavecomm.cpp @@ -375,6 +375,10 @@ void SlaveComm::processCommand(ByteStream& msg) do_writeVBEntry(msg); break; + case BULK_WRITE_VB_ENTRY: + do_bulkWriteVBEntry(msg); + break; + case BEGIN_VB_COPY: do_beginVBCopy(msg); break; @@ -1758,6 +1762,49 @@ void SlaveComm::do_writeVBEntry(ByteStream& msg) doSaveDelta = true; } +void SlaveComm::do_bulkWriteVBEntry(ByteStream& msg) +{ + VER_t transID; + std::vector lbids; + OID_t vbOID; + std::vector vbFBOs; + uint32_t tmp; + int err; + ByteStream reply; + +#ifdef BRM_VERBOSE + cerr << "WorkerComm: do_bulkWriteVBEntry()" << endl; +#endif + + msg >> tmp; + transID = tmp; + deserializeInlineVector(msg, lbids); + msg >> tmp; + vbOID = tmp; + deserializeInlineVector(msg, vbFBOs); + + if (printOnly) + { + cout << "bulkWriteVBEntry: transID=" << transID << endl; + + for (size_t i = 0; i < lbids.size(); i++) + cout << "bulkWriteVBEntry arg " << i + 1 << ": lbid=" << lbids[i] << " vbOID=" << + vbOID << " vbFBO=" << vbFBOs[i] << endl; + return; + } + + err = slave->bulkWriteVBEntry(transID, lbids, vbOID, vbFBOs); + reply << (uint8_t) err; +#ifdef BRM_VERBOSE + cerr << "WorkerComm: do_bulkWriteVBEntry() err code is " << err << endl; +#endif + + if (!standalone) + master.write(reply); + + doSaveDelta = true; +} + void SlaveComm::do_beginVBCopy(ByteStream& msg) { VER_t transID; diff --git a/versioning/BRM/slavecomm.h b/versioning/BRM/slavecomm.h index f967a2239..a7b7b44f3 100644 --- a/versioning/BRM/slavecomm.h +++ b/versioning/BRM/slavecomm.h @@ -91,6 +91,7 @@ private: void do_bulkSetHWM(messageqcpp::ByteStream& msg); void do_bulkSetHWMAndCP(messageqcpp::ByteStream& msg); void do_writeVBEntry(messageqcpp::ByteStream& msg); + void do_bulkWriteVBEntry(messageqcpp::ByteStream& msg); void do_beginVBCopy(messageqcpp::ByteStream& msg); void do_endVBCopy(messageqcpp::ByteStream& msg); void do_vbRollback1(messageqcpp::ByteStream& msg); diff --git a/versioning/BRM/slavedbrmnode.cpp b/versioning/BRM/slavedbrmnode.cpp index cbee7e5cf..36d4b0cb7 100644 --- a/versioning/BRM/slavedbrmnode.cpp +++ b/versioning/BRM/slavedbrmnode.cpp @@ -523,6 +523,70 @@ int SlaveDBRMNode::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, return 0; } +int SlaveDBRMNode::bulkWriteVBEntry(VER_t transID, + const std::vector& lbids, + OID_t vbOID, + const std::vector& vbFBOs) throw() +{ + VER_t oldVerID; + + /* + LBIDRange r; + r.start = lbid; + r.size = 1; + if (!copylocks.isLocked(r)) + cout << "Copylock error: lbid " << lbid << " isn't locked\n"; + */ + + try + { + vbbm.lock(VBBM::WRITE); + locked[0] = true; + vss.lock(VSS::WRITE); + locked[1] = true; + + for (size_t i = 0; i < lbids.size(); i++) + { + // figure out the current version of the block + // NOTE! This will currently error out to preserve the assumption that + // larger version numbers imply more recent changes. If we ever change that + // assumption, we'll need to revise the vbRollback() fcns as well. + oldVerID = vss.getCurrentVersion(lbids[i], NULL); + + if (oldVerID == transID) + continue; + else if (oldVerID > transID) + { + ostringstream str; + + str << "WorkerDBRMNode::bulkWriteVBEntry(): Overlapping transactions detected. " + "Transaction " << transID << " cannot overwrite blocks written by " + "transaction " << oldVerID; + log(str.str()); + return ERR_OLDTXN_OVERWRITING_NEWTXN; + } + + vbbm.insert(lbids[i], oldVerID, vbOID, vbFBOs[i]); + + if (oldVerID > 0) + vss.setVBFlag(lbids[i], oldVerID, true); + else + vss.insert(lbids[i], oldVerID, true, false); + + // XXXPAT: There's a problem if we use transID as the new version here. + // Need to use at least oldVerID + 1. OldverID can be > TransID + vss.insert(lbids[i], transID, false, true); + } + } + catch (exception& e) + { + cerr << e.what() << endl; + return -1; + } + + return 0; +} + int SlaveDBRMNode::beginVBCopy(VER_t transID, uint16_t vbOID, const LBIDRange_v& ranges, VBRange_v& freeList, bool flushPMCache) throw() { diff --git a/versioning/BRM/slavedbrmnode.h b/versioning/BRM/slavedbrmnode.h index f46dc3cb9..482df198f 100644 --- a/versioning/BRM/slavedbrmnode.h +++ b/versioning/BRM/slavedbrmnode.h @@ -364,6 +364,20 @@ public: EXPORT int writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, uint32_t vbFBO) throw(); + /** @brief Bulk registers a version buffer entry. + * + * Similar to writeVBEntry, but registers the version buffer + * entries in bulk for a list of lbids and vbFBOs, for a given + * transID and vbOID. + * @note The version buffer locations must hold the 'copy' lock + * first. + * @return 0 on success, -1 on error + */ + EXPORT int bulkWriteVBEntry(VER_t transID, + const std::vector& lbids, + OID_t vbOID, + const std::vector& vbFBOs) throw(); + /** @brief Atomically prepare to copy data to the version buffer * * Atomically sets the copy flag on the specified LBID ranges diff --git a/writeengine/shared/we_brm.cpp b/writeengine/shared/we_brm.cpp index 5b4f9339b..c1c6bcafb 100644 --- a/writeengine/shared/we_brm.cpp +++ b/writeengine/shared/we_brm.cpp @@ -1737,38 +1737,44 @@ int BRMWrapper::writeVB(IDBDataFile* pSourceFile, const VER_t transID, const OID if (rc != NO_ERROR) goto cleanup; - for (; processedBlocks < (k + rangeListCount); processedBlocks++) + std::vector lbids(k); + std::vector vbFBOs(k); + size_t idx = 0; + + for (; processedBlocks < (k + rangeListCount); processedBlocks++, idx++) { - rc = blockRsltnMgrPtr->writeVBEntry(transID, rangeList[processedBlocks].start, - freeList[i].vbOID, freeList[i].vbFBO + (processedBlocks - rangeListCount)); + lbids[idx] = rangeList[processedBlocks].start; + vbFBOs[idx] = freeList[i].vbFBO + (processedBlocks - rangeListCount); + } - //cout << (uint64_t)rangeList[processedBlocks].start << endl; - if (rc != NO_ERROR) + rc = blockRsltnMgrPtr->bulkWriteVBEntry(transID, lbids, freeList[i].vbOID, + vbFBOs); + + if (rc != NO_ERROR) + { + switch (rc) { - switch (rc) - { - case ERR_DEADLOCK: - rc = ERR_BRM_DEAD_LOCK; - break; + case ERR_DEADLOCK: + rc = ERR_BRM_DEAD_LOCK; + break; - case ERR_VBBM_OVERFLOW: - rc = ERR_BRM_VB_OVERFLOW; - break; + case ERR_VBBM_OVERFLOW: + rc = ERR_BRM_VB_OVERFLOW; + break; - case ERR_NETWORK: - rc = ERR_BRM_NETWORK; - break; + case ERR_NETWORK: + rc = ERR_BRM_NETWORK; + break; - case ERR_READONLY: - rc = ERR_BRM_READONLY; - break; + case ERR_READONLY: + rc = ERR_BRM_READONLY; + break; - default: - rc = ERR_BRM_WR_VB_ENTRY; - } - - goto cleanup; + default: + rc = ERR_BRM_WR_VB_ENTRY; } + + goto cleanup; } } } From 01ff2652a6b3e4a64ca26bba40fcbacad35aa7b9 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Fri, 29 May 2020 22:30:34 -0400 Subject: [PATCH 2/6] MCOL-4023 Pushdown WHERE conditions for UPDATE/DELETE. For certain queries, such as: update cs1 set i = 41 where i = 42 or (i is null and 42 is null); the SELECT_LEX.where does not contain the required where conditions. Server sends the where conditions in the call to cond_push(), so we are storing them in a handler data member, condStack, and later push them down to getSelectPlan() for UPDATES/DELETEs. --- dbcon/mysql/ha_mcs.cpp | 27 +++++++++++++--- dbcon/mysql/ha_mcs.h | 7 +++++ dbcon/mysql/ha_mcs_execplan.cpp | 56 ++++++++++++++++++++++++++++++--- dbcon/mysql/ha_mcs_impl.cpp | 22 +++++++------ dbcon/mysql/ha_mcs_impl.h | 6 ++-- dbcon/mysql/ha_mcs_impl_if.h | 2 +- 6 files changed, 98 insertions(+), 22 deletions(-) diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 5c2c3560c..1c771063d 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -448,7 +448,7 @@ int ha_mcs::direct_update_rows(ha_rows *update_rows) int rc; try { - rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows); + rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows, condStack); } catch (std::runtime_error& e) { @@ -464,7 +464,7 @@ int ha_mcs::direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) int rc; try { - rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows); + rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows, condStack); *found_rows = *update_rows; } catch (std::runtime_error& e) @@ -487,7 +487,7 @@ int ha_mcs::direct_delete_rows(ha_rows *deleted_rows) int rc; try { - rc = ha_mcs_impl_direct_update_delete_rows(true, deleted_rows); + rc = ha_mcs_impl_direct_update_delete_rows(true, deleted_rows, condStack); } catch (std::runtime_error& e) { @@ -629,7 +629,7 @@ int ha_mcs::rnd_init(bool scan) { try { - rc = ha_mcs_impl_rnd_init(table); + rc = ha_mcs_impl_rnd_init(table, condStack); } catch (std::runtime_error& e) { @@ -1110,7 +1110,7 @@ const COND* ha_mcs::cond_push(const COND* cond) COND* ret_cond = NULL; try { - ret_cond = ha_mcs_impl_cond_push(const_cast(cond), table); + ret_cond = ha_mcs_impl_cond_push(const_cast(cond), table, condStack); } catch (std::runtime_error& e) { @@ -1119,6 +1119,23 @@ const COND* ha_mcs::cond_push(const COND* cond) DBUG_RETURN(ret_cond); } +void ha_mcs::cond_pop() +{ + DBUG_ENTER("ha_mcs::cond_pop"); + + THD* thd = current_thd; + + if ((((thd->lex)->sql_command == SQLCOM_UPDATE) || + ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) || + ((thd->lex)->sql_command == SQLCOM_DELETE) || + ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI)) && + !condStack.empty()) + { + condStack.pop_back(); + } + + DBUG_VOID_RETURN; +} struct st_mysql_storage_engine columnstore_storage_engine = { MYSQL_HANDLERTON_INTERFACE_VERSION }; diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index 8baf31e7e..a82ea07c5 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -18,6 +18,7 @@ MA 02110-1301, USA. */ #ifndef HA_MCS_H__ #define HA_MCS_H__ + #include #include "idb_mysql.h" #include "ha_mcs_sysvars.h" @@ -44,6 +45,11 @@ class ha_mcs: public handler THR_LOCK_DATA lock; ///< MySQL lock COLUMNSTORE_SHARE* share; ///< Shared lock info ulonglong int_table_flags; + // We are using a vector here to mimick the stack functionality + // using push_back() and pop_back() + // as apparently there is a linker error on the std::stack::pop() + // call on Ubuntu18. + std::vector condStack; public: ha_mcs(handlerton* hton, TABLE_SHARE* table_arg); @@ -222,6 +228,7 @@ public: THR_LOCK_DATA** store_lock(THD* thd, THR_LOCK_DATA** to, enum thr_lock_type lock_type); ///< required const COND* cond_push(const COND* cond); + void cond_pop() override; uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 21ec8eddb..991cb556f 100755 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -6333,10 +6333,12 @@ int processFrom(bool &isUnion, int processWhere(SELECT_LEX &select_lex, gp_walk_info &gwi, SCSEP &csep, - List &on_expr_list) + List &on_expr_list, + const std::vector& condStack) { JOIN* join = select_lex.join; Item_cond* icp = 0; + bool isUpdateDelete = false; if (join != 0) icp = reinterpret_cast(join->conds); @@ -6354,7 +6356,7 @@ int processWhere(SELECT_LEX &select_lex, ((gwi.thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) || ((gwi.thd->lex)->sql_command == SQLCOM_DELETE_MULTI ))) { - icp = reinterpret_cast(select_lex.where); + isUpdateDelete = true; } if (icp) @@ -6393,6 +6395,51 @@ int processWhere(SELECT_LEX &select_lex, return ER_INTERNAL_ERROR; } } + else if (isUpdateDelete) + { + // MCOL-4023 For updates/deletes, we iterate over the pushed down condStack + if (!condStack.empty()) + { + std::vector::const_iterator condStackIter = condStack.begin(); + + while (condStackIter != condStack.end()) + { + COND* cond = *condStackIter++; + + cond->traverse_cond(gp_walk, &gwi, Item::POSTFIX); + + if (gwi.fatalParseError) + { + if (gwi.thd->derived_tables_processing) + { + gwi.cs_vtable_is_update_with_derive = true; + return -1; + } + + setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi); + return ER_INTERNAL_ERROR; + } + } + } + // if condStack is empty(), check the select_lex for where conditions + // as a last resort + else if ((icp = reinterpret_cast(select_lex.where)) != 0) + { + icp->traverse_cond(gp_walk, &gwi, Item::POSTFIX); + + if (gwi.fatalParseError) + { + if (gwi.thd->derived_tables_processing) + { + gwi.cs_vtable_is_update_with_derive = true; + return -1; + } + + setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi); + return ER_INTERNAL_ERROR; + } + } + } else if (join && join->zero_result_cause) { gwi.rcWorkStack.push(new ConstantColumn((int64_t)0, ConstantColumn::NUM)); @@ -6688,7 +6735,8 @@ int processLimitAndOffset( int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool isUnion, - bool isSelectHandlerTop) + bool isSelectHandlerTop, + const std::vector& condStack) { #ifdef DEBUG_WALK_COND cerr << "getSelectPlan()" << endl; @@ -6724,7 +6772,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, bool unionSel = (!isUnion && select_lex.master_unit()->is_unit_op()) ? true : false; gwi.clauseType = WHERE; - if ((rc = processWhere(select_lex, gwi, csep, on_expr_list))) + if ((rc = processWhere(select_lex, gwi, csep, on_expr_list, condStack))) { return rc; } diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index c920c8657..c6838b025 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -1194,7 +1194,7 @@ vector getOnUpdateTimestampColumns(string& schema, string& tableName, in return returnVal; } -uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) +uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& condStack) { if (get_fe_conn_info_ptr() == nullptr) set_fe_conn_info_ptr((void*)new cal_connection_info()); @@ -1780,7 +1780,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) gwi.clauseType = WHERE; - if (getSelectPlan(gwi, select_lex, updateCP, false) != 0) //@Bug 3030 Modify the error message for unsupported functions + if (getSelectPlan(gwi, select_lex, updateCP, false, false, condStack) != 0) //@Bug 3030 Modify the error message for unsupported functions { if (gwi.cs_vtable_is_update_with_derive) { @@ -2284,7 +2284,7 @@ int ha_mcs_impl_discover_existence(const char* schema, const char* name) return 0; } -int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows) +int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, const std::vector& condStack) { THD* thd = current_thd; int rc = 0; @@ -2308,7 +2308,7 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows) if (execute) { - rc = doUpdateDelete(thd, gwi); + rc = doUpdateDelete(thd, gwi, condStack); } cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); @@ -2320,7 +2320,7 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows) return rc; } -int ha_mcs_impl_rnd_init(TABLE* table) +int ha_mcs_impl_rnd_init(TABLE* table, const std::vector& condStack) { IDEBUG( cout << "rnd_init for table " << table->s->table_name.str << endl ); THD* thd = current_thd; @@ -2384,7 +2384,7 @@ int ha_mcs_impl_rnd_init(TABLE* table) //Update and delete code if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) - return doUpdateDelete(thd, gwi); + return doUpdateDelete(thd, gwi, condStack); uint32_t sessionID = tid2sid(thd->thread_id); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); @@ -3985,7 +3985,7 @@ int ha_mcs_impl_delete_row(const uchar* buf) return 0; } -COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table) +COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condStack) { THD* thd = current_thd; @@ -3993,7 +3993,10 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table) ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI)) - return cond; + { + condStack.push_back(cond); + return nullptr; + } string alias; alias.assign(table->alias.ptr(), table->alias.length()); @@ -4959,9 +4962,10 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) thd->lex->sql_command == SQLCOM_LOAD)) return 0; + // MCOL-4023 We need to test this code path. //Update and delete code if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) - return doUpdateDelete(thd, gwi); + return doUpdateDelete(thd, gwi, std::vector()); uint32_t sessionID = tid2sid(thd->thread_id); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); diff --git a/dbcon/mysql/ha_mcs_impl.h b/dbcon/mysql/ha_mcs_impl.h index 87ce22ae7..9024130ba 100644 --- a/dbcon/mysql/ha_mcs_impl.h +++ b/dbcon/mysql/ha_mcs_impl.h @@ -29,7 +29,7 @@ extern int ha_mcs_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO extern int ha_mcs_impl_delete_table(const char* name); extern int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked); extern int ha_mcs_impl_close(void); -extern int ha_mcs_impl_rnd_init(TABLE* table); +extern int ha_mcs_impl_rnd_init(TABLE* table, const std::vector& condStack); extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table); extern int ha_mcs_impl_rnd_end(TABLE* table, bool is_derived_hand = false); extern int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed); @@ -39,10 +39,10 @@ extern int ha_mcs_impl_rename_table(const char* from, const char* to); extern int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all); extern int ha_mcs_impl_rollback (handlerton* hton, THD* thd, bool all); extern int ha_mcs_impl_close_connection (handlerton* hton, THD* thd); -extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table); +extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector&); extern int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type); extern int ha_mcs_impl_update_row(); -extern int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows); +extern int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, const std::vector& condStack); extern int ha_mcs_impl_delete_row(); extern int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos); extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table); diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 9975605b7..3d2ae4cfe 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -342,7 +342,7 @@ int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_in int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_info& gi); int cs_get_derived_plan(derived_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi); int cs_get_select_plan(select_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi); -int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false, bool isSelectHandlerTop = false); +int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false, bool isSelectHandlerTop = false, const std::vector& condStack = std::vector()); int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi, bool isUnion = false); void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi); void setError(THD* thd, uint32_t errcode, const std::string errmsg); From 2acc03242b2e821a341361b71afcf5c11283956e Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Mon, 1 Jun 2020 17:10:14 -0400 Subject: [PATCH 3/6] Bring in the definition of MESSAGE_ONCE to get the engine to build standalone. --- CMakeLists.txt | 3 +++ cmake/misc.cmake | 14 ++++++++++++++ 2 files changed, 17 insertions(+) create mode 100644 cmake/misc.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt index cb65a11d3..de8b6405a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8.12) +PROJECT(Columnstore) + IF(NOT INSTALL_LAYOUT) IF(NOT CMAKE_BUILD_TYPE) SET(CMAKE_BUILD_TYPE RELWITHDEBINFO CACHE STRING @@ -91,6 +93,7 @@ LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/cmake) SET (ENGINE_SRC_DIR ${CMAKE_CURRENT_SOURCE_DIR}) INCLUDE(columnstore_version) +INCLUDE(misc) OPTION(USE_CCACHE "reduce compile time with ccache." FALSE) if(NOT USE_CCACHE) diff --git a/cmake/misc.cmake b/cmake/misc.cmake new file mode 100644 index 000000000..027b49561 --- /dev/null +++ b/cmake/misc.cmake @@ -0,0 +1,14 @@ +IF ("${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION}.${CMAKE_PATCH_VERSION}" VERSION_LESS "2.8.7") + FUNCTION(MESSAGE_ONCE id out) + MESSAGE(STATUS "${out}") + ENDFUNCTION() +ELSE() + FUNCTION(MESSAGE_ONCE id out) + STRING(MD5 hash "${out}") + IF(NOT __msg1_${id} STREQUAL "${hash}") + MESSAGE(STATUS "${out}") + ENDIF() + SET(__msg1_${id} ${hash} CACHE INTERNAL "") + ENDFUNCTION() +ENDIF() + From 0395779d52d26706e72bff9b384f429080f61fdd Mon Sep 17 00:00:00 2001 From: mariadb-RomanNavrotskiy Date: Mon, 1 Jun 2020 23:59:04 +0200 Subject: [PATCH 4/6] ci: fetch engine submodules --- .drone.jsonnet | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.drone.jsonnet b/.drone.jsonnet index ec4ebacd3..fe5b8f6c2 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -77,9 +77,8 @@ local Pipeline(branch, platform, event) = { name: 'submodules', image: 'alpine/git', commands: [ - 'git submodule update --recursive --remote', + 'git submodule update --init --recursive --remote', 'git config cmake.update-submodules no', - 'ls -la /drone/src/storage-manager', ], }, { From 5a275e6483ed404e281d751672d4a6df562caa52 Mon Sep 17 00:00:00 2001 From: Sergei Golubchik Date: Tue, 2 Jun 2020 14:39:04 +0200 Subject: [PATCH 5/6] don't abort the build if no libcurl --- CMakeLists.txt | 7 +++++++ utils/libmarias3/CMakeLists.txt | 1 - 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index de8b6405a..2b2173ad8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -154,6 +154,13 @@ if (NOT SNAPPY_FOUND) return() endif() +FIND_PACKAGE(CURL) +if (NOT CURL_FOUND) + MESSAGE_ONCE(CS_NO_CURL "libcurl development headers not found") + return() +endif() + + FIND_PROGRAM(AWK_EXECUTABLE awk DOC "path to the awk executable") if(NOT AWK_EXECUTABLE) MESSAGE_ONCE(CS_NO_AWK "awk not found!") diff --git a/utils/libmarias3/CMakeLists.txt b/utils/libmarias3/CMakeLists.txt index 5de4533a5..71be94bab 100644 --- a/utils/libmarias3/CMakeLists.txt +++ b/utils/libmarias3/CMakeLists.txt @@ -11,7 +11,6 @@ SET(S3_SOURCES ${S3API_DIR}/src/debug.c ADD_LIBRARY(marias3 SHARED ${S3_SOURCES}) -FIND_PACKAGE(CURL REQUIRED) TARGET_LINK_LIBRARIES(marias3 curl) INCLUDE_DIRECTORIES(${S3API_DIR}) From dd6694e0399b116f822465ed1b1d0a7ff05d624a Mon Sep 17 00:00:00 2001 From: benthompson15 Date: Tue, 2 Jun 2020 11:14:24 -0500 Subject: [PATCH 6/6] fix out-of-source builds --- .gitignore | 1 - utils/libmarias3/CMakeLists.txt | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 02fe345cb..05ad703b0 100644 --- a/.gitignore +++ b/.gitignore @@ -25,7 +25,6 @@ VERSION.dep cmake_install.cmake install_manifest.txt CTestTestfile.cmake -config.h config.status stamp-h1 export/ diff --git a/utils/libmarias3/CMakeLists.txt b/utils/libmarias3/CMakeLists.txt index 71be94bab..31caae340 100644 --- a/utils/libmarias3/CMakeLists.txt +++ b/utils/libmarias3/CMakeLists.txt @@ -12,7 +12,7 @@ SET(S3_SOURCES ${S3API_DIR}/src/debug.c ADD_LIBRARY(marias3 SHARED ${S3_SOURCES}) TARGET_LINK_LIBRARIES(marias3 curl) -INCLUDE_DIRECTORIES(${S3API_DIR}) +INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR} ${S3API_DIR}) set(S3API_DEPS marias3 curl CACHE INTERNAL "S3API_DEPS")