1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-01 03:47:19 +03:00

Merge joreland@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into mysql.com:/home/jonas/src/mysql-4.1-ndb


ndb/include/ndbapi/NdbOperation.hpp:
  Auto merged
ndb/include/ndbapi/NdbScanOperation.hpp:
  Auto merged
ndb/src/ndbapi/NdbOperationDefine.cpp:
  Auto merged
This commit is contained in:
unknown
2004-10-04 13:02:16 +02:00
23 changed files with 352 additions and 227 deletions

View File

@ -35,6 +35,7 @@ class AttrInfo {
*/
friend class Dbtc;
friend class Dblqh;
friend class NdbScanOperation;
friend bool printATTRINFO(FILE *, const Uint32 *, Uint32, Uint16);

View File

@ -26,6 +26,7 @@ class KeyInfo {
friend class DbUtil;
friend class NdbOperation;
friend class NdbScanOperation;
friend class NdbIndexScanOperation;
/**
* Reciver(s)

View File

@ -35,13 +35,15 @@ class ScanFragReq {
public:
STATIC_CONST( SignalLength = 12 );
friend bool printSCAN_FRAGREQ(FILE *, const Uint32*, Uint32, Uint16);
public:
Uint32 senderData;
Uint32 resultRef; // Where to send the result
Uint32 savePointId;
Uint32 requestInfo;
Uint32 tableId;
Uint32 fragmentNo;
Uint32 fragmentNoKeyLen;
Uint32 schemaVersion;
Uint32 transId1;
Uint32 transId2;

View File

@ -55,7 +55,7 @@ private:
* DATA VARIABLES
*/
UintR apiConnectPtr; // DATA 0
UintR attrLen; // DATA 1
UintR attrLenKeyLen; // DATA 1
UintR requestInfo; // DATA 2
UintR tableId; // DATA 3
UintR tableSchemaVersion; // DATA 4

View File

@ -673,6 +673,7 @@ private:
void printState();
#endif
bool checkState_TransId(const Uint32 * transId) const;
void define_scan_op(NdbIndexScanOperation*);
};
inline

View File

@ -119,12 +119,18 @@ public:
/** @} *********************************************************************/
/**
* Reset bounds and put operation in list that will be
* sent on next execute
*/
int reset_bounds();
private:
NdbIndexScanOperation(Ndb* aNdb);
virtual ~NdbIndexScanOperation();
int setBound(const NdbColumnImpl*, int type, const void* aValue, Uint32 len);
int saveBoundATTRINFO();
int insertBOUNDS(Uint32 * data, Uint32 sz);
virtual int equal_impl(const NdbColumnImpl*, const char*, Uint32);
virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*);

View File

@ -749,7 +749,6 @@ protected:
FinalGetValue,
SubroutineExec,
SubroutineEnd,
SetBound,
WaitResponse,
WaitCommitResponse,
Finished,
@ -921,9 +920,6 @@ protected:
Uint16 m_keyInfoGSN;
Uint16 m_attrInfoGSN;
// saveBoundATTRINFO() moves ATTRINFO here when setBound() is ready
NdbApiSignal* theBoundATTRINFO;
Uint32 theTotalBoundAI_Len;
// Blobs in this operation
NdbBlob* theBlobList;

View File

@ -93,6 +93,7 @@ protected:
virtual void release();
void closeScan();
int close_impl(class TransporterFacade*);
// Overloaded methods from NdbCursorOperation
int executeCursor(int ProcessorId);
@ -119,6 +120,7 @@ protected:
int prepareSendScan(Uint32 TC_ConnectPtr, Uint64 TransactionId);
int fix_receivers(Uint32 parallel);
void reset_receivers(Uint32 parallel, Uint32 ordered);
Uint32* m_array; // containing all arrays below
Uint32 m_allocated_receivers;
NdbReceiver** m_receivers; // All receivers

View File

@ -23,7 +23,8 @@ libsignaldataprint_la_SOURCES = \
FailRep.cpp DisconnectRep.cpp SignalDroppedRep.cpp \
SumaImpl.cpp NdbSttor.cpp CreateFragmentation.cpp \
UtilLock.cpp TuxMaint.cpp AccLock.cpp \
LqhTrans.cpp ReadNodesConf.cpp CntrStart.cpp
LqhTrans.cpp ReadNodesConf.cpp CntrStart.cpp \
ScanFrag.cpp
include $(top_srcdir)/ndb/config/common.mk.am
include $(top_srcdir)/ndb/config/type_ndbapi.mk.am

View File

@ -37,8 +37,10 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
sig->getHoldLockFlag(requestInfo),
sig->getRangeScanFlag(requestInfo));
fprintf(output, " attrLen: %d, tableId: %d, tableSchemaVer: %d\n",
sig->attrLen, sig->tableId, sig->tableSchemaVersion);
Uint32 keyLen = (sig->attrLenKeyLen >> 16);
Uint32 attrLen = (sig->attrLenKeyLen & 0xFFFF);
fprintf(output, " attrLen: %d, keyLen: %d tableId: %d, tableSchemaVer: %d\n",
attrLen, keyLen, sig->tableId, sig->tableSchemaVersion);
fprintf(output, " transId(1, 2): (H\'%.8x, H\'%.8x) storedProcId: H\'%.8x\n",
sig->transId1, sig->transId2, sig->storedProcId);

