1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Merge branch 'develop' into MCOL-3536

Conflicts:
	CMakeLists.txt
This commit is contained in:
David Hall
2020-06-04 16:22:59 -05:00
24 changed files with 429 additions and 79 deletions

View File

@ -77,9 +77,8 @@ local Pipeline(branch, platform, event) = {
name: 'submodules',
image: 'alpine/git',
commands: [
'git submodule update --recursive --remote',
'git submodule update --init --recursive --remote',
'git config cmake.update-submodules no',
'ls -la /drone/src/storage-manager',
],
},
{

1
.gitignore vendored
View File

@ -25,7 +25,6 @@ VERSION.dep
cmake_install.cmake
install_manifest.txt
CTestTestfile.cmake
config.h
config.status
stamp-h1
export/

View File

@ -1,22 +1,7 @@
CMAKE_MINIMUM_REQUIRED(VERSION 2.8.12)
IF (NOT COMMAND MESSAGE_ONCE)
# lifted from the server's definition of message_once
IF ("${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION}.${CMAKE_PATCH_VERSION}" VERSION_LESS "2.8.7")
FUNCTION(MESSAGE_ONCE id out)
MESSAGE(STATUS "${out}")
ENDFUNCTION()
ELSE()
FUNCTION(MESSAGE_ONCE id out)
STRING(MD5 hash "${out}")
IF(NOT __msg1_${id} STREQUAL "${hash}")
MESSAGE(STATUS "${out}")
ENDIF()
SET(__msg1_${id} ${hash} CACHE INTERNAL "")
ENDFUNCTION()
ENDIF()
ENDIF()
PROJECT(Columnstore)
IF(NOT INSTALL_LAYOUT)
IF(NOT CMAKE_BUILD_TYPE)
@ -108,6 +93,7 @@ LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/cmake)
SET (ENGINE_SRC_DIR ${CMAKE_CURRENT_SOURCE_DIR})
INCLUDE(columnstore_version)
INCLUDE(misc)
OPTION(USE_CCACHE "reduce compile time with ccache." FALSE)
if(NOT USE_CCACHE)
@ -168,6 +154,13 @@ if (NOT SNAPPY_FOUND)
return()
endif()
FIND_PACKAGE(CURL)
if (NOT CURL_FOUND)
MESSAGE_ONCE(CS_NO_CURL "libcurl development headers not found")
return()
endif()
FIND_PROGRAM(AWK_EXECUTABLE awk DOC "path to the awk executable")
if(NOT AWK_EXECUTABLE)
MESSAGE_ONCE(CS_NO_AWK "awk not found!")

14
cmake/misc.cmake Normal file
View File

@ -0,0 +1,14 @@
IF ("${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION}.${CMAKE_PATCH_VERSION}" VERSION_LESS "2.8.7")
FUNCTION(MESSAGE_ONCE id out)
MESSAGE(STATUS "${out}")
ENDFUNCTION()
ELSE()
FUNCTION(MESSAGE_ONCE id out)
STRING(MD5 hash "${out}")
IF(NOT __msg1_${id} STREQUAL "${hash}")
MESSAGE(STATUS "${out}")
ENDIF()
SET(__msg1_${id} ${hash} CACHE INTERNAL "")
ENDFUNCTION()
ENDIF()

View File

