From 88987f1db3d6bfaaecec72774df5e32dfa25312b Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 8 Oct 2004 10:50:50 +0200 Subject: [PATCH 1/3] NDB wl-2151 Fix bounds setting table handler vs TUX mysql-test/ndb/ndb_range_bounds.pl: wl-2151 Fix bounds setting table handler vs TUX ndb/include/kernel/signaldata/TuxBound.hpp: wl-2151 Fix bounds setting table handler vs TUX ndb/include/ndbapi/NdbIndexScanOperation.hpp: wl-2151 Fix bounds setting table handler vs TUX ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: wl-2151 Fix bounds setting table handler vs TUX ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp: wl-2151 Fix bounds setting table handler vs TUX ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp: wl-2151 Fix bounds setting table handler vs TUX ndb/test/ndbapi/testOIBasic.cpp: wl-2151 Fix bounds setting table handler vs TUX sql/ha_ndbcluster.cc: wl-2151 Fix bounds setting table handler vs TUX sql/ha_ndbcluster.h: wl-2151 Fix bounds setting table handler vs TUX --- mysql-test/ndb/ndb_range_bounds.pl | 133 ++++++++++ ndb/include/kernel/signaldata/TuxBound.hpp | 1 - ndb/include/ndbapi/NdbIndexScanOperation.hpp | 46 ++-- ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 1 - ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp | 66 ++--- ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp | 107 ++++---- ndb/test/ndbapi/testOIBasic.cpp | 18 +- sql/ha_ndbcluster.cc | 265 ++++++++++--------- sql/ha_ndbcluster.h | 3 +- 9 files changed, 385 insertions(+), 255 deletions(-) create mode 100644 mysql-test/ndb/ndb_range_bounds.pl diff --git a/mysql-test/ndb/ndb_range_bounds.pl b/mysql-test/ndb/ndb_range_bounds.pl new file mode 100644 index 00000000000..264a543752b --- /dev/null +++ b/mysql-test/ndb/ndb_range_bounds.pl @@ -0,0 +1,133 @@ +# +# test range scan bounds +# output to mysql-test/t/ndb_range_bounds.test +# +# give option --all to generate all cases +# + +use strict; +use integer; + +my $all = shift; +!defined($all) || ($all eq '--all' && !defined(shift)) + or die "only available option is --all"; + +my $table = 't'; + +print < 1, + 'exp' => '9 = 9', + 'cnt' => scalar @$val, + }; +} + +sub mkone ($$$\@) { + my($col, $op, $key, $val) = @_; + my $cnt = scalar cut($op, $key, @$val); + return { + 'exp' => "$col $op $key", + 'cnt' => $cnt, + }; +} + +sub mktwo ($$$$$\@) { + my($col, $op1, $key1, $op2, $key2, $val) = @_; + my $cnt = scalar cut($op2, $key2, cut($op1, $key1, @$val)); + return { + 'exp' => "$col $op1 $key1 and $col $op2 $key2", + 'cnt' => $cnt, + }; +} + +sub mkall ($$$\@) { + my($col, $key1, $key2, $val) = @_; + my @a = (); + my $p = mkdummy(@$val); + push(@a, $p) if $all; + my @ops1 = $all ? qw(< <= = >= >) : qw(= >= >); + my @ops2 = $all ? qw(< <= = >= >) : qw(< <=); + for my $op1 (@ops1) { + my $p = mkone($col, $op1, $key1, @$val); + push(@a, $p) if $all || $p->{cnt} != 0; + for my $op2 (@ops2) { + my $p = mktwo($col, $op1, $key1, $op2, $key2, @$val); + push(@a, $p) if $all || $p->{cnt} != 0; + } + } + return \@a; +} + +for my $nn ("bcd", "") { + my %nn; + for my $x (qw(b c d)) { + $nn{$x} = $nn =~ /$x/ ? "not null" : "null"; + } + print <{cnt} * @val * @val; + print "select count(*) - $cnt1 from $table"; + print " where $p1->{exp};\n"; + for my $p2 (@$a2) { + my $cnt2 = $p1->{cnt} * $p2->{cnt} * @val; + print "select count(*) - $cnt2 from $table"; + print " where $p1->{exp} and $p2->{exp};\n"; + for my $p3 (@$a3) { + my $cnt3 = $p1->{cnt} * $p2->{cnt} * $p3->{cnt}; + print "select count(*) - $cnt3 from $table"; + print " where $p1->{exp} and $p2->{exp} and $p3->{exp};\n"; + } + } + } + print <= 2 and b > 3" is ok but "a > 2 and b >= 3" is not. + * + * The scan may currently return tuples for which the bounds are not + * satisfied. For example, "a <= 2 and b <= 3" scans the index up to + * (a=2, b=3) but also returns any (a=1, b=4). * * NULL is treated like a normal value which is less than any not-NULL - * value and equal to another NULL value. To search for NULL use + * value and equal to another NULL value. To compare against NULL use * setBound with null pointer (0). * - * An index stores also all-NULL keys (this may become optional). - * Doing index scan with empty bound set returns all table tuples. + * An index stores also all-NULL keys. Doing index scan with empty + * bound set returns all table tuples. * * @param attrName Attribute name, alternatively: * @param anAttrId Index column id (starting from 0) @@ -117,8 +109,6 @@ public: */ int setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len = 0); - /** @} *********************************************************************/ - /** * Reset bounds and put operation in list that will be * sent on next execute diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index c5e2bd900e9..ed81f0735cb 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -7683,7 +7683,6 @@ void Dblqh::accScanConfScanLab(Signal* signal) Uint32 boundAiLength = tcConnectptr.p->primKeyLen - 4; if (scanptr.p->rangeScan) { jam(); - // bound info length is in first of the 5 header words TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend(); req->errorCode = RNIL; req->tuxScanPtrI = scanptr.p->scanAccPtr; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp index 549720cc17c..ddab77b97b5 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp @@ -87,21 +87,23 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons /* * Scan bound vs node prefix or entry. * - * Compare lower or upper bound and index attribute data. The attribute - * data may be partial in which case CmpUnknown may be returned. - * Returns -1 if the boundary is to the left of the compared key and +1 - * if the boundary is to the right of the compared key. + * Compare lower or upper bound and index entry data. The entry data + * may be partial in which case CmpUnknown may be returned. Otherwise + * returns -1 if the bound is to the left of the entry and +1 if the + * bound is to the right of the entry. * - * To get this behaviour we treat equality a little bit special. If the - * boundary is a lower bound then the boundary is to the left of all - * equal keys and if it is an upper bound then the boundary is to the - * right of all equal keys. + * The routine is similar to cmpSearchKey, but 0 is never returned. + * Suppose all attributes compare equal. Recall that all bounds except + * possibly the last one are non-strict. Use the given bound direction + * (0-lower 1-upper) and strictness of last bound to return -1 or +1. * - * When searching for the first key we are using the lower bound to try - * to find the first key that is to the right of the boundary. Then we - * start scanning from this tuple (including the tuple itself) until we - * find the first key which is to the right of the boundary. Then we - * stop and do not include that key in the scan result. + * Following example illustrates this. We are at (a=2, b=3). + * + * dir bounds strict return + * 0 a >= 2 and b >= 3 no -1 + * 0 a >= 2 and b > 3 yes +1 + * 1 a <= 2 and b <= 3 no +1 + * 1 a <= 2 and b < 3 yes -1 */ int Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen) @@ -111,12 +113,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne ndbrequire(dir <= 1); // number of words of data left unsigned len2 = maxlen; - /* - * No boundary means full scan, low boundary is to the right of all - * keys. Thus we should always return -1. For upper bound we are to - * the right of all keys, thus we should always return +1. We achieve - * this behaviour by initializing type to 4. - */ + // in case of no bounds, init last type to something non-strict unsigned type = 4; while (boundCount != 0) { if (len2 <= AttributeHeaderSize) { @@ -124,7 +121,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne return NdbSqlUtil::CmpUnknown; } len2 -= AttributeHeaderSize; - // get and skip bound type + // get and skip bound type (it is used after the loop) type = boundInfo[0]; boundInfo += 1; if (! boundInfo.ah().isNULL()) { @@ -166,30 +163,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne entryData += AttributeHeaderSize + entryData.ah().getDataSize(); boundCount -= 1; } - if (dir == 0) { - jam(); - /* - * Looking for the lower bound. If strict lower bound then the - * boundary is to the right of the compared key and otherwise (equal - * included in range) then the boundary is to the left of the key. - */ - if (type == 1) { - jam(); - return +1; - } - return -1; - } else { - jam(); - /* - * Looking for the upper bound. If strict upper bound then the - * boundary is to the left of all equal keys and otherwise (equal - * included in the range) then the boundary is to the right of all - * equal keys. - */ - if (type == 3) { - jam(); - return -1; - } - return +1; - } + // all attributes were equal + const int strict = (type & 0x1); + return (dir == 0 ? (strict == 0 ? -1 : +1) : (strict == 0 ? +1 : -1)); } diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp index 7414691ab78..cf39185e403 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp @@ -108,15 +108,23 @@ Dbtux::execACC_SCANREQ(Signal* signal) /* * Receive bounds for scan in single direct call. The bounds can arrive * in any order. Attribute ids are those of index table. + * + * Replace EQ by equivalent LE + GE. Check for conflicting bounds. + * Check that sets of lower and upper bounds are on initial sequences of + * keys and that all but possibly last bound is non-strict. + * + * Finally save the sets of lower and upper bounds (i.e. start key and + * end key). Full bound type (< 4) is included but only the strict bit + * is used since lower and upper have now been separated. */ void Dbtux::execTUX_BOUND_INFO(Signal* signal) { jamEntry(); struct BoundInfo { + int type; unsigned offset; unsigned size; - int type; }; TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend(); const TuxBoundInfo reqCopy = *(const TuxBoundInfo*)sig; @@ -124,18 +132,11 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal) // get records ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI); Index& index = *c_indexPool.getPtr(scan.m_indexId); - // collect bound info for each index attribute - BoundInfo boundInfo[MaxIndexAttributes][2]; + // collect lower and upper bounds + BoundInfo boundInfo[2][MaxIndexAttributes]; // largest attrId seen plus one - Uint32 maxAttrId = 0; - // skip 5 words + Uint32 maxAttrId[2] = { 0, 0 }; unsigned offset = 0; - if (req->boundAiLength < offset) { - jam(); - scan.m_state = ScanOp::Invalid; - sig->errorCode = TuxBoundInfo::InvalidAttrInfo; - return; - } const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength; // walk through entries while (offset + 2 <= req->boundAiLength) { @@ -156,32 +157,35 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal) sig->errorCode = TuxBoundInfo::InvalidAttrInfo; return; } - while (maxAttrId <= attrId) { - BoundInfo* b = boundInfo[maxAttrId++]; - b[0].type = b[1].type = -1; - } - BoundInfo* b = boundInfo[attrId]; - if (type == 0 || type == 1 || type == 4) { - if (b[0].type != -1) { - jam(); - scan.m_state = ScanOp::Invalid; - sig->errorCode = TuxBoundInfo::InvalidBounds; - return; + for (unsigned j = 0; j <= 1; j++) { + // check if lower/upper bit matches + const unsigned luBit = (j << 1); + if ((type & 0x2) != luBit && type != 4) + continue; + // EQ -> LE, GE + const unsigned type2 = (type & 0x1) | luBit; + // fill in any gap + while (maxAttrId[j] <= attrId) { + BoundInfo& b = boundInfo[j][maxAttrId[j]++]; + b.type = -1; } - b[0].offset = offset; - b[0].size = 2 + dataSize; - b[0].type = type; - } - if (type == 2 || type == 3 || type == 4) { - if (b[1].type != -1) { - jam(); - scan.m_state = ScanOp::Invalid; - sig->errorCode = TuxBoundInfo::InvalidBounds; - return; + BoundInfo& b = boundInfo[j][attrId]; + if (b.type != -1) { + // compare with previous bound + if (b.type != type2 || + b.size != 2 + dataSize || + memcmp(&data[b.offset + 2], &data[offset + 2], dataSize << 2) != 0) { + jam(); + scan.m_state = ScanOp::Invalid; + sig->errorCode = TuxBoundInfo::InvalidBounds; + return; + } + } else { + // enter new bound + b.type = type2; + b.offset = offset; + b.size = 2 + dataSize; } - b[1].offset = offset; - b[1].size = 2 + dataSize; - b[1].type = type; } // jump to next offset += 2 + dataSize; @@ -192,34 +196,27 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal) sig->errorCode = TuxBoundInfo::InvalidAttrInfo; return; } - // save the bounds in index attribute id order - scan.m_boundCnt[0] = 0; - scan.m_boundCnt[1] = 0; - for (unsigned i = 0; i < maxAttrId; i++) { - jam(); - const BoundInfo* b = boundInfo[i]; - // current limitation - check all but last is equality - if (i + 1 < maxAttrId) { - if (b[0].type != 4 || b[1].type != 4) { + for (unsigned j = 0; j <= 1; j++) { + // save lower/upper bound in index attribute id order + for (unsigned i = 0; i < maxAttrId[j]; i++) { + jam(); + const BoundInfo& b = boundInfo[j][i]; + // check for gap or strict bound before last + if (b.type == -1 || (i + 1 < maxAttrId[j] && (b.type & 0x1))) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidBounds; return; } - } - for (unsigned j = 0; j <= 1; j++) { - if (b[j].type != -1) { + bool ok = scan.m_bound[j]->append(&data[b.offset], b.size); + if (! ok) { jam(); - bool ok = scan.m_bound[j]->append(&data[b[j].offset], b[j].size); - if (! ok) { - jam(); - scan.m_state = ScanOp::Invalid; - sig->errorCode = TuxBoundInfo::OutOfBuffers; - return; - } - scan.m_boundCnt[j]++; + scan.m_state = ScanOp::Invalid; + sig->errorCode = TuxBoundInfo::OutOfBuffers; + return; } } + scan.m_boundCnt[j] = maxAttrId[j]; } // no error sig->errorCode = 0; diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp index f9eb3514926..10994fd7847 100644 --- a/ndb/test/ndbapi/testOIBasic.cpp +++ b/ndb/test/ndbapi/testOIBasic.cpp @@ -1965,9 +1965,21 @@ BSet::calcpk(Par par, unsigned i) int BSet::setbnd(Par par) const { - for (unsigned j = 0; j < m_bvals; j++) { - const BVal& bval = *m_bval[j]; - CHK(bval.setbnd(par) == 0); + if (m_bvals != 0) { + unsigned p1 = urandom(m_bvals); + unsigned p2 = 10009; // prime + // random order + for (unsigned j = 0; j < m_bvals; j++) { + unsigned k = p1 + p2 * j; + const BVal& bval = *m_bval[k % m_bvals]; + CHK(bval.setbnd(par) == 0); + } + // duplicate + if (urandom(5) == 0) { + unsigned k = urandom(m_bvals); + const BVal& bval = *m_bval[k]; + CHK(bval.setbnd(par) == 0); + } } return 0; } diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 6d643298a79..ef3a067d04f 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -1222,115 +1222,159 @@ inline int ha_ndbcluster::next_result(byte *buf) DBUG_RETURN(HA_ERR_END_OF_FILE); } - /* - Set bounds for a ordered index scan, use key_range + Set bounds for ordered index scan. */ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, - const key_range *key, - int bound) + const key_range *keys[2]) { - uint key_len, key_store_len, tot_len, key_tot_len; - byte *key_ptr; - KEY* key_info= table->key_info + active_index; - KEY_PART_INFO* key_part= key_info->key_part; - KEY_PART_INFO* end= key_part+key_info->key_parts; - Field* field; - bool key_nullable, key_null; + const KEY *const key_info= table->key_info + active_index; + const uint key_parts= key_info->key_parts; + uint key_tot_len[2]; + uint tot_len; + int i, j; DBUG_ENTER("set_bounds"); - DBUG_PRINT("enter", ("bound: %d", bound)); - DBUG_PRINT("enter", ("key_parts: %d", key_info->key_parts)); - DBUG_PRINT("enter", ("key->length: %d", key->length)); - DBUG_PRINT("enter", ("key->flag: %d", key->flag)); + DBUG_PRINT("info", ("key_parts=%d", key_parts)); - // Set bounds using key data - tot_len= 0; - key_ptr= (byte *) key->key; - key_tot_len= key->length; - for (; key_part != end; key_part++) + for (j= 0; j <= 1; j++) { - field= key_part->field; - key_len= key_part->length; - key_store_len= key_part->store_length; - key_nullable= (bool) key_part->null_bit; - key_null= (field->maybe_null() && *key_ptr); - tot_len+= key_store_len; - - const char* bounds[]= {"LE", "LT", "GE", "GT", "EQ"}; - DBUG_ASSERT(bound >= 0 && bound <= 4); - DBUG_PRINT("info", ("Set Bound%s on %s %s %s", - bounds[bound], - field->field_name, - key_nullable ? "NULLABLE" : "", - key_null ? "NULL":"")); - DBUG_PRINT("info", ("Total length %d", tot_len)); - - DBUG_DUMP("key", (char*) key_ptr, key_store_len); - - if (op->setBound(field->field_name, - bound, - key_null ? 0 : (key_nullable ? key_ptr + 1 : key_ptr), - key_null ? 0 : key_len) != 0) - ERR_RETURN(op->getNdbError()); - - key_ptr+= key_store_len; - - if (tot_len >= key_tot_len) - break; - - /* - Only one bound which is not EQ can be set - so if this bound was not EQ, bail out and make - a best effort attempt - */ - if (bound != NdbIndexScanOperation::BoundEQ) - break; + const key_range *key= keys[j]; + if (key != NULL) + { + // for key->flag see ha_rkey_function + DBUG_PRINT("info", ("key %d length=%d flag=%d", + j, key->length, key->flag)); + key_tot_len[j]= key->length; + } + else + { + DBUG_PRINT("info", ("key %d not present", j)); + key_tot_len[j]= 0; + } } + tot_len= 0; + for (i= 0; i < key_parts; i++) + { + KEY_PART_INFO *key_part= &key_info->key_part[i]; + Field *field= key_part->field; + uint part_len= key_part->length; + uint part_store_len= key_part->store_length; + bool part_nullable= (bool) key_part->null_bit; + // Info about each key part + struct part_st { + bool part_last; + const key_range *key; + const byte *part_ptr; + bool part_null; + int bound_type; + const char* bound_ptr; + }; + struct part_st part[2]; + + for (j= 0; j <= 1; j++) + { + struct part_st &p = part[j]; + p.key= NULL; + p.bound_type= -1; + if (tot_len < key_tot_len[j]) + { + p.part_last= (tot_len + part_store_len >= key_tot_len[j]); + p.key= keys[j]; + p.part_ptr= &p.key->key[tot_len]; + p.part_null= (field->maybe_null() && *p.part_ptr); + p.bound_ptr= (const char *) + p.part_null ? 0 : part_nullable ? p.part_ptr + 1 : p.part_ptr; + + if (j == 0) + { + switch (p.key->flag) + { + case HA_READ_KEY_EXACT: + p.bound_type= NdbIndexScanOperation::BoundEQ; + break; + case HA_READ_KEY_OR_NEXT: + p.bound_type= NdbIndexScanOperation::BoundLE; + break; + case HA_READ_AFTER_KEY: + if (! p.part_last) + p.bound_type= NdbIndexScanOperation::BoundLE; + else + p.bound_type= NdbIndexScanOperation::BoundLT; + break; + default: + break; + } + } + if (j == 1) { + switch (p.key->flag) + { + case HA_READ_BEFORE_KEY: + if (! p.part_last) + p.bound_type= NdbIndexScanOperation::BoundGE; + else + p.bound_type= NdbIndexScanOperation::BoundGT; + break; + case HA_READ_AFTER_KEY: // weird + p.bound_type= NdbIndexScanOperation::BoundGE; + break; + default: + break; + } + } + + if (p.bound_type == -1) + { + DBUG_PRINT("error", ("key %d unknown flag %d", j, p.key->flag)); + DBUG_ASSERT(false); + // Stop setting bounds but continue with what we have + DBUG_RETURN(0); + } + } + } + + // Seen with e.g. b = 1 and c > 1 + if (part[0].bound_type == NdbIndexScanOperation::BoundLE && + part[1].bound_type == NdbIndexScanOperation::BoundGE && + memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0) + { + DBUG_PRINT("info", ("replace LE/GE pair by EQ")); + part[0].bound_type= NdbIndexScanOperation::BoundEQ; + part[1].bound_type= -1; + } + // Not seen but was in previous version + if (part[0].bound_type == NdbIndexScanOperation::BoundEQ && + part[1].bound_type == NdbIndexScanOperation::BoundGE && + memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0) + { + DBUG_PRINT("info", ("remove GE from EQ/GE pair")); + part[1].bound_type= -1; + } + + for (j= 0; j <= 1; j++) + { + struct part_st &p = part[j]; + // Set bound if not done with this key + if (p.key != NULL) + { + DBUG_PRINT("info", ("key %d:%d offset=%d length=%d last=%d bound=%d", + j, i, tot_len, part_len, p.part_last, p.bound_type)); + DBUG_DUMP("info", (const char*)p.part_ptr, part_store_len); + + // Set bound if not cancelled via type -1 + if (p.bound_type != -1) + if (op->setBound(field->field_name, p.bound_type, p.bound_ptr)) + ERR_RETURN(op->getNdbError()); + } + } + + tot_len+= part_store_len; + } DBUG_RETURN(0); } -#ifndef DBUG_OFF - -const char* key_flag_strs[] = -{ "HA_READ_KEY_EXACT", - "HA_READ_KEY_OR_NEXT", - "HA_READ_KEY_OR_PREV", - "HA_READ_AFTER_KEY", - "HA_READ_BEFORE_KEY", - "HA_READ_PREFIX", - "HA_READ_PREFIX_LAST", - "HA_READ_PREFIX_LAST_OR_PREV", - "HA_READ_MBR_CONTAIN", - "HA_READ_MBR_INTERSECT", - "HA_READ_MBR_WITHIN", - "HA_READ_MBR_DISJOINT", - "HA_READ_MBR_EQUAL" -}; - -const int no_of_key_flags = sizeof(key_flag_strs)/sizeof(char*); - -void print_key(const key_range* key, const char* info) -{ - if (key) - { - const char* str= key->flag < no_of_key_flags ? - key_flag_strs[key->flag] : "Unknown flag"; - - DBUG_LOCK_FILE; - fprintf(DBUG_FILE,"%s: %s, length=%d, key=", info, str, key->length); - uint i; - for (i=0; ilength-1; i++) - fprintf(DBUG_FILE,"%0d ", key->key[i]); - fprintf(DBUG_FILE, "\n"); - DBUG_UNLOCK_FILE; - } - return; -} -#endif - /* Start ordered index scan in NDB */ @@ -1347,11 +1391,8 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, DBUG_ENTER("ordered_index_scan"); DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted)); DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname)); - - DBUG_EXECUTE("enter", print_key(start_key, "start_key");); - DBUG_EXECUTE("enter", print_key(end_key, "end_key");); - if(m_active_cursor == 0) + if (m_active_cursor == 0) { restart= false; NdbOperation::LockMode lm= @@ -1372,29 +1413,15 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, if(op->reset_bounds()) DBUG_RETURN(ndb_err(m_active_trans)); } - - if (start_key && - set_bounds(op, start_key, - (start_key->flag == HA_READ_KEY_EXACT) ? - NdbIndexScanOperation::BoundEQ : - (start_key->flag == HA_READ_AFTER_KEY) ? - NdbIndexScanOperation::BoundLT : - NdbIndexScanOperation::BoundLE)) - DBUG_RETURN(1); - if (end_key) { - if (start_key && start_key->flag == HA_READ_KEY_EXACT) - { - DBUG_PRINT("info", ("start_key is HA_READ_KEY_EXACT ignoring end_key")); - } - else if (set_bounds(op, end_key, - (end_key->flag == HA_READ_AFTER_KEY) ? - NdbIndexScanOperation::BoundGE : - NdbIndexScanOperation::BoundGT)) - DBUG_RETURN(1); + const key_range *keys[2]= { start_key, end_key }; + int ret= set_bounds(op, keys); + if (ret) + DBUG_RETURN(ret); } - if(!restart) + + if (!restart) { DBUG_RETURN(define_read_attrs(buf, op)); } diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h index 36452033516..16862559b74 100644 --- a/sql/ha_ndbcluster.h +++ b/sql/ha_ndbcluster.h @@ -214,8 +214,7 @@ class ha_ndbcluster: public handler int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key(NdbOperation *op); int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data); - int set_bounds(NdbIndexScanOperation *ndb_op, const key_range *key, - int bound); + int set_bounds(NdbIndexScanOperation *ndb_op, const key_range *keys[2]); int key_cmp(uint keynr, const byte * old_row, const byte * new_row); void print_results(); From cbd5ddc63fd841c89c9d0683a2b9809c1d51f3c7 Mon Sep 17 00:00:00 2001 From: unknown Date: Sat, 9 Oct 2004 16:22:16 +0200 Subject: [PATCH 2/3] NDB tux optim 15 - fix wasted space in index node entries ndb/src/kernel/blocks/dbtux/Dbtux.hpp: tux optim 15 - fix wasted space in index node entries ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp: tux optim 15 - fix wasted space in index node entries ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp: tux optim 15 - fix wasted space in index node entries ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp: tux optim 15 - fix wasted space in index node entries ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp: tux optim 15 - fix wasted space in index node entries ndb/src/kernel/blocks/dbtux/Times.txt: tux optim 15 - fix wasted space in index node entries ndb/test/ndbapi/testOIBasic.cpp: tux optim 15 - fix wasted space in index node entries --- ndb/src/kernel/blocks/dbtux/Dbtux.hpp | 83 +++++++++++++++------- ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp | 13 ++-- ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp | 4 +- ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp | 14 ++++ ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp | 12 ++-- ndb/src/kernel/blocks/dbtux/Times.txt | 7 ++ ndb/test/ndbapi/testOIBasic.cpp | 7 +- 7 files changed, 96 insertions(+), 44 deletions(-) diff --git a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp index 8dca52cec04..77f63d3d829 100644 --- a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp +++ b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp @@ -172,12 +172,21 @@ private: * Physical tuple address in TUP. Provides fast access to table tuple * or index node. Valid within the db node and across timeslices. * Not valid between db nodes or across restarts. + * + * To avoid wasting an Uint16 the pageid is split in two. */ struct TupLoc { - Uint32 m_pageId; // page i-value + private: + Uint16 m_pageId1; // page i-value (big-endian) + Uint16 m_pageId2; Uint16 m_pageOffset; // page offset in words + public: TupLoc(); TupLoc(Uint32 pageId, Uint16 pageOffset); + Uint32 getPageId() const; + void setPageId(Uint32 pageId); + Uint32 getPageOffset() const; + void setPageOffset(Uint32 pageOffset); bool operator==(const TupLoc& loc) const; bool operator!=(const TupLoc& loc) const; }; @@ -224,18 +233,13 @@ private: * work entry part 5 * * There are 3 links to other nodes: left child, right child, parent. - * These are in TupLoc format but the pageIds and pageOffsets are - * stored in separate arrays (saves 1 word). - * * Occupancy (number of entries) is at least 1 except temporarily when - * a node is about to be removed. If occupancy is 1, only max entry - * is present but both min and max prefixes are set. + * a node is about to be removed. */ struct TreeNode; friend struct TreeNode; struct TreeNode { - Uint32 m_linkPI[3]; // link to 0-left child 1-right child 2-parent - Uint16 m_linkPO[3]; // page offsets for above real page ids + TupLoc m_link[3]; // link to 0-left child 1-right child 2-parent unsigned m_side : 2; // we are 0-left child 1-right child 2-root int m_balance : 2; // balance -1, 0, +1 unsigned pad1 : 4; @@ -805,22 +809,52 @@ Dbtux::ConstData::operator=(Data data) inline Dbtux::TupLoc::TupLoc() : - m_pageId(RNIL), + m_pageId1(RNIL >> 16), + m_pageId2(RNIL & 0xFFFF), m_pageOffset(0) { } inline Dbtux::TupLoc::TupLoc(Uint32 pageId, Uint16 pageOffset) : - m_pageId(pageId), + m_pageId1(pageId >> 16), + m_pageId2(pageId & 0xFFFF), m_pageOffset(pageOffset) { } +inline Uint32 +Dbtux::TupLoc::getPageId() const +{ + return (m_pageId1 << 16) | m_pageId2; +} + +inline void +Dbtux::TupLoc::setPageId(Uint32 pageId) +{ + m_pageId1 = (pageId >> 16); + m_pageId2 = (pageId & 0xFFFF); +} + +inline Uint32 +Dbtux::TupLoc::getPageOffset() const +{ + return (Uint32)m_pageOffset; +} + +inline void +Dbtux::TupLoc::setPageOffset(Uint32 pageOffset) +{ + m_pageOffset = (Uint16)pageOffset; +} + inline bool Dbtux::TupLoc::operator==(const TupLoc& loc) const { - return m_pageId == loc.m_pageId && m_pageOffset == loc.m_pageOffset; + return + m_pageId1 == loc.m_pageId1 && + m_pageId2 == loc.m_pageId2 && + m_pageOffset == loc.m_pageOffset; } inline bool @@ -851,13 +885,13 @@ Dbtux::TreeEnt::eq(const TreeEnt ent) const inline int Dbtux::TreeEnt::cmp(const TreeEnt ent) const { - if (m_tupLoc.m_pageId < ent.m_tupLoc.m_pageId) + if (m_tupLoc.getPageId() < ent.m_tupLoc.getPageId()) return -1; - if (m_tupLoc.m_pageId > ent.m_tupLoc.m_pageId) + if (m_tupLoc.getPageId() > ent.m_tupLoc.getPageId()) return +1; - if (m_tupLoc.m_pageOffset < ent.m_tupLoc.m_pageOffset) + if (m_tupLoc.getPageOffset() < ent.m_tupLoc.getPageOffset()) return -1; - if (m_tupLoc.m_pageOffset > ent.m_tupLoc.m_pageOffset) + if (m_tupLoc.getPageOffset() > ent.m_tupLoc.getPageOffset()) return +1; if (m_tupVersion < ent.m_tupVersion) return -1; @@ -880,12 +914,9 @@ Dbtux::TreeNode::TreeNode() : m_occup(0), m_nodeScan(RNIL) { - m_linkPI[0] = NullTupLoc.m_pageId; - m_linkPO[0] = NullTupLoc.m_pageOffset; - m_linkPI[1] = NullTupLoc.m_pageId; - m_linkPO[1] = NullTupLoc.m_pageOffset; - m_linkPI[2] = NullTupLoc.m_pageId; - m_linkPO[2] = NullTupLoc.m_pageOffset; + m_link[0] = NullTupLoc; + m_link[1] = NullTupLoc; + m_link[2] = NullTupLoc; } // Dbtux::TreeHead @@ -913,7 +944,6 @@ Dbtux::TreeHead::getSize(AccSize acc) const case AccFull: return m_nodeSize; } - abort(); return 0; } @@ -1088,13 +1118,13 @@ inline Dbtux::TupLoc Dbtux::NodeHandle::getLink(unsigned i) { ndbrequire(i <= 2); - return TupLoc(m_node->m_linkPI[i], m_node->m_linkPO[i]); + return m_node->m_link[i]; } inline unsigned Dbtux::NodeHandle::getChilds() { - return (getLink(0) != NullTupLoc) + (getLink(1) != NullTupLoc); + return (m_node->m_link[0] != NullTupLoc) + (m_node->m_link[1] != NullTupLoc); } inline unsigned @@ -1125,8 +1155,7 @@ inline void Dbtux::NodeHandle::setLink(unsigned i, TupLoc loc) { ndbrequire(i <= 2); - m_node->m_linkPI[i] = loc.m_pageId; - m_node->m_linkPO[i] = loc.m_pageOffset; + m_node->m_link[i] = loc; } inline void @@ -1224,7 +1253,7 @@ Dbtux::getTupAddr(const Frag& frag, TreeEnt ent) const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit]; const TupLoc tupLoc = ent.m_tupLoc; Uint32 tupAddr = NullTupAddr; - c_tup->tuxGetTupAddr(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupAddr); + c_tup->tuxGetTupAddr(tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), tupAddr); jamEntry(); return tupAddr; } diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp index 8d31d2c6a55..d88119976be 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp @@ -256,8 +256,8 @@ operator<<(NdbOut& out, const Dbtux::TupLoc& loc) if (loc == Dbtux::NullTupLoc) { out << "null"; } else { - out << dec << loc.m_pageId; - out << "." << dec << loc.m_pageOffset; + out << dec << loc.getPageId(); + out << "." << dec << loc.getPageOffset(); } return out; } @@ -274,13 +274,10 @@ operator<<(NdbOut& out, const Dbtux::TreeEnt& ent) NdbOut& operator<<(NdbOut& out, const Dbtux::TreeNode& node) { - Dbtux::TupLoc link0(node.m_linkPI[0], node.m_linkPO[0]); - Dbtux::TupLoc link1(node.m_linkPI[1], node.m_linkPO[1]); - Dbtux::TupLoc link2(node.m_linkPI[2], node.m_linkPO[2]); out << "[TreeNode " << hex << &node; - out << " [left " << link0 << "]"; - out << " [right " << link1 << "]"; - out << " [up " << link2 << "]"; + out << " [left " << node.m_link[0] << "]"; + out << " [right " << node.m_link[1] << "]"; + out << " [up " << node.m_link[2] << "]"; out << " [side " << dec << node.m_side << "]"; out << " [occup " << dec << node.m_occup << "]"; out << " [balance " << dec << (int)node.m_balance << "]"; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp index 39cd8e25184..ded02696a89 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp @@ -245,7 +245,7 @@ Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData) const Uint32 numAttrs = frag.m_numAttrs - start; // skip to start position in keyAttrs only keyAttrs += start; - int ret = c_tup->tuxReadAttrs(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupVersion, keyAttrs, numAttrs, keyData); + int ret = c_tup->tuxReadAttrs(tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), tupVersion, keyAttrs, numAttrs, keyData); jamEntry(); // TODO handle error ndbrequire(ret > 0); @@ -256,7 +256,7 @@ Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize) { const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit]; const TupLoc tupLoc = ent.m_tupLoc; - int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, pkData); + int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), pkData); jamEntry(); // TODO handle error ndbrequire(ret > 0); diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp index 3c0af3ca79d..e0b7fec19cf 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp @@ -235,6 +235,20 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) tree.m_minOccup = tree.m_maxOccup - maxSlack; // root node does not exist (also set by ctor) tree.m_root = NullTupLoc; +#ifdef VM_TRACE + if (debugFlags & DebugMeta) { + if (fragOpPtr.p->m_fragNo == 0) { + debugOut << "Index id=" << indexPtr.i; + debugOut << " nodeSize=" << tree.m_nodeSize; + debugOut << " headSize=" << NodeHeadSize; + debugOut << " prefSize=" << tree.m_prefSize; + debugOut << " entrySize=" << TreeEntSize; + debugOut << " minOccup=" << tree.m_minOccup; + debugOut << " maxOccup=" << tree.m_maxOccup; + debugOut << endl; + } + } +#endif // fragment is defined c_fragOpPool.release(fragOpPtr); } diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp index a1bfa2179bb..51c865b2dbc 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp @@ -24,8 +24,8 @@ int Dbtux::allocNode(Signal* signal, NodeHandle& node) { Frag& frag = node.m_frag; - Uint32 pageId = NullTupLoc.m_pageId; - Uint32 pageOffset = NullTupLoc.m_pageOffset; + Uint32 pageId = NullTupLoc.getPageId(); + Uint32 pageOffset = NullTupLoc.getPageOffset(); Uint32* node32 = 0; int errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); jamEntry(); @@ -60,8 +60,8 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc) { Frag& frag = node.m_frag; ndbrequire(loc != NullTupLoc); - Uint32 pageId = loc.m_pageId; - Uint32 pageOffset = loc.m_pageOffset; + Uint32 pageId = loc.getPageId(); + Uint32 pageOffset = loc.getPageOffset(); Uint32* node32 = 0; c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); jamEntry(); @@ -100,8 +100,8 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node) Frag& frag = node.m_frag; ndbrequire(node.getOccup() == 0); TupLoc loc = node.m_loc; - Uint32 pageId = loc.m_pageId; - Uint32 pageOffset = loc.m_pageOffset; + Uint32 pageId = loc.getPageId(); + Uint32 pageOffset = loc.getPageOffset(); Uint32* node32 = reinterpret_cast(node.m_node); c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); jamEntry(); diff --git a/ndb/src/kernel/blocks/dbtux/Times.txt b/ndb/src/kernel/blocks/dbtux/Times.txt index 03473353a52..698e93b80ef 100644 --- a/ndb/src/kernel/blocks/dbtux/Times.txt +++ b/ndb/src/kernel/blocks/dbtux/Times.txt @@ -108,4 +108,11 @@ charsets mc02/a 35 ms 60 ms 71 pct [ case b: TUX can no longer use pointers to TUP data ] +optim 15 mc02/a 34 ms 60 ms 72 pct + mc02/b 42 ms 85 ms 100 pct + mc02/c 5 ms 12 ms 110 pct + mc02/d 178 ms 242 ms 35 pct + +[ corrected wasted space in index node ] + vim: set et: diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp index 10994fd7847..214816a1ba1 100644 --- a/ndb/test/ndbapi/testOIBasic.cpp +++ b/ndb/test/ndbapi/testOIBasic.cpp @@ -212,6 +212,8 @@ struct Par : public Opt { // value calculation unsigned m_range; unsigned m_pctrange; + // choice of key + bool m_randomkey; // do verify after read bool m_verify; // deadlock possible @@ -227,6 +229,7 @@ struct Par : public Opt { m_totrows(m_threads * m_rows), m_range(m_rows), m_pctrange(0), + m_randomkey(false), m_verify(false), m_deadlock(false) { } @@ -2119,7 +2122,8 @@ pkupdate(Par par) Lst lst; bool deadlock = false; for (unsigned j = 0; j < par.m_rows; j++) { - unsigned i = thrrow(par, j); + unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows); + unsigned i = thrrow(par, j2); set.lock(); if (! set.exist(i) || set.pending(i)) { set.unlock(); @@ -2722,6 +2726,7 @@ pkupdateindexbuild(Par par) if (par.m_no == 0) { CHK(createindex(par) == 0); } else { + par.m_randomkey = true; CHK(pkupdate(par) == 0); } return 0; From f5571c3def3ca43a120443207131fd07c4d04daa Mon Sep 17 00:00:00 2001 From: unknown Date: Sun, 10 Oct 2004 18:21:05 +0200 Subject: [PATCH 3/3] NDB tux optim 16 - binary search on bounding node when adding entry ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp: tux optim 15 - binary search on bounding node when adding entry ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp: tux optim 15 - binary search on bounding node when adding entry ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp: tux optim 15 - binary search on bounding node when adding entry ndb/src/kernel/blocks/dbtux/Times.txt: tux optim 15 - binary search on bounding node when adding entry --- ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp | 5 +- ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp | 4 +- ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp | 62 ++++++++++++++------- ndb/src/kernel/blocks/dbtux/Times.txt | 5 ++ 4 files changed, 53 insertions(+), 23 deletions(-) diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp index d88119976be..9207c938e3a 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp @@ -424,8 +424,9 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node) } data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + tree.m_prefSize; const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data; - for (unsigned pos = 0; pos < numpos; pos++) - out << " " << entList[pos]; + // print entries in logical order + for (unsigned pos = 1; pos <= numpos; pos++) + out << " " << entList[pos % numpos]; out << "]"; } out << "]"; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp index 24b030bf8ec..565e64f9aeb 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp @@ -120,7 +120,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) searchToAdd(signal, frag, c_searchKey, ent, treePos); #ifdef VM_TRACE if (debugFlags & DebugMaint) { - debugOut << treePos << endl; + debugOut << treePos << (treePos.m_match ? " - error" : "") << endl; } #endif if (treePos.m_match) { @@ -154,7 +154,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) searchToRemove(signal, frag, c_searchKey, ent, treePos); #ifdef VM_TRACE if (debugFlags & DebugMaint) { - debugOut << treePos << endl; + debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl; } #endif if (! treePos.m_match) { diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp index bffbb8f5594..7568e27ac74 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp @@ -31,10 +31,11 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear const unsigned numAttrs = frag.m_numAttrs; NodeHandle currNode(frag); currNode.m_loc = tree.m_root; + // assume success + treePos.m_match = false; if (currNode.m_loc == NullTupLoc) { // empty tree jam(); - treePos.m_match = false; return; } NodeHandle glbNode(frag); // potential g.l.b of final node @@ -93,6 +94,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear jam(); treePos.m_loc = currNode.m_loc; treePos.m_pos = 0; + // failed treePos.m_match = true; return; } @@ -100,9 +102,16 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear } // access rest of current node accessNode(signal, currNode, AccFull); - for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) { + // anticipate + treePos.m_loc = currNode.m_loc; + // binary search + int lo = -1; + int hi = currNode.getOccup(); + int ret; + while (1) { jam(); - int ret; + // hi - lo > 1 implies lo < j < hi + int j = (hi + lo) / 2; // read and compare attributes unsigned start = 0; readKeyAttrs(frag, currNode.getEnt(j), start, c_entryKey); @@ -113,25 +122,38 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear // keys are equal, compare entry values ret = searchEnt.cmp(currNode.getEnt(j)); } - if (ret <= 0) { - jam(); - treePos.m_loc = currNode.m_loc; + if (ret < 0) + hi = j; + else if (ret > 0) + lo = j; + else { treePos.m_pos = j; - treePos.m_match = (ret == 0); + // failed + treePos.m_match = true; return; } + if (hi - lo == 1) + break; } - if (! bottomNode.isNull()) { + if (ret < 0) { jam(); - // backwards compatible for now - treePos.m_loc = bottomNode.m_loc; - treePos.m_pos = 0; - treePos.m_match = false; + treePos.m_pos = hi; return; } - treePos.m_loc = currNode.m_loc; - treePos.m_pos = currNode.getOccup(); - treePos.m_match = false; + if (hi < currNode.getOccup()) { + jam(); + treePos.m_pos = hi; + return; + } + if (bottomNode.isNull()) { + jam(); + treePos.m_pos = hi; + return; + } + jam(); + // backwards compatible for now + treePos.m_loc = bottomNode.m_loc; + treePos.m_pos = 0; } /* @@ -150,9 +172,12 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s const unsigned numAttrs = frag.m_numAttrs; NodeHandle currNode(frag); currNode.m_loc = tree.m_root; + // assume success + treePos.m_match = true; if (currNode.m_loc == NullTupLoc) { // empty tree jam(); + // failed treePos.m_match = false; return; } @@ -206,27 +231,26 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s jam(); treePos.m_loc = currNode.m_loc; treePos.m_pos = 0; - treePos.m_match = true; return; } break; } // access rest of current node accessNode(signal, currNode, AccFull); + // anticipate + treePos.m_loc = currNode.m_loc; // pos 0 was handled above for (unsigned j = 1, occup = currNode.getOccup(); j < occup; j++) { jam(); // compare only the entry if (searchEnt.eq(currNode.getEnt(j))) { jam(); - treePos.m_loc = currNode.m_loc; treePos.m_pos = j; - treePos.m_match = true; return; } } - treePos.m_loc = currNode.m_loc; treePos.m_pos = currNode.getOccup(); + // failed treePos.m_match = false; } diff --git a/ndb/src/kernel/blocks/dbtux/Times.txt b/ndb/src/kernel/blocks/dbtux/Times.txt index 698e93b80ef..8fbb695ef82 100644 --- a/ndb/src/kernel/blocks/dbtux/Times.txt +++ b/ndb/src/kernel/blocks/dbtux/Times.txt @@ -115,4 +115,9 @@ optim 15 mc02/a 34 ms 60 ms 72 pct [ corrected wasted space in index node ] +optim 16 mc02/a 34 ms 53 ms 53 pct + mc02/b 42 ms 75 ms 75 pct + +[ case a, b: binary search of bounding node when adding entry ] + vim: set et: