mirror of
https://github.com/MariaDB/server.git
synced 2025-09-05 08:04:25 +03:00
Merge
This commit is contained in:
@@ -453,10 +453,12 @@ NdbScanOperation::executeCursor(int nodeId){
|
||||
return -1;
|
||||
}
|
||||
|
||||
int NdbScanOperation::nextResult(bool fetchAllowed)
|
||||
|
||||
int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
|
||||
{
|
||||
if(m_ordered)
|
||||
return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed);
|
||||
return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
|
||||
forceSend);
|
||||
|
||||
/**
|
||||
* Check current receiver
|
||||
@@ -493,7 +495,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
|
||||
TransporterFacade* tp = TransporterFacade::instance();
|
||||
Guard guard(tp->theMutexPtr);
|
||||
Uint32 seq = theNdbCon->theNodeSequence;
|
||||
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){
|
||||
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
|
||||
forceSend) == 0){
|
||||
|
||||
idx = m_current_api_receiver;
|
||||
last = m_api_receivers_count;
|
||||
@@ -584,9 +587,9 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
|
||||
}
|
||||
|
||||
int
|
||||
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
|
||||
if(cnt > 0)
|
||||
{
|
||||
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
|
||||
bool forceSend){
|
||||
if(cnt > 0){
|
||||
NdbApiSignal tSignal(theNdb->theMyRef);
|
||||
tSignal.setSignal(GSN_SCAN_NEXTREQ);
|
||||
|
||||
@@ -633,6 +636,8 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
|
||||
}
|
||||
}
|
||||
|
||||
if (!ret) checkForceSend(forceSend);
|
||||
|
||||
m_sent_receivers_count = last + sent;
|
||||
m_api_receivers_count -= cnt;
|
||||
m_current_api_receiver = 0;
|
||||
@@ -642,6 +647,15 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
|
||||
return 0;
|
||||
}
|
||||
|
||||
void NdbScanOperation::checkForceSend(bool forceSend)
|
||||
{
|
||||
if (forceSend) {
|
||||
TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
|
||||
} else {
|
||||
TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
|
||||
}//if
|
||||
}
|
||||
|
||||
int
|
||||
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
|
||||
{
|
||||
@@ -657,7 +671,7 @@ NdbScanOperation::doSend(int ProcessorId)
|
||||
return 0;
|
||||
}
|
||||
|
||||
void NdbScanOperation::closeScan()
|
||||
void NdbScanOperation::closeScan(bool forceSend)
|
||||
{
|
||||
if(m_transConnection){
|
||||
if(DEBUG_NEXT_RESULT)
|
||||
@@ -672,7 +686,7 @@ void NdbScanOperation::closeScan()
|
||||
|
||||
TransporterFacade* tp = TransporterFacade::instance();
|
||||
Guard guard(tp->theMutexPtr);
|
||||
close_impl(tp);
|
||||
close_impl(tp, forceSend);
|
||||
|
||||
} while(0);
|
||||
|
||||
@@ -1309,7 +1323,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
|
||||
}
|
||||
|
||||
int
|
||||
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
|
||||
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
|
||||
bool forceSend){
|
||||
|
||||
Uint32 u_idx = 0, u_last = 0;
|
||||
Uint32 s_idx = m_current_api_receiver; // first sorted
|
||||
@@ -1335,7 +1350,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
|
||||
Guard guard(tp->theMutexPtr);
|
||||
Uint32 seq = theNdbCon->theNodeSequence;
|
||||
Uint32 nodeId = theNdbCon->theDBnode;
|
||||
if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){
|
||||
if(seq == tp->getNodeSequence(nodeId) &&
|
||||
!send_next_scan_ordered(s_idx, forceSend)){
|
||||
Uint32 tmp = m_sent_receivers_count;
|
||||
s_idx = m_current_api_receiver;
|
||||
while(m_sent_receivers_count > 0 && !theError.code){
|
||||
@@ -1424,7 +1440,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
|
||||
}
|
||||
|
||||
int
|
||||
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
|
||||
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
|
||||
if(idx == theParallelism)
|
||||
return 0;
|
||||
|
||||
@@ -1461,11 +1477,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
|
||||
Uint32 nodeId = theNdbCon->theDBnode;
|
||||
TransporterFacade * tp = TransporterFacade::instance();
|
||||
tSignal.setLength(4+1);
|
||||
return tp->sendSignal(&tSignal, nodeId);
|
||||
int ret= tp->sendSignal(&tSignal, nodeId);
|
||||
if (!ret) checkForceSend(forceSend);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int
|
||||
NdbScanOperation::close_impl(TransporterFacade* tp){
|
||||
NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
|
||||
Uint32 seq = theNdbCon->theNodeSequence;
|
||||
Uint32 nodeId = theNdbCon->theDBnode;
|
||||
|
||||
@@ -1532,7 +1550,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
|
||||
}
|
||||
|
||||
// Send close scan
|
||||
if(send_next_scan(api+conf, true) == -1)
|
||||
if(send_next_scan(api+conf, true, forceSend) == -1)
|
||||
{
|
||||
theNdbCon->theReleaseOnClose = true;
|
||||
return -1;
|
||||
@@ -1581,7 +1599,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
|
||||
}
|
||||
|
||||
int
|
||||
NdbScanOperation::restart()
|
||||
NdbScanOperation::restart(bool forceSend)
|
||||
{
|
||||
|
||||
TransporterFacade* tp = TransporterFacade::instance();
|
||||
@@ -1590,7 +1608,7 @@ NdbScanOperation::restart()
|
||||
|
||||
{
|
||||
int res;
|
||||
if((res= close_impl(tp)))
|
||||
if((res= close_impl(tp, forceSend)))
|
||||
{
|
||||
return res;
|
||||
}
|
||||
@@ -1609,13 +1627,13 @@ NdbScanOperation::restart()
|
||||
}
|
||||
|
||||
int
|
||||
NdbIndexScanOperation::reset_bounds(){
|
||||
NdbIndexScanOperation::reset_bounds(bool forceSend){
|
||||
int res;
|
||||
|
||||
{
|
||||
TransporterFacade* tp = TransporterFacade::instance();
|
||||
Guard guard(tp->theMutexPtr);
|
||||
res= close_impl(tp);
|
||||
res= close_impl(tp, forceSend);
|
||||
}
|
||||
|
||||
if(!res)
|
||||
|
Reference in New Issue
Block a user