1
0
mirror of https://github.com/MariaDB/server.git synced 2025-09-03 20:43:11 +03:00

Merge mysql.com:/home/jonas/src/mysql-4.1-fix

into mysql.com:/home/jonas/src/mysql-5.0-ndb
This commit is contained in:
joreland@mysql.com
2004-11-19 13:24:37 +01:00
21 changed files with 560 additions and 211 deletions

View File

@@ -35,6 +35,8 @@
#include <signaldata/AttrInfo.hpp>
#include <signaldata/TcKeyReq.hpp>
#define DEBUG_NEXT_RESULT 0
NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
NdbOperation(aNdb),
m_resultSet(0),
@@ -277,6 +279,9 @@ NdbScanOperation::fix_receivers(Uint32 parallel){
void
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
if(theError.code == 0){
if(DEBUG_NEXT_RESULT)
ndbout_c("receiver_delivered");
Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1;
if(idx != last){
@@ -300,6 +305,9 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
void
NdbScanOperation::receiver_completed(NdbReceiver* tRec){
if(theError.code == 0){
if(DEBUG_NEXT_RESULT)
ndbout_c("receiver_completed");
Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1;
if(idx != last){
@@ -393,8 +401,6 @@ NdbScanOperation::executeCursor(int nodeId){
return -1;
}
#define DEBUG_NEXT_RESULT 0
int NdbScanOperation::nextResult(bool fetchAllowed)
{
if(m_ordered)
@@ -527,7 +533,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
int
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
if(cnt > 0 || stopScanFlag){
if(cnt > 0)
{
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
@@ -543,33 +550,40 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
*/
Uint32 last = m_sent_receivers_count;
Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
Uint32 sent = 0;
for(Uint32 i = 0; i<cnt; i++){
NdbReceiver * tRec = m_api_receivers[i];
m_sent_receivers[last+i] = tRec;
tRec->m_list_index = last+i;
prep_array[i] = tRec->m_tcPtrI;
tRec->prepareSend();
if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
{
m_sent_receivers[last+sent] = tRec;
tRec->m_list_index = last+sent;
tRec->prepareSend();
sent++;
}
}
memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*));
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
int ret;
if(cnt > 21){
tSignal.setLength(4);
LinearSectionPtr ptr[3];
ptr[0].p = prep_array;
ptr[0].sz = cnt;
ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
} else {
tSignal.setLength(4+cnt);
ret = tp->sendSignal(&tSignal, nodeId);
int ret = 0;
if(sent)
{
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
if(cnt > 21 && !stopScanFlag){
tSignal.setLength(4);
LinearSectionPtr ptr[3];
ptr[0].p = prep_array;
ptr[0].sz = sent;
ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
} else {
tSignal.setLength(4+(stopScanFlag ? 0 : sent));
ret = tp->sendSignal(&tSignal, nodeId);
}
}
m_sent_receivers_count = last + cnt + stopScanFlag;
m_sent_receivers_count = last + sent;
m_api_receivers_count -= cnt;
m_current_api_receiver = 0;
return ret;
}
return 0;
@@ -1376,10 +1390,22 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
if(idx == theParallelism)
return 0;
NdbReceiver* tRec = m_api_receivers[idx];
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
Uint32 last = m_sent_receivers_count;
Uint32* theData = tSignal.getDataPtrSend();
Uint32* prep_array = theData + 4;
m_current_api_receiver = idx + 1;
if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
{
if(DEBUG_NEXT_RESULT)
ndbout_c("receiver completed, don't send");
return 0;
}
theData[0] = theNdbCon->theTCConPtr;
theData[1] = 0;
Uint64 transId = theNdbCon->theTransactionId;
@@ -1389,17 +1415,10 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
/**
* Prepare ops
*/
Uint32 last = m_sent_receivers_count;
Uint32 * prep_array = theData + 4;
NdbReceiver * tRec = m_api_receivers[idx];
m_sent_receivers[last] = tRec;
tRec->m_list_index = last;
prep_array[0] = tRec->m_tcPtrI;
tRec->prepareSend();
m_sent_receivers_count = last + 1;
m_current_api_receiver = idx + 1;
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
@@ -1412,12 +1431,17 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq != tp->getNodeSequence(nodeId)){
if(seq != tp->getNodeSequence(nodeId))
{
theNdbCon->theReleaseOnClose = true;
return -1;
}
while(theError.code == 0 && m_sent_receivers_count){
/**
* Wait for outstanding
*/
while(theError.code == 0 && m_sent_receivers_count)
{
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@@ -1435,18 +1459,52 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
}
}
if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan
if(send_next_scan(0, true) == -1){ // Close scan
theNdbCon->theReleaseOnClose = true;
return -1;
}
/**
* move all conf'ed into api
* so that send_next_scan can check if they needs to be closed
*/
Uint32 api = m_api_receivers_count;
Uint32 conf = m_conf_receivers_count;
if(m_ordered)
{
/**
* Ordered scan, keep the m_api_receivers "to the right"
*/
memmove(m_api_receivers, m_api_receivers+m_current_api_receiver,
(theParallelism - m_current_api_receiver) * sizeof(char*));
api = (theParallelism - m_current_api_receiver);
m_api_receivers_count = api;
}
if(DEBUG_NEXT_RESULT)
ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
m_ordered, api, conf,
m_sent_receivers_count, m_current_api_receiver, theParallelism);
if(api+conf)
{
/**
* There's something to close
* setup m_api_receivers (for send_next_scan)
*/
memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
m_api_receivers_count = api + conf;
m_conf_receivers_count = 0;
}
// Send close scan
if(send_next_scan(api+conf, true) == -1)
{
theNdbCon->theReleaseOnClose = true;
return -1;
}
/**
* wait for close scan conf
*/
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@@ -1463,6 +1521,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
return -1;
}
}
return 0;
}