You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-27 21:01:50 +03:00
MCOL-769 Add WE command to get LBIDs written to
This commit is contained in:
@ -3410,6 +3410,47 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
|
||||
return rc;
|
||||
}
|
||||
|
||||
uint8_t WE_DMLCommandProc::getWrittenLbids(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId)
|
||||
{
|
||||
uint8_t rc = 0;
|
||||
uint32_t txnId;
|
||||
vector<LBID_t> lbidList;
|
||||
|
||||
bs >> txnId;
|
||||
std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;
|
||||
std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();
|
||||
try
|
||||
{
|
||||
mapIter = m_txnLBIDMap.find(txnId);
|
||||
if (mapIter != m_txnLBIDMap.end())
|
||||
{
|
||||
SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
|
||||
std::tr1::unordered_map<BRM::LBID_t, uint32_t> ::iterator listIter = spTxnLBIDRec->m_LBIDMap.begin();
|
||||
while (listIter != spTxnLBIDRec->m_LBIDMap.end())
|
||||
{
|
||||
lbidList.push_back(listIter->first);
|
||||
listIter++;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch(...) {}
|
||||
bs.restart();
|
||||
try
|
||||
{
|
||||
serializeInlineVector (bs, lbidList);
|
||||
}
|
||||
catch (exception& ex)
|
||||
{
|
||||
// Append to errmsg in case we already have an error
|
||||
if (err.length() > 0)
|
||||
err += "; ";
|
||||
err += ex.what();
|
||||
rc = 1;
|
||||
return rc;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
uint8_t WE_DMLCommandProc::processFlushFiles(messageqcpp::ByteStream& bs, std::string & err)
|
||||
{
|
||||
uint8_t rc = 0;
|
||||
|
@ -96,6 +96,7 @@ class WE_DMLCommandProc
|
||||
EXPORT uint8_t processPurgeFDCache(ByteStream& bs, std::string & err);
|
||||
EXPORT uint8_t processEndTransaction(ByteStream& bs, std::string & err);
|
||||
EXPORT uint8_t processFixRows(ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId);
|
||||
EXPORT uint8_t getWrittenLbids(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId);
|
||||
int validateColumnHWMs(
|
||||
execplan::CalpontSystemCatalog::RIDList& ridList,
|
||||
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr,
|
||||
|
@ -83,6 +83,7 @@ enum ServerMessages
|
||||
WE_SRV_FIX_ROWS,
|
||||
WE_SVR_WRITE_CREATE_SYSCOLUMN,
|
||||
WE_SVR_BATCH_INSERT_BINARY,
|
||||
WE_SVR_GET_WRITTEN_LBIDS,
|
||||
|
||||
WE_CLT_SRV_DATA=100,
|
||||
WE_CLT_SRV_EOD,
|
||||
|
@ -149,11 +149,16 @@ void DmlReadThread::operator()()
|
||||
//cout << "fWeDMLprocessor " << fWeDMLprocessor << " is processing batchinsert ..." << endl;
|
||||
break;
|
||||
}
|
||||
case WE_SVR_BATCH_INSERT_BINARY:
|
||||
case WE_SVR_BATCH_INSERT_BINARY:
|
||||
{
|
||||
rc = fWeDMLprocessor->processBatchInsertBinary(ibs, errMsg, PMId);
|
||||
break;
|
||||
}
|
||||
case WE_SVR_GET_WRITTEN_LBIDS:
|
||||
{
|
||||
rc = fWeDMLprocessor->getWrittenLbids(ibs, errMsg, PMId);
|
||||
break;
|
||||
}
|
||||
case WE_SVR_BATCH_INSERT_END:
|
||||
{
|
||||
rc = fWeDMLprocessor->processBatchInsertHwm(ibs, errMsg);
|
||||
@ -383,7 +388,7 @@ void DmlReadThread::operator()()
|
||||
obs << errMsg;
|
||||
}
|
||||
|
||||
if ((msgId == WE_SVR_COMMIT_BATCH_AUTO_ON) || (msgId ==WE_SVR_BATCH_INSERT_END) || (msgId == WE_SVR_FETCH_DDL_LOGS))
|
||||
if ((msgId == WE_SVR_COMMIT_BATCH_AUTO_ON) || (msgId ==WE_SVR_BATCH_INSERT_END) || (msgId == WE_SVR_FETCH_DDL_LOGS) || (msgId == WE_SVR_GET_WRITTEN_LBIDS))
|
||||
{
|
||||
obs += ibs;
|
||||
//cout << " sending back hwm info with ibs length " << endl;
|
||||
|
Reference in New Issue
Block a user