@ -72,6 +72,28 @@ int DMLTable::read(messageqcpp::ByteStream& bytestream)
return retval;
}
void DMLTable::readMetaData(messageqcpp::ByteStream& bytestream)
{
// read the table name
bytestream >> fName;
// read the schema name
bytestream >> fSchema;
}
void DMLTable::readRowData(messageqcpp::ByteStream& bytestream)
{
messageqcpp::ByteStream::quadbyte rowNum;
bytestream >> rowNum;
for (unsigned int i = 0; i < rowNum; i++)
{
Row* aRow = new Row();
aRow->read(bytestream);
fRows.push_back(aRow);
}
}
int DMLTable::write(messageqcpp::ByteStream& bytestream)
{
int retval = 1;

View File

@ -91,6 +91,20 @@ public:
int read(messageqcpp::ByteStream& bytestream);
/** @brief read a DMLTable metadata from a ByteStream
*
* @param bytestream the ByteStream to read from
*/
void readMetaData(messageqcpp::ByteStream& bytestream);
/** @brief read a DMLTable row data from a ByteStream
*
* @param bytestream the ByteStream to read from
*/
void readRowData(messageqcpp::ByteStream& bytestream);
/** @brief write a DMLTable to a ByteStream
*
* @param bytestream the ByteStream to write to

View File

@ -66,16 +66,16 @@ int InsertDMLPackage::write(messageqcpp::ByteStream& bytestream)
bytestream << (uint8_t)fLogging;
bytestream << (uint8_t)fLogending;
if (fTable != 0)
{
retval = fTable->write(bytestream);
}
bytestream << fTableOid;
bytestream << static_cast<const messageqcpp::ByteStream::byte>(fIsInsertSelect);
bytestream << static_cast<const messageqcpp::ByteStream::byte>(fIsBatchInsert);
bytestream << static_cast<const messageqcpp::ByteStream::byte>(fIsAutocommitOn);
if (fTable != 0)
{
retval = fTable->write(bytestream);
}
return retval;
}
@ -100,15 +100,50 @@ int InsertDMLPackage::read(messageqcpp::ByteStream& bytestream)
bytestream >> logending;
fLogending = (logending != 0);
bytestream >> fTableOid;
bytestream >> reinterpret_cast<messageqcpp::ByteStream::byte&>(fIsInsertSelect);
bytestream >> reinterpret_cast<messageqcpp::ByteStream::byte&>(fIsBatchInsert);
bytestream >> reinterpret_cast<messageqcpp::ByteStream::byte&>(fIsAutocommitOn);
fTable = new DMLTable();
retval = fTable->read(bytestream);
bytestream >> fTableOid;
bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsInsertSelect);
bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsBatchInsert);
bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsAutocommitOn);
return retval;
}
void InsertDMLPackage::readMetaData(messageqcpp::ByteStream& bytestream)
{
messageqcpp::ByteStream::quadbyte session_id;
bytestream >> session_id;
fSessionID = session_id;
bytestream >> fUuid;
std::string dmlStatement;
bytestream >> fDMLStatement;
bytestream >> fSQLStatement;
bytestream >> fSchemaName;
bytestream >> fTimeZone;
uint8_t logging;
bytestream >> logging;
fLogging = (logging != 0);
uint8_t logending;
bytestream >> logending;
fLogending = (logending != 0);
bytestream >> fTableOid;
bytestream >> reinterpret_cast<messageqcpp::ByteStream::byte&>(fIsInsertSelect);
bytestream >> reinterpret_cast<messageqcpp::ByteStream::byte&>(fIsBatchInsert);
bytestream >> reinterpret_cast<messageqcpp::ByteStream::byte&>(fIsAutocommitOn);
fTable = new DMLTable();
fTable->readMetaData(bytestream);
}
// Has to be called after InsertDMLPackage::readMetaData()
void InsertDMLPackage::readRowData(messageqcpp::ByteStream& bytestream)
{
fTable->readRowData(bytestream);
}
int InsertDMLPackage::buildFromBuffer(std::string& buffer, int columns, int rows)
{
#ifdef DML_PACKAGE_DEBUG

View File

@ -73,6 +73,18 @@ public:
*/
EXPORT int read(messageqcpp::ByteStream& bytestream);
/** @brief read InsertDMLPackage metadata from bytestream
*
* @param bytestream the ByteStream to read from
*/
EXPORT void readMetaData(messageqcpp::ByteStream& bytestream);
/** @brief read InsertDMLPackage row data from bytestream
*
* @param bytestream the ByteStream to read from
*/
EXPORT void readRowData(messageqcpp::ByteStream& bytestream);
/** @brief build a InsertDMLPackage from a string buffer
*
* @param buffer

View File

@ -448,7 +448,7 @@ int ha_mcs::direct_update_rows(ha_rows *update_rows)
int rc;
try
{
rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows);
rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows, condStack);
}
catch (std::runtime_error& e)
{
@ -464,7 +464,7 @@ int ha_mcs::direct_update_rows(ha_rows *update_rows, ha_rows *found_rows)
int rc;
try
{
rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows);
rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows, condStack);
*found_rows = *update_rows;
}
catch (std::runtime_error& e)
@ -487,7 +487,7 @@ int ha_mcs::direct_delete_rows(ha_rows *deleted_rows)
int rc;
try
{
rc = ha_mcs_impl_direct_update_delete_rows(true, deleted_rows);
rc = ha_mcs_impl_direct_update_delete_rows(true, deleted_rows, condStack);
}
catch (std::runtime_error& e)
{
@ -629,7 +629,7 @@ int ha_mcs::rnd_init(bool scan)
{
try
{
rc = ha_mcs_impl_rnd_init(table);
rc = ha_mcs_impl_rnd_init(table, condStack);
}
catch (std::runtime_error& e)
{
@ -1110,7 +1110,7 @@ const COND* ha_mcs::cond_push(const COND* cond)
COND* ret_cond = NULL;
try
{
ret_cond = ha_mcs_impl_cond_push(const_cast<COND*>(cond), table);
ret_cond = ha_mcs_impl_cond_push(const_cast<COND*>(cond), table, condStack);
}
catch (std::runtime_error& e)
{
@ -1119,6 +1119,23 @@ const COND* ha_mcs::cond_push(const COND* cond)
DBUG_RETURN(ret_cond);
}
void ha_mcs::cond_pop()
{
DBUG_ENTER("ha_mcs::cond_pop");
THD* thd = current_thd;
if ((((thd->lex)->sql_command == SQLCOM_UPDATE) ||
((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) ||
((thd->lex)->sql_command == SQLCOM_DELETE) ||
((thd->lex)->sql_command == SQLCOM_DELETE_MULTI)) &&
!condStack.empty())
{
condStack.pop_back();
}
DBUG_VOID_RETURN;
}
struct st_mysql_storage_engine columnstore_storage_engine =
{ MYSQL_HANDLERTON_INTERFACE_VERSION };

View File

@ -18,6 +18,7 @@
MA 02110-1301, USA. */
#ifndef HA_MCS_H__
#define HA_MCS_H__
#include <my_config.h>
#include "idb_mysql.h"
#include "ha_mcs_sysvars.h"
@ -44,6 +45,11 @@ class ha_mcs: public handler
THR_LOCK_DATA lock; ///< MySQL lock
COLUMNSTORE_SHARE* share; ///< Shared lock info
ulonglong int_table_flags;
// We are using a vector here to mimick the stack functionality
// using push_back() and pop_back()
// as apparently there is a linker error on the std::stack<COND*>::pop()
// call on Ubuntu18.
std::vector<COND*> condStack;
public:
ha_mcs(handlerton* hton, TABLE_SHARE* table_arg);
@ -222,6 +228,7 @@ public:
THR_LOCK_DATA** store_lock(THD* thd, THR_LOCK_DATA** to,
enum thr_lock_type lock_type); ///< required
const COND* cond_push(const COND* cond);
void cond_pop() override;
uint8 table_cache_type()
{
return HA_CACHE_TBL_NOCACHE;

View File

@ -6346,10 +6346,12 @@ int processFrom(bool &isUnion,
int processWhere(SELECT_LEX &select_lex,
gp_walk_info &gwi,
SCSEP &csep,
List<Item> &on_expr_list)
List<Item> &on_expr_list,
const std::vector<COND*>& condStack)
{
JOIN* join = select_lex.join;
Item_cond* icp = 0;
bool isUpdateDelete = false;
if (join != 0)
icp = reinterpret_cast<Item_cond*>(join->conds);
@ -6367,7 +6369,7 @@ int processWhere(SELECT_LEX &select_lex,
((gwi.thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) ||
((gwi.thd->lex)->sql_command == SQLCOM_DELETE_MULTI )))
{
icp = reinterpret_cast<Item_cond*>(select_lex.where);
isUpdateDelete = true;
}
if (icp)
@ -6406,6 +6408,51 @@ int processWhere(SELECT_LEX &select_lex,
return ER_INTERNAL_ERROR;
}
}
else if (isUpdateDelete)
{
// MCOL-4023 For updates/deletes, we iterate over the pushed down condStack
if (!condStack.empty())
{
std::vector<COND*>::const_iterator condStackIter = condStack.begin();
while (condStackIter != condStack.end())
{
COND* cond = *condStackIter++;
cond->traverse_cond(gp_walk, &gwi, Item::POSTFIX);
if (gwi.fatalParseError)
{
if (gwi.thd->derived_tables_processing)
{
gwi.cs_vtable_is_update_with_derive = true;
return -1;
}
setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi);
return ER_INTERNAL_ERROR;
}
}
}
// if condStack is empty(), check the select_lex for where conditions
// as a last resort
else if ((icp = reinterpret_cast<Item_cond*>(select_lex.where)) != 0)
{
icp->traverse_cond(gp_walk, &gwi, Item::POSTFIX);
if (gwi.fatalParseError)
{
if (gwi.thd->derived_tables_processing)
{
gwi.cs_vtable_is_update_with_derive = true;
return -1;
}
setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi);
return ER_INTERNAL_ERROR;
}
}
}
else if (join && join->zero_result_cause)
{
gwi.rcWorkStack.push(new ConstantColumn((int64_t)0, ConstantColumn::NUM));
@ -6702,7 +6749,8 @@ int processLimitAndOffset(
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
SCSEP& csep,
bool isUnion,
bool isSelectHandlerTop)
bool isSelectHandlerTop,
const std::vector<COND*>& condStack)
{
#ifdef DEBUG_WALK_COND
cerr << "getSelectPlan()" << endl;
@ -6738,7 +6786,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
bool unionSel = (!isUnion && select_lex.master_unit()->is_unit_op()) ? true : false;
gwi.clauseType = WHERE;
if ((rc = processWhere(select_lex, gwi, csep, on_expr_list)))
if ((rc = processWhere(select_lex, gwi, csep, on_expr_list, condStack)))
{
return rc;
}

View File

@ -1194,7 +1194,7 @@ vector<string> getOnUpdateTimestampColumns(string& schema, string& tableName, in
return returnVal;
}
uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& condStack)
{
if (get_fe_conn_info_ptr() == nullptr)
set_fe_conn_info_ptr((void*)new cal_connection_info());
@ -1780,7 +1780,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
gwi.clauseType = WHERE;
if (getSelectPlan(gwi, select_lex, updateCP, false) != 0) //@Bug 3030 Modify the error message for unsupported functions
if (getSelectPlan(gwi, select_lex, updateCP, false, false, condStack) != 0) //@Bug 3030 Modify the error message for unsupported functions
{
if (gwi.cs_vtable_is_update_with_derive)
{
@ -2284,7 +2284,7 @@ int ha_mcs_impl_discover_existence(const char* schema, const char* name)
return 0;
}
int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows)
int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, const std::vector<COND*>& condStack)
{
THD* thd = current_thd;
int rc = 0;
@ -2308,7 +2308,7 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows)
if (execute)
{
rc = doUpdateDelete(thd, gwi);
rc = doUpdateDelete(thd, gwi, condStack);
}
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
@ -2320,7 +2320,7 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows)
return rc;
}
int ha_mcs_impl_rnd_init(TABLE* table)
int ha_mcs_impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack)
{
IDEBUG( cout << "rnd_init for table " << table->s->table_name.str << endl );
THD* thd = current_thd;
@ -2384,7 +2384,7 @@ int ha_mcs_impl_rnd_init(TABLE* table)
//Update and delete code
if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
return doUpdateDelete(thd, gwi);
return doUpdateDelete(thd, gwi, condStack);
uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
@ -3985,7 +3985,7 @@ int ha_mcs_impl_delete_row(const uchar* buf)
return 0;
}
COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table)
COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector<COND*>& condStack)
{
THD* thd = current_thd;
@ -3993,7 +3993,10 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table)
((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) ||
((thd->lex)->sql_command == SQLCOM_DELETE) ||
((thd->lex)->sql_command == SQLCOM_DELETE_MULTI))
return cond;
{
condStack.push_back(cond);
return nullptr;
}
string alias;
alias.assign(table->alias.ptr(), table->alias.length());
@ -4959,9 +4962,10 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
// MCOL-4023 We need to test this code path.
//Update and delete code
if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
return doUpdateDelete(thd, gwi);
return doUpdateDelete(thd, gwi, std::vector<COND*>());
uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);

