mirror of
https://github.com/MariaDB/server.git
synced 2025-08-29 00:08:14 +03:00
1670 lines
45 KiB
C++
1670 lines
45 KiB
C++
/* Copyright (C) 2003 MySQL AB
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; either version 2 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
|
|
|
#include <ndb_global.h>
|
|
#include <Ndb.hpp>
|
|
#include <NdbScanOperation.hpp>
|
|
#include <NdbIndexScanOperation.hpp>
|
|
#include <NdbTransaction.hpp>
|
|
#include "NdbApiSignal.hpp"
|
|
#include <NdbOut.hpp>
|
|
#include "NdbDictionaryImpl.hpp"
|
|
#include <NdbBlob.hpp>
|
|
|
|
#include <NdbRecAttr.hpp>
|
|
#include <NdbReceiver.hpp>
|
|
|
|
#include <stdlib.h>
|
|
#include <NdbSqlUtil.hpp>
|
|
|
|
#include <signaldata/ScanTab.hpp>
|
|
#include <signaldata/KeyInfo.hpp>
|
|
#include <signaldata/AttrInfo.hpp>
|
|
#include <signaldata/TcKeyReq.hpp>
|
|
|
|
#define DEBUG_NEXT_RESULT 0
|
|
|
|
NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
|
|
NdbOperation(aNdb),
|
|
m_transConnection(NULL)
|
|
{
|
|
theParallelism = 0;
|
|
m_allocated_receivers = 0;
|
|
m_prepared_receivers = 0;
|
|
m_api_receivers = 0;
|
|
m_conf_receivers = 0;
|
|
m_sent_receivers = 0;
|
|
m_receivers = 0;
|
|
m_array = new Uint32[1]; // skip if on delete in fix_receivers
|
|
theSCAN_TABREQ = 0;
|
|
}
|
|
|
|
NdbScanOperation::~NdbScanOperation()
|
|
{
|
|
for(Uint32 i = 0; i<m_allocated_receivers; i++){
|
|
m_receivers[i]->release();
|
|
theNdb->releaseNdbScanRec(m_receivers[i]);
|
|
}
|
|
delete[] m_array;
|
|
}
|
|
|
|
void
|
|
NdbScanOperation::setErrorCode(int aErrorCode){
|
|
NdbTransaction* tmp = theNdbCon;
|
|
theNdbCon = m_transConnection;
|
|
NdbOperation::setErrorCode(aErrorCode);
|
|
theNdbCon = tmp;
|
|
}
|
|
|
|
void
|
|
NdbScanOperation::setErrorCodeAbort(int aErrorCode){
|
|
NdbTransaction* tmp = theNdbCon;
|
|
theNdbCon = m_transConnection;
|
|
NdbOperation::setErrorCodeAbort(aErrorCode);
|
|
theNdbCon = tmp;
|
|
}
|
|
|
|
|
|
/*****************************************************************************
|
|
* int init();
|
|
*
|
|
* Return Value: Return 0 : init was successful.
|
|
* Return -1: In all other case.
|
|
* Remark: Initiates operation record after allocation.
|
|
*****************************************************************************/
|
|
int
|
|
NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
|
|
{
|
|
m_transConnection = myConnection;
|
|
//NdbTransaction* aScanConnection = theNdb->startTransaction(myConnection);
|
|
NdbTransaction* aScanConnection = theNdb->hupp(myConnection);
|
|
if (!aScanConnection){
|
|
setErrorCodeAbort(theNdb->getNdbError().code);
|
|
return -1;
|
|
}
|
|
|
|
// NOTE! The hupped trans becomes the owner of the operation
|
|
if(NdbOperation::init(tab, aScanConnection) != 0){
|
|
return -1;
|
|
}
|
|
|
|
initInterpreter();
|
|
|
|
theStatus = GetValue;
|
|
theOperationType = OpenScanRequest;
|
|
theNdbCon->theMagicNumber = 0xFE11DF;
|
|
theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;
|
|
m_read_range_no = 0;
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|
Uint32 batch,
|
|
Uint32 parallel)
|
|
{
|
|
m_ordered = m_descending = false;
|
|
Uint32 fragCount = m_currentTable->m_fragmentCount;
|
|
|
|
if (parallel > fragCount || parallel == 0) {
|
|
parallel = fragCount;
|
|
}
|
|
|
|
// It is only possible to call openScan if
|
|
// 1. this transcation don't already contain another scan operation
|
|
// 2. this transaction don't already contain other operations
|
|
// 3. theScanOp contains a NdbScanOperation
|
|
if (theNdbCon->theScanningOp != NULL){
|
|
setErrorCode(4605);
|
|
return -1;
|
|
}
|
|
|
|
theNdbCon->theScanningOp = this;
|
|
theLockMode = lm;
|
|
|
|
bool lockExcl, lockHoldMode, readCommitted;
|
|
switch(lm){
|
|
case NdbScanOperation::LM_Read:
|
|
lockExcl = false;
|
|
lockHoldMode = true;
|
|
readCommitted = false;
|
|
break;
|
|
case NdbScanOperation::LM_Exclusive:
|
|
lockExcl = true;
|
|
lockHoldMode = true;
|
|
readCommitted = false;
|
|
break;
|
|
case NdbScanOperation::LM_CommittedRead:
|
|
lockExcl = false;
|
|
lockHoldMode = false;
|
|
readCommitted = true;
|
|
break;
|
|
default:
|
|
setErrorCode(4003);
|
|
return -1;
|
|
}
|
|
|
|
m_keyInfo = lockExcl ? 1 : 0;
|
|
|
|
bool range = false;
|
|
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex)
|
|
{
|
|
if (m_currentTable == m_accessTable){
|
|
// Old way of scanning indexes, should not be allowed
|
|
m_currentTable = theNdb->theDictionary->
|
|
getTable(m_currentTable->m_primaryTable.c_str());
|
|
assert(m_currentTable != NULL);
|
|
}
|
|
assert (m_currentTable != m_accessTable);
|
|
// Modify operation state
|
|
theStatus = GetValue;
|
|
theOperationType = OpenRangeScanRequest;
|
|
range = true;
|
|
}
|
|
|
|
theParallelism = parallel;
|
|
|
|
if(fix_receivers(parallel) == -1){
|
|
setErrorCodeAbort(4000);
|
|
return -1;
|
|
}
|
|
|
|
theSCAN_TABREQ = (!theSCAN_TABREQ ? theNdb->getSignal() : theSCAN_TABREQ);
|
|
if (theSCAN_TABREQ == NULL) {
|
|
setErrorCodeAbort(4000);
|
|
return -1;
|
|
}//if
|
|
|
|
theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ);
|
|
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
|
|
req->apiConnectPtr = theNdbCon->theTCConPtr;
|
|
req->tableId = m_accessTable->m_tableId;
|
|
req->tableSchemaVersion = m_accessTable->m_version;
|
|
req->storedProcId = 0xFFFF;
|
|
req->buddyConPtr = theNdbCon->theBuddyConPtr;
|
|
|
|
Uint32 reqInfo = 0;
|
|
ScanTabReq::setParallelism(reqInfo, parallel);
|
|
ScanTabReq::setScanBatch(reqInfo, 0);
|
|
ScanTabReq::setLockMode(reqInfo, lockExcl);
|
|
ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
|
|
ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
|
|
ScanTabReq::setRangeScanFlag(reqInfo, range);
|
|
req->requestInfo = reqInfo;
|
|
|
|
Uint64 transId = theNdbCon->getTransactionId();
|
|
req->transId1 = (Uint32) transId;
|
|
req->transId2 = (Uint32) (transId >> 32);
|
|
|
|
NdbApiSignal* tSignal = theSCAN_TABREQ->next();
|
|
if(!tSignal)
|
|
{
|
|
theSCAN_TABREQ->next(tSignal = theNdb->getSignal());
|
|
}
|
|
theLastKEYINFO = tSignal;
|
|
|
|
tSignal->setSignal(GSN_KEYINFO);
|
|
theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
|
|
theTotalNrOfKeyWordInSignal= 0;
|
|
|
|
getFirstATTRINFOScan();
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
NdbScanOperation::fix_receivers(Uint32 parallel){
|
|
assert(parallel > 0);
|
|
if(parallel > m_allocated_receivers){
|
|
const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32));
|
|
|
|
Uint64 * tmp = new Uint64[(sz+7)/8];
|
|
// Save old receivers
|
|
memcpy(tmp, m_receivers, m_allocated_receivers*sizeof(char*));
|
|
delete[] m_array;
|
|
m_array = (Uint32*)tmp;
|
|
|
|
m_receivers = (NdbReceiver**)tmp;
|
|
m_api_receivers = m_receivers + parallel;
|
|
m_conf_receivers = m_api_receivers + parallel;
|
|
m_sent_receivers = m_conf_receivers + parallel;
|
|
m_prepared_receivers = (Uint32*)(m_sent_receivers + parallel);
|
|
|
|
// Only get/init "new" receivers
|
|
NdbReceiver* tScanRec;
|
|
for (Uint32 i = m_allocated_receivers; i < parallel; i ++) {
|
|
tScanRec = theNdb->getNdbScanRec();
|
|
if (tScanRec == NULL) {
|
|
setErrorCodeAbort(4000);
|
|
return -1;
|
|
}//if
|
|
m_receivers[i] = tScanRec;
|
|
tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this);
|
|
}
|
|
m_allocated_receivers = parallel;
|
|
}
|
|
|
|
reset_receivers(parallel, 0);
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Move receiver from send array to conf:ed array
|
|
*/
|
|
void
|
|
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
|
|
if(theError.code == 0){
|
|
if(DEBUG_NEXT_RESULT)
|
|
ndbout_c("receiver_delivered");
|
|
|
|
Uint32 idx = tRec->m_list_index;
|
|
Uint32 last = m_sent_receivers_count - 1;
|
|
if(idx != last){
|
|
NdbReceiver * move = m_sent_receivers[last];
|
|
m_sent_receivers[idx] = move;
|
|
move->m_list_index = idx;
|
|
}
|
|
m_sent_receivers_count = last;
|
|
|
|
last = m_conf_receivers_count;
|
|
m_conf_receivers[last] = tRec;
|
|
m_conf_receivers_count = last + 1;
|
|
tRec->m_list_index = last;
|
|
tRec->m_current_row = 0;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Remove receiver as it's completed
|
|
*/
|
|
void
|
|
NdbScanOperation::receiver_completed(NdbReceiver* tRec){
|
|
if(theError.code == 0){
|
|
if(DEBUG_NEXT_RESULT)
|
|
ndbout_c("receiver_completed");
|
|
|
|
Uint32 idx = tRec->m_list_index;
|
|
Uint32 last = m_sent_receivers_count - 1;
|
|
if(idx != last){
|
|
NdbReceiver * move = m_sent_receivers[last];
|
|
m_sent_receivers[idx] = move;
|
|
move->m_list_index = idx;
|
|
}
|
|
m_sent_receivers_count = last;
|
|
}
|
|
}
|
|
|
|
/*****************************************************************************
|
|
* int getFirstATTRINFOScan( U_int32 aData )
|
|
*
|
|
* Return Value: Return 0: Successful
|
|
* Return -1: All other cases
|
|
* Parameters: None: Only allocate the first signal.
|
|
* Remark: When a scan is defined we need to use this method instead
|
|
* of insertATTRINFO for the first signal.
|
|
* This is because we need not to mess up the code in
|
|
* insertATTRINFO with if statements since we are not
|
|
* interested in the TCKEYREQ signal.
|
|
*****************************************************************************/
|
|
int
|
|
NdbScanOperation::getFirstATTRINFOScan()
|
|
{
|
|
NdbApiSignal* tSignal;
|
|
|
|
tSignal = theNdb->getSignal();
|
|
if (tSignal == NULL){
|
|
setErrorCodeAbort(4000);
|
|
return -1;
|
|
}
|
|
tSignal->setSignal(m_attrInfoGSN);
|
|
theAI_LenInCurrAI = 8;
|
|
theATTRINFOptr = &tSignal->getDataPtrSend()[8];
|
|
theFirstATTRINFO = tSignal;
|
|
theCurrentATTRINFO = tSignal;
|
|
theCurrentATTRINFO->next(NULL);
|
|
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Constats for theTupleKeyDefined[][0]
|
|
*/
|
|
#define SETBOUND_EQ 1
|
|
#define FAKE_PTR 2
|
|
#define API_PTR 3
|
|
|
|
#define WAITFOR_SCAN_TIMEOUT 120000
|
|
|
|
int
|
|
NdbScanOperation::executeCursor(int nodeId){
|
|
NdbTransaction * tCon = theNdbCon;
|
|
TransporterFacade* tp = TransporterFacade::instance();
|
|
Guard guard(tp->theMutexPtr);
|
|
|
|
Uint32 magic = tCon->theMagicNumber;
|
|
Uint32 seq = tCon->theNodeSequence;
|
|
|
|
if (tp->get_node_alive(nodeId) &&
|
|
(tp->getNodeSequence(nodeId) == seq)) {
|
|
|
|
/**
|
|
* Only call prepareSendScan first time (incase of restarts)
|
|
* - check with theMagicNumber
|
|
*/
|
|
tCon->theMagicNumber = 0x37412619;
|
|
if(magic != 0x37412619 &&
|
|
prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1)
|
|
return -1;
|
|
|
|
|
|
if (doSendScan(nodeId) == -1)
|
|
return -1;
|
|
|
|
return 0;
|
|
} else {
|
|
if (!(tp->get_node_stopping(nodeId) &&
|
|
(tp->getNodeSequence(nodeId) == seq))){
|
|
TRACE_DEBUG("The node is hard dead when attempting to start a scan");
|
|
setErrorCode(4029);
|
|
tCon->theReleaseOnClose = true;
|
|
} else {
|
|
TRACE_DEBUG("The node is stopping when attempting to start a scan");
|
|
setErrorCode(4030);
|
|
}//if
|
|
tCon->theCommitStatus = NdbTransaction::Aborted;
|
|
}//if
|
|
return -1;
|
|
}
|
|
|
|
|
|
int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
|
|
{
|
|
int res;
|
|
if ((res = nextResultImpl(fetchAllowed, forceSend)) == 0) {
|
|
// handle blobs
|
|
NdbBlob* tBlob = theBlobList;
|
|
while (tBlob != 0) {
|
|
if (tBlob->atNextResult() == -1)
|
|
return -1;
|
|
tBlob = tBlob->theNext;
|
|
}
|
|
/*
|
|
* Flush blob part ops on behalf of user because
|
|
* - nextResult is analogous to execute(NoCommit)
|
|
* - user is likely to want blob value before next execute
|
|
*/
|
|
if (m_transConnection->executePendingBlobOps() == -1)
|
|
return -1;
|
|
return 0;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
|
|
{
|
|
if(m_ordered)
|
|
return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
|
|
forceSend);
|
|
|
|
/**
|
|
* Check current receiver
|
|
*/
|
|
int retVal = 2;
|
|
Uint32 idx = m_current_api_receiver;
|
|
Uint32 last = m_api_receivers_count;
|
|
m_curr_row = 0;
|
|
|
|
if(DEBUG_NEXT_RESULT)
|
|
ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last);
|
|
|
|
/**
|
|
* Check next buckets
|
|
*/
|
|
for(; idx < last; idx++){
|
|
NdbReceiver* tRec = m_api_receivers[idx];
|
|
if(tRec->nextResult()){
|
|
m_curr_row = tRec->copyout(theReceiver);
|
|
retVal = 0;
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* We have advanced atleast one bucket
|
|
*/
|
|
if(!fetchAllowed || !retVal){
|
|
m_current_api_receiver = idx;
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
|
|
return retVal;
|
|
}
|
|
|
|
Uint32 nodeId = theNdbCon->theDBnode;
|
|
TransporterFacade* tp = TransporterFacade::instance();
|
|
Guard guard(tp->theMutexPtr);
|
|
if(theError.code)
|
|
return -1;
|
|
|
|
Uint32 seq = theNdbCon->theNodeSequence;
|
|
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
|
|
forceSend) == 0){
|
|
|
|
idx = m_current_api_receiver;
|
|
last = m_api_receivers_count;
|
|
|
|
do {
|
|
if(theError.code){
|
|
setErrorCode(theError.code);
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
|
|
return -1;
|
|
}
|
|
|
|
Uint32 cnt = m_conf_receivers_count;
|
|
Uint32 sent = m_sent_receivers_count;
|
|
|
|
if(DEBUG_NEXT_RESULT)
|
|
ndbout_c("idx=%d last=%d cnt=%d sent=%d", idx, last, cnt, sent);
|
|
|
|
if(cnt > 0){
|
|
/**
|
|
* Just move completed receivers
|
|
*/
|
|
memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*));
|
|
last += cnt;
|
|
m_conf_receivers_count = 0;
|
|
} else if(retVal == 2 && sent > 0){
|
|
/**
|
|
* No completed...
|
|
*/
|
|
theNdb->theImpl->theWaiter.m_node = nodeId;
|
|
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
|
|
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
|
|
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
|
|
continue;
|
|
} else {
|
|
idx = last;
|
|
retVal = -2; //return_code;
|
|
}
|
|
} else if(retVal == 2){
|
|
/**
|
|
* No completed & no sent -> EndOfData
|
|
*/
|
|
theError.code = -1; // make sure user gets error if he tries again
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
|
|
return 1;
|
|
}
|
|
|
|
if(retVal == 0)
|
|
break;
|
|
|
|
for(; idx < last; idx++){
|
|
NdbReceiver* tRec = m_api_receivers[idx];
|
|
if(tRec->nextResult()){
|
|
m_curr_row = tRec->copyout(theReceiver);
|
|
retVal = 0;
|
|
break;
|
|
}
|
|
}
|
|
} while(retVal == 2);
|
|
} else {
|
|
retVal = -3;
|
|
}
|
|
|
|
m_api_receivers_count = last;
|
|
m_current_api_receiver = idx;
|
|
|
|
switch(retVal){
|
|
case 0:
|
|
case 1:
|
|
case 2:
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
|
|
return retVal;
|
|
case -1:
|
|
setErrorCode(4008); // Timeout
|
|
break;
|
|
case -2:
|
|
setErrorCode(4028); // Node fail
|
|
break;
|
|
case -3: // send_next_scan -> return fail (set error-code self)
|
|
if(theError.code == 0)
|
|
setErrorCode(4028); // seq changed = Node fail
|
|
break;
|
|
}
|
|
|
|
theNdbCon->theTransactionIsStarted = false;
|
|
theNdbCon->theReleaseOnClose = true;
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("return -1", retVal);
|
|
return -1;
|
|
}
|
|
|
|
int
|
|
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
|
|
bool forceSend){
|
|
if(cnt > 0){
|
|
NdbApiSignal tSignal(theNdb->theMyRef);
|
|
tSignal.setSignal(GSN_SCAN_NEXTREQ);
|
|
|
|
Uint32* theData = tSignal.getDataPtrSend();
|
|
theData[0] = theNdbCon->theTCConPtr;
|
|
theData[1] = stopScanFlag == true ? 1 : 0;
|
|
Uint64 transId = theNdbCon->theTransactionId;
|
|
theData[2] = transId;
|
|
theData[3] = (Uint32) (transId >> 32);
|
|
|
|
/**
|
|
* Prepare ops
|
|
*/
|
|
Uint32 last = m_sent_receivers_count;
|
|
Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
|
|
Uint32 sent = 0;
|
|
for(Uint32 i = 0; i<cnt; i++){
|
|
NdbReceiver * tRec = m_api_receivers[i];
|
|
if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
|
|
{
|
|
m_sent_receivers[last+sent] = tRec;
|
|
tRec->m_list_index = last+sent;
|
|
tRec->prepareSend();
|
|
sent++;
|
|
}
|
|
}
|
|
memmove(m_api_receivers, m_api_receivers+cnt,
|
|
(theParallelism-cnt) * sizeof(char*));
|
|
|
|
int ret = 0;
|
|
if(sent)
|
|
{
|
|
Uint32 nodeId = theNdbCon->theDBnode;
|
|
TransporterFacade * tp = TransporterFacade::instance();
|
|
if(cnt > 21){
|
|
tSignal.setLength(4);
|
|
LinearSectionPtr ptr[3];
|
|
ptr[0].p = prep_array;
|
|
ptr[0].sz = sent;
|
|
ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
|
|
} else {
|
|
tSignal.setLength(4+sent);
|
|
ret = tp->sendSignal(&tSignal, nodeId);
|
|
}
|
|
}
|
|
|
|
if (!ret) checkForceSend(forceSend);
|
|
|
|
m_sent_receivers_count = last + sent;
|
|
m_api_receivers_count -= cnt;
|
|
m_current_api_receiver = 0;
|
|
|
|
return ret;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
void NdbScanOperation::checkForceSend(bool forceSend)
|
|
{
|
|
if (forceSend) {
|
|
TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
|
|
} else {
|
|
TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
|
|
}//if
|
|
}
|
|
|
|
int
|
|
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
|
|
{
|
|
printf("NdbScanOperation::prepareSend\n");
|
|
abort();
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
NdbScanOperation::doSend(int ProcessorId)
|
|
{
|
|
printf("NdbScanOperation::doSend\n");
|
|
return 0;
|
|
}
|
|
|
|
void NdbScanOperation::close(bool forceSend)
|
|
{
|
|
if(m_transConnection){
|
|
if(DEBUG_NEXT_RESULT)
|
|
ndbout_c("close() theError.code = %d "
|
|
"m_api_receivers_count = %d "
|
|
"m_conf_receivers_count = %d "
|
|
"m_sent_receivers_count = %d",
|
|
theError.code,
|
|
m_api_receivers_count,
|
|
m_conf_receivers_count,
|
|
m_sent_receivers_count);
|
|
|
|
TransporterFacade* tp = TransporterFacade::instance();
|
|
Guard guard(tp->theMutexPtr);
|
|
close_impl(tp, forceSend);
|
|
|
|
} while(0);
|
|
|
|
theNdbCon->theScanningOp = 0;
|
|
theNdb->closeTransaction(theNdbCon);
|
|
|
|
theNdbCon = 0;
|
|
m_transConnection = NULL;
|
|
}
|
|
|
|
void
|
|
NdbScanOperation::execCLOSE_SCAN_REP(){
|
|
m_conf_receivers_count = 0;
|
|
m_sent_receivers_count = 0;
|
|
}
|
|
|
|
void NdbScanOperation::release()
|
|
{
|
|
if(theNdbCon != 0 || m_transConnection != 0){
|
|
close();
|
|
}
|
|
for(Uint32 i = 0; i<m_allocated_receivers; i++){
|
|
m_receivers[i]->release();
|
|
}
|
|
|
|
NdbOperation::release();
|
|
|
|
if(theSCAN_TABREQ)
|
|
{
|
|
theNdb->releaseSignal(theSCAN_TABREQ);
|
|
theSCAN_TABREQ = 0;
|
|
}
|
|
}
|
|
|
|
/***************************************************************************
|
|
int prepareSendScan(Uint32 aTC_ConnectPtr,
|
|
Uint64 aTransactionId)
|
|
|
|
Return Value: Return 0 : preparation of send was succesful.
|
|
Return -1: In all other case.
|
|
Parameters: aTC_ConnectPtr: the Connect pointer to TC.
|
|
aTransactionId: the Transaction identity of the transaction.
|
|
Remark: Puts the the final data into ATTRINFO signal(s) after this
|
|
we know the how many signal to send and their sizes
|
|
***************************************************************************/
|
|
int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
|
|
Uint64 aTransactionId){
|
|
|
|
if (theInterpretIndicator != 1 ||
|
|
(theOperationType != OpenScanRequest &&
|
|
theOperationType != OpenRangeScanRequest)) {
|
|
setErrorCodeAbort(4005);
|
|
return -1;
|
|
}
|
|
|
|
theErrorLine = 0;
|
|
|
|
// In preapareSendInterpreted we set the sizes (word 4-8) in the
|
|
// first ATTRINFO signal.
|
|
if (prepareSendInterpreted() == -1)
|
|
return -1;
|
|
|
|
if(m_ordered){
|
|
((NdbIndexScanOperation*)this)->fix_get_values();
|
|
}
|
|
|
|
theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
|
|
|
|
/**
|
|
* Prepare all receivers
|
|
*/
|
|
theReceiver.prepareSend();
|
|
bool keyInfo = m_keyInfo;
|
|
Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0;
|
|
/**
|
|
* The number of records sent by each LQH is calculated and the kernel
|
|
* is informed of this number by updating the SCAN_TABREQ signal
|
|
*/
|
|
Uint32 batch_size, batch_byte_size, first_batch_size;
|
|
theReceiver.calculate_batch_size(key_size,
|
|
theParallelism,
|
|
batch_size,
|
|
batch_byte_size,
|
|
first_batch_size);
|
|
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
|
|
ScanTabReq::setScanBatch(req->requestInfo, batch_size);
|
|
req->batch_byte_size= batch_byte_size;
|
|
req->first_batch_size= first_batch_size;
|
|
|
|
/**
|
|
* Set keyinfo flag
|
|
* (Always keyinfo when using blobs)
|
|
*/
|
|
Uint32 reqInfo = req->requestInfo;
|
|
ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
|
|
req->requestInfo = reqInfo;
|
|
|
|
for(Uint32 i = 0; i<theParallelism; i++){
|
|
m_receivers[i]->do_get_value(&theReceiver, batch_size,
|
|
key_size,
|
|
m_read_range_no);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/*****************************************************************************
|
|
int doSend()
|
|
|
|
Return Value: Return >0 : send was succesful, returns number of signals sent
|
|
Return -1: In all other case.
|
|
Parameters: aProcessorId: Receiving processor node
|
|
Remark: Sends the ATTRINFO signal(s)
|
|
*****************************************************************************/
|
|
int
|
|
NdbScanOperation::doSendScan(int aProcessorId)
|
|
{
|
|
Uint32 tSignalCount = 0;
|
|
NdbApiSignal* tSignal;
|
|
|
|
if (theInterpretIndicator != 1 ||
|
|
(theOperationType != OpenScanRequest &&
|
|
theOperationType != OpenRangeScanRequest)) {
|
|
setErrorCodeAbort(4005);
|
|
return -1;
|
|
}
|
|
|
|
assert(theSCAN_TABREQ != NULL);
|
|
tSignal = theSCAN_TABREQ;
|
|
|
|
Uint32 tupKeyLen = theTupKeyLen;
|
|
Uint32 len = theTotalNrOfKeyWordInSignal;
|
|
Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr;
|
|
Uint64 transId = theNdbCon->theTransactionId;
|
|
|
|
// Update the "attribute info length in words" in SCAN_TABREQ before
|
|
// sending it. This could not be done in openScan because
|
|
// we created the ATTRINFO signals after the SCAN_TABREQ signal.
|
|
ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
|
|
req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len;
|
|
Uint32 tmp = req->requestInfo;
|
|
ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_);
|
|
req->distributionKey = theDistributionKey;
|
|
req->requestInfo = tmp;
|
|
tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
|
|
|
|
TransporterFacade *tp = TransporterFacade::instance();
|
|
LinearSectionPtr ptr[3];
|
|
ptr[0].p = m_prepared_receivers;
|
|
ptr[0].sz = theParallelism;
|
|
if (tp->sendSignal(tSignal, aProcessorId, ptr, 1) == -1) {
|
|
setErrorCode(4002);
|
|
return -1;
|
|
}
|
|
|
|
if (tupKeyLen > 0){
|
|
// must have at least one signal since it contains attrLen for bounds
|
|
assert(theLastKEYINFO != NULL);
|
|
tSignal = theLastKEYINFO;
|
|
tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal);
|
|
|
|
assert(theSCAN_TABREQ->next() != NULL);
|
|
tSignal = theSCAN_TABREQ->next();
|
|
|
|
NdbApiSignal* last;
|
|
do {
|
|
KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
|
|
keyInfo->connectPtr = aTC_ConnectPtr;
|
|
keyInfo->transId[0] = Uint32(transId);
|
|
keyInfo->transId[1] = Uint32(transId >> 32);
|
|
|
|
if (tp->sendSignal(tSignal,aProcessorId) == -1){
|
|
setErrorCode(4002);
|
|
return -1;
|
|
}
|
|
|
|
tSignalCount++;
|
|
last = tSignal;
|
|
tSignal = tSignal->next();
|
|
} while(last != theLastKEYINFO);
|
|
}
|
|
|
|
tSignal = theFirstATTRINFO;
|
|
while (tSignal != NULL) {
|
|
AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend());
|
|
attrInfo->connectPtr = aTC_ConnectPtr;
|
|
attrInfo->transId[0] = Uint32(transId);
|
|
attrInfo->transId[1] = Uint32(transId >> 32);
|
|
|
|
if (tp->sendSignal(tSignal,aProcessorId) == -1){
|
|
setErrorCode(4002);
|
|
return -1;
|
|
}
|
|
tSignalCount++;
|
|
tSignal = tSignal->next();
|
|
}
|
|
theStatus = WaitResponse;
|
|
|
|
m_curr_row = 0;
|
|
m_sent_receivers_count = theParallelism;
|
|
if(m_ordered)
|
|
{
|
|
m_current_api_receiver = theParallelism;
|
|
m_api_receivers_count = theParallelism;
|
|
}
|
|
|
|
return tSignalCount;
|
|
}//NdbOperation::doSendScan()
|
|
|
|
/*****************************************************************************
|
|
* NdbOperation* takeOverScanOp(NdbTransaction* updateTrans);
|
|
*
|
|
* Parameters: The update transactions NdbTransaction pointer.
|
|
* Return Value: A reference to the transferred operation object
|
|
* or NULL if no success.
|
|
* Remark: Take over the scanning transactions NdbOperation
|
|
* object for a tuple to an update transaction,
|
|
* which is the last operation read in nextScanResult()
|
|
* (theNdbCon->thePreviousScanRec)
|
|
*
|
|
* FUTURE IMPLEMENTATION: (This note was moved from header file.)
|
|
* In the future, it will even be possible to transfer
|
|
* to a NdbTransaction on another Ndb-object.
|
|
* In this case the receiving NdbTransaction-object must call
|
|
* a method receiveOpFromScan to actually receive the information.
|
|
* This means that the updating transactions can be placed
|
|
* in separate threads and thus increasing the parallelism during
|
|
* the scan process.
|
|
****************************************************************************/
|
|
int
|
|
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size)
|
|
{
|
|
NdbRecAttr * tRecAttr = m_curr_row;
|
|
if(tRecAttr)
|
|
{
|
|
const Uint32 * src = (Uint32*)tRecAttr->aRef();
|
|
memcpy(data, src, 4*size);
|
|
return 0;
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
NdbOperation*
|
|
NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans)
|
|
{
|
|
|
|
NdbRecAttr * tRecAttr = m_curr_row;
|
|
if(tRecAttr)
|
|
{
|
|
NdbOperation * newOp = pTrans->getNdbOperation(m_currentTable);
|
|
if (newOp == NULL){
|
|
return NULL;
|
|
}
|
|
pTrans->theSimpleState = 0;
|
|
|
|
const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;
|
|
|
|
newOp->theTupKeyLen = len;
|
|
newOp->theOperationType = opType;
|
|
if (opType == DeleteRequest) {
|
|
newOp->theStatus = GetValue;
|
|
} else {
|
|
newOp->theStatus = SetValue;
|
|
}
|
|
|
|
const Uint32 * src = (Uint32*)tRecAttr->aRef();
|
|
const Uint32 tScanInfo = src[len] & 0x3FFFF;
|
|
const Uint32 tTakeOverFragment = src[len] >> 20;
|
|
{
|
|
UintR scanInfo = 0;
|
|
TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
|
|
TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
|
|
TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
|
|
newOp->theScanInfo = scanInfo;
|
|
newOp->theDistrKeyIndicator_ = 1;
|
|
newOp->theDistributionKey = tTakeOverFragment;
|
|
}
|
|
|
|
// Copy the first 8 words of key info from KEYINF20 into TCKEYREQ
|
|
TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq,newOp->theTCREQ->getDataPtrSend());
|
|
Uint32 i = 0;
|
|
for (i = 0; i < TcKeyReq::MaxKeyInfo && i < len; i++) {
|
|
tcKeyReq->keyInfo[i] = * src++;
|
|
}
|
|
|
|
if(i < len){
|
|
NdbApiSignal* tSignal = theNdb->getSignal();
|
|
newOp->theTCREQ->next(tSignal);
|
|
|
|
Uint32 left = len - i;
|
|
while(tSignal && left > KeyInfo::DataLength){
|
|
tSignal->setSignal(GSN_KEYINFO);
|
|
KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
|
|
memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength);
|
|
src += KeyInfo::DataLength;
|
|
left -= KeyInfo::DataLength;
|
|
|
|
tSignal->next(theNdb->getSignal());
|
|
tSignal = tSignal->next();
|
|
}
|
|
|
|
if(tSignal && left > 0){
|
|
tSignal->setSignal(GSN_KEYINFO);
|
|
KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
|
|
memcpy(keyInfo->keyData, src, 4 * left);
|
|
}
|
|
}
|
|
// create blob handles automatically
|
|
if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) {
|
|
for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) {
|
|
NdbColumnImpl* c = m_currentTable->m_columns[i];
|
|
assert(c != 0);
|
|
if (c->getBlobType()) {
|
|
if (newOp->getBlobHandle(pTrans, c) == NULL)
|
|
return NULL;
|
|
}
|
|
}
|
|
}
|
|
|
|
return newOp;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
NdbBlob*
|
|
NdbScanOperation::getBlobHandle(const char* anAttrName)
|
|
{
|
|
m_keyInfo = 1;
|
|
return NdbOperation::getBlobHandle(m_transConnection,
|
|
m_currentTable->getColumn(anAttrName));
|
|
}
|
|
|
|
NdbBlob*
|
|
NdbScanOperation::getBlobHandle(Uint32 anAttrId)
|
|
{
|
|
m_keyInfo = 1;
|
|
return NdbOperation::getBlobHandle(m_transConnection,
|
|
m_currentTable->getColumn(anAttrId));
|
|
}
|
|
|
|
NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb)
|
|
: NdbScanOperation(aNdb)
|
|
{
|
|
}
|
|
|
|
NdbIndexScanOperation::~NdbIndexScanOperation(){
|
|
}
|
|
|
|
int
|
|
NdbIndexScanOperation::setBound(const char* anAttrName, int type,
|
|
const void* aValue, Uint32 len)
|
|
{
|
|
return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len);
|
|
}
|
|
|
|
int
|
|
NdbIndexScanOperation::setBound(Uint32 anAttrId, int type,
|
|
const void* aValue, Uint32 len)
|
|
{
|
|
return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len);
|
|
}
|
|
|
|
int
|
|
NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject,
|
|
const char* aValue,
|
|
Uint32 len){
|
|
return setBound(anAttrObject, BoundEQ, aValue, len);
|
|
}
|
|
|
|
NdbRecAttr*
|
|
NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo,
|
|
char* aValue){
|
|
if(!m_ordered){
|
|
return NdbScanOperation::getValue_impl(attrInfo, aValue);
|
|
}
|
|
|
|
int id = attrInfo->m_attrId; // In "real" table
|
|
assert(m_accessTable->m_index);
|
|
int sz = (int)m_accessTable->m_index->m_key_ids.size();
|
|
if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){
|
|
return NdbScanOperation::getValue_impl(attrInfo, aValue);
|
|
}
|
|
|
|
assert(id < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
|
|
Uint32 marker = theTupleKeyDefined[id][0];
|
|
|
|
if(marker == SETBOUND_EQ){
|
|
return NdbScanOperation::getValue_impl(attrInfo, aValue);
|
|
} else if(marker == API_PTR){
|
|
return NdbScanOperation::getValue_impl(attrInfo, aValue);
|
|
}
|
|
|
|
assert(marker == FAKE_PTR);
|
|
|
|
UintPtr oldVal;
|
|
oldVal = theTupleKeyDefined[id][1];
|
|
#if (SIZEOF_CHARP == 8)
|
|
oldVal = oldVal | (((UintPtr)theTupleKeyDefined[id][2]) << 32);
|
|
#endif
|
|
theTupleKeyDefined[id][0] = API_PTR;
|
|
|
|
NdbRecAttr* tmp = (NdbRecAttr*)oldVal;
|
|
tmp->setup(attrInfo, aValue);
|
|
|
|
return tmp;
|
|
}
|
|
|
|
#include <AttributeHeader.hpp>
|
|
/*
|
|
* Define bound on index column in range scan.
|
|
*/
|
|
int
|
|
NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
|
|
int type, const void* aValue, Uint32 len)
|
|
{
|
|
if (theOperationType == OpenRangeScanRequest &&
|
|
(0 <= type && type <= 4) &&
|
|
len <= 8000) {
|
|
// insert bound type
|
|
Uint32 currLen = theTotalNrOfKeyWordInSignal;
|
|
Uint32 remaining = KeyInfo::DataLength - currLen;
|
|
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
|
|
bool tDistrKey = tAttrInfo->m_distributionKey;
|
|
|
|
len = aValue != NULL ? sizeInBytes : 0;
|
|
if (len != sizeInBytes && (len != 0)) {
|
|
setErrorCodeAbort(4209);
|
|
return -1;
|
|
}
|
|
|
|
// insert attribute header
|
|
Uint32 tIndexAttrId = tAttrInfo->m_attrId;
|
|
Uint32 sizeInWords = (len + 3) / 4;
|
|
AttributeHeader ah(tIndexAttrId, sizeInWords);
|
|
const Uint32 ahValue = ah.m_value;
|
|
|
|
const Uint32 align = (UintPtr(aValue) & 7);
|
|
const bool aligned = (tDistrKey && type == BoundEQ) ?
|
|
(align == 0) : (align & 3) == 0;
|
|
|
|
const bool nobytes = (len & 0x3) == 0;
|
|
const Uint32 totalLen = 2 + sizeInWords;
|
|
Uint32 tupKeyLen = theTupKeyLen;
|
|
if(remaining > totalLen && aligned && nobytes){
|
|
Uint32 * dst = theKEYINFOptr + currLen;
|
|
* dst ++ = type;
|
|
* dst ++ = ahValue;
|
|
memcpy(dst, aValue, 4 * sizeInWords);
|
|
theTotalNrOfKeyWordInSignal = currLen + totalLen;
|
|
} else {
|
|
if(!aligned || !nobytes){
|
|
Uint32 tempData[2000];
|
|
tempData[0] = type;
|
|
tempData[1] = ahValue;
|
|
tempData[2 + (len >> 2)] = 0;
|
|
memcpy(tempData+2, aValue, len);
|
|
|
|
insertBOUNDS(tempData, 2+sizeInWords);
|
|
} else {
|
|
Uint32 buf[2] = { type, ahValue };
|
|
insertBOUNDS(buf, 2);
|
|
insertBOUNDS((Uint32*)aValue, sizeInWords);
|
|
}
|
|
}
|
|
theTupKeyLen = tupKeyLen + totalLen;
|
|
|
|
/**
|
|
* Do sorted stuff
|
|
*/
|
|
|
|
/**
|
|
* The primary keys for an ordered index is defined in the beginning
|
|
* so it's safe to use [tIndexAttrId]
|
|
* (instead of looping as is NdbOperation::equal_impl)
|
|
*/
|
|
if(type == BoundEQ && tDistrKey)
|
|
{
|
|
theNoOfTupKeyLeft--;
|
|
return handle_distribution_key((Uint64*)aValue, sizeInWords);
|
|
}
|
|
return 0;
|
|
} else {
|
|
setErrorCodeAbort(4228); // XXX wrong code
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
int
|
|
NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){
|
|
Uint32 len;
|
|
Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal;
|
|
Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal;
|
|
do {
|
|
len = (sz < remaining ? sz : remaining);
|
|
memcpy(dst, data, 4 * len);
|
|
|
|
if(sz >= remaining){
|
|
NdbApiSignal* tCurr = theLastKEYINFO;
|
|
tCurr->setLength(KeyInfo::MaxSignalLength);
|
|
NdbApiSignal* tSignal = tCurr->next();
|
|
if(tSignal)
|
|
;
|
|
else if((tSignal = theNdb->getSignal()) != 0)
|
|
{
|
|
tCurr->next(tSignal);
|
|
tSignal->setSignal(GSN_KEYINFO);
|
|
} else {
|
|
goto error;
|
|
}
|
|
theLastKEYINFO = tSignal;
|
|
theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
|
|
remaining = KeyInfo::DataLength;
|
|
sz -= len;
|
|
data += len;
|
|
} else {
|
|
len = (KeyInfo::DataLength - remaining) + len;
|
|
break;
|
|
}
|
|
} while(true);
|
|
theTotalNrOfKeyWordInSignal = len;
|
|
return 0;
|
|
|
|
error:
|
|
setErrorCodeAbort(4228); // XXX wrong code
|
|
return -1;
|
|
}
|
|
|
|
int
|
|
NdbIndexScanOperation::readTuples(LockMode lm,
|
|
Uint32 batch,
|
|
Uint32 parallel,
|
|
bool order_by,
|
|
bool order_desc,
|
|
bool read_range_no){
|
|
int res = NdbScanOperation::readTuples(lm, batch, 0);
|
|
if(!res && read_range_no)
|
|
{
|
|
m_read_range_no = 1;
|
|
Uint32 word = 0;
|
|
AttributeHeader::init(&word, AttributeHeader::RANGE_NO, 0);
|
|
if(insertATTRINFO(word) == -1)
|
|
res = -1;
|
|
}
|
|
if(!res && order_by){
|
|
m_ordered = true;
|
|
if (order_desc) {
|
|
m_descending = true;
|
|
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
|
|
ScanTabReq::setDescendingFlag(req->requestInfo, true);
|
|
}
|
|
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
|
|
m_sort_columns = cnt; // -1 for NDB$NODE
|
|
m_current_api_receiver = m_sent_receivers_count;
|
|
m_api_receivers_count = m_sent_receivers_count;
|
|
|
|
m_sort_columns = cnt;
|
|
for(Uint32 i = 0; i<cnt; i++){
|
|
const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
|
|
const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
|
|
NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
|
|
UintPtr newVal = UintPtr(tmp);
|
|
theTupleKeyDefined[i][0] = FAKE_PTR;
|
|
theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
|
|
#if (SIZEOF_CHARP == 8)
|
|
theTupleKeyDefined[i][2] = (newVal >> 32);
|
|
#endif
|
|
}
|
|
}
|
|
m_this_bound_start = 0;
|
|
m_first_bound_word = theKEYINFOptr;
|
|
|
|
return res;
|
|
}
|
|
|
|
void
|
|
NdbIndexScanOperation::fix_get_values(){
|
|
/**
|
|
* Loop through all getValues and set buffer pointer to "API" pointer
|
|
*/
|
|
NdbRecAttr * curr = theReceiver.theFirstRecAttr;
|
|
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
|
|
assert(cnt < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
|
|
|
|
const NdbIndexImpl * idx = m_accessTable->m_index;
|
|
const NdbTableImpl * tab = m_currentTable;
|
|
for(Uint32 i = 0; i<cnt; i++){
|
|
Uint32 val = theTupleKeyDefined[i][0];
|
|
switch(val){
|
|
case FAKE_PTR:
|
|
curr->setup(curr->m_column, 0);
|
|
case API_PTR:
|
|
curr = curr->next();
|
|
break;
|
|
case SETBOUND_EQ:
|
|
break;
|
|
#ifdef VM_TRACE
|
|
default:
|
|
abort();
|
|
#endif
|
|
}
|
|
}
|
|
}
|
|
|
|
int
|
|
NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
|
|
const NdbReceiver* t1,
|
|
const NdbReceiver* t2){
|
|
|
|
NdbRecAttr * r1 = t1->m_rows[t1->m_current_row];
|
|
NdbRecAttr * r2 = t2->m_rows[t2->m_current_row];
|
|
|
|
r1 = (skip ? r1->next() : r1);
|
|
r2 = (skip ? r2->next() : r2);
|
|
const int jdir = 1 - 2 * (int)m_descending;
|
|
assert(jdir == 1 || jdir == -1);
|
|
while(cols > 0){
|
|
Uint32 * d1 = (Uint32*)r1->aRef();
|
|
Uint32 * d2 = (Uint32*)r2->aRef();
|
|
unsigned r1_null = r1->isNULL();
|
|
if((r1_null ^ (unsigned)r2->isNULL())){
|
|
return (r1_null ? -1 : 1) * jdir;
|
|
}
|
|
const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
|
|
Uint32 len = r1->theAttrSize * r1->theArraySize;
|
|
if(!r1_null){
|
|
const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_type);
|
|
int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true);
|
|
if(r){
|
|
assert(r != NdbSqlUtil::CmpUnknown);
|
|
return r * jdir;
|
|
}
|
|
}
|
|
cols--;
|
|
r1 = r1->next();
|
|
r2 = r2->next();
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
|
|
bool forceSend){
|
|
|
|
m_curr_row = 0;
|
|
Uint32 u_idx = 0, u_last = 0;
|
|
Uint32 s_idx = m_current_api_receiver; // first sorted
|
|
Uint32 s_last = theParallelism; // last sorted
|
|
|
|
NdbReceiver** arr = m_api_receivers;
|
|
NdbReceiver* tRec = arr[s_idx];
|
|
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d",
|
|
fetchAllowed,
|
|
(s_idx < s_last ? tRec->nextResult() : 0));
|
|
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
|
|
u_idx, u_last,
|
|
s_idx, s_last);
|
|
|
|
bool fetchNeeded = (s_idx == s_last) || !tRec->nextResult();
|
|
|
|
if(fetchNeeded){
|
|
if(fetchAllowed){
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
|
|
TransporterFacade* tp = TransporterFacade::instance();
|
|
Guard guard(tp->theMutexPtr);
|
|
if(theError.code)
|
|
return -1;
|
|
Uint32 seq = theNdbCon->theNodeSequence;
|
|
Uint32 nodeId = theNdbCon->theDBnode;
|
|
if(seq == tp->getNodeSequence(nodeId) &&
|
|
!send_next_scan_ordered(s_idx, forceSend)){
|
|
Uint32 tmp = m_sent_receivers_count;
|
|
s_idx = m_current_api_receiver;
|
|
while(m_sent_receivers_count > 0 && !theError.code){
|
|
theNdb->theImpl->theWaiter.m_node = nodeId;
|
|
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
|
|
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
|
|
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
|
|
continue;
|
|
}
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
|
|
setErrorCode(4028);
|
|
return -1;
|
|
}
|
|
|
|
if(theError.code){
|
|
setErrorCode(theError.code);
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
|
|
return -1;
|
|
}
|
|
|
|
u_idx = 0;
|
|
u_last = m_conf_receivers_count;
|
|
m_conf_receivers_count = 0;
|
|
memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
|
|
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
|
|
} else {
|
|
setErrorCode(4028);
|
|
return -1;
|
|
}
|
|
} else {
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("return 2");
|
|
return 2;
|
|
}
|
|
} else {
|
|
u_idx = s_idx;
|
|
u_last = s_idx + 1;
|
|
s_idx++;
|
|
}
|
|
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
|
|
u_idx, u_last,
|
|
s_idx, s_last);
|
|
|
|
|
|
Uint32 cols = m_sort_columns + m_read_range_no;
|
|
Uint32 skip = m_keyInfo;
|
|
while(u_idx < u_last){
|
|
u_last--;
|
|
tRec = arr[u_last];
|
|
|
|
// Do binary search instead to find place
|
|
Uint32 place = s_idx;
|
|
for(; place < s_last; place++){
|
|
if(compare(skip, cols, tRec, arr[place]) <= 0){
|
|
break;
|
|
}
|
|
}
|
|
|
|
if(place != s_idx){
|
|
if(DEBUG_NEXT_RESULT)
|
|
ndbout_c("memmove(%d, %d, %d)", s_idx-1, s_idx, (place - s_idx));
|
|
memmove(arr+s_idx-1, arr+s_idx, sizeof(char*)*(place - s_idx));
|
|
}
|
|
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("putting %d @ %d", u_last, place - 1);
|
|
m_api_receivers[place-1] = tRec;
|
|
s_idx--;
|
|
}
|
|
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
|
|
u_idx, u_last,
|
|
s_idx, s_last);
|
|
|
|
m_current_api_receiver = s_idx;
|
|
|
|
if(DEBUG_NEXT_RESULT)
|
|
for(Uint32 i = s_idx; i<s_last; i++)
|
|
ndbout_c("%p", arr[i]);
|
|
|
|
tRec = m_api_receivers[s_idx];
|
|
if(s_idx < s_last && tRec->nextResult()){
|
|
m_curr_row = tRec->copyout(theReceiver);
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("return 0");
|
|
return 0;
|
|
}
|
|
|
|
theError.code = -1;
|
|
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
|
|
return 1;
|
|
}
|
|
|
|
int
|
|
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
|
|
if(idx == theParallelism)
|
|
return 0;
|
|
|
|
NdbReceiver* tRec = m_api_receivers[idx];
|
|
NdbApiSignal tSignal(theNdb->theMyRef);
|
|
tSignal.setSignal(GSN_SCAN_NEXTREQ);
|
|
|
|
Uint32 last = m_sent_receivers_count;
|
|
Uint32* theData = tSignal.getDataPtrSend();
|
|
Uint32* prep_array = theData + 4;
|
|
|
|
m_current_api_receiver = idx + 1;
|
|
if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
|
|
{
|
|
if(DEBUG_NEXT_RESULT)
|
|
ndbout_c("receiver completed, don't send");
|
|
return 0;
|
|
}
|
|
|
|
theData[0] = theNdbCon->theTCConPtr;
|
|
theData[1] = 0;
|
|
Uint64 transId = theNdbCon->theTransactionId;
|
|
theData[2] = transId;
|
|
theData[3] = (Uint32) (transId >> 32);
|
|
|
|
/**
|
|
* Prepare ops
|
|
*/
|
|
m_sent_receivers[last] = tRec;
|
|
tRec->m_list_index = last;
|
|
tRec->prepareSend();
|
|
m_sent_receivers_count = last + 1;
|
|
|
|
Uint32 nodeId = theNdbCon->theDBnode;
|
|
TransporterFacade * tp = TransporterFacade::instance();
|
|
tSignal.setLength(4+1);
|
|
int ret= tp->sendSignal(&tSignal, nodeId);
|
|
if (!ret) checkForceSend(forceSend);
|
|
return ret;
|
|
}
|
|
|
|
int
|
|
NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
|
|
Uint32 seq = theNdbCon->theNodeSequence;
|
|
Uint32 nodeId = theNdbCon->theDBnode;
|
|
|
|
if(seq != tp->getNodeSequence(nodeId))
|
|
{
|
|
theNdbCon->theReleaseOnClose = true;
|
|
return -1;
|
|
}
|
|
|
|
/**
|
|
* Wait for outstanding
|
|
*/
|
|
while(theError.code == 0 && m_sent_receivers_count)
|
|
{
|
|
theNdb->theImpl->theWaiter.m_node = nodeId;
|
|
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
|
|
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
|
|
switch(return_code){
|
|
case 0:
|
|
break;
|
|
case -1:
|
|
setErrorCode(4008);
|
|
case -2:
|
|
m_api_receivers_count = 0;
|
|
m_conf_receivers_count = 0;
|
|
m_sent_receivers_count = 0;
|
|
theNdbCon->theReleaseOnClose = true;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
if(theError.code)
|
|
{
|
|
m_api_receivers_count = 0;
|
|
m_current_api_receiver = m_ordered ? theParallelism : 0;
|
|
}
|
|
|
|
|
|
/**
|
|
* move all conf'ed into api
|
|
* so that send_next_scan can check if they needs to be closed
|
|
*/
|
|
Uint32 api = m_api_receivers_count;
|
|
Uint32 conf = m_conf_receivers_count;
|
|
|
|
if(m_ordered)
|
|
{
|
|
/**
|
|
* Ordered scan, keep the m_api_receivers "to the right"
|
|
*/
|
|
memmove(m_api_receivers, m_api_receivers+m_current_api_receiver,
|
|
(theParallelism - m_current_api_receiver) * sizeof(char*));
|
|
api = (theParallelism - m_current_api_receiver);
|
|
m_api_receivers_count = api;
|
|
}
|
|
|
|
if(DEBUG_NEXT_RESULT)
|
|
ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
|
|
m_ordered, api, conf,
|
|
m_sent_receivers_count, m_current_api_receiver, theParallelism);
|
|
|
|
if(api+conf)
|
|
{
|
|
/**
|
|
* There's something to close
|
|
* setup m_api_receivers (for send_next_scan)
|
|
*/
|
|
memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
|
|
m_api_receivers_count = api + conf;
|
|
m_conf_receivers_count = 0;
|
|
}
|
|
|
|
// Send close scan
|
|
if(send_next_scan(api+conf, true, forceSend) == -1)
|
|
{
|
|
theNdbCon->theReleaseOnClose = true;
|
|
return -1;
|
|
}
|
|
|
|
/**
|
|
* wait for close scan conf
|
|
*/
|
|
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
|
|
{
|
|
theNdb->theImpl->theWaiter.m_node = nodeId;
|
|
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
|
|
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
|
|
switch(return_code){
|
|
case 0:
|
|
break;
|
|
case -1:
|
|
setErrorCode(4008);
|
|
case -2:
|
|
m_api_receivers_count = 0;
|
|
m_conf_receivers_count = 0;
|
|
m_sent_receivers_count = 0;
|
|
theNdbCon->theReleaseOnClose = true;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
|
|
for(Uint32 i = 0; i<parallell; i++){
|
|
m_receivers[i]->m_list_index = i;
|
|
m_prepared_receivers[i] = m_receivers[i]->getId();
|
|
m_sent_receivers[i] = m_receivers[i];
|
|
m_conf_receivers[i] = 0;
|
|
m_api_receivers[i] = 0;
|
|
m_receivers[i]->prepareSend();
|
|
}
|
|
|
|
m_api_receivers_count = 0;
|
|
m_current_api_receiver = 0;
|
|
m_sent_receivers_count = 0;
|
|
m_conf_receivers_count = 0;
|
|
}
|
|
|
|
int
|
|
NdbScanOperation::restart(bool forceSend)
|
|
{
|
|
|
|
TransporterFacade* tp = TransporterFacade::instance();
|
|
Guard guard(tp->theMutexPtr);
|
|
Uint32 nodeId = theNdbCon->theDBnode;
|
|
|
|
{
|
|
int res;
|
|
if((res= close_impl(tp, forceSend)))
|
|
{
|
|
return res;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Reset receivers
|
|
*/
|
|
reset_receivers(theParallelism, m_ordered);
|
|
|
|
theError.code = 0;
|
|
if (doSendScan(nodeId) == -1)
|
|
return -1;
|
|
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
NdbIndexScanOperation::reset_bounds(bool forceSend){
|
|
int res;
|
|
|
|
{
|
|
TransporterFacade* tp = TransporterFacade::instance();
|
|
Guard guard(tp->theMutexPtr);
|
|
res= close_impl(tp, forceSend);
|
|
}
|
|
|
|
if(!res)
|
|
{
|
|
theError.code = 0;
|
|
reset_receivers(theParallelism, m_ordered);
|
|
|
|
theLastKEYINFO = theSCAN_TABREQ->next();
|
|
theKEYINFOptr = ((KeyInfo*)theLastKEYINFO->getDataPtrSend())->keyData;
|
|
theTupKeyLen = 0;
|
|
theTotalNrOfKeyWordInSignal = 0;
|
|
theNoOfTupKeyLeft = m_accessTable->m_noOfDistributionKeys;
|
|
theDistrKeyIndicator_ = 0;
|
|
m_this_bound_start = 0;
|
|
m_first_bound_word = theKEYINFOptr;
|
|
m_transConnection
|
|
->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp,
|
|
this);
|
|
m_transConnection->define_scan_op(this);
|
|
return 0;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
int
|
|
NdbIndexScanOperation::end_of_bound(Uint32 no)
|
|
{
|
|
if(no < (1 << 13)) // Only 12-bits no of ranges
|
|
{
|
|
Uint32 bound_head = * m_first_bound_word;
|
|
bound_head |= (theTupKeyLen - m_this_bound_start) << 16 | (no << 4);
|
|
* m_first_bound_word = bound_head;
|
|
|
|
m_first_bound_word = theKEYINFOptr + theTotalNrOfKeyWordInSignal;;
|
|
m_this_bound_start = theTupKeyLen;
|
|
return 0;
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
int
|
|
NdbIndexScanOperation::get_range_no()
|
|
{
|
|
NdbRecAttr* tRecAttr = m_curr_row;
|
|
if(m_read_range_no && tRecAttr)
|
|
{
|
|
if(m_keyInfo)
|
|
tRecAttr = tRecAttr->next();
|
|
Uint32 ret = *(Uint32*)tRecAttr->aRef();
|
|
return ret;
|
|
}
|
|
return -1;
|
|
}
|