View File

@ -53,6 +53,7 @@
#include <signaldata/UtilPrepare.hpp>
#include <signaldata/UtilExecute.hpp>
#include <signaldata/ScanTab.hpp>
#include <signaldata/ScanFrag.hpp>
#include <signaldata/LqhFrag.hpp>
#include <signaldata/LqhTransConf.hpp>
#include <signaldata/DropTab.hpp>
@ -250,6 +251,7 @@ SignalDataPrintFunctions[] = {
,{ GSN_TUX_MAINT_REQ, printTUX_MAINT_REQ }
,{ GSN_ACC_LOCKREQ, printACC_LOCKREQ }
,{ GSN_LQH_TRANSCONF, printLQH_TRANSCONF }
,{ GSN_SCAN_FRAGREQ, printSCAN_FRAGREQ }
};
const unsigned short NO_OF_PRINT_FUNCTIONS = sizeof(SignalDataPrintFunctions)/sizeof(NameFunctionPair);

View File

@ -3360,7 +3360,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
req->senderData = filePtr.i;
req->resultRef = reference();
req->schemaVersion = table.schemaVersion;
req->fragmentNo = fragNo;
req->fragmentNoKeyLen = fragNo;
req->requestInfo = 0;
req->savePointId = 0;
req->tableId = table.tableId;

View File