View File

@ -29,7 +29,7 @@ extern int ha_mcs_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO
extern int ha_mcs_impl_delete_table(const char* name);
extern int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked);
extern int ha_mcs_impl_close(void);
extern int ha_mcs_impl_rnd_init(TABLE* table);
extern int ha_mcs_impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack);
extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table);
extern int ha_mcs_impl_rnd_end(TABLE* table, bool is_derived_hand = false);
extern int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed);
@ -39,10 +39,10 @@ extern int ha_mcs_impl_rename_table(const char* from, const char* to);
extern int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all);
extern int ha_mcs_impl_rollback (handlerton* hton, THD* thd, bool all);
extern int ha_mcs_impl_close_connection (handlerton* hton, THD* thd);
extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table);
extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector<COND*>&);
extern int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type);
extern int ha_mcs_impl_update_row();
extern int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows);
extern int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, const std::vector<COND*>& condStack);
extern int ha_mcs_impl_delete_row();
extern int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos);
extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table);

View File

@ -342,7 +342,7 @@ int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_in
int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_info& gi);
int cs_get_derived_plan(derived_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi);
int cs_get_select_plan(select_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi);
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false, bool isSelectHandlerTop = false);
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false, bool isSelectHandlerTop = false, const std::vector<COND*>& condStack = std::vector<COND*>());
int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi, bool isUnion = false);
void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi);
void setError(THD* thd, uint32_t errcode, const std::string errmsg);

