diff --git a/.drone.jsonnet b/.drone.jsonnet index ec4ebacd3..fe5b8f6c2 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -77,9 +77,8 @@ local Pipeline(branch, platform, event) = { name: 'submodules', image: 'alpine/git', commands: [ - 'git submodule update --recursive --remote', + 'git submodule update --init --recursive --remote', 'git config cmake.update-submodules no', - 'ls -la /drone/src/storage-manager', ], }, { diff --git a/.gitignore b/.gitignore index 02fe345cb..05ad703b0 100644 --- a/.gitignore +++ b/.gitignore @@ -25,7 +25,6 @@ VERSION.dep cmake_install.cmake install_manifest.txt CTestTestfile.cmake -config.h config.status stamp-h1 export/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 0d76a4e93..0f31249bd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,22 +1,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8.12) -IF (NOT COMMAND MESSAGE_ONCE) - # lifted from the server's definition of message_once - IF ("${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION}.${CMAKE_PATCH_VERSION}" VERSION_LESS "2.8.7") - FUNCTION(MESSAGE_ONCE id out) - MESSAGE(STATUS "${out}") - ENDFUNCTION() - ELSE() - FUNCTION(MESSAGE_ONCE id out) - STRING(MD5 hash "${out}") - IF(NOT __msg1_${id} STREQUAL "${hash}") - MESSAGE(STATUS "${out}") - ENDIF() - SET(__msg1_${id} ${hash} CACHE INTERNAL "") - ENDFUNCTION() - ENDIF() -ENDIF() +PROJECT(Columnstore) IF(NOT INSTALL_LAYOUT) IF(NOT CMAKE_BUILD_TYPE) @@ -108,6 +93,7 @@ LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/cmake) SET (ENGINE_SRC_DIR ${CMAKE_CURRENT_SOURCE_DIR}) INCLUDE(columnstore_version) +INCLUDE(misc) OPTION(USE_CCACHE "reduce compile time with ccache." FALSE) if(NOT USE_CCACHE) @@ -168,6 +154,13 @@ if (NOT SNAPPY_FOUND) return() endif() +FIND_PACKAGE(CURL) +if (NOT CURL_FOUND) + MESSAGE_ONCE(CS_NO_CURL "libcurl development headers not found") + return() +endif() + + FIND_PROGRAM(AWK_EXECUTABLE awk DOC "path to the awk executable") if(NOT AWK_EXECUTABLE) MESSAGE_ONCE(CS_NO_AWK "awk not found!") diff --git a/cmake/misc.cmake b/cmake/misc.cmake new file mode 100644 index 000000000..027b49561 --- /dev/null +++ b/cmake/misc.cmake @@ -0,0 +1,14 @@ +IF ("${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION}.${CMAKE_PATCH_VERSION}" VERSION_LESS "2.8.7") + FUNCTION(MESSAGE_ONCE id out) + MESSAGE(STATUS "${out}") + ENDFUNCTION() +ELSE() + FUNCTION(MESSAGE_ONCE id out) + STRING(MD5 hash "${out}") + IF(NOT __msg1_${id} STREQUAL "${hash}") + MESSAGE(STATUS "${out}") + ENDIF() + SET(__msg1_${id} ${hash} CACHE INTERNAL "") + ENDFUNCTION() +ENDIF() + diff --git a/dbcon/dmlpackage/dmltable.cpp b/dbcon/dmlpackage/dmltable.cpp index 0beedc10c..0d8fd5646 100644 --- a/dbcon/dmlpackage/dmltable.cpp +++ b/dbcon/dmlpackage/dmltable.cpp @@ -72,6 +72,28 @@ int DMLTable::read(messageqcpp::ByteStream& bytestream) return retval; } +void DMLTable::readMetaData(messageqcpp::ByteStream& bytestream) +{ + // read the table name + bytestream >> fName; + + // read the schema name + bytestream >> fSchema; +} + +void DMLTable::readRowData(messageqcpp::ByteStream& bytestream) +{ + messageqcpp::ByteStream::quadbyte rowNum; + bytestream >> rowNum; + + for (unsigned int i = 0; i < rowNum; i++) + { + Row* aRow = new Row(); + aRow->read(bytestream); + fRows.push_back(aRow); + } +} + int DMLTable::write(messageqcpp::ByteStream& bytestream) { int retval = 1; diff --git a/dbcon/dmlpackage/dmltable.h b/dbcon/dmlpackage/dmltable.h index 7ff0eca14..8847318f4 100644 --- a/dbcon/dmlpackage/dmltable.h +++ b/dbcon/dmlpackage/dmltable.h @@ -91,6 +91,20 @@ public: int read(messageqcpp::ByteStream& bytestream); + /** @brief read a DMLTable metadata from a ByteStream + * + * @param bytestream the ByteStream to read from + */ + void readMetaData(messageqcpp::ByteStream& bytestream); + + + /** @brief read a DMLTable row data from a ByteStream + * + * @param bytestream the ByteStream to read from + */ + void readRowData(messageqcpp::ByteStream& bytestream); + + /** @brief write a DMLTable to a ByteStream * * @param bytestream the ByteStream to write to diff --git a/dbcon/dmlpackage/insertdmlpackage.cpp b/dbcon/dmlpackage/insertdmlpackage.cpp index 8c00ed2fc..b46e58da3 100644 --- a/dbcon/dmlpackage/insertdmlpackage.cpp +++ b/dbcon/dmlpackage/insertdmlpackage.cpp @@ -66,16 +66,16 @@ int InsertDMLPackage::write(messageqcpp::ByteStream& bytestream) bytestream << (uint8_t)fLogging; bytestream << (uint8_t)fLogending; - if (fTable != 0) - { - retval = fTable->write(bytestream); - } - bytestream << fTableOid; bytestream << static_cast(fIsInsertSelect); bytestream << static_cast(fIsBatchInsert); bytestream << static_cast(fIsAutocommitOn); + if (fTable != 0) + { + retval = fTable->write(bytestream); + } + return retval; } @@ -100,15 +100,50 @@ int InsertDMLPackage::read(messageqcpp::ByteStream& bytestream) bytestream >> logending; fLogending = (logending != 0); + bytestream >> fTableOid; + bytestream >> reinterpret_cast(fIsInsertSelect); + bytestream >> reinterpret_cast(fIsBatchInsert); + bytestream >> reinterpret_cast(fIsAutocommitOn); + fTable = new DMLTable(); retval = fTable->read(bytestream); - bytestream >> fTableOid; - bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsInsertSelect); - bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsBatchInsert); - bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsAutocommitOn); return retval; } +void InsertDMLPackage::readMetaData(messageqcpp::ByteStream& bytestream) +{ + messageqcpp::ByteStream::quadbyte session_id; + bytestream >> session_id; + fSessionID = session_id; + bytestream >> fUuid; + + std::string dmlStatement; + bytestream >> fDMLStatement; + bytestream >> fSQLStatement; + bytestream >> fSchemaName; + bytestream >> fTimeZone; + uint8_t logging; + bytestream >> logging; + fLogging = (logging != 0); + uint8_t logending; + bytestream >> logending; + fLogending = (logending != 0); + + bytestream >> fTableOid; + bytestream >> reinterpret_cast(fIsInsertSelect); + bytestream >> reinterpret_cast(fIsBatchInsert); + bytestream >> reinterpret_cast(fIsAutocommitOn); + + fTable = new DMLTable(); + fTable->readMetaData(bytestream); +} + +// Has to be called after InsertDMLPackage::readMetaData() +void InsertDMLPackage::readRowData(messageqcpp::ByteStream& bytestream) +{ + fTable->readRowData(bytestream); +} + int InsertDMLPackage::buildFromBuffer(std::string& buffer, int columns, int rows) { #ifdef DML_PACKAGE_DEBUG diff --git a/dbcon/dmlpackage/insertdmlpackage.h b/dbcon/dmlpackage/insertdmlpackage.h index bd3d0245a..a2b76d8fd 100644 --- a/dbcon/dmlpackage/insertdmlpackage.h +++ b/dbcon/dmlpackage/insertdmlpackage.h @@ -73,6 +73,18 @@ public: */ EXPORT int read(messageqcpp::ByteStream& bytestream); + /** @brief read InsertDMLPackage metadata from bytestream + * + * @param bytestream the ByteStream to read from + */ + EXPORT void readMetaData(messageqcpp::ByteStream& bytestream); + + /** @brief read InsertDMLPackage row data from bytestream + * + * @param bytestream the ByteStream to read from + */ + EXPORT void readRowData(messageqcpp::ByteStream& bytestream); + /** @brief build a InsertDMLPackage from a string buffer * * @param buffer diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 5c2c3560c..1c771063d 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -448,7 +448,7 @@ int ha_mcs::direct_update_rows(ha_rows *update_rows) int rc; try { - rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows); + rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows, condStack); } catch (std::runtime_error& e) { @@ -464,7 +464,7 @@ int ha_mcs::direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) int rc; try { - rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows); + rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows, condStack); *found_rows = *update_rows; } catch (std::runtime_error& e) @@ -487,7 +487,7 @@ int ha_mcs::direct_delete_rows(ha_rows *deleted_rows) int rc; try { - rc = ha_mcs_impl_direct_update_delete_rows(true, deleted_rows); + rc = ha_mcs_impl_direct_update_delete_rows(true, deleted_rows, condStack); } catch (std::runtime_error& e) { @@ -629,7 +629,7 @@ int ha_mcs::rnd_init(bool scan) { try { - rc = ha_mcs_impl_rnd_init(table); + rc = ha_mcs_impl_rnd_init(table, condStack); } catch (std::runtime_error& e) { @@ -1110,7 +1110,7 @@ const COND* ha_mcs::cond_push(const COND* cond) COND* ret_cond = NULL; try { - ret_cond = ha_mcs_impl_cond_push(const_cast(cond), table); + ret_cond = ha_mcs_impl_cond_push(const_cast(cond), table, condStack); } catch (std::runtime_error& e) { @@ -1119,6 +1119,23 @@ const COND* ha_mcs::cond_push(const COND* cond) DBUG_RETURN(ret_cond); } +void ha_mcs::cond_pop() +{ + DBUG_ENTER("ha_mcs::cond_pop"); + + THD* thd = current_thd; + + if ((((thd->lex)->sql_command == SQLCOM_UPDATE) || + ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) || + ((thd->lex)->sql_command == SQLCOM_DELETE) || + ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI)) && + !condStack.empty()) + { + condStack.pop_back(); + } + + DBUG_VOID_RETURN; +} struct st_mysql_storage_engine columnstore_storage_engine = { MYSQL_HANDLERTON_INTERFACE_VERSION }; diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index 8baf31e7e..a82ea07c5 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -18,6 +18,7 @@ MA 02110-1301, USA. */ #ifndef HA_MCS_H__ #define HA_MCS_H__ + #include #include "idb_mysql.h" #include "ha_mcs_sysvars.h" @@ -44,6 +45,11 @@ class ha_mcs: public handler THR_LOCK_DATA lock; ///< MySQL lock COLUMNSTORE_SHARE* share; ///< Shared lock info ulonglong int_table_flags; + // We are using a vector here to mimick the stack functionality + // using push_back() and pop_back() + // as apparently there is a linker error on the std::stack::pop() + // call on Ubuntu18. + std::vector condStack; public: ha_mcs(handlerton* hton, TABLE_SHARE* table_arg); @@ -222,6 +228,7 @@ public: THR_LOCK_DATA** store_lock(THD* thd, THR_LOCK_DATA** to, enum thr_lock_type lock_type); ///< required const COND* cond_push(const COND* cond); + void cond_pop() override; uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index ae6de6fb9..52b93fb58 100755 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -6346,10 +6346,12 @@ int processFrom(bool &isUnion, int processWhere(SELECT_LEX &select_lex, gp_walk_info &gwi, SCSEP &csep, - List &on_expr_list) + List &on_expr_list, + const std::vector& condStack) { JOIN* join = select_lex.join; Item_cond* icp = 0; + bool isUpdateDelete = false; if (join != 0) icp = reinterpret_cast(join->conds); @@ -6367,7 +6369,7 @@ int processWhere(SELECT_LEX &select_lex, ((gwi.thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) || ((gwi.thd->lex)->sql_command == SQLCOM_DELETE_MULTI ))) { - icp = reinterpret_cast(select_lex.where); + isUpdateDelete = true; } if (icp) @@ -6406,6 +6408,51 @@ int processWhere(SELECT_LEX &select_lex, return ER_INTERNAL_ERROR; } } + else if (isUpdateDelete) + { + // MCOL-4023 For updates/deletes, we iterate over the pushed down condStack + if (!condStack.empty()) + { + std::vector::const_iterator condStackIter = condStack.begin(); + + while (condStackIter != condStack.end()) + { + COND* cond = *condStackIter++; + + cond->traverse_cond(gp_walk, &gwi, Item::POSTFIX); + + if (gwi.fatalParseError) + { + if (gwi.thd->derived_tables_processing) + { + gwi.cs_vtable_is_update_with_derive = true; + return -1; + } + + setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi); + return ER_INTERNAL_ERROR; + } + } + } + // if condStack is empty(), check the select_lex for where conditions + // as a last resort + else if ((icp = reinterpret_cast(select_lex.where)) != 0) + { + icp->traverse_cond(gp_walk, &gwi, Item::POSTFIX); + + if (gwi.fatalParseError) + { + if (gwi.thd->derived_tables_processing) + { + gwi.cs_vtable_is_update_with_derive = true; + return -1; + } + + setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi); + return ER_INTERNAL_ERROR; + } + } + } else if (join && join->zero_result_cause) { gwi.rcWorkStack.push(new ConstantColumn((int64_t)0, ConstantColumn::NUM)); @@ -6702,7 +6749,8 @@ int processLimitAndOffset( int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool isUnion, - bool isSelectHandlerTop) + bool isSelectHandlerTop, + const std::vector& condStack) { #ifdef DEBUG_WALK_COND cerr << "getSelectPlan()" << endl; @@ -6738,7 +6786,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, bool unionSel = (!isUnion && select_lex.master_unit()->is_unit_op()) ? true : false; gwi.clauseType = WHERE; - if ((rc = processWhere(select_lex, gwi, csep, on_expr_list))) + if ((rc = processWhere(select_lex, gwi, csep, on_expr_list, condStack))) { return rc; } diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 127abb078..bfd7a6846 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -1194,7 +1194,7 @@ vector getOnUpdateTimestampColumns(string& schema, string& tableName, in return returnVal; } -uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) +uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& condStack) { if (get_fe_conn_info_ptr() == nullptr) set_fe_conn_info_ptr((void*)new cal_connection_info()); @@ -1780,7 +1780,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) gwi.clauseType = WHERE; - if (getSelectPlan(gwi, select_lex, updateCP, false) != 0) //@Bug 3030 Modify the error message for unsupported functions + if (getSelectPlan(gwi, select_lex, updateCP, false, false, condStack) != 0) //@Bug 3030 Modify the error message for unsupported functions { if (gwi.cs_vtable_is_update_with_derive) { @@ -2284,7 +2284,7 @@ int ha_mcs_impl_discover_existence(const char* schema, const char* name) return 0; } -int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows) +int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, const std::vector& condStack) { THD* thd = current_thd; int rc = 0; @@ -2308,7 +2308,7 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows) if (execute) { - rc = doUpdateDelete(thd, gwi); + rc = doUpdateDelete(thd, gwi, condStack); } cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); @@ -2320,7 +2320,7 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows) return rc; } -int ha_mcs_impl_rnd_init(TABLE* table) +int ha_mcs_impl_rnd_init(TABLE* table, const std::vector& condStack) { IDEBUG( cout << "rnd_init for table " << table->s->table_name.str << endl ); THD* thd = current_thd; @@ -2384,7 +2384,7 @@ int ha_mcs_impl_rnd_init(TABLE* table) //Update and delete code if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) - return doUpdateDelete(thd, gwi); + return doUpdateDelete(thd, gwi, condStack); uint32_t sessionID = tid2sid(thd->thread_id); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); @@ -3985,7 +3985,7 @@ int ha_mcs_impl_delete_row(const uchar* buf) return 0; } -COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table) +COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condStack) { THD* thd = current_thd; @@ -3993,7 +3993,10 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table) ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI)) - return cond; + { + condStack.push_back(cond); + return nullptr; + } string alias; alias.assign(table->alias.ptr(), table->alias.length()); @@ -4959,9 +4962,10 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table) thd->lex->sql_command == SQLCOM_LOAD)) return 0; + // MCOL-4023 We need to test this code path. //Update and delete code if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) - return doUpdateDelete(thd, gwi); + return doUpdateDelete(thd, gwi, std::vector()); uint32_t sessionID = tid2sid(thd->thread_id); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); diff --git a/dbcon/mysql/ha_mcs_impl.h b/dbcon/mysql/ha_mcs_impl.h index 87ce22ae7..9024130ba 100644 --- a/dbcon/mysql/ha_mcs_impl.h +++ b/dbcon/mysql/ha_mcs_impl.h @@ -29,7 +29,7 @@ extern int ha_mcs_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO extern int ha_mcs_impl_delete_table(const char* name); extern int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked); extern int ha_mcs_impl_close(void); -extern int ha_mcs_impl_rnd_init(TABLE* table); +extern int ha_mcs_impl_rnd_init(TABLE* table, const std::vector& condStack); extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table); extern int ha_mcs_impl_rnd_end(TABLE* table, bool is_derived_hand = false); extern int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed); @@ -39,10 +39,10 @@ extern int ha_mcs_impl_rename_table(const char* from, const char* to); extern int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all); extern int ha_mcs_impl_rollback (handlerton* hton, THD* thd, bool all); extern int ha_mcs_impl_close_connection (handlerton* hton, THD* thd); -extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table); +extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector&); extern int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type); extern int ha_mcs_impl_update_row(); -extern int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows); +extern int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, const std::vector& condStack); extern int ha_mcs_impl_delete_row(); extern int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos); extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table); diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 9975605b7..3d2ae4cfe 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -342,7 +342,7 @@ int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_in int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_info& gi); int cs_get_derived_plan(derived_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi); int cs_get_select_plan(select_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi); -int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false, bool isSelectHandlerTop = false); +int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false, bool isSelectHandlerTop = false, const std::vector& condStack = std::vector()); int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi, bool isUnion = false); void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi); void setError(THD* thd, uint32_t errcode, const std::string errmsg); diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index ba13c4a21..112b23241 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -548,7 +548,7 @@ void PackageHandler::run() dmlpackage::InsertDMLPackage insertPkg; //boost::shared_ptr insertBs (new messageqcpp::ByteStream); messageqcpp::ByteStream bsSave = *(fByteStream.get()); - insertPkg.read(*(fByteStream.get())); + insertPkg.readMetaData(*(fByteStream.get())); #ifdef MCOL_140 if (fConcurrentSupport) @@ -584,8 +584,8 @@ void PackageHandler::run() //cout << "This is batch insert " << insertPkg->get_isBatchInsert() << endl; if (insertPkg.get_isBatchInsert()) { + fByteStream->reset(); //cout << "This is batch insert " << endl; - //boost::shared_ptr insertBs (new messageqcpp::ByteStream(fByteStream)); BatchInsertProc* batchProcessor = NULL; { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); @@ -900,7 +900,11 @@ void PackageHandler::run() } else // Single Insert { - //insertPkg.readTable(*(fByteStream.get())); + // make sure insertPkg.readMetaData() is called before + // this on fByteStream! + // TODO: Similar to batch inserts, don't + // deserialize the row data here for single inserts. + insertPkg.readRowData(*(fByteStream.get())); insertPkg.set_TxnID(fTxnid); fProcessor.reset(new dmlpackageprocessor::InsertPackageProcessor(fDbrm, insertPkg.get_SessionID())); result = fProcessor->processPackage(insertPkg); diff --git a/utils/libmarias3/CMakeLists.txt b/utils/libmarias3/CMakeLists.txt index 5de4533a5..31caae340 100644 --- a/utils/libmarias3/CMakeLists.txt +++ b/utils/libmarias3/CMakeLists.txt @@ -11,9 +11,8 @@ SET(S3_SOURCES ${S3API_DIR}/src/debug.c ADD_LIBRARY(marias3 SHARED ${S3_SOURCES}) -FIND_PACKAGE(CURL REQUIRED) TARGET_LINK_LIBRARIES(marias3 curl) -INCLUDE_DIRECTORIES(${S3API_DIR}) +INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR} ${S3API_DIR}) set(S3API_DEPS marias3 curl CACHE INTERNAL "S3API_DEPS") diff --git a/versioning/BRM/brmtypes.h b/versioning/BRM/brmtypes.h index 8de385c58..e0e3f64e7 100644 --- a/versioning/BRM/brmtypes.h +++ b/versioning/BRM/brmtypes.h @@ -507,6 +507,7 @@ const uint8_t RELEASE_LBID_RANGES = 91; /* More main BRM functions 100-110 */ const uint8_t BULK_UPDATE_DBROOT = 100; const uint8_t GET_SYSTEM_CATALOG = 101; +const uint8_t BULK_WRITE_VB_ENTRY = 102; /* Error codes returned by the DBRM functions. */ diff --git a/versioning/BRM/dbrm.cpp b/versioning/BRM/dbrm.cpp index 92b3f086d..dff639666 100644 --- a/versioning/BRM/dbrm.cpp +++ b/versioning/BRM/dbrm.cpp @@ -2226,6 +2226,42 @@ int DBRM::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, return err; } +int DBRM::bulkWriteVBEntry(VER_t transID, + const std::vector& lbids, + OID_t vbOID, + const std::vector& vbFBOs) DBRM_THROW +{ + +#ifdef BRM_INFO + + if (fDebug) + { + TRACER_WRITELATER("bulkWriteVBEntry"); + TRACER_WRITE; + } + +#endif + + ByteStream command, response; + uint8_t err; + + command << BULK_WRITE_VB_ENTRY << (uint32_t) transID; + serializeInlineVector(command, lbids); + command << (uint32_t) vbOID; + serializeInlineVector(command, vbFBOs); + err = send_recv(command, response); + + if (err != ERR_OK) + return err; + + if (response.length() != 1) + return ERR_NETWORK; + + response >> err; + CHECK_EMPTY(response); + return err; +} + struct _entry { _entry(LBID_t l) : lbid(l) { }; diff --git a/versioning/BRM/dbrm.h b/versioning/BRM/dbrm.h index 8427d9f24..1aac747f4 100644 --- a/versioning/BRM/dbrm.h +++ b/versioning/BRM/dbrm.h @@ -608,6 +608,20 @@ public: EXPORT int writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, uint32_t vbFBO) DBRM_THROW; + /** @brief Bulk registers a version buffer entry. + * + * Similar to writeVBEntry, but registers the version buffer + * entries in bulk for a list of lbids and vbFBOs, for a given + * transID and vbOID. + * @note The version buffer locations must hold the 'copy' lock + * first. + * @return 0 on success, non-0 on error (see brmtypes.h) + */ + EXPORT int bulkWriteVBEntry(VER_t transID, + const std::vector& lbids, + OID_t vbOID, + const std::vector& vbFBOs) DBRM_THROW; + /** @brief Retrieves a list of uncommitted LBIDs. * * Retrieves a list of uncommitted LBIDs for the given transaction ID. diff --git a/versioning/BRM/slavecomm.cpp b/versioning/BRM/slavecomm.cpp index 097cf3959..f43a8c351 100644 --- a/versioning/BRM/slavecomm.cpp +++ b/versioning/BRM/slavecomm.cpp @@ -375,6 +375,10 @@ void SlaveComm::processCommand(ByteStream& msg) do_writeVBEntry(msg); break; + case BULK_WRITE_VB_ENTRY: + do_bulkWriteVBEntry(msg); + break; + case BEGIN_VB_COPY: do_beginVBCopy(msg); break; @@ -1758,6 +1762,49 @@ void SlaveComm::do_writeVBEntry(ByteStream& msg) doSaveDelta = true; } +void SlaveComm::do_bulkWriteVBEntry(ByteStream& msg) +{ + VER_t transID; + std::vector lbids; + OID_t vbOID; + std::vector vbFBOs; + uint32_t tmp; + int err; + ByteStream reply; + +#ifdef BRM_VERBOSE + cerr << "WorkerComm: do_bulkWriteVBEntry()" << endl; +#endif + + msg >> tmp; + transID = tmp; + deserializeInlineVector(msg, lbids); + msg >> tmp; + vbOID = tmp; + deserializeInlineVector(msg, vbFBOs); + + if (printOnly) + { + cout << "bulkWriteVBEntry: transID=" << transID << endl; + + for (size_t i = 0; i < lbids.size(); i++) + cout << "bulkWriteVBEntry arg " << i + 1 << ": lbid=" << lbids[i] << " vbOID=" << + vbOID << " vbFBO=" << vbFBOs[i] << endl; + return; + } + + err = slave->bulkWriteVBEntry(transID, lbids, vbOID, vbFBOs); + reply << (uint8_t) err; +#ifdef BRM_VERBOSE + cerr << "WorkerComm: do_bulkWriteVBEntry() err code is " << err << endl; +#endif + + if (!standalone) + master.write(reply); + + doSaveDelta = true; +} + void SlaveComm::do_beginVBCopy(ByteStream& msg) { VER_t transID; diff --git a/versioning/BRM/slavecomm.h b/versioning/BRM/slavecomm.h index f967a2239..a7b7b44f3 100644 --- a/versioning/BRM/slavecomm.h +++ b/versioning/BRM/slavecomm.h @@ -91,6 +91,7 @@ private: void do_bulkSetHWM(messageqcpp::ByteStream& msg); void do_bulkSetHWMAndCP(messageqcpp::ByteStream& msg); void do_writeVBEntry(messageqcpp::ByteStream& msg); + void do_bulkWriteVBEntry(messageqcpp::ByteStream& msg); void do_beginVBCopy(messageqcpp::ByteStream& msg); void do_endVBCopy(messageqcpp::ByteStream& msg); void do_vbRollback1(messageqcpp::ByteStream& msg); diff --git a/versioning/BRM/slavedbrmnode.cpp b/versioning/BRM/slavedbrmnode.cpp index cbee7e5cf..36d4b0cb7 100644 --- a/versioning/BRM/slavedbrmnode.cpp +++ b/versioning/BRM/slavedbrmnode.cpp @@ -523,6 +523,70 @@ int SlaveDBRMNode::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, return 0; } +int SlaveDBRMNode::bulkWriteVBEntry(VER_t transID, + const std::vector& lbids, + OID_t vbOID, + const std::vector& vbFBOs) throw() +{ + VER_t oldVerID; + + /* + LBIDRange r; + r.start = lbid; + r.size = 1; + if (!copylocks.isLocked(r)) + cout << "Copylock error: lbid " << lbid << " isn't locked\n"; + */ + + try + { + vbbm.lock(VBBM::WRITE); + locked[0] = true; + vss.lock(VSS::WRITE); + locked[1] = true; + + for (size_t i = 0; i < lbids.size(); i++) + { + // figure out the current version of the block + // NOTE! This will currently error out to preserve the assumption that + // larger version numbers imply more recent changes. If we ever change that + // assumption, we'll need to revise the vbRollback() fcns as well. + oldVerID = vss.getCurrentVersion(lbids[i], NULL); + + if (oldVerID == transID) + continue; + else if (oldVerID > transID) + { + ostringstream str; + + str << "WorkerDBRMNode::bulkWriteVBEntry(): Overlapping transactions detected. " + "Transaction " << transID << " cannot overwrite blocks written by " + "transaction " << oldVerID; + log(str.str()); + return ERR_OLDTXN_OVERWRITING_NEWTXN; + } + + vbbm.insert(lbids[i], oldVerID, vbOID, vbFBOs[i]); + + if (oldVerID > 0) + vss.setVBFlag(lbids[i], oldVerID, true); + else + vss.insert(lbids[i], oldVerID, true, false); + + // XXXPAT: There's a problem if we use transID as the new version here. + // Need to use at least oldVerID + 1. OldverID can be > TransID + vss.insert(lbids[i], transID, false, true); + } + } + catch (exception& e) + { + cerr << e.what() << endl; + return -1; + } + + return 0; +} + int SlaveDBRMNode::beginVBCopy(VER_t transID, uint16_t vbOID, const LBIDRange_v& ranges, VBRange_v& freeList, bool flushPMCache) throw() { diff --git a/versioning/BRM/slavedbrmnode.h b/versioning/BRM/slavedbrmnode.h index f46dc3cb9..482df198f 100644 --- a/versioning/BRM/slavedbrmnode.h +++ b/versioning/BRM/slavedbrmnode.h @@ -364,6 +364,20 @@ public: EXPORT int writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, uint32_t vbFBO) throw(); + /** @brief Bulk registers a version buffer entry. + * + * Similar to writeVBEntry, but registers the version buffer + * entries in bulk for a list of lbids and vbFBOs, for a given + * transID and vbOID. + * @note The version buffer locations must hold the 'copy' lock + * first. + * @return 0 on success, -1 on error + */ + EXPORT int bulkWriteVBEntry(VER_t transID, + const std::vector& lbids, + OID_t vbOID, + const std::vector& vbFBOs) throw(); + /** @brief Atomically prepare to copy data to the version buffer * * Atomically sets the copy flag on the specified LBID ranges diff --git a/writeengine/shared/we_brm.cpp b/writeengine/shared/we_brm.cpp index 5b4f9339b..c1c6bcafb 100644 --- a/writeengine/shared/we_brm.cpp +++ b/writeengine/shared/we_brm.cpp @@ -1737,38 +1737,44 @@ int BRMWrapper::writeVB(IDBDataFile* pSourceFile, const VER_t transID, const OID if (rc != NO_ERROR) goto cleanup; - for (; processedBlocks < (k + rangeListCount); processedBlocks++) + std::vector lbids(k); + std::vector vbFBOs(k); + size_t idx = 0; + + for (; processedBlocks < (k + rangeListCount); processedBlocks++, idx++) { - rc = blockRsltnMgrPtr->writeVBEntry(transID, rangeList[processedBlocks].start, - freeList[i].vbOID, freeList[i].vbFBO + (processedBlocks - rangeListCount)); + lbids[idx] = rangeList[processedBlocks].start; + vbFBOs[idx] = freeList[i].vbFBO + (processedBlocks - rangeListCount); + } - //cout << (uint64_t)rangeList[processedBlocks].start << endl; - if (rc != NO_ERROR) + rc = blockRsltnMgrPtr->bulkWriteVBEntry(transID, lbids, freeList[i].vbOID, + vbFBOs); + + if (rc != NO_ERROR) + { + switch (rc) { - switch (rc) - { - case ERR_DEADLOCK: - rc = ERR_BRM_DEAD_LOCK; - break; + case ERR_DEADLOCK: + rc = ERR_BRM_DEAD_LOCK; + break; - case ERR_VBBM_OVERFLOW: - rc = ERR_BRM_VB_OVERFLOW; - break; + case ERR_VBBM_OVERFLOW: + rc = ERR_BRM_VB_OVERFLOW; + break; - case ERR_NETWORK: - rc = ERR_BRM_NETWORK; - break; + case ERR_NETWORK: + rc = ERR_BRM_NETWORK; + break; - case ERR_READONLY: - rc = ERR_BRM_READONLY; - break; + case ERR_READONLY: + rc = ERR_BRM_READONLY; + break; - default: - rc = ERR_BRM_WR_VB_ENTRY; - } - - goto cleanup; + default: + rc = ERR_BRM_WR_VB_ENTRY; } + + goto cleanup; } } }