You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-27 21:01:50 +03:00
MCOL-769 Add command to get the uncommitted lbids
We need this to be able to commit them
This commit is contained in:
@ -438,6 +438,7 @@ const uint8_t GET_SYSTEM_STATE = 54;
|
|||||||
const uint8_t SET_SYSTEM_STATE = 55;
|
const uint8_t SET_SYSTEM_STATE = 55;
|
||||||
const uint8_t GET_UNIQUE_UINT64 = 56;
|
const uint8_t GET_UNIQUE_UINT64 = 56;
|
||||||
const uint8_t CLEAR_SYSTEM_STATE = 57;
|
const uint8_t CLEAR_SYSTEM_STATE = 57;
|
||||||
|
const uint8_t GET_UNCOMMITTED_LBIDS = 58;
|
||||||
|
|
||||||
/* OID Manager interface */
|
/* OID Manager interface */
|
||||||
const uint8_t ALLOC_OIDS = 60;
|
const uint8_t ALLOC_OIDS = 60;
|
||||||
|
@ -379,6 +379,7 @@ void MasterDBRMNode::msgProcessor()
|
|||||||
case SET_SYSTEM_STATE: doSetSystemState(msg, p); continue;
|
case SET_SYSTEM_STATE: doSetSystemState(msg, p); continue;
|
||||||
case CLEAR_SYSTEM_STATE: doClearSystemState(msg, p); continue;
|
case CLEAR_SYSTEM_STATE: doClearSystemState(msg, p); continue;
|
||||||
case SM_RESET: doSessionManagerReset(msg, p); continue;
|
case SM_RESET: doSessionManagerReset(msg, p); continue;
|
||||||
|
case GET_UNCOMMITTED_LBIDS: doGetUncommittedLbids(msg, p); continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Process TableLock calls */
|
/* Process TableLock calls */
|
||||||
@ -1292,6 +1293,98 @@ void MasterDBRMNode::doSIDTIDMap(ByteStream &msg, ThreadParams *p)
|
|||||||
catch (...) { }
|
catch (...) { }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MasterDBRMNode::doGetUncommittedLbids(ByteStream &msg, ThreadParams *p)
|
||||||
|
{
|
||||||
|
ByteStream reply;
|
||||||
|
vector<LBID_t> lbidList;
|
||||||
|
VSS vss;
|
||||||
|
ExtentMap em;
|
||||||
|
bool locked = false;
|
||||||
|
vector<LBID_t>::iterator lbidIt;
|
||||||
|
typedef pair<int64_t, int64_t> range_t;
|
||||||
|
range_t range;
|
||||||
|
vector<range_t> ranges;
|
||||||
|
vector<range_t>::iterator rangeIt;
|
||||||
|
ByteStream::byte cmd;
|
||||||
|
ByteStream::quadbyte transID;
|
||||||
|
msg >> cmd;
|
||||||
|
msg >> transID;
|
||||||
|
try {
|
||||||
|
vss.lock(VSS::READ);
|
||||||
|
locked = true;
|
||||||
|
|
||||||
|
// Get a full list of uncommitted LBIDs related to this transactin.
|
||||||
|
vss.getUncommittedLBIDs(transID, lbidList);
|
||||||
|
|
||||||
|
vss.release(VSS::READ);
|
||||||
|
locked = false;
|
||||||
|
|
||||||
|
if(lbidList.size() > 0) {
|
||||||
|
|
||||||
|
// Sort the vector.
|
||||||
|
std::sort<vector<LBID_t>::iterator>(lbidList.begin(), lbidList.end());
|
||||||
|
|
||||||
|
// Get the LBID range for the first block in the list.
|
||||||
|
lbidIt = lbidList.begin();
|
||||||
|
if (em.lookup(*lbidIt, range.first, range.second) < 0) {
|
||||||
|
reply.reset();
|
||||||
|
reply << (uint8_t) ERR_FAILURE;
|
||||||
|
try {
|
||||||
|
p->sock->write(reply);
|
||||||
|
}
|
||||||
|
catch (...) { }
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ranges.push_back(range);
|
||||||
|
|
||||||
|
// Loop through the LBIDs and add the new ranges.
|
||||||
|
++lbidIt;
|
||||||
|
while(lbidIt != lbidList.end()) {
|
||||||
|
if (*lbidIt > range.second) {
|
||||||
|
if (em.lookup(*lbidIt, range.first, range.second) < 0) {
|
||||||
|
reply.reset();
|
||||||
|
reply << (uint8_t) ERR_FAILURE;
|
||||||
|
try {
|
||||||
|
p->sock->write(reply);
|
||||||
|
}
|
||||||
|
catch (...) { }
|
||||||
|
return;
|
||||||
|
|
||||||
|
}
|
||||||
|
ranges.push_back(range);
|
||||||
|
}
|
||||||
|
++lbidIt;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset the lbidList and return only the first LBID in each extent that was changed
|
||||||
|
// in the transaction.
|
||||||
|
lbidList.clear();
|
||||||
|
for (rangeIt = ranges.begin(); rangeIt != ranges.end(); rangeIt++) {
|
||||||
|
lbidList.push_back(rangeIt->first);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reply << (uint8_t) ERR_OK;
|
||||||
|
serializeInlineVector(reply, lbidList);
|
||||||
|
try {
|
||||||
|
p->sock->write(reply);
|
||||||
|
}
|
||||||
|
catch (...) { }
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
catch (exception &e) {
|
||||||
|
if (locked)
|
||||||
|
vss.release(VSS::READ);
|
||||||
|
reply.reset();
|
||||||
|
reply << (uint8_t) ERR_FAILURE;
|
||||||
|
try {
|
||||||
|
p->sock->write(reply);
|
||||||
|
}
|
||||||
|
catch (...) { }
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void MasterDBRMNode::doGetUniqueUint32(ByteStream &msg, ThreadParams *p)
|
void MasterDBRMNode::doGetUniqueUint32(ByteStream &msg, ThreadParams *p)
|
||||||
{
|
{
|
||||||
ByteStream reply;
|
ByteStream reply;
|
||||||
|
@ -193,6 +193,8 @@ private:
|
|||||||
void doSetSystemState(messageqcpp::ByteStream &msg, ThreadParams *p);
|
void doSetSystemState(messageqcpp::ByteStream &msg, ThreadParams *p);
|
||||||
void doClearSystemState(messageqcpp::ByteStream &msg, ThreadParams *p);
|
void doClearSystemState(messageqcpp::ByteStream &msg, ThreadParams *p);
|
||||||
void doSessionManagerReset(messageqcpp::ByteStream &msg, ThreadParams *p);
|
void doSessionManagerReset(messageqcpp::ByteStream &msg, ThreadParams *p);
|
||||||
|
void doGetUncommittedLbids(messageqcpp::ByteStream &msg, ThreadParams *p);
|
||||||
|
|
||||||
|
|
||||||
/* OID Manager interface */
|
/* OID Manager interface */
|
||||||
OIDServer oids;
|
OIDServer oids;
|
||||||
|
Reference in New Issue
Block a user