View File

@ -548,7 +548,7 @@ void PackageHandler::run()
dmlpackage::InsertDMLPackage insertPkg;
//boost::shared_ptr<messageqcpp::ByteStream> insertBs (new messageqcpp::ByteStream);
messageqcpp::ByteStream bsSave = *(fByteStream.get());
insertPkg.read(*(fByteStream.get()));
insertPkg.readMetaData(*(fByteStream.get()));
#ifdef MCOL_140
if (fConcurrentSupport)
@ -584,8 +584,8 @@ void PackageHandler::run()
//cout << "This is batch insert " << insertPkg->get_isBatchInsert() << endl;
if (insertPkg.get_isBatchInsert())
{
fByteStream->reset();
//cout << "This is batch insert " << endl;
//boost::shared_ptr<messageqcpp::ByteStream> insertBs (new messageqcpp::ByteStream(fByteStream));
BatchInsertProc* batchProcessor = NULL;
{
boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
@ -900,7 +900,11 @@ void PackageHandler::run()
}
else // Single Insert
{
//insertPkg.readTable(*(fByteStream.get()));
// make sure insertPkg.readMetaData() is called before
// this on fByteStream!
// TODO: Similar to batch inserts, don't
// deserialize the row data here for single inserts.
insertPkg.readRowData(*(fByteStream.get()));
insertPkg.set_TxnID(fTxnid);
fProcessor.reset(new dmlpackageprocessor::InsertPackageProcessor(fDbrm, insertPkg.get_SessionID()));
result = fProcessor->processPackage(insertPkg);

View File

@ -11,9 +11,8 @@ SET(S3_SOURCES ${S3API_DIR}/src/debug.c
ADD_LIBRARY(marias3 SHARED ${S3_SOURCES})
FIND_PACKAGE(CURL REQUIRED)
TARGET_LINK_LIBRARIES(marias3 curl)
INCLUDE_DIRECTORIES(${S3API_DIR})
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR} ${S3API_DIR})
set(S3API_DEPS marias3 curl CACHE INTERNAL "S3API_DEPS")

View File

@ -507,6 +507,7 @@ const uint8_t RELEASE_LBID_RANGES = 91;
/* More main BRM functions 100-110 */
const uint8_t BULK_UPDATE_DBROOT = 100;
const uint8_t GET_SYSTEM_CATALOG = 101;
const uint8_t BULK_WRITE_VB_ENTRY = 102;
/* Error codes returned by the DBRM functions. */

View File

@ -2226,6 +2226,42 @@ int DBRM::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID,
return err;
}
int DBRM::bulkWriteVBEntry(VER_t transID,
const std::vector<BRM::LBID_t>& lbids,
OID_t vbOID,
const std::vector<uint32_t>& vbFBOs) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("bulkWriteVBEntry");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << BULK_WRITE_VB_ENTRY << (uint32_t) transID;
serializeInlineVector(command, lbids);
command << (uint32_t) vbOID;
serializeInlineVector(command, vbFBOs);
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
struct _entry
{
_entry(LBID_t l) : lbid(l) { };

View File

@ -608,6 +608,20 @@ public:
EXPORT int writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID,
uint32_t vbFBO) DBRM_THROW;
/** @brief Bulk registers a version buffer entry.
*
* Similar to writeVBEntry, but registers the version buffer
* entries in bulk for a list of lbids and vbFBOs, for a given
* transID and vbOID.
* @note The version buffer locations must hold the 'copy' lock
* first.
* @return 0 on success, non-0 on error (see brmtypes.h)
*/
EXPORT int bulkWriteVBEntry(VER_t transID,
const std::vector<BRM::LBID_t>& lbids,
OID_t vbOID,
const std::vector<uint32_t>& vbFBOs) DBRM_THROW;
/** @brief Retrieves a list of uncommitted LBIDs.
*
* Retrieves a list of uncommitted LBIDs for the given transaction ID.

View File

@ -375,6 +375,10 @@ void SlaveComm::processCommand(ByteStream& msg)
do_writeVBEntry(msg);
break;
case BULK_WRITE_VB_ENTRY:
do_bulkWriteVBEntry(msg);
break;
case BEGIN_VB_COPY:
do_beginVBCopy(msg);
break;
@ -1758,6 +1762,49 @@ void SlaveComm::do_writeVBEntry(ByteStream& msg)
doSaveDelta = true;
}
void SlaveComm::do_bulkWriteVBEntry(ByteStream& msg)
{
VER_t transID;
std::vector<BRM::LBID_t> lbids;
OID_t vbOID;
std::vector<uint32_t> vbFBOs;
uint32_t tmp;
int err;
ByteStream reply;
#ifdef BRM_VERBOSE
cerr << "WorkerComm: do_bulkWriteVBEntry()" << endl;
#endif
msg >> tmp;
transID = tmp;
deserializeInlineVector(msg, lbids);
msg >> tmp;
vbOID = tmp;
deserializeInlineVector(msg, vbFBOs);
if (printOnly)
{
cout << "bulkWriteVBEntry: transID=" << transID << endl;
for (size_t i = 0; i < lbids.size(); i++)
cout << "bulkWriteVBEntry arg " << i + 1 << ": lbid=" << lbids[i] << " vbOID=" <<
vbOID << " vbFBO=" << vbFBOs[i] << endl;
return;
}
err = slave->bulkWriteVBEntry(transID, lbids, vbOID, vbFBOs);
reply << (uint8_t) err;
#ifdef BRM_VERBOSE
cerr << "WorkerComm: do_bulkWriteVBEntry() err code is " << err << endl;
#endif
if (!standalone)
master.write(reply);
doSaveDelta = true;
}
void SlaveComm::do_beginVBCopy(ByteStream& msg)
{
VER_t transID;

View File

@ -91,6 +91,7 @@ private:
void do_bulkSetHWM(messageqcpp::ByteStream& msg);
void do_bulkSetHWMAndCP(messageqcpp::ByteStream& msg);
void do_writeVBEntry(messageqcpp::ByteStream& msg);
void do_bulkWriteVBEntry(messageqcpp::ByteStream& msg);
void do_beginVBCopy(messageqcpp::ByteStream& msg);
void do_endVBCopy(messageqcpp::ByteStream& msg);
void do_vbRollback1(messageqcpp::ByteStream& msg);

View File

@ -523,6 +523,70 @@ int SlaveDBRMNode::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID,
return 0;
}
int SlaveDBRMNode::bulkWriteVBEntry(VER_t transID,
const std::vector<BRM::LBID_t>& lbids,
OID_t vbOID,
const std::vector<uint32_t>& vbFBOs) throw()
{
VER_t oldVerID;
/*
LBIDRange r;
r.start = lbid;
r.size = 1;
if (!copylocks.isLocked(r))
cout << "Copylock error: lbid " << lbid << " isn't locked\n";
*/
try
{
vbbm.lock(VBBM::WRITE);
locked[0] = true;
vss.lock(VSS::WRITE);
locked[1] = true;
for (size_t i = 0; i < lbids.size(); i++)
{
// figure out the current version of the block
// NOTE! This will currently error out to preserve the assumption that
// larger version numbers imply more recent changes. If we ever change that
// assumption, we'll need to revise the vbRollback() fcns as well.
oldVerID = vss.getCurrentVersion(lbids[i], NULL);
if (oldVerID == transID)
continue;
else if (oldVerID > transID)
{
ostringstream str;
str << "WorkerDBRMNode::bulkWriteVBEntry(): Overlapping transactions detected. "
"Transaction " << transID << " cannot overwrite blocks written by "
"transaction " << oldVerID;
log(str.str());
return ERR_OLDTXN_OVERWRITING_NEWTXN;
}
vbbm.insert(lbids[i], oldVerID, vbOID, vbFBOs[i]);
if (oldVerID > 0)
vss.setVBFlag(lbids[i], oldVerID, true);
else
vss.insert(lbids[i], oldVerID, true, false);
// XXXPAT: There's a problem if we use transID as the new version here.
// Need to use at least oldVerID + 1. OldverID can be > TransID
vss.insert(lbids[i], transID, false, true);
}
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
return 0;
}
int SlaveDBRMNode::beginVBCopy(VER_t transID, uint16_t vbOID,
const LBIDRange_v& ranges, VBRange_v& freeList, bool flushPMCache) throw()
{

View File

@ -364,6 +364,20 @@ public:
EXPORT int writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID,
uint32_t vbFBO) throw();
/** @brief Bulk registers a version buffer entry.
*
* Similar to writeVBEntry, but registers the version buffer
* entries in bulk for a list of lbids and vbFBOs, for a given
* transID and vbOID.
* @note The version buffer locations must hold the 'copy' lock
* first.
* @return 0 on success, -1 on error
*/
EXPORT int bulkWriteVBEntry(VER_t transID,
const std::vector<BRM::LBID_t>& lbids,
OID_t vbOID,
const std::vector<uint32_t>& vbFBOs) throw();
/** @brief Atomically prepare to copy data to the version buffer
*
* Atomically sets the copy flag on the specified LBID ranges

View File

@ -1737,12 +1737,19 @@ int BRMWrapper::writeVB(IDBDataFile* pSourceFile, const VER_t transID, const OID
if (rc != NO_ERROR)
goto cleanup;
for (; processedBlocks < (k + rangeListCount); processedBlocks++)
{
rc = blockRsltnMgrPtr->writeVBEntry(transID, rangeList[processedBlocks].start,
freeList[i].vbOID, freeList[i].vbFBO + (processedBlocks - rangeListCount));
std::vector<BRM::LBID_t> lbids(k);
std::vector<uint32_t> vbFBOs(k);
size_t idx = 0;
for (; processedBlocks < (k + rangeListCount); processedBlocks++, idx++)
{
lbids[idx] = rangeList[processedBlocks].start;
vbFBOs[idx] = freeList[i].vbFBO + (processedBlocks - rangeListCount);
}
rc = blockRsltnMgrPtr->bulkWriteVBEntry(transID, lbids, freeList[i].vbOID,
vbFBOs);
//cout << (uint64_t)rangeList[processedBlocks].start << endl;
if (rc != NO_ERROR)
{
switch (rc)
@ -1771,7 +1778,6 @@ int BRMWrapper::writeVB(IDBDataFile* pSourceFile, const VER_t transID, const OID
}
}
}
}
if (pTargetFile)
{