diff --git a/ndb/include/kernel/AttributeHeader.hpp b/ndb/include/kernel/AttributeHeader.hpp index 91190fdd223..669bbe6511f 100644 --- a/ndb/include/kernel/AttributeHeader.hpp +++ b/ndb/include/kernel/AttributeHeader.hpp @@ -30,6 +30,12 @@ class AttributeHeader { friend class Suma; public: + /** + * Psuedo columns + */ + STATIC_CONST( FRAGMENT = 0xFFFE ); + STATIC_CONST( ROW_COUNT = 0xFFFD ); + /** Initialize AttributeHeader at location aHeaderPtr */ static AttributeHeader& init(void* aHeaderPtr, Uint32 anAttributeId, Uint32 aDataSize); diff --git a/ndb/include/kernel/GlobalSignalNumbers.h b/ndb/include/kernel/GlobalSignalNumbers.h index 8941fa6b381..0a799ca6d95 100644 --- a/ndb/include/kernel/GlobalSignalNumbers.h +++ b/ndb/include/kernel/GlobalSignalNumbers.h @@ -23,14 +23,8 @@ * * When adding a new signal, remember to update MAX_GSN and SignalNames.cpp */ - - - const GlobalSignalNumber MAX_GSN = 712; - - - struct GsnName { GlobalSignalNumber gsn; const char * name; @@ -898,7 +892,6 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES; #define GSN_TUX_MAINT_REF 679 // not used 680 -// not used 712 // not used 681 /** @@ -951,6 +944,6 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES; #define GSN_TUX_BOUND_INFO 710 #define GSN_ACC_LOCKREQ 711 - +#define GSN_READ_ROWCOUNT_REQ 712 #endif diff --git a/ndb/include/kernel/signaldata/TupKey.hpp b/ndb/include/kernel/signaldata/TupKey.hpp index 304bebbec88..ffd57d81e64 100644 --- a/ndb/include/kernel/signaldata/TupKey.hpp +++ b/ndb/include/kernel/signaldata/TupKey.hpp @@ -80,7 +80,7 @@ class TupKeyConf { friend bool printTUPKEYCONF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo); public: - STATIC_CONST( SignalLength = 6 ); + STATIC_CONST( SignalLength = 5 ); private: @@ -88,11 +88,10 @@ private: * DATA VARIABLES */ Uint32 userPtr; - Uint32 pageId; - Uint32 pageIndex; Uint32 readLength; Uint32 writeLength; Uint32 noFiredTriggers; + Uint32 lastRow; }; class TupKeyRef { diff --git a/ndb/include/ndbapi/NdbOperation.hpp b/ndb/include/ndbapi/NdbOperation.hpp index c48dccd4864..9bf5a0817a4 100644 --- a/ndb/include/ndbapi/NdbOperation.hpp +++ b/ndb/include/ndbapi/NdbOperation.hpp @@ -609,6 +609,20 @@ public: int interpret_exit_nok(Uint32 ErrorCode); int interpret_exit_nok(); + + /** + * Interpreted program instruction: + * + * For scanning transactions, + * return this row, but no more from this fragment + * + * For non-scanning transactions, + * abort the whole transaction. + * + * @return -1 if unsuccessful. + */ + int interpret_exit_last_row(); + /** * Interpreted program instruction: * Define a subroutine in an interpreted operation. diff --git a/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/ndb/src/kernel/blocks/dbacc/Dbacc.hpp index cc3e646f219..24ca9558ca3 100644 --- a/ndb/src/kernel/blocks/dbacc/Dbacc.hpp +++ b/ndb/src/kernel/blocks/dbacc/Dbacc.hpp @@ -926,6 +926,7 @@ private: void execACC_OVER_REC(Signal* signal); void execACC_SAVE_PAGES(Signal* signal); void execNEXTOPERATION(Signal* signal); + void execREAD_ROWCOUNTREQ(Signal* signal); // Received signals void execSTTOR(Signal* signal); diff --git a/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp b/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp index b22fd6ce641..c2c4821a70c 100644 --- a/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp +++ b/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp @@ -148,6 +148,7 @@ Dbacc::Dbacc(const class Configuration & conf): addRecSignal(GSN_ACC_OVER_REC, &Dbacc::execACC_OVER_REC); addRecSignal(GSN_ACC_SAVE_PAGES, &Dbacc::execACC_SAVE_PAGES); addRecSignal(GSN_NEXTOPERATION, &Dbacc::execNEXTOPERATION); + addRecSignal(GSN_READ_ROWCOUNT_REQ, &Dbacc::execREAD_ROWCOUNTREQ); // Received signals addRecSignal(GSN_STTOR, &Dbacc::execSTTOR); diff --git a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 9cfac0ad2a2..ce30170f36a 100644 --- a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -13384,3 +13384,17 @@ void Dbacc::execSET_VAR_REQ(Signal* signal) #endif }//execSET_VAR_REQ() + +void +Dbacc::execREAD_ROWCOUNTREQ(Signal* signal){ + jamEntry(); + fragrecptr.i = signal->theData[0]; + ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); + rootfragrecptr.i = fragrecptr.p->myroot; + ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec); + Uint64 tmp = rootfragrecptr.p->noOfElements; + Uint32 * src = (Uint32*)&tmp; + signal->theData[0] = src[0]; + signal->theData[1] = src[1]; +} + diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index 9fcb6faf3e3..e945fb55761 100644 --- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -575,6 +575,7 @@ public: Uint8 scanReleaseCounter; Uint8 scanTcWaiting; Uint8 scanKeyinfoFlag; + Uint8 m_last_row; }; // Size 272 bytes typedef Ptr ScanRecordPtr; @@ -2097,7 +2098,8 @@ private: void execSTART_EXEC_SR(Signal* signal); void execEXEC_SRREQ(Signal* signal); void execEXEC_SRCONF(Signal* signal); - + void execREAD_ROWCOUNTREQ(Signal* signal); + void execDUMP_STATE_ORD(Signal* signal); void execACC_COM_BLOCK(Signal* signal); void execACC_COM_UNBLOCK(Signal* signal); diff --git a/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp b/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp index 4bb31185cfe..5e5ca57dc7f 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp @@ -323,6 +323,8 @@ Dblqh::Dblqh(const class Configuration & conf): addRecSignal(GSN_TUX_ADD_ATTRCONF, &Dblqh::execTUX_ADD_ATTRCONF); addRecSignal(GSN_TUX_ADD_ATTRREF, &Dblqh::execTUX_ADD_ATTRREF); + addRecSignal(GSN_READ_ROWCOUNT_REQ, &Dblqh::execREAD_ROWCOUNTREQ); + initData(); #ifdef VM_TRACE diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 6b4a78380be..6e020c1de63 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -2511,6 +2511,21 @@ Dblqh::updatePackedList(Signal* signal, HostRecord * ahostptr, Uint16 hostId) }//if }//Dblqh::updatePackedList() +void +Dblqh::execREAD_ROWCOUNTREQ(Signal* signal){ + jamEntry(); + TcConnectionrecPtr regTcPtr; + regTcPtr.i = signal->theData[0]; + ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec); + + FragrecordPtr regFragptr; + regFragptr.i = regTcPtr.p->fragmentptr; + ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord); + + signal->theData[0] = regFragptr.p->accFragptr[regTcPtr.p->localFragptr]; + EXECUTE_DIRECT(DBACC, GSN_READ_ROWCOUNT_REQ, signal, 1); +} + /* ************>> */ /* TUPKEYCONF > */ /* ************>> */ @@ -7014,6 +7029,14 @@ void Dblqh::continueScanNextReqLab(Signal* signal) return; }//if + if(scanptr.p->m_last_row){ + jam(); + scanptr.p->scanCompletedStatus = ZTRUE; + scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; + sendScanFragConf(signal, ZFALSE); + return; + } + // Update timer on tcConnectRecord tcConnectptr.p->tcTimer = cLqhTimeOutCount; @@ -7959,13 +7982,10 @@ bool Dblqh::keyinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length) * ------------------------------------------------------------------------- */ void Dblqh::scanTupkeyConfLab(Signal* signal) { - UintR tdata3; - UintR tdata4; - UintR tdata5; + const TupKeyConf * conf = (TupKeyConf *)signal->getDataPtr(); + UintR tdata4 = conf->readLength; + UintR tdata5 = conf->lastRow; - tdata3 = signal->theData[2]; - tdata4 = signal->theData[3]; - tdata5 = signal->theData[4]; tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED; scanptr.i = tcConnectptr.p->tcScanRec; releaseActiveFrag(signal); @@ -7996,15 +8016,15 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN); scanptr.p->scanOpLength[scanptr.p->scanCompletedOperations] = tdata4; scanptr.p->scanCompletedOperations++; - if ((scanptr.p->scanCompletedOperations == - scanptr.p->scanConcurrentOperations) && - (scanptr.p->scanLockHold == ZTRUE)) { + scanptr.p->m_last_row = conf->lastRow; + + const bool done = (scanptr.p->scanCompletedOperations == scanptr.p->scanConcurrentOperations) | conf->lastRow; + if (done && (scanptr.p->scanLockHold == ZTRUE)) { jam(); scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; sendScanFragConf(signal, ZFALSE); return; - } else if (scanptr.p->scanCompletedOperations == - scanptr.p->scanConcurrentOperations) { + } else if (done){ jam(); scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations; scanReleaseLocksLab(signal); @@ -8310,6 +8330,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) scanptr.p->scanLocalFragid = 0; scanptr.p->scanTcWaiting = ZTRUE; scanptr.p->scanNumber = ~0; + scanptr.p->m_last_row = 0; for (Uint32 i = 0; i < scanConcurrentOperations; i++) { jam(); diff --git a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp index b792edf9333..df752111291 100644 --- a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp +++ b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp @@ -622,7 +622,10 @@ struct Operationrec { Uint32 tcOpIndex; Uint32 gci; Uint32 noFiredTriggers; - Uint32 hashValue; // only used in TUP_COMMITREQ + union { + Uint32 hashValue; // only used in TUP_COMMITREQ + Uint32 lastRow; + }; Bitmask changeMask; }; typedef Ptr OperationrecPtr; @@ -1623,6 +1626,7 @@ private: //------------------------------------------------------------------ //------------------------------------------------------------------ bool nullFlagCheck(Uint32 attrDes2); + bool readRowcount(Uint32 userPtr, Uint32* outBuffer); //------------------------------------------------------------------ //------------------------------------------------------------------ diff --git a/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp index 0dc196d5f56..a36af46315d 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp @@ -708,7 +708,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal) regOperPtr->tupleState = TUPLE_BLOCKED; regOperPtr->changeMask.clear(); - + if (Rstoredid != ZNIL) { ndbrequire(initStoredOperationrec(regOperPtr, Rstoredid) == ZOK); }//if @@ -844,20 +844,18 @@ void Dbtup::sendTUPKEYCONF(Signal* signal, TupKeyConf * const tupKeyConf = (TupKeyConf *)signal->getDataPtrSend(); Uint32 RuserPointer = regOperPtr->userpointer; - Uint32 RfragPageId = regOperPtr->fragPageId; - Uint32 RpageIndex = regOperPtr->pageIndex; Uint32 RattroutbufLen = regOperPtr->attroutbufLen; Uint32 RnoFiredTriggers = regOperPtr->noFiredTriggers; BlockReference Ruserblockref = regOperPtr->userblockref; + Uint32 lastRow = regOperPtr->lastRow; regOperPtr->transstate = STARTED; regOperPtr->tupleState = NO_OTHER_OP; tupKeyConf->userPtr = RuserPointer; - tupKeyConf->pageId = RfragPageId; - tupKeyConf->pageIndex = RpageIndex; tupKeyConf->readLength = RattroutbufLen; tupKeyConf->writeLength = TlogSize; tupKeyConf->noFiredTriggers = RnoFiredTriggers; + tupKeyConf->lastRow = lastRow; EXECUTE_DIRECT(refToBlock(Ruserblockref), GSN_TUPKEYCONF, signal, TupKeyConf::SignalLength); @@ -920,6 +918,7 @@ int Dbtup::handleReadReq(Signal* signal, return -1; } else { jam(); + regOperPtr->lastRow = 0; if (interpreterStartLab(signal, pagePtr, Ttupheadoffset) != -1) { return 0; }//if @@ -1978,12 +1977,19 @@ int Dbtup::interpreterNextLab(Signal* signal, } case Interpreter::EXIT_OK: - case Interpreter::EXIT_OK_LAST: jam(); #ifdef TRACE_INTERPRETER ndbout_c(" - exit_ok"); #endif return TdataWritten; + + case Interpreter::EXIT_OK_LAST: + jam(); +#if 1 + ndbout_c(" - exit_ok_last"); +#endif + operPtr.p->lastRow = 1; + return TdataWritten; case Interpreter::EXIT_REFUSE: jam(); diff --git a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp index a5f56a356f9..fd3f8807aaa 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp @@ -187,6 +187,14 @@ int Dbtup::readAttributes(Page* const pagePtr, } else { return (Uint32)-1; }//if + } else if(attributeId == AttributeHeader::FRAGMENT){ + AttributeHeader::init(&outBuffer[tmpAttrBufIndex], attributeId, 1); + outBuffer[tmpAttrBufIndex+1] = fragptr.p->fragmentId; + tOutBufIndex = tmpAttrBufIndex + 2; + } else if(attributeId == AttributeHeader::ROW_COUNT){ + AttributeHeader::init(&outBuffer[tmpAttrBufIndex], attributeId, 2); + readRowcount(operPtr.p->userpointer, outBuffer+tmpAttrBufIndex+1); + tOutBufIndex = tmpAttrBufIndex + 3; } else { terrorCode = ZATTRIBUTE_ID_ERROR; return (Uint32)-1; @@ -195,6 +203,7 @@ int Dbtup::readAttributes(Page* const pagePtr, return tOutBufIndex; }//Dbtup::readAttributes() +#if 0 int Dbtup::readAttributesWithoutHeader(Page* const pagePtr, Uint32 tupHeadOffset, Uint32* inBuffer, @@ -247,6 +256,7 @@ int Dbtup::readAttributesWithoutHeader(Page* const pagePtr, ndbrequire(attrBufIndex == inBufLen); return tOutBufIndex; }//Dbtup::readAttributes() +#endif bool Dbtup::readFixedSizeTHOneWordNotNULL(Uint32* outBuffer, @@ -893,4 +903,13 @@ Dbtup::updateDynSmallVarSize(Uint32* inBuffer, return false; }//Dbtup::updateDynSmallVarSize() - +bool +Dbtup::readRowcount(Uint32 userPtr, Uint32* outBuffer){ + Uint32 tmp[sizeof(SignalHeader)+25]; + Signal * signal = (Signal*)&tmp; + signal->theData[0] = userPtr; + + EXECUTE_DIRECT(DBLQH, GSN_READ_ROWCOUNT_REQ, signal, 1); + outBuffer[0] = signal->theData[0]; + outBuffer[1] = signal->theData[1]; +} diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp index 08ed6e84271..6d995e06582 100644 --- a/ndb/src/ndbapi/NdbOperationDefine.cpp +++ b/ndb/src/ndbapi/NdbOperationDefine.cpp @@ -325,7 +325,7 @@ NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue) if (theStatus == FinalGetValue) { ; // Simply continue with getValue } else if (theStatus == ExecInterpretedValue) { - if (insertATTRINFO(Interpreter::EXIT_OK_LAST) == -1) + if (insertATTRINFO(Interpreter::EXIT_OK) == -1) return NULL; theInterpretedSize = theTotalCurrAI_Len - (theInitialReadSize + 5); @@ -415,7 +415,7 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, // We insert an exit from interpretation since we are now starting // to set values in the tuple by setValue. //-------------------------------------------------------------------- - if (insertATTRINFO(Interpreter::EXIT_OK_LAST) == -1){ + if (insertATTRINFO(Interpreter::EXIT_OK) == -1){ return -1; } theInterpretedSize = theTotalCurrAI_Len - diff --git a/ndb/src/ndbapi/NdbOperationExec.cpp b/ndb/src/ndbapi/NdbOperationExec.cpp index 7ee76bf2f3e..cd89f953213 100644 --- a/ndb/src/ndbapi/NdbOperationExec.cpp +++ b/ndb/src/ndbapi/NdbOperationExec.cpp @@ -354,7 +354,7 @@ NdbOperation::prepareSendInterpreted() Uint32 tTotalCurrAI_Len = theTotalCurrAI_Len; Uint32 tInitReadSize = theInitialReadSize; if (theStatus == ExecInterpretedValue) { - if (insertATTRINFO(Interpreter::EXIT_OK_LAST) != -1) { + if (insertATTRINFO(Interpreter::EXIT_OK) != -1) { //------------------------------------------------------------------------- // Since we read the total length before inserting the last entry in the // signals we need to add one to the total length. diff --git a/ndb/src/ndbapi/NdbOperationInt.cpp b/ndb/src/ndbapi/NdbOperationInt.cpp index 3a7e0dda85e..57d741ab3d6 100644 --- a/ndb/src/ndbapi/NdbOperationInt.cpp +++ b/ndb/src/ndbapi/NdbOperationInt.cpp @@ -888,6 +888,18 @@ NdbOperation::interpret_exit_ok() return 0; } +int +NdbOperation::interpret_exit_last_row() +{ + INT_DEBUG(("interpret_exit_last_row")); + if (initial_interpreterCheck() == -1) + return -1; + if (insertATTRINFO(Interpreter::EXIT_OK_LAST) == -1) + return -1; + theErrorLine++; + return 0; +} + /************************************************************************************************ int NdbOperation::interpret_exit_nok(Uint32 ErrorCode)