1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

Merge pull request #1197 from tntnatbry/develop-batchinserts-improve

Improve batch inserts.
This commit is contained in:
Patrick LeBlanc
2020-06-02 14:05:47 -05:00
committed by GitHub
13 changed files with 306 additions and 36 deletions

View File

@ -72,6 +72,28 @@ int DMLTable::read(messageqcpp::ByteStream& bytestream)
return retval; return retval;
} }
void DMLTable::readMetaData(messageqcpp::ByteStream& bytestream)
{
// read the table name
bytestream >> fName;
// read the schema name
bytestream >> fSchema;
}
void DMLTable::readRowData(messageqcpp::ByteStream& bytestream)
{
messageqcpp::ByteStream::quadbyte rowNum;
bytestream >> rowNum;
for (unsigned int i = 0; i < rowNum; i++)
{
Row* aRow = new Row();
aRow->read(bytestream);
fRows.push_back(aRow);
}
}
int DMLTable::write(messageqcpp::ByteStream& bytestream) int DMLTable::write(messageqcpp::ByteStream& bytestream)
{ {
int retval = 1; int retval = 1;

View File

@ -91,6 +91,20 @@ public:
int read(messageqcpp::ByteStream& bytestream); int read(messageqcpp::ByteStream& bytestream);
/** @brief read a DMLTable metadata from a ByteStream
*
* @param bytestream the ByteStream to read from
*/
void readMetaData(messageqcpp::ByteStream& bytestream);
/** @brief read a DMLTable row data from a ByteStream
*
* @param bytestream the ByteStream to read from
*/
void readRowData(messageqcpp::ByteStream& bytestream);
/** @brief write a DMLTable to a ByteStream /** @brief write a DMLTable to a ByteStream
* *
* @param bytestream the ByteStream to write to * @param bytestream the ByteStream to write to

View File

@ -66,16 +66,16 @@ int InsertDMLPackage::write(messageqcpp::ByteStream& bytestream)
bytestream << (uint8_t)fLogging; bytestream << (uint8_t)fLogging;
bytestream << (uint8_t)fLogending; bytestream << (uint8_t)fLogending;
if (fTable != 0)
{
retval = fTable->write(bytestream);
}
bytestream << fTableOid; bytestream << fTableOid;
bytestream << static_cast<const messageqcpp::ByteStream::byte>(fIsInsertSelect); bytestream << static_cast<const messageqcpp::ByteStream::byte>(fIsInsertSelect);
bytestream << static_cast<const messageqcpp::ByteStream::byte>(fIsBatchInsert); bytestream << static_cast<const messageqcpp::ByteStream::byte>(fIsBatchInsert);
bytestream << static_cast<const messageqcpp::ByteStream::byte>(fIsAutocommitOn); bytestream << static_cast<const messageqcpp::ByteStream::byte>(fIsAutocommitOn);
if (fTable != 0)
{
retval = fTable->write(bytestream);
}
return retval; return retval;
} }
@ -100,15 +100,50 @@ int InsertDMLPackage::read(messageqcpp::ByteStream& bytestream)
bytestream >> logending; bytestream >> logending;
fLogending = (logending != 0); fLogending = (logending != 0);
bytestream >> fTableOid;
bytestream >> reinterpret_cast<messageqcpp::ByteStream::byte&>(fIsInsertSelect);
bytestream >> reinterpret_cast<messageqcpp::ByteStream::byte&>(fIsBatchInsert);
bytestream >> reinterpret_cast<messageqcpp::ByteStream::byte&>(fIsAutocommitOn);
fTable = new DMLTable(); fTable = new DMLTable();
retval = fTable->read(bytestream); retval = fTable->read(bytestream);
bytestream >> fTableOid;
bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsInsertSelect);
bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsBatchInsert);
bytestream >> reinterpret_cast< messageqcpp::ByteStream::byte&>(fIsAutocommitOn);
return retval; return retval;
} }
void InsertDMLPackage::readMetaData(messageqcpp::ByteStream& bytestream)
{
messageqcpp::ByteStream::quadbyte session_id;
bytestream >> session_id;
fSessionID = session_id;
bytestream >> fUuid;
std::string dmlStatement;
bytestream >> fDMLStatement;
bytestream >> fSQLStatement;
bytestream >> fSchemaName;
bytestream >> fTimeZone;
uint8_t logging;
bytestream >> logging;
fLogging = (logging != 0);
uint8_t logending;
bytestream >> logending;
fLogending = (logending != 0);
bytestream >> fTableOid;
bytestream >> reinterpret_cast<messageqcpp::ByteStream::byte&>(fIsInsertSelect);
bytestream >> reinterpret_cast<messageqcpp::ByteStream::byte&>(fIsBatchInsert);
bytestream >> reinterpret_cast<messageqcpp::ByteStream::byte&>(fIsAutocommitOn);
fTable = new DMLTable();
fTable->readMetaData(bytestream);
}
// Has to be called after InsertDMLPackage::readMetaData()
void InsertDMLPackage::readRowData(messageqcpp::ByteStream& bytestream)
{
fTable->readRowData(bytestream);
}
int InsertDMLPackage::buildFromBuffer(std::string& buffer, int columns, int rows) int InsertDMLPackage::buildFromBuffer(std::string& buffer, int columns, int rows)
{ {
#ifdef DML_PACKAGE_DEBUG #ifdef DML_PACKAGE_DEBUG

View File

@ -73,6 +73,18 @@ public:
*/ */
EXPORT int read(messageqcpp::ByteStream& bytestream); EXPORT int read(messageqcpp::ByteStream& bytestream);
/** @brief read InsertDMLPackage metadata from bytestream
*
* @param bytestream the ByteStream to read from
*/
EXPORT void readMetaData(messageqcpp::ByteStream& bytestream);
/** @brief read InsertDMLPackage row data from bytestream
*
* @param bytestream the ByteStream to read from
*/
EXPORT void readRowData(messageqcpp::ByteStream& bytestream);
/** @brief build a InsertDMLPackage from a string buffer /** @brief build a InsertDMLPackage from a string buffer
* *
* @param buffer * @param buffer

View File

@ -548,7 +548,7 @@ void PackageHandler::run()
dmlpackage::InsertDMLPackage insertPkg; dmlpackage::InsertDMLPackage insertPkg;
//boost::shared_ptr<messageqcpp::ByteStream> insertBs (new messageqcpp::ByteStream); //boost::shared_ptr<messageqcpp::ByteStream> insertBs (new messageqcpp::ByteStream);
messageqcpp::ByteStream bsSave = *(fByteStream.get()); messageqcpp::ByteStream bsSave = *(fByteStream.get());
insertPkg.read(*(fByteStream.get())); insertPkg.readMetaData(*(fByteStream.get()));
#ifdef MCOL_140 #ifdef MCOL_140
if (fConcurrentSupport) if (fConcurrentSupport)
@ -584,8 +584,8 @@ void PackageHandler::run()
//cout << "This is batch insert " << insertPkg->get_isBatchInsert() << endl; //cout << "This is batch insert " << insertPkg->get_isBatchInsert() << endl;
if (insertPkg.get_isBatchInsert()) if (insertPkg.get_isBatchInsert())
{ {
fByteStream->reset();
//cout << "This is batch insert " << endl; //cout << "This is batch insert " << endl;
//boost::shared_ptr<messageqcpp::ByteStream> insertBs (new messageqcpp::ByteStream(fByteStream));
BatchInsertProc* batchProcessor = NULL; BatchInsertProc* batchProcessor = NULL;
{ {
boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock);
@ -900,7 +900,11 @@ void PackageHandler::run()
} }
else // Single Insert else // Single Insert
{ {
//insertPkg.readTable(*(fByteStream.get())); // make sure insertPkg.readMetaData() is called before
// this on fByteStream!
// TODO: Similar to batch inserts, don't
// deserialize the row data here for single inserts.
insertPkg.readRowData(*(fByteStream.get()));
insertPkg.set_TxnID(fTxnid); insertPkg.set_TxnID(fTxnid);
fProcessor.reset(new dmlpackageprocessor::InsertPackageProcessor(fDbrm, insertPkg.get_SessionID())); fProcessor.reset(new dmlpackageprocessor::InsertPackageProcessor(fDbrm, insertPkg.get_SessionID()));
result = fProcessor->processPackage(insertPkg); result = fProcessor->processPackage(insertPkg);

View File

@ -507,6 +507,7 @@ const uint8_t RELEASE_LBID_RANGES = 91;
/* More main BRM functions 100-110 */ /* More main BRM functions 100-110 */
const uint8_t BULK_UPDATE_DBROOT = 100; const uint8_t BULK_UPDATE_DBROOT = 100;
const uint8_t GET_SYSTEM_CATALOG = 101; const uint8_t GET_SYSTEM_CATALOG = 101;
const uint8_t BULK_WRITE_VB_ENTRY = 102;
/* Error codes returned by the DBRM functions. */ /* Error codes returned by the DBRM functions. */

View File

@ -2226,6 +2226,42 @@ int DBRM::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID,
return err; return err;
} }
int DBRM::bulkWriteVBEntry(VER_t transID,
const std::vector<BRM::LBID_t>& lbids,
OID_t vbOID,
const std::vector<uint32_t>& vbFBOs) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("bulkWriteVBEntry");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << BULK_WRITE_VB_ENTRY << (uint32_t) transID;
serializeInlineVector(command, lbids);
command << (uint32_t) vbOID;
serializeInlineVector(command, vbFBOs);
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
struct _entry struct _entry
{ {
_entry(LBID_t l) : lbid(l) { }; _entry(LBID_t l) : lbid(l) { };

View File

@ -608,6 +608,20 @@ public:
EXPORT int writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, EXPORT int writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID,
uint32_t vbFBO) DBRM_THROW; uint32_t vbFBO) DBRM_THROW;
/** @brief Bulk registers a version buffer entry.
*
* Similar to writeVBEntry, but registers the version buffer
* entries in bulk for a list of lbids and vbFBOs, for a given
* transID and vbOID.
* @note The version buffer locations must hold the 'copy' lock
* first.
* @return 0 on success, non-0 on error (see brmtypes.h)
*/
EXPORT int bulkWriteVBEntry(VER_t transID,
const std::vector<BRM::LBID_t>& lbids,
OID_t vbOID,
const std::vector<uint32_t>& vbFBOs) DBRM_THROW;
/** @brief Retrieves a list of uncommitted LBIDs. /** @brief Retrieves a list of uncommitted LBIDs.
* *
* Retrieves a list of uncommitted LBIDs for the given transaction ID. * Retrieves a list of uncommitted LBIDs for the given transaction ID.

View File

@ -375,6 +375,10 @@ void SlaveComm::processCommand(ByteStream& msg)
do_writeVBEntry(msg); do_writeVBEntry(msg);
break; break;
case BULK_WRITE_VB_ENTRY:
do_bulkWriteVBEntry(msg);
break;
case BEGIN_VB_COPY: case BEGIN_VB_COPY:
do_beginVBCopy(msg); do_beginVBCopy(msg);
break; break;
@ -1758,6 +1762,49 @@ void SlaveComm::do_writeVBEntry(ByteStream& msg)
doSaveDelta = true; doSaveDelta = true;
} }
void SlaveComm::do_bulkWriteVBEntry(ByteStream& msg)
{
VER_t transID;
std::vector<BRM::LBID_t> lbids;
OID_t vbOID;
std::vector<uint32_t> vbFBOs;
uint32_t tmp;
int err;
ByteStream reply;
#ifdef BRM_VERBOSE
cerr << "WorkerComm: do_bulkWriteVBEntry()" << endl;
#endif
msg >> tmp;
transID = tmp;
deserializeInlineVector(msg, lbids);
msg >> tmp;
vbOID = tmp;
deserializeInlineVector(msg, vbFBOs);
if (printOnly)
{
cout << "bulkWriteVBEntry: transID=" << transID << endl;
for (size_t i = 0; i < lbids.size(); i++)
cout << "bulkWriteVBEntry arg " << i + 1 << ": lbid=" << lbids[i] << " vbOID=" <<
vbOID << " vbFBO=" << vbFBOs[i] << endl;
return;
}
err = slave->bulkWriteVBEntry(transID, lbids, vbOID, vbFBOs);
reply << (uint8_t) err;
#ifdef BRM_VERBOSE
cerr << "WorkerComm: do_bulkWriteVBEntry() err code is " << err << endl;
#endif
if (!standalone)
master.write(reply);
doSaveDelta = true;
}
void SlaveComm::do_beginVBCopy(ByteStream& msg) void SlaveComm::do_beginVBCopy(ByteStream& msg)
{ {
VER_t transID; VER_t transID;

View File

@ -91,6 +91,7 @@ private:
void do_bulkSetHWM(messageqcpp::ByteStream& msg); void do_bulkSetHWM(messageqcpp::ByteStream& msg);
void do_bulkSetHWMAndCP(messageqcpp::ByteStream& msg); void do_bulkSetHWMAndCP(messageqcpp::ByteStream& msg);
void do_writeVBEntry(messageqcpp::ByteStream& msg); void do_writeVBEntry(messageqcpp::ByteStream& msg);
void do_bulkWriteVBEntry(messageqcpp::ByteStream& msg);
void do_beginVBCopy(messageqcpp::ByteStream& msg); void do_beginVBCopy(messageqcpp::ByteStream& msg);
void do_endVBCopy(messageqcpp::ByteStream& msg); void do_endVBCopy(messageqcpp::ByteStream& msg);
void do_vbRollback1(messageqcpp::ByteStream& msg); void do_vbRollback1(messageqcpp::ByteStream& msg);

View File

@ -523,6 +523,70 @@ int SlaveDBRMNode::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID,
return 0; return 0;
} }
int SlaveDBRMNode::bulkWriteVBEntry(VER_t transID,
const std::vector<BRM::LBID_t>& lbids,
OID_t vbOID,
const std::vector<uint32_t>& vbFBOs) throw()
{
VER_t oldVerID;
/*
LBIDRange r;
r.start = lbid;
r.size = 1;
if (!copylocks.isLocked(r))
cout << "Copylock error: lbid " << lbid << " isn't locked\n";
*/
try
{
vbbm.lock(VBBM::WRITE);
locked[0] = true;
vss.lock(VSS::WRITE);
locked[1] = true;
for (size_t i = 0; i < lbids.size(); i++)
{
// figure out the current version of the block
// NOTE! This will currently error out to preserve the assumption that
// larger version numbers imply more recent changes. If we ever change that
// assumption, we'll need to revise the vbRollback() fcns as well.
oldVerID = vss.getCurrentVersion(lbids[i], NULL);
if (oldVerID == transID)
continue;
else if (oldVerID > transID)
{
ostringstream str;
str << "WorkerDBRMNode::bulkWriteVBEntry(): Overlapping transactions detected. "
"Transaction " << transID << " cannot overwrite blocks written by "
"transaction " << oldVerID;
log(str.str());
return ERR_OLDTXN_OVERWRITING_NEWTXN;
}
vbbm.insert(lbids[i], oldVerID, vbOID, vbFBOs[i]);
if (oldVerID > 0)
vss.setVBFlag(lbids[i], oldVerID, true);
else
vss.insert(lbids[i], oldVerID, true, false);
// XXXPAT: There's a problem if we use transID as the new version here.
// Need to use at least oldVerID + 1. OldverID can be > TransID
vss.insert(lbids[i], transID, false, true);
}
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
return 0;
}
int SlaveDBRMNode::beginVBCopy(VER_t transID, uint16_t vbOID, int SlaveDBRMNode::beginVBCopy(VER_t transID, uint16_t vbOID,
const LBIDRange_v& ranges, VBRange_v& freeList, bool flushPMCache) throw() const LBIDRange_v& ranges, VBRange_v& freeList, bool flushPMCache) throw()
{ {

View File

@ -364,6 +364,20 @@ public:
EXPORT int writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, EXPORT int writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID,
uint32_t vbFBO) throw(); uint32_t vbFBO) throw();
/** @brief Bulk registers a version buffer entry.
*
* Similar to writeVBEntry, but registers the version buffer
* entries in bulk for a list of lbids and vbFBOs, for a given
* transID and vbOID.
* @note The version buffer locations must hold the 'copy' lock
* first.
* @return 0 on success, -1 on error
*/
EXPORT int bulkWriteVBEntry(VER_t transID,
const std::vector<BRM::LBID_t>& lbids,
OID_t vbOID,
const std::vector<uint32_t>& vbFBOs) throw();
/** @brief Atomically prepare to copy data to the version buffer /** @brief Atomically prepare to copy data to the version buffer
* *
* Atomically sets the copy flag on the specified LBID ranges * Atomically sets the copy flag on the specified LBID ranges

View File

@ -1737,38 +1737,44 @@ int BRMWrapper::writeVB(IDBDataFile* pSourceFile, const VER_t transID, const OID
if (rc != NO_ERROR) if (rc != NO_ERROR)
goto cleanup; goto cleanup;
for (; processedBlocks < (k + rangeListCount); processedBlocks++) std::vector<BRM::LBID_t> lbids(k);
std::vector<uint32_t> vbFBOs(k);
size_t idx = 0;
for (; processedBlocks < (k + rangeListCount); processedBlocks++, idx++)
{ {
rc = blockRsltnMgrPtr->writeVBEntry(transID, rangeList[processedBlocks].start, lbids[idx] = rangeList[processedBlocks].start;
freeList[i].vbOID, freeList[i].vbFBO + (processedBlocks - rangeListCount)); vbFBOs[idx] = freeList[i].vbFBO + (processedBlocks - rangeListCount);
}
//cout << (uint64_t)rangeList[processedBlocks].start << endl; rc = blockRsltnMgrPtr->bulkWriteVBEntry(transID, lbids, freeList[i].vbOID,
if (rc != NO_ERROR) vbFBOs);
if (rc != NO_ERROR)
{
switch (rc)
{ {
switch (rc) case ERR_DEADLOCK:
{ rc = ERR_BRM_DEAD_LOCK;
case ERR_DEADLOCK: break;
rc = ERR_BRM_DEAD_LOCK;
break;
case ERR_VBBM_OVERFLOW: case ERR_VBBM_OVERFLOW:
rc = ERR_BRM_VB_OVERFLOW; rc = ERR_BRM_VB_OVERFLOW;
break; break;
case ERR_NETWORK: case ERR_NETWORK:
rc = ERR_BRM_NETWORK; rc = ERR_BRM_NETWORK;
break; break;
case ERR_READONLY: case ERR_READONLY:
rc = ERR_BRM_READONLY; rc = ERR_BRM_READONLY;
break; break;
default: default:
rc = ERR_BRM_WR_VB_ENTRY; rc = ERR_BRM_WR_VB_ENTRY;
}
goto cleanup;
} }
goto cleanup;
} }
} }
} }