From 1c75becb7c25ed2f2adb582e39a003e6be46862c Mon Sep 17 00:00:00 2001 From: "joreland@mysql.com" <> Date: Wed, 17 Nov 2004 10:07:52 +0100 Subject: [PATCH] wl#2126 - read_multi_range ndb part --- ndb/include/ndbapi/NdbConnection.hpp | 4 + ndb/include/ndbapi/NdbOperation.hpp | 14 +- ndb/src/ndbapi/NdbConnection.cpp | 16 +- ndb/src/ndbapi/NdbOperationExec.cpp | 5 +- sql/ha_ndbcluster.cc | 273 +++++++++++++++++++++------ sql/ha_ndbcluster.h | 12 ++ 6 files changed, 243 insertions(+), 81 deletions(-) diff --git a/ndb/include/ndbapi/NdbConnection.hpp b/ndb/include/ndbapi/NdbConnection.hpp index 7af5d27b922..500dea52916 100644 --- a/ndb/include/ndbapi/NdbConnection.hpp +++ b/ndb/include/ndbapi/NdbConnection.hpp @@ -430,6 +430,10 @@ public: */ const NdbOperation * getNextCompletedOperation(const NdbOperation * op)const; + + const NdbOperation* getFirstDefinedOperation()const{return theFirstOpInList;} + const NdbOperation* getLastDefinedOperation()const{return theLastOpInList;} + /** @} *********************************************************************/ /** diff --git a/ndb/include/ndbapi/NdbOperation.hpp b/ndb/include/ndbapi/NdbOperation.hpp index f32a418f6d2..1ca5b95840b 100644 --- a/ndb/include/ndbapi/NdbOperation.hpp +++ b/ndb/include/ndbapi/NdbOperation.hpp @@ -718,6 +718,7 @@ public: }; LockMode getLockMode() const { return theLockMode; } + void setAbortOption(Int8 ao) { m_abortOption = ao; } /** * Set/get distribution/partition key @@ -746,10 +747,13 @@ protected: void initInterpreter(); void next(NdbOperation*); // Set next pointer - NdbOperation* next(); // Get next pointer +public: + const NdbOperation* next() const; +protected: - enum OperationStatus{ + enum OperationStatus + { Init, OperationDefined, TupleKeyDefined, @@ -995,6 +999,12 @@ NdbOperation::next() return theNext; } +inline +const NdbOperation* +NdbOperation::next() const +{ + return theNext; +} /****************************************************************************** OperationStatus Status(); diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp index 8cf7c46deee..ed437e5e072 100644 --- a/ndb/src/ndbapi/NdbConnection.cpp +++ b/ndb/src/ndbapi/NdbConnection.cpp @@ -232,21 +232,6 @@ Remark: Handle time-out on a transaction object. void NdbConnection::handleExecuteCompletion() { - - if (theCompletionStatus == CompletedFailure) { - NdbOperation* tOpTemp = theFirstExecOpInList; - while (tOpTemp != NULL) { -/***************************************************************************** - * Ensure that all executing operations report failed for each - * read attribute when failure occurs. - * We do not want any operations to report both failure and - * success on different read attributes. - ****************************************************************************/ - tOpTemp->handleFailedAI_ElemLen(); - tOpTemp = tOpTemp->next(); - }//while - theReturnStatus = ReturnFailure; - }//if /*************************************************************************** * Move the NdbOperation objects from the list of executing * operations to list of completed @@ -1512,6 +1497,7 @@ transactions. /**********************************************************************/ theCompletionStatus = CompletedFailure; theCommitStatus = Aborted; + theReturnStatus = ReturnFailure; return 0; } else { #ifdef NDB_NO_DROPPED_SIGNAL diff --git a/ndb/src/ndbapi/NdbOperationExec.cpp b/ndb/src/ndbapi/NdbOperationExec.cpp index fc263609eb4..9258ce618e8 100644 --- a/ndb/src/ndbapi/NdbOperationExec.cpp +++ b/ndb/src/ndbapi/NdbOperationExec.cpp @@ -543,10 +543,11 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal) theStatus = Finished; // blobs want this if (m_abortOption != IgnoreError) + { theNdbCon->theReturnStatus = NdbConnection::ReturnFailure; - + } theError.code = aSignal->readData(4); - theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4), m_abortOption); + theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4), ao); if(theOperationType != ReadRequest || !theSimpleIndicator) // not simple read return theNdbCon->OpCompleteFailure(ao, m_abortOption != IgnoreError); diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 17aaaf20fa2..428db43af28 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -55,8 +55,9 @@ static const char *ha_ndb_ext=".ndb"; #define ERR_RETURN(err) \ { \ - ERR_PRINT(err); \ - DBUG_RETURN(ndb_to_mysql_error(&err)); \ + const NdbError& tmp= err; \ + ERR_PRINT(tmp); \ + DBUG_RETURN(ndb_to_mysql_error(&tmp)); \ } // Typedefs for long names @@ -980,20 +981,21 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op) int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) { + int res; + DBUG_ENTER("pk_read"); + DBUG_PRINT("enter", ("key_len: %u", key_len)); + DBUG_DUMP("key", (char*)key, key_len); uint no_fields= table->fields, i; NdbConnection *trans= m_active_trans; NdbOperation *op; THD *thd= current_thd; - DBUG_ENTER("pk_read"); - DBUG_PRINT("enter", ("key_len: %u", key_len)); - DBUG_DUMP("key", (char*)key, key_len); NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || op->readTuple(lm) != 0) ERR_RETURN(trans->getNdbError()); - + if (table->primary_key == MAX_KEY) { // This table has no primary key, use "hidden" primary key @@ -1001,34 +1003,19 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) DBUG_DUMP("key", (char*)key, 8); if (set_hidden_key(op, no_fields, key)) ERR_RETURN(trans->getNdbError()); - + // Read key at the same time, for future reference if (get_ndb_value(op, NULL, no_fields, NULL)) ERR_RETURN(trans->getNdbError()); } else { - int res; if ((res= set_primary_key(op, key))) return res; } - // Read all wanted non-key field(s) unless HA_EXTRA_RETRIEVE_ALL_COLS - for (i= 0; i < no_fields; i++) - { - Field *field= table->field[i]; - if ((thd->query_id == field->query_id) || - m_retrieve_all_fields) - { - if (get_ndb_value(op, field, i, buf)) - ERR_RETURN(trans->getNdbError()); - } - else - { - // Attribute was not to be read - m_value[i].ptr= NULL; - } - } + if((res= define_read_attrs(buf, op))) + DBUG_RETURN(res); if (execute_no_commit_ie(this,trans) != 0) { @@ -1042,7 +1029,6 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) DBUG_RETURN(0); } - /* Read one complementing record from NDB using primary key from old_data */ @@ -1101,6 +1087,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) int ha_ndbcluster::unique_index_read(const byte *key, uint key_len, byte *buf) { + int res; NdbConnection *trans= m_active_trans; NdbIndexOperation *op; THD *thd= current_thd; @@ -1133,22 +1120,8 @@ int ha_ndbcluster::unique_index_read(const byte *key, key_ptr+= key_part->length; } - // Get non-index attribute(s) - for (i= 0; i < table->fields; i++) - { - Field *field= table->field[i]; - if ((thd->query_id == field->query_id) || - (field->flags & PRI_KEY_FLAG)) - { - if (get_ndb_value(op, field, i, buf)) - ERR_RETURN(op->getNdbError()); - } - else - { - // Attribute was not to be read - m_value[i].ptr= NULL; - } - } + if((res= define_read_attrs(buf, op))) + DBUG_RETURN(res); if (execute_no_commit_ie(this,trans) != 0) { @@ -1446,11 +1419,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) if (get_ndb_value(op, NULL, hidden_no, NULL)) ERR_RETURN(op->getNdbError()); } - - if (execute_no_commit(this,trans) != 0) - DBUG_RETURN(ndb_err(trans)); - DBUG_PRINT("exit", ("Scan started successfully")); - DBUG_RETURN(next_result(buf)); + DBUG_RETURN(0); } /* @@ -1461,6 +1430,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, const key_range *end_key, bool sorted, byte* buf) { + int res; bool restart; NdbConnection *trans= m_active_trans; NdbResultSet *cursor; @@ -1497,23 +1467,21 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, { const key_range *keys[2]= { start_key, end_key }; - int ret= set_bounds(op, keys); - if (ret) - DBUG_RETURN(ret); + res= set_bounds(op, keys); + if (res) + DBUG_RETURN(res); } - if (!restart) + if (!restart && (res= define_read_attrs(buf, op))) { - DBUG_RETURN(define_read_attrs(buf, op)); + DBUG_RETURN(res); } - else - { - if (execute_no_commit(this,trans) != 0) - DBUG_RETURN(ndb_err(trans)); - - DBUG_RETURN(next_result(buf)); - } -} + + if (execute_no_commit(this,trans) != 0) + DBUG_RETURN(ndb_err(trans)); + + DBUG_RETURN(next_result(buf)); +} /* Start a filtered scan in NDB. @@ -1533,6 +1501,7 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, byte *buf, enum ha_rkey_function find_flag) { + int res; NdbConnection *trans= m_active_trans; NdbResultSet *cursor; NdbScanOperation *op; @@ -1596,9 +1565,14 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, sf.end(); } - DBUG_RETURN(define_read_attrs(buf, op)); -} + if((res= define_read_attrs(buf, op))) + DBUG_RETURN(res); + if (execute_no_commit(this,trans) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_PRINT("exit", ("Scan started successfully")); + DBUG_RETURN(next_result(buf)); +} /* Start full table scan in NDB @@ -1607,6 +1581,7 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, int ha_ndbcluster::full_table_scan(byte *buf) { uint i; + int res; NdbResultSet *cursor; NdbScanOperation *op; NdbConnection *trans= m_active_trans; @@ -1620,7 +1595,14 @@ int ha_ndbcluster::full_table_scan(byte *buf) !(cursor= op->readTuples(lm, 0, parallelism))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; - DBUG_RETURN(define_read_attrs(buf, op)); + + if((res= define_read_attrs(buf, op))) + DBUG_RETURN(res); + + if (execute_no_commit(this,trans) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_PRINT("exit", ("Scan started successfully")); + DBUG_RETURN(next_result(buf)); } /* @@ -4554,4 +4536,171 @@ int ha_ndbcluster::write_ndb_file() DBUG_RETURN(error); } +int +ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p, + key_multi_range *ranges, + uint range_count, + bool sorted, + handler_buffer *buffer) +{ + DBUG_ENTER("ha_ndbcluster::read_multi_range_first"); + + int res; + uint i; + KEY* key_info= table->key_info + active_index; + NDB_INDEX_TYPE index_type = get_index_type(active_index); + ulong reclength = table->reclength; + NdbOperation* op; + + switch(index_type){ + case UNIQUE_INDEX: + case PRIMARY_KEY_INDEX: + break; + case PRIMARY_KEY_ORDERED_INDEX: + case UNIQUE_ORDERED_INDEX: + /** + * Currently not supported on ordered indexes + */ + for(i= 0; ikey_length && + ranges[i].start_key.flag == HA_READ_KEY_EXACT) + continue; + + /** + * Mark that we using hander:: implementation + */ + m_disable_multi_read= true; + return handler::read_multi_range_first(found_range_p, + ranges, + range_count, + sorted, + buffer); + } + break; + default: + case ORDERED_INDEX: + m_disable_multi_read= true; + return handler::read_multi_range_first(found_range_p, + ranges, + range_count, + sorted, + buffer); + } + + m_disable_multi_read= false; + + multi_ranges= ranges; + multi_range_count= range_count; + multi_range_sorted= sorted; + multi_range_buffer= buffer; + multi_range_found_p= found_range_p; + + byte* curr = buffer->buffer; + NdbOperation::LockMode lm= + (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); + const NDBTAB *tab= (const NDBTAB *) m_table; + const NDBINDEX *unique_idx= (NDBINDEX *) m_index[active_index].unique_index; + const NdbOperation* lastOp = m_active_trans->getLastDefinedOperation(); + + switch(index_type){ + case PRIMARY_KEY_INDEX: + case PRIMARY_KEY_ORDERED_INDEX: + for(i= 0; ibuffer_end; i++) + { + if ((op= m_active_trans->getNdbOperation(tab)) && + !op->readTuple(lm) && + !set_primary_key(op, ranges[i].start_key.key) && + !define_read_attrs(curr, op) && + (op->setAbortOption(IgnoreError), true)) + curr += reclength; + else + ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError()); + } + break; + case UNIQUE_INDEX: + case UNIQUE_ORDERED_INDEX: + for(i= 0; ibuffer_end; i++) + { + if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) && + !op->readTuple(lm) && + !set_primary_key(op, ranges[i].start_key.key) && + !define_read_attrs(curr, op) && + (op->setAbortOption(IgnoreError), true)) + curr += reclength; + else + ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError()); + } + } + + if(i != range_count) + { + buffer->end_of_used_area= buffer->buffer_end; + } + else + { + buffer->end_of_used_area= curr; + } + + /** + * Set first operation in multi range + */ + m_current_multi_operation= + lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation(); + if(!(res= execute_no_commit_ie(this, m_active_trans))) + { + multi_range_curr= 0; + m_multi_range_result_ptr= buffer->buffer; + return read_multi_range_next(); + } + ERR_RETURN(m_active_trans->getNdbError()); +} + +int +ha_ndbcluster::read_multi_range_next() +{ + DBUG_ENTER("ha_ndbcluster::read_multi_range_next"); + if(m_disable_multi_read) + DBUG_RETURN(handler::read_multi_range_next()); + + ulong reclength = table->reclength; + const NdbOperation* op = m_current_multi_operation; + while(multi_range_curr < multi_range_count && op && op->getNdbError().code) + { + multi_range_curr++; + op = m_active_trans->getNextCompletedOperation(op); + m_multi_range_result_ptr += reclength; + } + + if(multi_range_curr < multi_range_count && op) + { + * multi_range_found_p= multi_ranges + multi_range_curr; + memcpy(table->record[0], m_multi_range_result_ptr, reclength); + unpack_record(table->record[0]); + table->status= 0; + + /** + * Move to next + */ + multi_range_curr++; + m_current_multi_operation = m_active_trans->getNextCompletedOperation(op); + m_multi_range_result_ptr += reclength; + DBUG_RETURN(0); + } + + if(multi_range_curr == multi_range_count) + { + DBUG_RETURN(HA_ERR_END_OF_FILE); + } + /** + * Read remaining ranges + */ + uint left = multi_range_count - multi_range_curr; + DBUG_RETURN(read_multi_range_first(multi_range_found_p, + multi_ranges + multi_range_curr, + left, + multi_range_sorted, + multi_range_buffer)); +} + #endif /* HAVE_NDBCLUSTER_DB */ diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h index d61876b1357..6fc9665697d 100644 --- a/sql/ha_ndbcluster.h +++ b/sql/ha_ndbcluster.h @@ -111,6 +111,14 @@ class ha_ndbcluster: public handler byte* buf); int read_range_next(); + /** + * Multi range stuff + */ + int read_multi_range_first(key_multi_range **found_range_p, + key_multi_range *ranges, uint range_count, + bool sorted, handler_buffer *buffer); + int read_multi_range_next(void); + bool get_error_message(int error, String *buf); void info(uint); int extra(enum ha_extra_function operation); @@ -257,6 +265,10 @@ class ha_ndbcluster: public handler uint32 m_blobs_buffer_size; uint m_dupkey; + bool m_disable_multi_read; + byte* m_multi_range_result_ptr; + const NdbOperation* m_current_multi_operation; + void set_rec_per_key(); void records_update(); void no_uncommitted_rows_execute_failure();