mirror of
https://github.com/MariaDB/server.git
synced 2025-07-30 16:24:05 +03:00
bug#9749 - ndb lock upgrade
handle more cases... ndb/src/kernel/blocks/dbacc/Dbacc.hpp: bug#9749 - ndb lock upgrade ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: bug#9749 - ndb lock upgrade ndb/test/ndbapi/testOperations.cpp: bug#9749 - ndb lock upgrade
This commit is contained in:
@ -1100,6 +1100,8 @@ private:
|
|||||||
Uint32 executeNextOperation(Signal* signal);
|
Uint32 executeNextOperation(Signal* signal);
|
||||||
void releaselock(Signal* signal);
|
void releaselock(Signal* signal);
|
||||||
void takeOutFragWaitQue(Signal* signal);
|
void takeOutFragWaitQue(Signal* signal);
|
||||||
|
void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner,
|
||||||
|
OperationrecPtr release_op);
|
||||||
void allocOverflowPage(Signal* signal);
|
void allocOverflowPage(Signal* signal);
|
||||||
bool getrootfragmentrec(Signal* signal, RootfragmentrecPtr&, Uint32 fragId);
|
bool getrootfragmentrec(Signal* signal, RootfragmentrecPtr&, Uint32 fragId);
|
||||||
void insertLockOwnersList(Signal* signal, const OperationrecPtr&);
|
void insertLockOwnersList(Signal* signal, const OperationrecPtr&);
|
||||||
|
@ -5802,9 +5802,148 @@ void Dbacc::commitOperation(Signal* signal)
|
|||||||
ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
|
ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
|
||||||
tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
|
tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
|
||||||
}//if
|
}//if
|
||||||
}//if
|
|
||||||
|
/**
|
||||||
|
* Check possible lock upgrade
|
||||||
|
* 1) Find lock owner
|
||||||
|
* 2) Count transactions in parallel que
|
||||||
|
* 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner)
|
||||||
|
* upgrade next serial
|
||||||
|
*/
|
||||||
|
if(operationRecPtr.p->lockMode)
|
||||||
|
{
|
||||||
|
jam();
|
||||||
|
/**
|
||||||
|
* Committing a non shared operation can't lead to lock upgrade
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
OperationrecPtr lock_owner;
|
||||||
|
lock_owner.i = operationRecPtr.p->prevParallelQue;
|
||||||
|
ptrCheckGuard(lock_owner, coprecsize, operationrec);
|
||||||
|
Uint32 transid[2] = { lock_owner.p->transId1,
|
||||||
|
lock_owner.p->transId2 };
|
||||||
|
|
||||||
|
|
||||||
|
while(lock_owner.p->prevParallelQue != RNIL)
|
||||||
|
{
|
||||||
|
lock_owner.i = lock_owner.p->prevParallelQue;
|
||||||
|
ptrCheckGuard(lock_owner, coprecsize, operationrec);
|
||||||
|
|
||||||
|
if(lock_owner.p->transId1 != transid[0] ||
|
||||||
|
lock_owner.p->transId2 != transid[1])
|
||||||
|
{
|
||||||
|
jam();
|
||||||
|
/**
|
||||||
|
* If more than 1 trans in lock queue -> no lock upgrade
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
check_lock_upgrade(signal, lock_owner, operationRecPtr);
|
||||||
|
}
|
||||||
}//Dbacc::commitOperation()
|
}//Dbacc::commitOperation()
|
||||||
|
|
||||||
|
void
|
||||||
|
Dbacc::check_lock_upgrade(Signal* signal,
|
||||||
|
OperationrecPtr lock_owner,
|
||||||
|
OperationrecPtr release_op)
|
||||||
|
{
|
||||||
|
if((lock_owner.p->transId1 == release_op.p->transId1 &&
|
||||||
|
lock_owner.p->transId2 == release_op.p->transId2) ||
|
||||||
|
release_op.p->lockMode ||
|
||||||
|
lock_owner.p->nextSerialQue == RNIL)
|
||||||
|
{
|
||||||
|
jam();
|
||||||
|
/**
|
||||||
|
* No lock upgrade if same trans or lock owner has no serial queue
|
||||||
|
* or releasing non shared op
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
OperationrecPtr next;
|
||||||
|
next.i = lock_owner.p->nextSerialQue;
|
||||||
|
ptrCheckGuard(next, coprecsize, operationrec);
|
||||||
|
|
||||||
|
if(lock_owner.p->transId1 != next.p->transId1 ||
|
||||||
|
lock_owner.p->transId2 != next.p->transId2)
|
||||||
|
{
|
||||||
|
jam();
|
||||||
|
/**
|
||||||
|
* No lock upgrad if !same trans in serial queue
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tgnptMainOpPtr = lock_owner;
|
||||||
|
getNoParallelTransaction(signal);
|
||||||
|
if (tgnptNrTransaction > 1)
|
||||||
|
{
|
||||||
|
jam();
|
||||||
|
/**
|
||||||
|
* No lock upgrade if more than 1 transaction in parallell queue
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
OperationrecPtr tmp;
|
||||||
|
tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue;
|
||||||
|
if(tmp.i != RNIL)
|
||||||
|
{
|
||||||
|
ptrCheckGuard(tmp, coprecsize, operationrec);
|
||||||
|
ndbassert(tmp.p->prevSerialQue == next.i);
|
||||||
|
tmp.p->prevSerialQue = lock_owner.i;
|
||||||
|
}
|
||||||
|
next.p->nextSerialQue = next.p->prevSerialQue = RNIL;
|
||||||
|
|
||||||
|
// Find end of parallell que
|
||||||
|
tmp = lock_owner;
|
||||||
|
tmp.p->lockMode= 1;
|
||||||
|
while(tmp.p->nextParallelQue != RNIL)
|
||||||
|
{
|
||||||
|
jam();
|
||||||
|
tmp.i = tmp.p->nextParallelQue;
|
||||||
|
ptrCheckGuard(tmp, coprecsize, operationrec);
|
||||||
|
tmp.p->lockMode= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
next.p->prevParallelQue = tmp.i;
|
||||||
|
tmp.p->nextParallelQue = next.i;
|
||||||
|
|
||||||
|
OperationrecPtr save = operationRecPtr;
|
||||||
|
|
||||||
|
Uint32 TelementIsDisappeared = 0; // lock upgrade = all reads
|
||||||
|
Uint32 ThashValue = lock_owner.p->hashValue;
|
||||||
|
Uint32 localdata[2];
|
||||||
|
localdata[0] = lock_owner.p->localdata[0];
|
||||||
|
localdata[1] = lock_owner.p->localdata[1];
|
||||||
|
do {
|
||||||
|
next.p->elementIsDisappeared = TelementIsDisappeared;
|
||||||
|
next.p->hashValue = ThashValue;
|
||||||
|
next.p->localdata[0] = localdata[0];
|
||||||
|
next.p->localdata[1] = localdata[1];
|
||||||
|
|
||||||
|
operationRecPtr = next;
|
||||||
|
ndbassert(next.p->lockMode);
|
||||||
|
TelementIsDisappeared = executeNextOperation(signal);
|
||||||
|
if (next.p->nextParallelQue != RNIL)
|
||||||
|
{
|
||||||
|
jam();
|
||||||
|
next.i = next.p->nextParallelQue;
|
||||||
|
ptrCheckGuard(next, coprecsize, operationrec);
|
||||||
|
} else {
|
||||||
|
jam();
|
||||||
|
break;
|
||||||
|
}//if
|
||||||
|
} while (1);
|
||||||
|
|
||||||
|
operationRecPtr = save;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------------------- */
|
/* ------------------------------------------------------------------------- */
|
||||||
/* RELEASELOCK */
|
/* RELEASELOCK */
|
||||||
/* RESETS LOCK OF AN ELEMENT. */
|
/* RESETS LOCK OF AN ELEMENT. */
|
||||||
@ -5841,6 +5980,8 @@ void Dbacc::releaselock(Signal* signal)
|
|||||||
ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
|
ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
|
||||||
trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i;
|
trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i;
|
||||||
}//if
|
}//if
|
||||||
|
|
||||||
|
check_lock_upgrade(signal, copyInOperPtr, operationRecPtr);
|
||||||
/* --------------------------------------------------------------------------------- */
|
/* --------------------------------------------------------------------------------- */
|
||||||
/* SINCE THERE ARE STILL ITEMS IN THE PARALLEL QUEUE WE NEED NOT WORRY ABOUT */
|
/* SINCE THERE ARE STILL ITEMS IN THE PARALLEL QUEUE WE NEED NOT WORRY ABOUT */
|
||||||
/* STARTING QUEUED OPERATIONS. THUS WE CAN END HERE. */
|
/* STARTING QUEUED OPERATIONS. THUS WE CAN END HERE. */
|
||||||
|
@ -547,21 +547,64 @@ runLockUpgrade1(NDBT_Context* ctx, NDBT_Step* step){
|
|||||||
do
|
do
|
||||||
{
|
{
|
||||||
CHECK(hugoOps.startTransaction(pNdb) == 0);
|
CHECK(hugoOps.startTransaction(pNdb) == 0);
|
||||||
CHECK(hugoOps.pkReadRecord(pNdb, 0, 1, NdbOperation::LM_Read) == 0);
|
if(ctx->getProperty("LOCK_UPGRADE", 1) == 1)
|
||||||
CHECK(hugoOps.execute_NoCommit(pNdb) == 0);
|
{
|
||||||
|
CHECK(hugoOps.pkReadRecord(pNdb, 0, 1, NdbOperation::LM_Read) == 0);
|
||||||
|
CHECK(hugoOps.execute_NoCommit(pNdb) == 0);
|
||||||
|
|
||||||
ctx->setProperty("READ_DONE", 1);
|
ctx->setProperty("READ_DONE", 1);
|
||||||
ctx->broadcast();
|
ctx->broadcast();
|
||||||
ndbout_c("wait 2");
|
ndbout_c("wait 2");
|
||||||
ctx->getPropertyWait("READ_DONE", 2);
|
ctx->getPropertyWait("READ_DONE", 2);
|
||||||
ndbout_c("wait 2 - done");
|
ndbout_c("wait 2 - done");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ctx->setProperty("READ_DONE", 1);
|
||||||
|
ctx->broadcast();
|
||||||
|
ctx->getPropertyWait("READ_DONE", 2);
|
||||||
|
ndbout_c("wait 2 - done");
|
||||||
|
CHECK(hugoOps.pkReadRecord(pNdb, 0, 1, NdbOperation::LM_Read) == 0);
|
||||||
|
CHECK(hugoOps.execute_NoCommit(pNdb) == 0);
|
||||||
|
}
|
||||||
|
if(ctx->getProperty("LU_OP", o_INS) == o_INS)
|
||||||
|
{
|
||||||
|
CHECK(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
|
||||||
|
CHECK(hugoOps.pkInsertRecord(pNdb, 0, 1, 2) == 0);
|
||||||
|
}
|
||||||
|
else if(ctx->getProperty("LU_OP", o_UPD) == o_UPD)
|
||||||
|
{
|
||||||
|
CHECK(hugoOps.pkUpdateRecord(pNdb, 0, 1, 2) == 0);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
CHECK(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
|
||||||
|
}
|
||||||
ctx->setProperty("READ_DONE", 3);
|
ctx->setProperty("READ_DONE", 3);
|
||||||
ctx->broadcast();
|
ctx->broadcast();
|
||||||
ndbout_c("before update");
|
ndbout_c("before update");
|
||||||
CHECK(hugoOps.pkUpdateRecord(pNdb, 0, 1, 2) == 0);
|
|
||||||
ndbout_c("wait update");
|
ndbout_c("wait update");
|
||||||
CHECK(hugoOps.execute_NoCommit(pNdb) == 0);
|
CHECK(hugoOps.execute_Commit(pNdb) == 0);
|
||||||
CHECK(hugoOps.closeTransaction(pNdb));
|
CHECK(hugoOps.closeTransaction(pNdb) == 0);
|
||||||
|
|
||||||
|
CHECK(hugoOps.startTransaction(pNdb) == 0);
|
||||||
|
CHECK(hugoOps.pkReadRecord(pNdb, 0, 1) == 0);
|
||||||
|
int res= hugoOps.execute_Commit(pNdb);
|
||||||
|
if(ctx->getProperty("LU_OP", o_INS) == o_INS)
|
||||||
|
{
|
||||||
|
CHECK(res == 0);
|
||||||
|
CHECK(hugoOps.verifyUpdatesValue(2) == 0);
|
||||||
|
}
|
||||||
|
else if(ctx->getProperty("LU_OP", o_UPD) == o_UPD)
|
||||||
|
{
|
||||||
|
CHECK(res == 0);
|
||||||
|
CHECK(hugoOps.verifyUpdatesValue(2) == 0);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
CHECK(res == 626);
|
||||||
|
}
|
||||||
|
|
||||||
} while(0);
|
} while(0);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
@ -592,10 +635,10 @@ runLockUpgrade2(NDBT_Context* ctx, NDBT_Step* step){
|
|||||||
ndbout_c("wait 3 - done");
|
ndbout_c("wait 3 - done");
|
||||||
|
|
||||||
NdbSleep_MilliSleep(200);
|
NdbSleep_MilliSleep(200);
|
||||||
CHECK(hugoOps.execute_Commit(pNdb) == 0);
|
CHECK(hugoOps.execute_Commit(pNdb) == 0);
|
||||||
} while(0);
|
} while(0);
|
||||||
|
|
||||||
return NDBT_FAILED;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
@ -607,11 +650,16 @@ main(int argc, const char** argv){
|
|||||||
|
|
||||||
NDBT_TestSuite ts("testOperations");
|
NDBT_TestSuite ts("testOperations");
|
||||||
|
|
||||||
|
for(Uint32 i = 0; i <6; i++)
|
||||||
{
|
{
|
||||||
BaseString name("bug_9749");
|
BaseString name("bug_9749");
|
||||||
|
name.appfmt("_%d", i);
|
||||||
NDBT_TestCaseImpl1 *pt = new NDBT_TestCaseImpl1(&ts,
|
NDBT_TestCaseImpl1 *pt = new NDBT_TestCaseImpl1(&ts,
|
||||||
name.c_str(), "");
|
name.c_str(), "");
|
||||||
|
|
||||||
|
pt->setProperty("LOCK_UPGRADE", 1 + (i & 1));
|
||||||
|
pt->setProperty("LU_OP", 1 + (i >> 1));
|
||||||
|
|
||||||
pt->addInitializer(new NDBT_Initializer(pt,
|
pt->addInitializer(new NDBT_Initializer(pt,
|
||||||
"runClearTable",
|
"runClearTable",
|
||||||
runClearTable));
|
runClearTable));
|
||||||
|
Reference in New Issue
Block a user