@ -2248,7 +2248,7 @@ private:
void sendAttrinfoLoop(Signal* signal);
void sendAttrinfoSignal(Signal* signal);
void sendLqhAttrinfoSignal(Signal* signal);
void sendKeyinfoAcc(Signal* signal);
void sendKeyinfoAcc(Signal* signal, Uint32 pos);
Uint32 initScanrec(const class ScanFragReq *);
void initScanTc(Signal* signal,
Uint32 transid1,

View File

@ -2803,8 +2803,10 @@ void Dblqh::execKEYINFO(Signal* signal)
return;
}//if
TcConnectionrec * const regTcPtr = tcConnectptr.p;
if (regTcPtr->transactionState !=
TcConnectionrec::WAIT_TUPKEYINFO) {
TcConnectionrec::TransactionState state = regTcPtr->transactionState;
if (state != TcConnectionrec::WAIT_TUPKEYINFO &&
state != TcConnectionrec::WAIT_SCAN_AI)
{
jam();
/*****************************************************************************/
/* TRANSACTION WAS ABORTED, THIS IS MOST LIKELY A SIGNAL BELONGING TO THE */
@ -2822,15 +2824,19 @@ void Dblqh::execKEYINFO(Signal* signal)
return;
}//if
jam();
abort();
terrorCode = errorCode;
abortErrorLab(signal);
return;
}//if
FragrecordPtr regFragptr;
regFragptr.i = regTcPtr->fragmentptr;
ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
fragptr = regFragptr;
endgettupkeyLab(signal);
if(state == TcConnectionrec::WAIT_TUPKEYINFO)
{
FragrecordPtr regFragptr;
regFragptr.i = regTcPtr->fragmentptr;
ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
fragptr = regFragptr;
endgettupkeyLab(signal);
}
return;
}//Dblqh::execKEYINFO()
@ -2838,9 +2844,9 @@ void Dblqh::execKEYINFO(Signal* signal)
/* FILL IN KEY DATA INTO DATA BUFFERS. */
/* ------------------------------------------------------------------------- */
Uint32 Dblqh::handleLongTupKey(Signal* signal,
Uint32 keyLength,
Uint32 primKeyLength,
Uint32* dataPtr)
Uint32 keyLength,
Uint32 primKeyLength,
Uint32* dataPtr)
{
TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 dataPos = 0;
@ -3686,7 +3692,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
signal->theData[9] = sig3;
signal->theData[10] = sig4;
if (regTcPtr->primKeyLen > 4) {
sendKeyinfoAcc(signal);
sendKeyinfoAcc(signal, 11);
}//if
EXECUTE_DIRECT(refToBlock(regTcPtr->tcAccBlockref), GSN_ACCKEYREQ,
signal, 7 + regTcPtr->primKeyLen);
@ -3708,9 +3714,8 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
/* ======= SEND KEYINFO TO ACC ======= */
/* */
/* ========================================================================== */
void Dblqh::sendKeyinfoAcc(Signal* signal)
void Dblqh::sendKeyinfoAcc(Signal* signal, Uint32 Ti)
{
UintR Ti = 11;
DatabufPtr regDatabufptr;
regDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
@ -7409,7 +7414,8 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
jamEntry();
const Uint32 reqinfo = scanFragReq->requestInfo;
const Uint32 fragId = scanFragReq->fragmentNo;
const Uint32 fragId = (scanFragReq->fragmentNoKeyLen & 0xFFFF);
const Uint32 keyLen = (scanFragReq->fragmentNoKeyLen >> 16);
tabptr.i = scanFragReq->tableId;
const Uint32 max_rows = scanFragReq->batch_size_rows;
const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
@ -7473,6 +7479,8 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
transid2,
fragId,
ZNIL);
tcConnectptr.p->save1 = 4;
tcConnectptr.p->primKeyLen = keyLen + 4; // hard coded in execKEYINFO
errorCode = initScanrec(scanFragReq);
if (errorCode != ZOK) {
jam();
@ -7672,34 +7680,18 @@ void Dblqh::accScanConfScanLab(Signal* signal)
return;
}//if
scanptr.p->scanAccPtr = accScanConf->accPtr;
AttrbufPtr regAttrinbufptr;
regAttrinbufptr.i = tcConnectptr.p->firstAttrinbuf;
Uint32 boundAiLength = 0;
Uint32 boundAiLength = tcConnectptr.p->primKeyLen - 4;
if (scanptr.p->rangeScan) {
jam();
// bound info length is in first of the 5 header words
ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
boundAiLength = regAttrinbufptr.p->attrbuf[0];
TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
req->errorCode = RNIL;
req->tuxScanPtrI = scanptr.p->scanAccPtr;
req->boundAiLength = boundAiLength;
Uint32* out = (Uint32*)req + TuxBoundInfo::SignalLength;
Uint32 sz = 0;
while (sz < boundAiLength) {
jam();
ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
MEMCOPY_NO_WORDS(&out[sz],
&regAttrinbufptr.p->attrbuf[0],
dataLen);
sz += dataLen;
regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
}
ndbrequire(sz == boundAiLength);
if(boundAiLength > 0)
sendKeyinfoAcc(signal, TuxBoundInfo::SignalLength);
EXECUTE_DIRECT(DBTUX, GSN_TUX_BOUND_INFO,
signal, TuxBoundInfo::SignalLength + boundAiLength);
signal, TuxBoundInfo::SignalLength + boundAiLength);
jamEntry();
if (req->errorCode != 0) {
jam();
@ -7716,12 +7708,14 @@ void Dblqh::accScanConfScanLab(Signal* signal)
signal->theData[1] = tcConnectptr.p->tableref;
signal->theData[2] = scanptr.p->scanSchemaVersion;
signal->theData[3] = ZSTORED_PROC_SCAN;
ndbrequire(boundAiLength <= scanptr.p->scanAiLength);
signal->theData[4] = scanptr.p->scanAiLength - boundAiLength;
signal->theData[4] = scanptr.p->scanAiLength;
sendSignal(tcConnectptr.p->tcTupBlockref,
GSN_STORED_PROCREQ, signal, 5, JBB);
signal->theData[0] = tcConnectptr.p->tupConnectrec;
AttrbufPtr regAttrinbufptr;
regAttrinbufptr.i = tcConnectptr.p->firstAttrinbuf;
while (regAttrinbufptr.i != RNIL) {
ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
jam();

View File

@ -192,7 +192,8 @@ public:
OS_WAIT_ATTR = 14,
OS_WAIT_COMMIT_CONF = 15,
OS_WAIT_ABORT_CONF = 16,
OS_WAIT_COMPLETE_CONF = 17
OS_WAIT_COMPLETE_CONF = 17,
OS_WAIT_SCAN = 18
};
enum AbortState {
@ -1169,6 +1170,8 @@ public:
// Length of expected attribute information
Uint32 scanAiLength;
Uint32 scanKeyLen;
// Reference to ApiConnectRecord
Uint32 scanApiRec;
@ -1571,7 +1574,7 @@ private:
void diFcountReqLab(Signal* signal, ScanRecordPtr);
void signalErrorRefuseLab(Signal* signal);
void abort080Lab(Signal* signal);
void packKeyData000Lab(Signal* signal, BlockReference TBRef);
void packKeyData000Lab(Signal* signal, BlockReference TBRef, Uint32 len);
void abortScanLab(Signal* signal, ScanRecordPtr, Uint32 errCode);
void sendAbortedAfterTimeout(Signal* signal, int Tcheck);
void abort010Lab(Signal* signal);

View File

@ -1751,6 +1751,7 @@ void Dbtc::execKEYINFO(Signal* signal)
switch (apiConnectptr.p->apiConnectstate) {
case CS_RECEIVING:
case CS_REC_COMMITTING:
case CS_START_SCAN:
jam();
/*empty*/;
break;
@ -1804,12 +1805,54 @@ void Dbtc::execKEYINFO(Signal* signal)
jam();
tckeyreq020Lab(signal);
return;
case OS_WAIT_SCAN:
break;
default:
jam();
terrorCode = ZSTATE_ERROR;
abortErrorLab(signal);
return;
}//switch
UintR TdataPos = 0;
UintR TkeyLen = regCachePtr->keylen;
UintR Tlen = regCachePtr->save1;
do {
if (cfirstfreeDatabuf == RNIL) {
jam();
abort();
seizeDatabuferrorLab(signal);
return;
}//if
linkKeybuf(signal);
arrGuard(TdataPos, 19);
databufptr.p->data[0] = signal->theData[TdataPos + 3];
databufptr.p->data[1] = signal->theData[TdataPos + 4];
databufptr.p->data[2] = signal->theData[TdataPos + 5];
databufptr.p->data[3] = signal->theData[TdataPos + 6];
Tlen = Tlen + 4;
TdataPos = TdataPos + 4;
if (Tlen < TkeyLen) {
jam();
if (TdataPos >= tmaxData) {
jam();
/*----------------------------------------------------*/
/** EXIT AND WAIT FOR SIGNAL KEYINFO OR KEYINFO9 **/
/** WHEN EITHER OF THE SIGNALS IS RECEIVED A JUMP **/
/** TO LABEL "KEYINFO_LABEL" IS DONE. THEN THE **/
/** PROGRAM RETURNS TO LABEL TCKEYREQ020 **/
/*----------------------------------------------------*/
setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__);
regCachePtr->save1 = Tlen;
return;
}//if
} else {
jam();
return;
}//if
} while (1);
return;
}//Dbtc::execKEYINFO()
/*---------------------------------------------------------------------------*/
@ -1818,45 +1861,45 @@ void Dbtc::execKEYINFO(Signal* signal)
/* WE WILL ALWAYS PACK 4 WORDS AT A TIME. */
/*---------------------------------------------------------------------------*/
void Dbtc::packKeyData000Lab(Signal* signal,
BlockReference TBRef)
BlockReference TBRef,
Uint32 totalLen)
{
CacheRecord * const regCachePtr = cachePtr.p;
UintR Tmp;
Uint16 tdataPos;
jam();
tdataPos = 0;
Tmp = regCachePtr->keylen;
Uint32 len = 0;
databufptr.i = regCachePtr->firstKeybuf;
signal->theData[0] = tcConnectptr.i;
signal->theData[1] = apiConnectptr.p->transid[0];
signal->theData[2] = apiConnectptr.p->transid[1];
Uint32 * dst = signal->theData+3;
ptrCheckGuard(databufptr, cdatabufFilesize, databufRecord);
do {
jam();
if (tdataPos == 20) {
jam();
/*---------------------------------------------------------------------*/
/* 4 MORE WORDS WILL NOT FIT IN THE 24 DATA WORDS IN A SIGNAL */
/*---------------------------------------------------------------------*/
sendKeyinfo(signal, TBRef, 20);
tdataPos = 0;
}//if
Tmp = Tmp - 4;
ptrCheckGuard(databufptr, cdatabufFilesize, databufRecord);
cdata[tdataPos ] = databufptr.p->data[0];
cdata[tdataPos + 1] = databufptr.p->data[1];
cdata[tdataPos + 2] = databufptr.p->data[2];
cdata[tdataPos + 3] = databufptr.p->data[3];
tdataPos = tdataPos + 4;
if (Tmp <= 4) {
databufptr.i = databufptr.p->nextDatabuf;
dst[len + 0] = databufptr.p->data[0];
dst[len + 1] = databufptr.p->data[1];
dst[len + 2] = databufptr.p->data[2];
dst[len + 3] = databufptr.p->data[3];
len += 4;
if (totalLen <= 4) {
jam();
/*---------------------------------------------------------------------*/
/* LAST PACK OF KEY DATA HAVE BEEN SENT */
/*---------------------------------------------------------------------*/
/* THERE WERE UNSENT INFORMATION, SEND IT. */
/*---------------------------------------------------------------------*/
sendKeyinfo(signal, TBRef, tdataPos);
releaseKeys();
sendSignal(TBRef, GSN_KEYINFO, signal, 3 + len, JBB);
return;
}//if
databufptr.i = databufptr.p->nextDatabuf;
} else if(len == KeyInfo::DataLength){
jam();
len = 0;
sendSignal(TBRef, GSN_KEYINFO, signal, 3 + KeyInfo::DataLength, JBB);
}
totalLen -= 4;
ptrCheckGuard(databufptr, cdatabufFilesize, databufRecord);
} while (1);
}//Dbtc::packKeyData000Lab()
@ -3006,7 +3049,8 @@ void Dbtc::packLqhkeyreq(Signal* signal,
UintR TfirstAttrbuf = regCachePtr->firstAttrbuf;
sendlqhkeyreq(signal, TBRef);
if (Tkeylen > 4) {
packKeyData000Lab(signal, TBRef);
packKeyData000Lab(signal, TBRef, Tkeylen - 4);
releaseKeys();
}//if
packLqhkeyreq040Lab(signal,
TfirstAttrbuf,
@ -8398,7 +8442,8 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
{
const ScanTabReq * const scanTabReq = (ScanTabReq *)&signal->theData[0];
const Uint32 reqinfo = scanTabReq->requestInfo;
const Uint32 aiLength = scanTabReq->attrLen;
const Uint32 aiLength = (scanTabReq->attrLenKeyLen & 0xFFFF);
const Uint32 keyLen = scanTabReq->attrLenKeyLen >> 16;
const Uint32 schemaVersion = scanTabReq->tableSchemaVersion;
const Uint32 transid1 = scanTabReq->transId1;
const Uint32 transid2 = scanTabReq->transId2;
@ -8472,8 +8517,12 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
seizeTcConnect(signal);
tcConnectptr.p->apiConnect = apiConnectptr.i;
tcConnectptr.p->tcConnectstate = OS_WAIT_SCAN;
apiConnectptr.p->lastTcConnect = tcConnectptr.i;
seizeCacheRecord(signal);
cachePtr.p->keylen = keyLen;
cachePtr.p->save1 = 0;
scanptr = seizeScanrec(signal);
ndbrequire(transP->apiScanRec == RNIL);
@ -8554,7 +8603,8 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
scanptr.p->scanTcrec = tcConnectptr.i;
scanptr.p->scanApiRec = apiConnectptr.i;
scanptr.p->scanAiLength = scanTabReq->attrLen;
scanptr.p->scanAiLength = scanTabReq->attrLenKeyLen & 0xFFFF;
scanptr.p->scanKeyLen = scanTabReq->attrLenKeyLen >> 16;
scanptr.p->scanTableref = tabptr.i;
scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion;
scanptr.p->scanParallel = scanParallel;
@ -8799,6 +8849,7 @@ void Dbtc::releaseScanResources(ScanRecordPtr scanPtr)
if (apiConnectptr.p->cachePtr != RNIL) {
cachePtr.i = apiConnectptr.p->cachePtr;
ptrCheckGuard(cachePtr, ccacheFilesize, cacheRecord);
releaseKeys();
releaseAttrinfo();
}//if
tcConnectptr.i = scanPtr.p->scanTcrec;
@ -9458,7 +9509,7 @@ void Dbtc::sendScanFragReq(Signal* signal,
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
req->senderData = scanFragptr.i;
req->requestInfo = requestInfo;
req->fragmentNo = scanFragP->scanFragId;
req->fragmentNoKeyLen = scanFragP->scanFragId | (scanP->scanKeyLen << 16);
req->resultRef = apiConnectptr.p->ndbapiBlockref;
req->savePointId = apiConnectptr.p->currSavePointId;
req->transId1 = apiConnectptr.p->transid[0];
@ -9468,6 +9519,11 @@ void Dbtc::sendScanFragReq(Signal* signal,
req->batch_size_bytes= scanP->batch_byte_size;
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB);
if(scanP->scanKeyLen > 0)
{
tcConnectptr.i = scanFragptr.i;
packKeyData000Lab(signal, scanFragP->lqhBlockref, scanP->scanKeyLen);
}
updateBuddyTimer(apiConnectptr);
scanFragP->startFragTimer(ctcTimer);
}//Dbtc::sendScanFragReq()

View File

@ -129,14 +129,14 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
// largest attrId seen plus one
Uint32 maxAttrId = 0;
// skip 5 words
if (req->boundAiLength < 5) {
unsigned offset = 0;
if (req->boundAiLength < offset) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
return;
}
const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
unsigned offset = 5;
// walk through entries
while (offset + 2 <= req->boundAiLength) {
jam();

View File

@ -1891,7 +1891,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
ScanFragReq::setAttrLen(req->requestInfo, attrLen);
req->fragmentNo = fd.m_fragDesc.m_fragmentNo;
req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
req->schemaVersion = tabPtr.p->m_schemaVersion;
req->transId1 = 0;
req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);

View File

@ -1115,15 +1115,8 @@ NdbConnection::getNdbScanOperation(const NdbTableImpl * tab)
if (tOp == NULL)
goto getNdbOp_error1;
// Link scan operation into list of cursor operations
if (m_theLastScanOperation == NULL)
m_theFirstScanOperation = m_theLastScanOperation = tOp;
else {
m_theLastScanOperation->next(tOp);
m_theLastScanOperation = tOp;
}
tOp->next(NULL);
if (tOp->init(tab, this) != -1) {
define_scan_op(tOp);
return tOp;
} else {
theNdb->releaseScanOperation(tOp);
@ -1135,6 +1128,18 @@ getNdbOp_error1:
return NULL;
}//NdbConnection::getNdbScanOperation()
void
NdbConnection::define_scan_op(NdbIndexScanOperation * tOp){
// Link scan operation into list of cursor operations
if (m_theLastScanOperation == NULL)
m_theFirstScanOperation = m_theLastScanOperation = tOp;
else {
m_theLastScanOperation->next(tOp);
m_theLastScanOperation = tOp;
}
tOp->next(NULL);
}
NdbScanOperation*
NdbConnection::getNdbScanOperation(const NdbDictionary::Table * table)
{

View File

@ -78,7 +78,6 @@ NdbOperation::NdbOperation(Ndb* aNdb) :
m_tcReqGSN(GSN_TCKEYREQ),
m_keyInfoGSN(GSN_KEYINFO),
m_attrInfoGSN(GSN_ATTRINFO),
theBoundATTRINFO(NULL),
theBlobList(NULL)
{
theReceiver.init(NdbReceiver::NDB_OPERATION, this);
@ -167,7 +166,6 @@ NdbOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection){
theScanInfo = 0;
theTotalNrOfKeyWordInSignal = 8;
theMagicNumber = 0xABCDEF01;
theBoundATTRINFO = NULL;
theBlobList = NULL;
tSignal = theNdb->getSignal();
@ -263,14 +261,6 @@ NdbOperation::release()
tSubroutine = tSubroutine->theNext;
theNdb->releaseNdbSubroutine(tSaveSubroutine);
}
tSignal = theBoundATTRINFO;
while (tSignal != NULL)
{
tSaveSignal = tSignal;
tSignal = tSignal->next();
theNdb->releaseSignal(tSaveSignal);
}
theBoundATTRINFO = NULL;
}
tBlob = theBlobList;
while (tBlob != NULL)

View File

@ -334,10 +334,6 @@ NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue)
if ((tAttrInfo != NULL) &&
(!tAttrInfo->m_indexOnly) &&
(theStatus != Init)){
if (theStatus == SetBound) {
((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue;
}
if (theStatus != GetValue) {
if (theInterpretIndicator == 1) {
if (theStatus == FinalGetValue) {

View File

@ -216,10 +216,6 @@ int
NdbOperation::initial_interpreterCheck()
{
if ((theInterpretIndicator == 1)) {
if (theStatus == SetBound) {
((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue;
}
if (theStatus == ExecInterpretedValue) {
return 0; // Simply continue with interpretation
} else if (theStatus == GetValue) {

View File

@ -32,6 +32,7 @@
#include <signaldata/ScanTab.hpp>
#include <signaldata/KeyInfo.hpp>
#include <signaldata/AttrInfo.hpp>
#include <signaldata/TcKeyReq.hpp>
NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
@ -116,10 +117,6 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection)
theStatus = GetValue;
theOperationType = OpenScanRequest;
theTotalBoundAI_Len = 0;
theBoundATTRINFO = NULL;
return 0;
}
@ -181,7 +178,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
}
assert (m_currentTable != m_accessTable);
// Modify operation state
theStatus = SetBound;
theStatus = GetValue;
theOperationType = OpenRangeScanRequest;
range = true;
}
@ -219,8 +216,14 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
req->transId1 = (Uint32) transId;
req->transId2 = (Uint32) (transId >> 32);
getFirstATTRINFOScan();
NdbApiSignal* tSignal =
theFirstKEYINFO = theLastKEYINFO = theNdb->getSignal();
tSignal->setSignal(GSN_KEYINFO);
theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
theTotalNrOfKeyWordInSignal= 0;
getFirstATTRINFOScan();
return getResultSet();
}
@ -355,6 +358,7 @@ NdbScanOperation::getFirstATTRINFOScan()
* After setBound() are done, move the accumulated ATTRINFO signals to
* a separate list. Then continue with normal scan.
*/
#if 0
int
NdbIndexScanOperation::saveBoundATTRINFO()
{
@ -401,6 +405,7 @@ NdbIndexScanOperation::saveBoundATTRINFO()
}
return res;
}
#endif
#define WAITFOR_SCAN_TIMEOUT 120000
@ -428,7 +433,6 @@ NdbScanOperation::executeCursor(int nodeId){
TRACE_DEBUG("The node is hard dead when attempting to start a scan");
setErrorCode(4029);
tCon->theReleaseOnClose = true;
abort();
} else {
TRACE_DEBUG("The node is stopping when attempting to start a scan");
setErrorCode(4030);
@ -635,7 +639,7 @@ NdbScanOperation::doSend(int ProcessorId)
void NdbScanOperation::closeScan()
{
if(m_transConnection) do {
if(m_transConnection){
if(DEBUG_NEXT_RESULT)
ndbout_c("closeScan() theError.code = %d "
"m_api_receivers_count = %d "
@ -648,55 +652,8 @@ void NdbScanOperation::closeScan()
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
close_impl(tp);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq != tp->getNodeSequence(nodeId)){
theNdbCon->theReleaseOnClose = true;
break;
}
while(theError.code == 0 && m_sent_receivers_count){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
break;
case -1:
setErrorCode(4008);
case -2:
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
theNdbCon->theReleaseOnClose = true;
}
}
if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan
send_next_scan(0, true); // Close scan
}
/**
* wait for close scan conf
*/
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
break;
case -1:
setErrorCode(4008);
case -2:
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
}
}
} while(0);
theNdbCon->theScanningOp = 0;
@ -750,11 +707,6 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
return -1;
}
if (theStatus == SetBound) {
((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue;
}
theErrorLine = 0;
// In preapareSendInterpreted we set the sizes (word 4-8) in the
@ -769,23 +721,7 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
const Uint32 transId1 = (Uint32) (aTransactionId & 0xFFFFFFFF);
const Uint32 transId2 = (Uint32) (aTransactionId >> 32);
if (theOperationType == OpenRangeScanRequest) {
NdbApiSignal* tSignal = theBoundATTRINFO;
do{
tSignal->setData(aTC_ConnectPtr, 1);
tSignal->setData(transId1, 2);
tSignal->setData(transId2, 3);
tSignal = tSignal->next();
} while (tSignal != NULL);
}
theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
NdbApiSignal* tSignal = theFirstATTRINFO;
do{
tSignal->setData(aTC_ConnectPtr, 1);
tSignal->setData(transId1, 2);
tSignal->setData(transId2, 3);
tSignal = tSignal->next();
} while (tSignal != NULL);
/**
* Prepare all receivers
@ -841,13 +777,18 @@ NdbScanOperation::doSendScan(int aProcessorId)
setErrorCode(4001);
return -1;
}
Uint32 tupKeyLen = theTupKeyLen;
Uint32 len = theTotalNrOfKeyWordInSignal;
Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr;
Uint64 transId = theNdbCon->theTransactionId;
// Update the "attribute info length in words" in SCAN_TABREQ before
// sending it. This could not be done in openScan because
// we created the ATTRINFO signals after the SCAN_TABREQ signal.
ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
req->attrLen = theTotalCurrAI_Len;
if (theOperationType == OpenRangeScanRequest)
req->attrLen += theTotalBoundAI_Len;
req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len;
TransporterFacade *tp = TransporterFacade::instance();
LinearSectionPtr ptr[3];
ptr[0].p = m_prepared_receivers;
@ -856,22 +797,41 @@ NdbScanOperation::doSendScan(int aProcessorId)
setErrorCode(4002);
return -1;
}
if (theOperationType == OpenRangeScanRequest) {
if (tupKeyLen > 0){
// must have at least one signal since it contains attrLen for bounds
assert(theBoundATTRINFO != NULL);
tSignal = theBoundATTRINFO;
while (tSignal != NULL) {
assert(theLastKEYINFO != NULL);
tSignal = theLastKEYINFO;
tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal);
assert(theFirstKEYINFO != NULL);
tSignal = theFirstKEYINFO;
NdbApiSignal* last;
do {
KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
keyInfo->connectPtr = aTC_ConnectPtr;
keyInfo->transId[0] = Uint32(transId);
keyInfo->transId[1] = Uint32(transId >> 32);
if (tp->sendSignal(tSignal,aProcessorId) == -1){
setErrorCode(4002);
return -1;
setErrorCode(4002);
return -1;
}
tSignalCount++;
last = tSignal;
tSignal = tSignal->next();
}
} while(last != theLastKEYINFO);
}
tSignal = theFirstATTRINFO;
while (tSignal != NULL) {
AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend());
attrInfo->connectPtr = aTC_ConnectPtr;
attrInfo->transId[0] = Uint32(transId);
attrInfo->transId[1] = Uint32(transId >> 32);
if (tp->sendSignal(tSignal,aProcessorId) == -1){
setErrorCode(4002);
return -1;
@ -1056,11 +1016,6 @@ NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo,
return NdbScanOperation::getValue_impl(attrInfo, aValue);
}
if (theStatus == SetBound) {
saveBoundATTRINFO();
theStatus = GetValue;
}
int id = attrInfo->m_attrId; // In "real" table
assert(m_accessTable->m_index);
int sz = (int)m_accessTable->m_index->m_key_ids.size();
@ -1101,12 +1056,13 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
int type, const void* aValue, Uint32 len)
{
if (theOperationType == OpenRangeScanRequest &&
theStatus == SetBound &&
(0 <= type && type <= 4) &&
len <= 8000) {
// insert bound type
insertATTRINFO(type);
Uint32 currLen = theTotalNrOfKeyWordInSignal;
Uint32 remaining = KeyInfo::DataLength - currLen;
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
// normalize char bound
CHARSET_INFO* cs = tAttrInfo->m_cs;
Uint32 xfrmData[2000];
@ -1130,19 +1086,34 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
Uint32 tIndexAttrId = tAttrInfo->m_attrId;
Uint32 sizeInWords = (len + 3) / 4;
AttributeHeader ah(tIndexAttrId, sizeInWords);
insertATTRINFO(ah.m_value);
if (len != 0) {
// insert attribute data
if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0)
insertATTRINFOloop((const Uint32*)aValue, sizeInWords);
else {
Uint32 tempData[2000];
memcpy(tempData, aValue, len);
const Uint32 ahValue = ah.m_value;
const bool aligned = (UintPtr(aValue) & 3) == 0;
const bool nobytes = (len & 0x3) == 0;
const Uint32 totalLen = 2 + sizeInWords;
Uint32 tupKeyLen = theTupKeyLen;
if(remaining > totalLen && aligned && nobytes){
Uint32 * dst = theKEYINFOptr + currLen;
* dst ++ = type;
* dst ++ = ahValue;
memcpy(dst, aValue, 4 * sizeInWords);
theTotalNrOfKeyWordInSignal = currLen + totalLen;
} else {
if(!aligned || !nobytes){
Uint32 tempData[2002];
tempData[0] = type;
tempData[1] = ahValue;
memcpy(tempData+2, aValue, len);
while ((len & 0x3) != 0)
((char*)tempData)[len++] = 0;
insertATTRINFOloop(tempData, sizeInWords);
((char*)&tempData[2])[len++] = 0;
insertBOUNDS(tempData, 2+sizeInWords);
} else {
Uint32 buf[2] = { type, ahValue };
insertBOUNDS(buf, 2);
insertBOUNDS((Uint32*)aValue, sizeInWords);
}
}
theTupKeyLen = tupKeyLen + totalLen;
/**
* Do sorted stuff
@ -1165,6 +1136,46 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
}
}
int
NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){
Uint32 len;
Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal;
Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal;
do {
len = (sz < remaining ? sz : remaining);
memcpy(dst, data, 4 * len);
if(sz >= remaining){
NdbApiSignal* tCurr = theLastKEYINFO;
tCurr->setLength(KeyInfo::MaxSignalLength);
NdbApiSignal* tSignal = tCurr->next();
if(tSignal)
;
else if((tSignal = theNdb->getSignal()) != 0)
{
tCurr->next(tSignal);
tSignal->setSignal(GSN_KEYINFO);
} else {
goto error;
}
theLastKEYINFO = tSignal;
theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
remaining = KeyInfo::DataLength;
sz -= len;
data += len;
} else {
len = (KeyInfo::DataLength - remaining) + len;
break;
}
} while(sz >= 0);
theTotalNrOfKeyWordInSignal = len;
return 0;
error:
setErrorCodeAbort(4228); // XXX wrong code
return -1;
}
NdbResultSet*
NdbIndexScanOperation::readTuples(LockMode lm,
Uint32 batch,
@ -1173,9 +1184,23 @@ NdbIndexScanOperation::readTuples(LockMode lm,
NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0);
if(rs && order_by){
m_ordered = 1;
m_sort_columns = m_accessTable->getNoOfColumns() - 1; // -1 for NDB$NODE
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
m_sort_columns = cnt; // -1 for NDB$NODE
m_current_api_receiver = m_sent_receivers_count;
m_api_receivers_count = m_sent_receivers_count;
m_sort_columns = cnt;
for(Uint32 i = 0; i<cnt; i++){
const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
UintPtr newVal = UintPtr(tmp);
theTupleKeyDefined[i][0] = FAKE_PTR;
theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
#if (SIZEOF_CHARP == 8)
theTupleKeyDefined[i][2] = (newVal >> 32);
#endif
}
}
return rs;
}
@ -1396,10 +1421,7 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
}
int
NdbScanOperation::restart(){
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
NdbScanOperation::close_impl(TransporterFacade* tp){
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
@ -1408,7 +1430,7 @@ NdbScanOperation::restart(){
return -1;
}
while(m_sent_receivers_count){
while(theError.code == 0 && m_sent_receivers_count){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@ -1421,14 +1443,17 @@ NdbScanOperation::restart(){
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
theNdbCon->theReleaseOnClose = true;
return -1;
}
}
if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan
if(send_next_scan(0, true) == -1) // Close scan
if(send_next_scan(0, true) == -1){ // Close scan
theNdbCon->theReleaseOnClose = true;
return -1;
}
}
/**
@ -1447,15 +1472,15 @@ NdbScanOperation::restart(){
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
theNdbCon->theReleaseOnClose = true;
return -1;
}
}
return 0;
}
/**
* Reset receivers
*/
const Uint32 parallell = theParallelism;
void
NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
for(Uint32 i = 0; i<parallell; i++){
m_receivers[i]->m_list_index = i;
m_prepared_receivers[i] = m_receivers[i]->getId();
@ -1470,13 +1495,59 @@ NdbScanOperation::restart(){
m_sent_receivers_count = parallell;
m_conf_receivers_count = 0;
if(m_ordered){
if(ordered){
m_current_api_receiver = parallell;
m_api_receivers_count = parallell;
}
}
int
NdbScanOperation::restart()
{
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
Uint32 nodeId = theNdbCon->theDBnode;
{
int res;
if((res= close_impl(tp)))
{
return res;
}
}
/**
* Reset receivers
*/
reset_receivers(theParallelism, m_ordered);
if (doSendScan(nodeId) == -1)
return -1;
return 0;
}
int
NdbIndexScanOperation::reset_bounds(){
int res;
{
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
res= close_impl(tp);
}
if(!res)
{
reset_receivers(theParallelism, m_ordered);
theLastKEYINFO = theFirstKEYINFO;
theKEYINFOptr = ((KeyInfo*)theFirstKEYINFO->getDataPtrSend())->keyData;
theTotalNrOfKeyWordInSignal= 0;
m_transConnection->define_scan_op(this);
return 0;
}
return res;
}