diff --git a/ndb/include/kernel/AttributeHeader.hpp b/ndb/include/kernel/AttributeHeader.hpp index b807b4ef4f1..13902aeb3c7 100644 --- a/ndb/include/kernel/AttributeHeader.hpp +++ b/ndb/include/kernel/AttributeHeader.hpp @@ -34,9 +34,10 @@ public: * Psuedo columns */ STATIC_CONST( PSUEDO = 0x8000 ); - STATIC_CONST( FRAGMENT = 0xFFFE ); - STATIC_CONST( ROW_COUNT = 0xFFFD ); - STATIC_CONST( COMMIT_COUNT = 0xFFFC ); + STATIC_CONST( FRAGMENT = 0xFFFE ); // Read fragment no + STATIC_CONST( ROW_COUNT = 0xFFFD ); // Read row count (committed) + STATIC_CONST( COMMIT_COUNT = 0xFFFC ); // Read commit count + STATIC_CONST( RANGE_NO = 0xFFFB ); // Read range no (when batched ranges) /** Initialize AttributeHeader at location aHeaderPtr */ static AttributeHeader& init(void* aHeaderPtr, Uint32 anAttributeId, diff --git a/ndb/include/ndbapi/NdbDictionary.hpp b/ndb/include/ndbapi/NdbDictionary.hpp index 4acca0f3d96..045ad39b6ac 100644 --- a/ndb/include/ndbapi/NdbDictionary.hpp +++ b/ndb/include/ndbapi/NdbDictionary.hpp @@ -387,6 +387,7 @@ public: static const Column * FRAGMENT; static const Column * ROW_COUNT; static const Column * COMMIT_COUNT; + static const Column * RANGE_NO; #endif private: diff --git a/ndb/include/ndbapi/NdbIndexScanOperation.hpp b/ndb/include/ndbapi/NdbIndexScanOperation.hpp index ff38c3a147c..cc5b468c1fb 100644 --- a/ndb/include/ndbapi/NdbIndexScanOperation.hpp +++ b/ndb/include/ndbapi/NdbIndexScanOperation.hpp @@ -45,7 +45,8 @@ public: NdbResultSet* readTuples(LockMode = LM_Read, Uint32 batch = 0, Uint32 parallel = 0, - bool order_by = false); + bool order_by = false, + bool read_range_no = false); inline NdbResultSet* readTuples(int parallell){ return readTuples(LM_Read, 0, parallell, false); @@ -119,7 +120,12 @@ public: * Marks end of a bound, * used when batching index reads (multiple ranges) */ - int end_of_bound(); + int end_of_bound(Uint32 range_no); + + /** + * Return range no for current row + */ + Uint32 get_range_no(); bool getSorted() const { return m_ordered; } private: diff --git a/ndb/include/ndbapi/NdbReceiver.hpp b/ndb/include/ndbapi/NdbReceiver.hpp index b95313db274..dba53bae4a6 100644 --- a/ndb/include/ndbapi/NdbReceiver.hpp +++ b/ndb/include/ndbapi/NdbReceiver.hpp @@ -68,7 +68,7 @@ private: Ndb* m_ndb; Uint32 m_id; Uint32 m_tcPtrI; - Uint32 m_key_info; + Uint32 m_hidden_count; ReceiverType m_type; void* m_owner; NdbReceiver* m_next; @@ -77,7 +77,7 @@ private: * At setup */ class NdbRecAttr * getValue(const class NdbColumnImpl*, char * user_dst_ptr); - void do_get_value(NdbReceiver*, Uint32 rows, Uint32 key_size); + void do_get_value(NdbReceiver*, Uint32 rows, Uint32 key_size, Uint32 range); void prepareSend(); void calculate_batch_size(Uint32, Uint32, Uint32&, Uint32&, Uint32&); diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp index 1bd90f9d8ec..861cda9b09e 100644 --- a/ndb/include/ndbapi/NdbScanOperation.hpp +++ b/ndb/include/ndbapi/NdbScanOperation.hpp @@ -156,6 +156,7 @@ protected: NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*); Uint32 m_ordered; + Uint32 m_read_range_no; int restart(bool forceSend = false); }; diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index ad19fc917e0..c233f25f6a3 100644 --- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -2008,8 +2008,10 @@ public: BlockReference tcTuxBlockref; BlockReference tcTupBlockref; Uint32 commitAckMarker; - UintR noFiredTriggers; - + union { + Uint32 m_scan_curr_range_no; + UintR noFiredTriggers; + }; Uint16 errorCode; Uint16 logStartPageIndex; Uint16 logStartPageNo; diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index cf4cc410cee..9278a6b16e3 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -2619,12 +2619,20 @@ Dblqh::execREAD_PSUEDO_REQ(Signal* signal){ regTcPtr.i = signal->theData[0]; ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec); - FragrecordPtr regFragptr; - regFragptr.i = regTcPtr.p->fragmentptr; - ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord); - - signal->theData[0] = regFragptr.p->accFragptr[regTcPtr.p->localFragptr]; - EXECUTE_DIRECT(DBACC, GSN_READ_PSUEDO_REQ, signal, 2); + if(signal->theData[1] != AttributeHeader::RANGE_NO) + { + jam(); + FragrecordPtr regFragptr; + regFragptr.i = regTcPtr.p->fragmentptr; + ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord); + + signal->theData[0] = regFragptr.p->accFragptr[regTcPtr.p->localFragptr]; + EXECUTE_DIRECT(DBACC, GSN_READ_PSUEDO_REQ, signal, 2); + } + else + { + signal->theData[0] = regTcPtr.p->m_scan_curr_range_no; + } } /* ************>> */ @@ -7860,11 +7868,12 @@ Dblqh::copy_bounds(Uint32 * dst, TcConnectionrec* tcPtrP) } Uint32 first = (* (dst - left)); // First word in range - (* (dst - left)) = (first & 0xFFFF); // Remove length (16 upper bits) // Length of this range Uint8 offset; const Uint32 len = (first >> 16) ? (first >> 16) : totalLen; + tcPtrP->m_scan_curr_range_no = (first & 0xFFF0) >> 4; + (* (dst - left)) = (first & 0xF); // Remove length & range no if(len < left) { @@ -8740,6 +8749,7 @@ void Dblqh::initScanTc(Signal* signal, tcConnectptr.p->listState = TcConnectionrec::NOT_IN_LIST; tcConnectptr.p->commitAckMarker = RNIL; tcConnectptr.p->m_offset_current_keybuf = 0; + tcConnectptr.p->m_scan_curr_range_no = 0; tabptr.p->usageCount++; }//Dblqh::initScanTc() diff --git a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp index 676a8312c48..b2c3634ae9e 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp @@ -998,6 +998,13 @@ Dbtup::read_psuedo(Uint32 attrId, Uint32* outBuffer){ outBuffer[0] = signal->theData[0]; outBuffer[1] = signal->theData[1]; return 2; + case AttributeHeader::RANGE_NO: + signal->theData[0] = operPtr.p->userpointer; + signal->theData[1] = attrId; + + EXECUTE_DIRECT(DBLQH, GSN_READ_PSUEDO_REQ, signal, 2); + outBuffer[0] = signal->theData[0]; + return 1; default: return 0; } diff --git a/ndb/src/ndbapi/NdbDictionary.cpp b/ndb/src/ndbapi/NdbDictionary.cpp index 462f04acb88..89dcfb953ff 100644 --- a/ndb/src/ndbapi/NdbDictionary.cpp +++ b/ndb/src/ndbapi/NdbDictionary.cpp @@ -942,4 +942,4 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col) const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT = 0; const NdbDictionary::Column * NdbDictionary::Column::ROW_COUNT = 0; const NdbDictionary::Column * NdbDictionary::Column::COMMIT_COUNT = 0; - +const NdbDictionary::Column * NdbDictionary::Column::RANGE_NO = 0; diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index 37dea73838f..3a8c25a04ae 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -218,6 +218,12 @@ NdbColumnImpl::create_psuedo(const char * name){ col->m_impl.m_attrId = AttributeHeader::COMMIT_COUNT; col->m_impl.m_attrSize = 8; col->m_impl.m_arraySize = 1; + } else if(!strcmp(name, "NDB$RANGE_NO")){ + col->setType(NdbDictionary::Column::Unsigned); + col->m_impl.m_attrId = AttributeHeader::RANGE_NO; + col->m_impl.m_attrSize = 4; + col->m_impl.m_arraySize = 1; + col->m_impl.m_extType = NdbSqlUtil::Type::Unsigned; } else { abort(); } @@ -685,6 +691,8 @@ NdbDictionaryImpl::setTransporter(class Ndb* ndb, NdbColumnImpl::create_psuedo("NDB$ROW_COUNT"); NdbDictionary::Column::COMMIT_COUNT= NdbColumnImpl::create_psuedo("NDB$COMMIT_COUNT"); + NdbDictionary::Column::RANGE_NO= + NdbColumnImpl::create_psuedo("NDB$RANGE_NO"); } m_globalHash->unlock(); return true; diff --git a/ndb/src/ndbapi/NdbReceiver.cpp b/ndb/src/ndbapi/NdbReceiver.cpp index 14f8d4b8440..52bbf26f5df 100644 --- a/ndb/src/ndbapi/NdbReceiver.cpp +++ b/ndb/src/ndbapi/NdbReceiver.cpp @@ -140,7 +140,10 @@ NdbReceiver::calculate_batch_size(Uint32 key_size, } void -NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ +NdbReceiver::do_get_value(NdbReceiver * org, + Uint32 rows, + Uint32 key_size, + Uint32 range_no){ if(rows > m_defined_rows){ delete[] m_rows; m_defined_rows = rows; @@ -155,7 +158,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ key.m_attrSize = 4; key.m_nullable = true; // So that receive works w.r.t KEYINFO20 } - m_key_info = key_size; + m_hidden_count = (key_size ? 1 : 0) + range_no ; for(Uint32 i = 0; itheFirstRecAttr; while(tRecAttr != 0){ if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0) != 0) @@ -196,10 +205,9 @@ void NdbReceiver::copyout(NdbReceiver & dstRec){ NdbRecAttr* src = m_rows[m_current_row++]; NdbRecAttr* dst = dstRec.theFirstRecAttr; - Uint32 tmp = m_key_info; - if(tmp > 0){ + Uint32 tmp = m_hidden_count; + while(tmp--) src = src->next(); - } while(dst){ Uint32 len = ((src->theAttrSize * src->theArraySize)+3)/4; diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index a412a77822c..d519b074e8b 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -121,6 +121,7 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection) theOperationType = OpenScanRequest; theNdbCon->theMagicNumber = 0xFE11DF; theNoOfTupKeyLeft = tab->m_noOfDistributionKeys; + m_read_range_no = 0; return 0; } @@ -735,7 +736,9 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, req->requestInfo = reqInfo; for(Uint32 i = 0; ido_get_value(&theReceiver, batch_size, key_size); + m_receivers[i]->do_get_value(&theReceiver, batch_size, + key_size, + m_read_range_no); } return 0; } @@ -1197,8 +1200,17 @@ NdbResultSet* NdbIndexScanOperation::readTuples(LockMode lm, Uint32 batch, Uint32 parallel, - bool order_by){ + bool order_by, + bool read_range_no){ NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0); + if(read_range_no) + { + m_read_range_no = 1; + Uint32 word = 0; + AttributeHeader::init(&word, AttributeHeader::RANGE_NO, 0); + if(insertATTRINFO(word) == -1) + rs = 0; + } if(rs && order_by){ m_ordered = 1; Uint32 cnt = m_accessTable->getNoOfColumns() - 1; @@ -1264,7 +1276,6 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, r1 = (skip ? r1->next() : r1); r2 = (skip ? r2->next() : r2); - while(cols > 0){ Uint32 * d1 = (Uint32*)r1->aRef(); Uint32 * d2 = (Uint32*)r2->aRef(); @@ -1366,7 +1377,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, s_idx, s_last); - Uint32 cols = m_sort_columns; + Uint32 cols = m_sort_columns + m_read_range_no; Uint32 skip = m_keyInfo; while(u_idx < u_last){ u_last--; @@ -1640,13 +1651,41 @@ NdbIndexScanOperation::reset_bounds(bool forceSend){ } int -NdbIndexScanOperation::end_of_bound() +NdbIndexScanOperation::end_of_bound(Uint32 no) { - Uint32 bound_head = * m_first_bound_word; - bound_head |= (theTupKeyLen - m_this_bound_start) << 16; - * m_first_bound_word = bound_head; - - m_first_bound_word = theKEYINFOptr + theTotalNrOfKeyWordInSignal;; - m_this_bound_start = theTupKeyLen; + if(no < (1 << 13)) // Only 12-bits no of ranges + { + Uint32 bound_head = * m_first_bound_word; + bound_head |= (theTupKeyLen - m_this_bound_start) << 16 | (no << 4); + * m_first_bound_word = bound_head; + + m_first_bound_word = theKEYINFOptr + theTotalNrOfKeyWordInSignal;; + m_this_bound_start = theTupKeyLen; + return 0; + } + return -1; +} + +Uint32 +NdbIndexScanOperation::get_range_no() +{ + if(m_read_range_no) + { + Uint32 idx = m_current_api_receiver; + Uint32 last = m_api_receivers_count; + + Uint32 row; + NdbReceiver * tRec; + NdbRecAttr * tRecAttr; + if(idx < last && (tRec = m_api_receivers[idx]) + && ((row = tRec->m_current_row) <= tRec->m_defined_rows) + && (tRecAttr = tRec->m_rows[row-1])){ + + if(m_keyInfo) + tRecAttr = tRecAttr->next(); + Uint32 ret = *(Uint32*)tRecAttr->aRef(); + return ret; + } + } return 0; } diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 3149552c8ef..29ceffa8b7e 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -33,7 +33,7 @@ #include // Default value for parallelism -static const int parallelism= 240; +static const int parallelism= 0; // Default value for max number of transactions // createable against NDB from this handler @@ -1316,7 +1316,8 @@ inline int ha_ndbcluster::next_result(byte *buf) */ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, - const key_range *keys[2]) + const key_range *keys[2], + uint range_no) { const KEY *const key_info= table->key_info + active_index; const uint key_parts= key_info->key_parts; @@ -1418,7 +1419,7 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, DBUG_PRINT("error", ("key %d unknown flag %d", j, p.key->flag)); DBUG_ASSERT(false); // Stop setting bounds but continue with what we have - op->end_of_bound(); + op->end_of_bound(range_no); DBUG_RETURN(0); } } @@ -1466,7 +1467,7 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, tot_len+= part_store_len; } - op->end_of_bound(); + op->end_of_bound(range_no); DBUG_RETURN(0); } @@ -4862,20 +4863,12 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, case ORDERED_INDEX: range: ranges[i].range_flag &= ~(uint)UNIQUE_RANGE; - if (sorted && scanOp != 0) - { - /** - * We currently don't support batching of ordered range scans - */ - i--; - curr= (byte*)buffer->buffer_end; - break; - } if (scanOp == 0) { if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab)) && - (m_active_cursor= scanOp->readTuples(lm, 0, parallelism, sorted)) && - !define_read_attrs(curr, scanOp)) + (m_active_cursor= scanOp->readTuples(lm, 0, + parallelism, sorted, true))&& + !define_read_attrs(curr, scanOp)) curr += reclength; else ERR_RETURN(scanOp ? @@ -4883,7 +4876,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, m_active_trans->getNdbError()); } const key_range *keys[2]= { &ranges[i].start_key, &ranges[i].end_key }; - if ((res= set_bounds(scanOp, keys))) + if ((res= set_bounds(scanOp, keys, i))) DBUG_RETURN(res); break; } @@ -5012,7 +5005,9 @@ found: /** * Found a record belonging to a scan */ - * multi_range_found_p= multi_ranges + multi_range_curr; + Uint32 range_no = ((NdbIndexScanOperation*)m_active_cursor->getOperation()) + ->get_range_no(); + * multi_range_found_p= multi_ranges + range_no; memcpy(table->record[0], m_multi_range_result_ptr, reclength); setup_recattr(m_active_cursor->getOperation()->getFirstRecAttr()); unpack_record(table->record[0]); diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h index 4cede842c50..d2965c733fa 100644 --- a/sql/ha_ndbcluster.h +++ b/sql/ha_ndbcluster.h @@ -203,7 +203,7 @@ class ha_ndbcluster: public handler int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key(NdbOperation *op); int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data); - int set_bounds(NdbIndexScanOperation *ndb_op, const key_range *keys[2]); + int set_bounds(NdbIndexScanOperation*, const key_range *keys[2], uint= 0); int key_cmp(uint keynr, const byte * old_row, const byte * new_row); int set_index_key(NdbOperation *, const KEY *key_info, const byte *key_ptr); void print_results();