mirror of
https://github.com/MariaDB/server.git
synced 2025-05-28 13:01:41 +03:00
Merge perch.ndb.mysql.com:/home/jonas/src/50-work
into perch.ndb.mysql.com:/home/jonas/src/51-work storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp: Auto merged storage/ndb/include/ndbapi/NdbTransaction.hpp: merge storage/ndb/src/ndbapi/NdbScanOperation.cpp: merge storage/ndb/src/ndbapi/NdbTransaction.cpp: merge
This commit is contained in:
commit
05ba11bff4
@ -658,8 +658,11 @@ private:
|
||||
// Release all cursor operations in connection
|
||||
void releaseOps(NdbOperation*);
|
||||
void releaseScanOperations(NdbIndexScanOperation*);
|
||||
void releaseScanOperation(NdbIndexScanOperation*);
|
||||
|
||||
bool releaseScanOperation(NdbIndexScanOperation** listhead,
|
||||
NdbIndexScanOperation** listtail,
|
||||
NdbIndexScanOperation* op);
|
||||
void releaseExecutedScanOperation(NdbIndexScanOperation*);
|
||||
|
||||
// Set the transaction identity of the transaction
|
||||
void setTransactionId(Uint64 aTransactionId);
|
||||
|
||||
|
@ -10015,73 +10015,84 @@ void Dbdih::startNextChkpt(Signal* signal)
|
||||
nodePtr.i = replicaPtr.p->procNode;
|
||||
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
|
||||
|
||||
if (replicaPtr.p->lcpOngoingFlag &&
|
||||
replicaPtr.p->lcpIdStarted < lcpId) {
|
||||
jam();
|
||||
//-------------------------------------------------------------------
|
||||
// We have found a replica on a node that performs local checkpoint
|
||||
// that is alive and that have not yet been started.
|
||||
//-------------------------------------------------------------------
|
||||
|
||||
if (nodePtr.p->noOfStartedChkpt < 2) {
|
||||
jam();
|
||||
/**
|
||||
* Send LCP_FRAG_ORD to LQH
|
||||
*/
|
||||
if (c_lcpState.m_participatingLQH.get(nodePtr.i))
|
||||
{
|
||||
if (replicaPtr.p->lcpOngoingFlag &&
|
||||
replicaPtr.p->lcpIdStarted < lcpId)
|
||||
{
|
||||
jam();
|
||||
//-------------------------------------------------------------------
|
||||
// We have found a replica on a node that performs local checkpoint
|
||||
// that is alive and that have not yet been started.
|
||||
//-------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Mark the replica so with lcpIdStarted == true
|
||||
*/
|
||||
replicaPtr.p->lcpIdStarted = lcpId;
|
||||
|
||||
Uint32 i = nodePtr.p->noOfStartedChkpt;
|
||||
nodePtr.p->startedChkpt[i].tableId = tabPtr.i;
|
||||
nodePtr.p->startedChkpt[i].fragId = curr.fragmentId;
|
||||
nodePtr.p->startedChkpt[i].replicaPtr = replicaPtr.i;
|
||||
nodePtr.p->noOfStartedChkpt = i + 1;
|
||||
|
||||
sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
|
||||
} else if (nodePtr.p->noOfQueuedChkpt < 2) {
|
||||
jam();
|
||||
/**
|
||||
* Put LCP_FRAG_ORD "in queue"
|
||||
*/
|
||||
|
||||
/**
|
||||
* Mark the replica so with lcpIdStarted == true
|
||||
*/
|
||||
replicaPtr.p->lcpIdStarted = lcpId;
|
||||
|
||||
Uint32 i = nodePtr.p->noOfQueuedChkpt;
|
||||
nodePtr.p->queuedChkpt[i].tableId = tabPtr.i;
|
||||
nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
|
||||
nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
|
||||
nodePtr.p->noOfQueuedChkpt = i + 1;
|
||||
} else {
|
||||
jam();
|
||||
|
||||
if(save){
|
||||
if (nodePtr.p->noOfStartedChkpt < 2)
|
||||
{
|
||||
jam();
|
||||
/**
|
||||
* Stop increasing value on first that was "full"
|
||||
* Send LCP_FRAG_ORD to LQH
|
||||
*/
|
||||
c_lcpState.currentFragment = curr;
|
||||
save = false;
|
||||
}
|
||||
|
||||
busyNodes.set(nodePtr.i);
|
||||
if(busyNodes.count() == lcpNodes){
|
||||
|
||||
/**
|
||||
* There were no possibility to start the local checkpoint
|
||||
* and it was not possible to queue it up. In this case we
|
||||
* stop the start of local checkpoints until the nodes with a
|
||||
* backlog have performed more checkpoints. We will return and
|
||||
* will not continue the process of starting any more checkpoints.
|
||||
* Mark the replica so with lcpIdStarted == true
|
||||
*/
|
||||
return;
|
||||
replicaPtr.p->lcpIdStarted = lcpId;
|
||||
|
||||
Uint32 i = nodePtr.p->noOfStartedChkpt;
|
||||
nodePtr.p->startedChkpt[i].tableId = tabPtr.i;
|
||||
nodePtr.p->startedChkpt[i].fragId = curr.fragmentId;
|
||||
nodePtr.p->startedChkpt[i].replicaPtr = replicaPtr.i;
|
||||
nodePtr.p->noOfStartedChkpt = i + 1;
|
||||
|
||||
sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
|
||||
}
|
||||
else if (nodePtr.p->noOfQueuedChkpt < 2)
|
||||
{
|
||||
jam();
|
||||
/**
|
||||
* Put LCP_FRAG_ORD "in queue"
|
||||
*/
|
||||
|
||||
/**
|
||||
* Mark the replica so with lcpIdStarted == true
|
||||
*/
|
||||
replicaPtr.p->lcpIdStarted = lcpId;
|
||||
|
||||
Uint32 i = nodePtr.p->noOfQueuedChkpt;
|
||||
nodePtr.p->queuedChkpt[i].tableId = tabPtr.i;
|
||||
nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
|
||||
nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
|
||||
nodePtr.p->noOfQueuedChkpt = i + 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
jam();
|
||||
|
||||
if(save)
|
||||
{
|
||||
/**
|
||||
* Stop increasing value on first that was "full"
|
||||
*/
|
||||
c_lcpState.currentFragment = curr;
|
||||
save = false;
|
||||
}
|
||||
|
||||
busyNodes.set(nodePtr.i);
|
||||
if(busyNodes.count() == lcpNodes)
|
||||
{
|
||||
/**
|
||||
* There were no possibility to start the local checkpoint
|
||||
* and it was not possible to queue it up. In this case we
|
||||
* stop the start of local checkpoints until the nodes with a
|
||||
* backlog have performed more checkpoints. We will return and
|
||||
* will not continue the process of starting any more checkpoints.
|
||||
*/
|
||||
return;
|
||||
}//if
|
||||
}//if
|
||||
}//if
|
||||
}
|
||||
}//while
|
||||
}
|
||||
}//while
|
||||
}
|
||||
curr.fragmentId++;
|
||||
if (curr.fragmentId >= tabPtr.p->totalfragments) {
|
||||
jam();
|
||||
|
@ -694,9 +694,27 @@ void NdbScanOperation::close(bool forceSend, bool releaseOp)
|
||||
theNdbCon = NULL;
|
||||
m_transConnection = NULL;
|
||||
|
||||
if (releaseOp && tTransCon) {
|
||||
if (tTransCon)
|
||||
{
|
||||
NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
|
||||
tTransCon->releaseScanOperation(tOp);
|
||||
|
||||
bool ret = true;
|
||||
if (theStatus != WaitResponse)
|
||||
{
|
||||
/**
|
||||
* Not executed yet
|
||||
*/
|
||||
ret =
|
||||
tTransCon->releaseScanOperation(&tTransCon->m_theFirstScanOperation,
|
||||
&tTransCon->m_theLastScanOperation,
|
||||
tOp);
|
||||
}
|
||||
else if (releaseOp)
|
||||
{
|
||||
ret = tTransCon->releaseScanOperation(&tTransCon->m_firstExecutedScanOp,
|
||||
0, tOp);
|
||||
}
|
||||
assert(ret);
|
||||
}
|
||||
|
||||
tCon->theScanningOp = 0;
|
||||
|
@ -979,50 +979,59 @@ Remark: Release scan op when hupp'ed trans closed (save memory)
|
||||
void
|
||||
NdbTransaction::releaseScanOperation(NdbIndexScanOperation* cursorOp)
|
||||
{
|
||||
DBUG_ENTER("NdbTransaction::releaseScanOperation");
|
||||
DBUG_ENTER("NdbTransaction::releaseExecutedScanOperation");
|
||||
DBUG_PRINT("enter", ("this=0x%x op=0x%x", (UintPtr)this, (UintPtr)cursorOp));
|
||||
|
||||
releaseScanOperation(&m_firstExecutedScanOp, 0, cursorOp);
|
||||
|
||||
DBUG_VOID_RETURN;
|
||||
}//NdbTransaction::releaseExecutedScanOperation()
|
||||
|
||||
// here is one reason to make op lists doubly linked
|
||||
if (cursorOp->m_executed)
|
||||
bool
|
||||
NdbTransaction::releaseScanOperation(NdbIndexScanOperation** listhead,
|
||||
NdbIndexScanOperation** listtail,
|
||||
NdbIndexScanOperation* op)
|
||||
{
|
||||
if (* listhead == op)
|
||||
{
|
||||
if (m_firstExecutedScanOp == cursorOp) {
|
||||
m_firstExecutedScanOp = (NdbIndexScanOperation*)cursorOp->theNext;
|
||||
cursorOp->release();
|
||||
theNdb->releaseScanOperation(cursorOp);
|
||||
} else if (m_firstExecutedScanOp != NULL) {
|
||||
NdbIndexScanOperation* tOp = m_firstExecutedScanOp;
|
||||
while (tOp->theNext != NULL) {
|
||||
if (tOp->theNext == cursorOp) {
|
||||
tOp->theNext = cursorOp->theNext;
|
||||
cursorOp->release();
|
||||
theNdb->releaseScanOperation(cursorOp);
|
||||
break;
|
||||
}
|
||||
tOp = (NdbIndexScanOperation*)tOp->theNext;
|
||||
}
|
||||
* listhead = (NdbIndexScanOperation*)op->theNext;
|
||||
if (listtail && *listtail == op)
|
||||
{
|
||||
assert(* listhead == 0);
|
||||
* listtail = 0;
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
if (m_theFirstScanOperation == cursorOp) {
|
||||
m_theFirstScanOperation = (NdbIndexScanOperation*)cursorOp->theNext;
|
||||
cursorOp->release();
|
||||
theNdb->releaseScanOperation(cursorOp);
|
||||
} else if (m_theFirstScanOperation != NULL) {
|
||||
NdbIndexScanOperation* tOp = m_theFirstScanOperation;
|
||||
while (tOp->theNext != NULL) {
|
||||
if (tOp->theNext == cursorOp) {
|
||||
tOp->theNext = cursorOp->theNext;
|
||||
cursorOp->release();
|
||||
theNdb->releaseScanOperation(cursorOp);
|
||||
break;
|
||||
}
|
||||
tOp = (NdbIndexScanOperation*)tOp->theNext;
|
||||
NdbIndexScanOperation* tmp = * listhead;
|
||||
while (tmp != NULL)
|
||||
{
|
||||
if (tmp->theNext == op)
|
||||
{
|
||||
tmp->theNext = (NdbIndexScanOperation*)op->theNext;
|
||||
if (listtail && *listtail == op)
|
||||
{
|
||||
assert(op->theNext == 0);
|
||||
*listtail = tmp;
|
||||
}
|
||||
break;
|
||||
}
|
||||
tmp = (NdbIndexScanOperation*)tmp->theNext;
|
||||
}
|
||||
if (tmp == NULL)
|
||||
op = NULL;
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}//NdbTransaction::releaseScanOperation()
|
||||
|
||||
if (op != NULL)
|
||||
{
|
||||
op->release();
|
||||
theNdb->releaseScanOperation(op);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/*****************************************************************************
|
||||
NdbOperation* getNdbOperation(const char* aTableName);
|
||||
|
Loading…
x
Reference in New Issue
Block a user