You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-26 10:01:05 +03:00
@ -380,6 +380,7 @@ void MasterDBRMNode::msgProcessor()
|
||||
case SET_SYSTEM_STATE: doSetSystemState(msg, p); continue;
|
||||
case CLEAR_SYSTEM_STATE: doClearSystemState(msg, p); continue;
|
||||
case SM_RESET: doSessionManagerReset(msg, p); continue;
|
||||
case GET_UNCOMMITTED_LBIDS: doGetUncommittedLbids(msg, p); continue;
|
||||
}
|
||||
|
||||
/* Process TableLock calls */
|
||||
@ -1353,6 +1354,98 @@ void MasterDBRMNode::doSIDTIDMap(ByteStream &msg, ThreadParams *p)
|
||||
catch (...) { }
|
||||
}
|
||||
|
||||
void MasterDBRMNode::doGetUncommittedLbids(ByteStream &msg, ThreadParams *p)
|
||||
{
|
||||
ByteStream reply;
|
||||
vector<LBID_t> lbidList;
|
||||
VSS vss;
|
||||
ExtentMap em;
|
||||
bool locked = false;
|
||||
vector<LBID_t>::iterator lbidIt;
|
||||
typedef pair<int64_t, int64_t> range_t;
|
||||
range_t range;
|
||||
vector<range_t> ranges;
|
||||
vector<range_t>::iterator rangeIt;
|
||||
ByteStream::byte cmd;
|
||||
ByteStream::quadbyte transID;
|
||||
msg >> cmd;
|
||||
msg >> transID;
|
||||
try {
|
||||
vss.lock(VSS::READ);
|
||||
locked = true;
|
||||
|
||||
// Get a full list of uncommitted LBIDs related to this transactin.
|
||||
vss.getUncommittedLBIDs(transID, lbidList);
|
||||
|
||||
vss.release(VSS::READ);
|
||||
locked = false;
|
||||
|
||||
if(lbidList.size() > 0) {
|
||||
|
||||
// Sort the vector.
|
||||
std::sort<vector<LBID_t>::iterator>(lbidList.begin(), lbidList.end());
|
||||
|
||||
// Get the LBID range for the first block in the list.
|
||||
lbidIt = lbidList.begin();
|
||||
if (em.lookup(*lbidIt, range.first, range.second) < 0) {
|
||||
reply.reset();
|
||||
reply << (uint8_t) ERR_FAILURE;
|
||||
try {
|
||||
p->sock->write(reply);
|
||||
}
|
||||
catch (...) { }
|
||||
|
||||
return;
|
||||
}
|
||||
ranges.push_back(range);
|
||||
|
||||
// Loop through the LBIDs and add the new ranges.
|
||||
++lbidIt;
|
||||
while(lbidIt != lbidList.end()) {
|
||||
if (*lbidIt > range.second) {
|
||||
if (em.lookup(*lbidIt, range.first, range.second) < 0) {
|
||||
reply.reset();
|
||||
reply << (uint8_t) ERR_FAILURE;
|
||||
try {
|
||||
p->sock->write(reply);
|
||||
}
|
||||
catch (...) { }
|
||||
return;
|
||||
|
||||
}
|
||||
ranges.push_back(range);
|
||||
}
|
||||
++lbidIt;
|
||||
}
|
||||
|
||||
// Reset the lbidList and return only the first LBID in each extent that was changed
|
||||
// in the transaction.
|
||||
lbidList.clear();
|
||||
for (rangeIt = ranges.begin(); rangeIt != ranges.end(); rangeIt++) {
|
||||
lbidList.push_back(rangeIt->first);
|
||||
}
|
||||
}
|
||||
reply << (uint8_t) ERR_OK;
|
||||
serializeInlineVector(reply, lbidList);
|
||||
try {
|
||||
p->sock->write(reply);
|
||||
}
|
||||
catch (...) { }
|
||||
return;
|
||||
}
|
||||
catch (exception &e) {
|
||||
if (locked)
|
||||
vss.release(VSS::READ);
|
||||
reply.reset();
|
||||
reply << (uint8_t) ERR_FAILURE;
|
||||
try {
|
||||
p->sock->write(reply);
|
||||
}
|
||||
catch (...) { }
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void MasterDBRMNode::doGetUniqueUint32(ByteStream &msg, ThreadParams *p)
|
||||
{
|
||||
ByteStream reply;
|
||||
|
Reference in New Issue
Block a user