diff --git a/dbcon/dmlpackage/dmltable.cpp b/dbcon/dmlpackage/dmltable.cpp index 0beedc10c..0d8fd5646 100644 --- a/dbcon/dmlpackage/dmltable.cpp +++ b/dbcon/dmlpackage/dmltable.cpp @@ -72,6 +72,28 @@ int DMLTable::read(messageqcpp::ByteStream& bytestream) return retval; } +void DMLTable::readMetaData(messageqcpp::ByteStream& bytestream) +{ + // read the table name + bytestream >> fName; + + // read the schema name + bytestream >> fSchema; +} + +void DMLTable::readRowData(messageqcpp::ByteStream& bytestream) +{ + messageqcpp::ByteStream::quadbyte rowNum; + bytestream >> rowNum; + + for (unsigned int i = 0; i < rowNum; i++) + { + Row* aRow = new Row(); + aRow->read(bytestream); + fRows.push_back(aRow); + } +} + int DMLTable::write(messageqcpp::ByteStream& bytestream) { int retval = 1; diff --git a/dbcon/dmlpackage/dmltable.h b/dbcon/dmlpackage/dmltable.h index 7ff0eca14..8847318f4 100644 --- a/dbcon/dmlpackage/dmltable.h +++ b/dbcon/dmlpackage/dmltable.h @@ -91,6 +91,20 @@ public: int read(messageqcpp::ByteStream& bytestream); + /** @brief read a DMLTable metadata from a ByteStream + * + * @param bytestream the ByteStream to read from + */ + void readMetaData(messageqcpp::ByteStream& bytestream); + + + /** @brief read a DMLTable row data from a ByteStream + * + * @param bytestream the ByteStream to read from + */ + void readRowData(messageqcpp::ByteStream& bytestream); + + /** @brief write a DMLTable to a ByteStream * * @param bytestream the ByteStream to write to diff --git a/dbcon/dmlpackage/insertdmlpackage.cpp b/dbcon/dmlpackage/insertdmlpackage.cpp index 8c00ed2fc..b46e58da3 100644 --- a/dbcon/dmlpackage/insertdmlpackage.cpp +++ b/dbcon/dmlpackage/insertdmlpackage.cpp @@ -66,16 +66,16 @@ int InsertDMLPackage::write(messageqcpp::ByteStream& bytestream) bytestream << (uint8_t)fLogging; bytestream << (uint8_t)fLogending; - if (fTable != 0) - { - retval = fTable->write(bytestream); - } - bytestream << fTableOid; bytestream << static_cast(fIsInsertSelect); bytestream << static_cast(fIsBatchInsert); bytestream << static_cast(fIsAutocommitOn); + if (fTable != 0) + { + retval = fTable->write(bytestream); + } + return retval; } @@ -100,15 +100,50 @@ int InsertDMLPackage::read(messageqcpp::ByteStream& bytestream) bytestream >> logending; fLogending = (logending != 0); + bytestream >> fTableOid; + bytestream >> reinterpret_cast(fIsInsertSelect); + bytestream >> reinterpret_cast(fIsBatchInsert); + bytestream >> reinterpret_cast(fIsAutocommitOn); + fTable = new DMLTable(); retval = fTable->read(bytestream); - bytestream >> fTableOid; - bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsInsertSelect); - bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsBatchInsert); - bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsAutocommitOn); return retval; } +void InsertDMLPackage::readMetaData(messageqcpp::ByteStream& bytestream) +{ + messageqcpp::ByteStream::quadbyte session_id; + bytestream >> session_id; + fSessionID = session_id; + bytestream >> fUuid; + + std::string dmlStatement; + bytestream >> fDMLStatement; + bytestream >> fSQLStatement; + bytestream >> fSchemaName; + bytestream >> fTimeZone; + uint8_t logging; + bytestream >> logging; + fLogging = (logging != 0); + uint8_t logending; + bytestream >> logending; + fLogending = (logending != 0); + + bytestream >> fTableOid; + bytestream >> reinterpret_cast(fIsInsertSelect); + bytestream >> reinterpret_cast(fIsBatchInsert); + bytestream >> reinterpret_cast(fIsAutocommitOn); + + fTable = new DMLTable(); + fTable->readMetaData(bytestream); +} + +// Has to be called after InsertDMLPackage::readMetaData() +void InsertDMLPackage::readRowData(messageqcpp::ByteStream& bytestream) +{ + fTable->readRowData(bytestream); +} + int InsertDMLPackage::buildFromBuffer(std::string& buffer, int columns, int rows) { #ifdef DML_PACKAGE_DEBUG diff --git a/dbcon/dmlpackage/insertdmlpackage.h b/dbcon/dmlpackage/insertdmlpackage.h index bd3d0245a..a2b76d8fd 100644 --- a/dbcon/dmlpackage/insertdmlpackage.h +++ b/dbcon/dmlpackage/insertdmlpackage.h @@ -73,6 +73,18 @@ public: */ EXPORT int read(messageqcpp::ByteStream& bytestream); + /** @brief read InsertDMLPackage metadata from bytestream + * + * @param bytestream the ByteStream to read from + */ + EXPORT void readMetaData(messageqcpp::ByteStream& bytestream); + + /** @brief read InsertDMLPackage row data from bytestream + * + * @param bytestream the ByteStream to read from + */ + EXPORT void readRowData(messageqcpp::ByteStream& bytestream); + /** @brief build a InsertDMLPackage from a string buffer * * @param buffer diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index ba13c4a21..112b23241 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -548,7 +548,7 @@ void PackageHandler::run() dmlpackage::InsertDMLPackage insertPkg; //boost::shared_ptr insertBs (new messageqcpp::ByteStream); messageqcpp::ByteStream bsSave = *(fByteStream.get()); - insertPkg.read(*(fByteStream.get())); + insertPkg.readMetaData(*(fByteStream.get())); #ifdef MCOL_140 if (fConcurrentSupport) @@ -584,8 +584,8 @@ void PackageHandler::run() //cout << "This is batch insert " << insertPkg->get_isBatchInsert() << endl; if (insertPkg.get_isBatchInsert()) { + fByteStream->reset(); //cout << "This is batch insert " << endl; - //boost::shared_ptr insertBs (new messageqcpp::ByteStream(fByteStream)); BatchInsertProc* batchProcessor = NULL; { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); @@ -900,7 +900,11 @@ void PackageHandler::run() } else // Single Insert { - //insertPkg.readTable(*(fByteStream.get())); + // make sure insertPkg.readMetaData() is called before + // this on fByteStream! + // TODO: Similar to batch inserts, don't + // deserialize the row data here for single inserts. + insertPkg.readRowData(*(fByteStream.get())); insertPkg.set_TxnID(fTxnid); fProcessor.reset(new dmlpackageprocessor::InsertPackageProcessor(fDbrm, insertPkg.get_SessionID())); result = fProcessor->processPackage(insertPkg); diff --git a/versioning/BRM/brmtypes.h b/versioning/BRM/brmtypes.h index 8de385c58..e0e3f64e7 100644 --- a/versioning/BRM/brmtypes.h +++ b/versioning/BRM/brmtypes.h @@ -507,6 +507,7 @@ const uint8_t RELEASE_LBID_RANGES = 91; /* More main BRM functions 100-110 */ const uint8_t BULK_UPDATE_DBROOT = 100; const uint8_t GET_SYSTEM_CATALOG = 101; +const uint8_t BULK_WRITE_VB_ENTRY = 102; /* Error codes returned by the DBRM functions. */ diff --git a/versioning/BRM/dbrm.cpp b/versioning/BRM/dbrm.cpp index 92b3f086d..dff639666 100644 --- a/versioning/BRM/dbrm.cpp +++ b/versioning/BRM/dbrm.cpp @@ -2226,6 +2226,42 @@ int DBRM::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, return err; } +int DBRM::bulkWriteVBEntry(VER_t transID, + const std::vector& lbids, + OID_t vbOID, + const std::vector& vbFBOs) DBRM_THROW +{ + +#ifdef BRM_INFO + + if (fDebug) + { + TRACER_WRITELATER("bulkWriteVBEntry"); + TRACER_WRITE; + } + +#endif + + ByteStream command, response; + uint8_t err; + + command << BULK_WRITE_VB_ENTRY << (uint32_t) transID; + serializeInlineVector(command, lbids); + command << (uint32_t) vbOID; + serializeInlineVector(command, vbFBOs); + err = send_recv(command, response); + + if (err != ERR_OK) + return err; + + if (response.length() != 1) + return ERR_NETWORK; + + response >> err; + CHECK_EMPTY(response); + return err; +} + struct _entry { _entry(LBID_t l) : lbid(l) { }; diff --git a/versioning/BRM/dbrm.h b/versioning/BRM/dbrm.h index 8427d9f24..1aac747f4 100644 --- a/versioning/BRM/dbrm.h +++ b/versioning/BRM/dbrm.h @@ -608,6 +608,20 @@ public: EXPORT int writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, uint32_t vbFBO) DBRM_THROW; + /** @brief Bulk registers a version buffer entry. + * + * Similar to writeVBEntry, but registers the version buffer + * entries in bulk for a list of lbids and vbFBOs, for a given + * transID and vbOID. + * @note The version buffer locations must hold the 'copy' lock + * first. + * @return 0 on success, non-0 on error (see brmtypes.h) + */ + EXPORT int bulkWriteVBEntry(VER_t transID, + const std::vector& lbids, + OID_t vbOID, + const std::vector& vbFBOs) DBRM_THROW; + /** @brief Retrieves a list of uncommitted LBIDs. * * Retrieves a list of uncommitted LBIDs for the given transaction ID. diff --git a/versioning/BRM/slavecomm.cpp b/versioning/BRM/slavecomm.cpp index 097cf3959..f43a8c351 100644 --- a/versioning/BRM/slavecomm.cpp +++ b/versioning/BRM/slavecomm.cpp @@ -375,6 +375,10 @@ void SlaveComm::processCommand(ByteStream& msg) do_writeVBEntry(msg); break; + case BULK_WRITE_VB_ENTRY: + do_bulkWriteVBEntry(msg); + break; + case BEGIN_VB_COPY: do_beginVBCopy(msg); break; @@ -1758,6 +1762,49 @@ void SlaveComm::do_writeVBEntry(ByteStream& msg) doSaveDelta = true; } +void SlaveComm::do_bulkWriteVBEntry(ByteStream& msg) +{ + VER_t transID; + std::vector lbids; + OID_t vbOID; + std::vector vbFBOs; + uint32_t tmp; + int err; + ByteStream reply; + +#ifdef BRM_VERBOSE + cerr << "WorkerComm: do_bulkWriteVBEntry()" << endl; +#endif + + msg >> tmp; + transID = tmp; + deserializeInlineVector(msg, lbids); + msg >> tmp; + vbOID = tmp; + deserializeInlineVector(msg, vbFBOs); + + if (printOnly) + { + cout << "bulkWriteVBEntry: transID=" << transID << endl; + + for (size_t i = 0; i < lbids.size(); i++) + cout << "bulkWriteVBEntry arg " << i + 1 << ": lbid=" << lbids[i] << " vbOID=" << + vbOID << " vbFBO=" << vbFBOs[i] << endl; + return; + } + + err = slave->bulkWriteVBEntry(transID, lbids, vbOID, vbFBOs); + reply << (uint8_t) err; +#ifdef BRM_VERBOSE + cerr << "WorkerComm: do_bulkWriteVBEntry() err code is " << err << endl; +#endif + + if (!standalone) + master.write(reply); + + doSaveDelta = true; +} + void SlaveComm::do_beginVBCopy(ByteStream& msg) { VER_t transID; diff --git a/versioning/BRM/slavecomm.h b/versioning/BRM/slavecomm.h index f967a2239..a7b7b44f3 100644 --- a/versioning/BRM/slavecomm.h +++ b/versioning/BRM/slavecomm.h @@ -91,6 +91,7 @@ private: void do_bulkSetHWM(messageqcpp::ByteStream& msg); void do_bulkSetHWMAndCP(messageqcpp::ByteStream& msg); void do_writeVBEntry(messageqcpp::ByteStream& msg); + void do_bulkWriteVBEntry(messageqcpp::ByteStream& msg); void do_beginVBCopy(messageqcpp::ByteStream& msg); void do_endVBCopy(messageqcpp::ByteStream& msg); void do_vbRollback1(messageqcpp::ByteStream& msg); diff --git a/versioning/BRM/slavedbrmnode.cpp b/versioning/BRM/slavedbrmnode.cpp index cbee7e5cf..36d4b0cb7 100644 --- a/versioning/BRM/slavedbrmnode.cpp +++ b/versioning/BRM/slavedbrmnode.cpp @@ -523,6 +523,70 @@ int SlaveDBRMNode::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, return 0; } +int SlaveDBRMNode::bulkWriteVBEntry(VER_t transID, + const std::vector& lbids, + OID_t vbOID, + const std::vector& vbFBOs) throw() +{ + VER_t oldVerID; + + /* + LBIDRange r; + r.start = lbid; + r.size = 1; + if (!copylocks.isLocked(r)) + cout << "Copylock error: lbid " << lbid << " isn't locked\n"; + */ + + try + { + vbbm.lock(VBBM::WRITE); + locked[0] = true; + vss.lock(VSS::WRITE); + locked[1] = true; + + for (size_t i = 0; i < lbids.size(); i++) + { + // figure out the current version of the block + // NOTE! This will currently error out to preserve the assumption that + // larger version numbers imply more recent changes. If we ever change that + // assumption, we'll need to revise the vbRollback() fcns as well. + oldVerID = vss.getCurrentVersion(lbids[i], NULL); + + if (oldVerID == transID) + continue; + else if (oldVerID > transID) + { + ostringstream str; + + str << "WorkerDBRMNode::bulkWriteVBEntry(): Overlapping transactions detected. " + "Transaction " << transID << " cannot overwrite blocks written by " + "transaction " << oldVerID; + log(str.str()); + return ERR_OLDTXN_OVERWRITING_NEWTXN; + } + + vbbm.insert(lbids[i], oldVerID, vbOID, vbFBOs[i]); + + if (oldVerID > 0) + vss.setVBFlag(lbids[i], oldVerID, true); + else + vss.insert(lbids[i], oldVerID, true, false); + + // XXXPAT: There's a problem if we use transID as the new version here. + // Need to use at least oldVerID + 1. OldverID can be > TransID + vss.insert(lbids[i], transID, false, true); + } + } + catch (exception& e) + { + cerr << e.what() << endl; + return -1; + } + + return 0; +} + int SlaveDBRMNode::beginVBCopy(VER_t transID, uint16_t vbOID, const LBIDRange_v& ranges, VBRange_v& freeList, bool flushPMCache) throw() { diff --git a/versioning/BRM/slavedbrmnode.h b/versioning/BRM/slavedbrmnode.h index f46dc3cb9..482df198f 100644 --- a/versioning/BRM/slavedbrmnode.h +++ b/versioning/BRM/slavedbrmnode.h @@ -364,6 +364,20 @@ public: EXPORT int writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, uint32_t vbFBO) throw(); + /** @brief Bulk registers a version buffer entry. + * + * Similar to writeVBEntry, but registers the version buffer + * entries in bulk for a list of lbids and vbFBOs, for a given + * transID and vbOID. + * @note The version buffer locations must hold the 'copy' lock + * first. + * @return 0 on success, -1 on error + */ + EXPORT int bulkWriteVBEntry(VER_t transID, + const std::vector& lbids, + OID_t vbOID, + const std::vector& vbFBOs) throw(); + /** @brief Atomically prepare to copy data to the version buffer * * Atomically sets the copy flag on the specified LBID ranges diff --git a/writeengine/shared/we_brm.cpp b/writeengine/shared/we_brm.cpp index 5b4f9339b..c1c6bcafb 100644 --- a/writeengine/shared/we_brm.cpp +++ b/writeengine/shared/we_brm.cpp @@ -1737,38 +1737,44 @@ int BRMWrapper::writeVB(IDBDataFile* pSourceFile, const VER_t transID, const OID if (rc != NO_ERROR) goto cleanup; - for (; processedBlocks < (k + rangeListCount); processedBlocks++) + std::vector lbids(k); + std::vector vbFBOs(k); + size_t idx = 0; + + for (; processedBlocks < (k + rangeListCount); processedBlocks++, idx++) { - rc = blockRsltnMgrPtr->writeVBEntry(transID, rangeList[processedBlocks].start, - freeList[i].vbOID, freeList[i].vbFBO + (processedBlocks - rangeListCount)); + lbids[idx] = rangeList[processedBlocks].start; + vbFBOs[idx] = freeList[i].vbFBO + (processedBlocks - rangeListCount); + } - //cout << (uint64_t)rangeList[processedBlocks].start << endl; - if (rc != NO_ERROR) + rc = blockRsltnMgrPtr->bulkWriteVBEntry(transID, lbids, freeList[i].vbOID, + vbFBOs); + + if (rc != NO_ERROR) + { + switch (rc) { - switch (rc) - { - case ERR_DEADLOCK: - rc = ERR_BRM_DEAD_LOCK; - break; + case ERR_DEADLOCK: + rc = ERR_BRM_DEAD_LOCK; + break; - case ERR_VBBM_OVERFLOW: - rc = ERR_BRM_VB_OVERFLOW; - break; + case ERR_VBBM_OVERFLOW: + rc = ERR_BRM_VB_OVERFLOW; + break; - case ERR_NETWORK: - rc = ERR_BRM_NETWORK; - break; + case ERR_NETWORK: + rc = ERR_BRM_NETWORK; + break; - case ERR_READONLY: - rc = ERR_BRM_READONLY; - break; + case ERR_READONLY: + rc = ERR_BRM_READONLY; + break; - default: - rc = ERR_BRM_WR_VB_ENTRY; - } - - goto cleanup; + default: + rc = ERR_BRM_WR_VB_ENTRY; } + + goto cleanup; } } }