diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 05bc6345d24..246664a03de 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -87,6 +87,12 @@ static int unpackfrm(const void **data, uint *len, static int ndb_get_table_statistics(Ndb*, const char *, struct Ndb_statistics *); +// Util thread variables +static pthread_t ndb_util_thread; +pthread_mutex_t LOCK_ndb_util_thread; +pthread_cond_t COND_ndb_util_thread; +extern "C" pthread_handler_decl(ndb_util_thread_func, arg); +ulong ndb_cache_check_time; /* Dummy buffer to read zero pack_length fields @@ -2354,22 +2360,21 @@ void ha_ndbcluster::print_results() if (!_db_on_) DBUG_VOID_RETURN; - + char buf_type[MAX_FIELD_WIDTH], buf_val[MAX_FIELD_WIDTH]; - String type(buf_type, sizeof(buf_type), &my_charset_bin); + String type(buf_type, sizeof(buf_type), &my_charset_bin); String val(buf_val, sizeof(buf_val), &my_charset_bin); for (uint f=0; fs->fields;f++) { - // Use DBUG_PRINT since DBUG_FILE cannot be filtered out + /* Use DBUG_PRINT since DBUG_FILE cannot be filtered out */ char buf[2000]; Field *field; void* ptr; - const NDBCOL *col= NULL; NdbValue value; NdbBlob *ndb_blob; buf[0]= 0; - field= table->field[f]; + field= table->field[f]; if (!(value= m_value[f]).ptr) { my_snprintf(buf, sizeof(buf), "not read"); @@ -2377,8 +2382,6 @@ void ha_ndbcluster::print_results() } ptr= field->ptr; - DBUG_DUMP("field->ptr", (char*)ptr, field->pack_length()); - col= tab->getColumn(f); if (! (field->flags & BLOB_FLAG)) { @@ -2404,9 +2407,9 @@ void ha_ndbcluster::print_results() goto print_value; } } - + print_value: - DBUG_PRINT("value", ("%u,%s: %s", f, col->getName(), buf)); + DBUG_PRINT("value", ("%u,%s: %s", f, field->field_name, buf)); } #endif DBUG_VOID_RETURN; @@ -3051,8 +3054,12 @@ int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size) } -const char **ha_ndbcluster::bas_ext() const -{ static const char *ext[]= { ha_ndb_ext, NullS }; return ext; } +static const char *ha_ndb_bas_ext[]= { ha_ndb_ext, NullS }; +const char** +ha_ndbcluster::bas_ext() const +{ + return ha_ndb_bas_ext; +} /* @@ -3220,7 +3227,6 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) m_transaction_on= FALSE; else m_transaction_on= thd->variables.ndb_use_transactions; - // m_use_local_query_cache= thd->variables.ndb_use_local_query_cache; m_active_trans= thd->transaction.all.ndb_tid ? (NdbTransaction*)thd->transaction.all.ndb_tid: @@ -3647,6 +3653,52 @@ static int create_ndb_column(NDBCOL &col, Create a table in NDB Cluster */ +static void ndb_set_fragmentation(NDBTAB &tab, TABLE *form, uint pk_length) +{ + if (form->s->max_rows == 0) /* default setting, don't set fragmentation */ + return; + /** + * get the number of fragments right + */ + uint no_fragments; + { +#if MYSQL_VERSION_ID >= 50000 + uint acc_row_size= 25 + /*safety margin*/ 2; +#else + uint acc_row_size= pk_length*4; + /* add acc overhead */ + if (pk_length <= 8) /* main page will set the limit */ + acc_row_size+= 25 + /*safety margin*/ 2; + else /* overflow page will set the limit */ + acc_row_size+= 4 + /*safety margin*/ 4; +#endif + ulonglong acc_fragment_size= 512*1024*1024; + ulonglong max_rows= form->s->max_rows; +#if MYSQL_VERSION_ID >= 50100 + no_fragments= (max_rows*acc_row_size)/acc_fragment_size+1; +#else + no_fragments= ((max_rows*acc_row_size)/acc_fragment_size+1 + +1/*correct rounding*/)/2; +#endif + } + { + uint no_nodes= g_ndb_cluster_connection->no_db_nodes(); + NDBTAB::FragmentType ftype; + if (no_fragments > 2*no_nodes) + { + ftype= NDBTAB::FragAllLarge; + if (no_fragments > 4*no_nodes) + push_warning(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR, + "Ndb might have problems storing the max amount of rows specified"); + } + else if (no_fragments > no_nodes) + ftype= NDBTAB::FragAllMedium; + else + ftype= NDBTAB::FragAllSmall; + tab.setFragmentType(ftype); + } +} + int ha_ndbcluster::create(const char *name, TABLE *form, HA_CREATE_INFO *info) @@ -3748,7 +3800,9 @@ int ha_ndbcluster::create(const char *name, break; } } - + + ndb_set_fragmentation(tab, form, pk_length); + if ((my_errno= check_ndb_connection())) DBUG_RETURN(my_errno); @@ -4014,7 +4068,6 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_force_send(TRUE), m_autoincrement_prefetch(32), m_transaction_on(TRUE), - m_use_local_query_cache(FALSE), m_cond_stack(NULL), m_multi_cursor(NULL) { @@ -4070,6 +4123,7 @@ ha_ndbcluster::~ha_ndbcluster() } + /* Open a table for further use - fetch metadata for this table from NDB @@ -4170,16 +4224,14 @@ void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb) Ndb* check_ndb_in_thd(THD* thd) { - DBUG_ENTER("check_ndb_in_thd"); - Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb; - + Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb; if (!thd_ndb) { if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb())) - DBUG_RETURN(NULL); + return NULL; thd->transaction.thd_ndb= thd_ndb; } - DBUG_RETURN(thd_ndb->ndb); + return thd_ndb->ndb; } @@ -4532,13 +4584,21 @@ bool ndbcluster_init() (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0, (hash_get_key) ndbcluster_get_key,0,0); pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST); + pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST); + pthread_cond_init(&COND_ndb_util_thread, NULL); + + // Create utility thread + pthread_t tmp; + if (pthread_create(&tmp, &connection_attrib, ndb_util_thread_func, 0)) + { + DBUG_PRINT("error", ("Could not create ndb utility thread")); + goto ndbcluster_init_error; + } + ndbcluster_inited= 1; -#ifdef USE_DISCOVER_ON_STARTUP - if (ndb_discover_tables() != 0) - goto ndbcluster_init_error; -#endif DBUG_RETURN(FALSE); + ndbcluster_init_error: ndbcluster_end(); DBUG_RETURN(TRUE); @@ -4548,12 +4608,19 @@ bool ndbcluster_init() /* End use of the NDB Cluster table handler - free all global variables allocated by - ndcluster_init() + ndbcluster_init() */ bool ndbcluster_end() { DBUG_ENTER("ndbcluster_end"); + + // Kill ndb utility thread + (void) pthread_mutex_lock(&LOCK_ndb_util_thread); + DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread)); + (void) pthread_cond_signal(&COND_ndb_util_thread); + (void) pthread_mutex_unlock(&LOCK_ndb_util_thread); + if(g_ndb) delete g_ndb; g_ndb= NULL; @@ -4564,6 +4631,8 @@ bool ndbcluster_end() DBUG_RETURN(0); hash_free(&ndbcluster_open_tables); pthread_mutex_destroy(&ndbcluster_mutex); + pthread_mutex_destroy(&LOCK_ndb_util_thread); + pthread_cond_destroy(&COND_ndb_util_thread); ndbcluster_inited= 0; DBUG_RETURN(0); } @@ -4756,16 +4825,174 @@ const char* ha_ndbcluster::index_type(uint key_number) return "HASH"; } } + uint8 ha_ndbcluster::table_cache_type() { - if (m_use_local_query_cache) - return HA_CACHE_TBL_TRANSACT; - else - return HA_CACHE_TBL_NOCACHE; + DBUG_ENTER("ha_ndbcluster::table_cache_type=HA_CACHE_TBL_ASKTRANSACT"); + DBUG_RETURN(HA_CACHE_TBL_ASKTRANSACT); } + +uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname, + Uint64 *commit_count) +{ + DBUG_ENTER("ndb_get_commitcount"); + + if (ndb_cache_check_time > 0) + { + /* Use cached commit_count from share */ + char name[FN_REFLEN]; + NDB_SHARE *share; + (void)strxnmov(name, FN_REFLEN, + "./",dbname,"/",tabname,NullS); + DBUG_PRINT("info", ("name: %s", name)); + pthread_mutex_lock(&ndbcluster_mutex); + if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables, + (byte*) name, + strlen(name)))) + { + pthread_mutex_unlock(&ndbcluster_mutex); + DBUG_RETURN(1); + } + *commit_count= share->commit_count; + DBUG_PRINT("info", ("commit_count: %d", *commit_count)); + pthread_mutex_unlock(&ndbcluster_mutex); + DBUG_RETURN(0); + } + + /* Get commit_count from NDB */ + Ndb *ndb; + if (!(ndb= check_ndb_in_thd(thd))) + DBUG_RETURN(1); + ndb->setDatabaseName(dbname); + + struct Ndb_statistics stat; + if (ndb_get_table_statistics(ndb, tabname, &stat)) + DBUG_RETURN(1); + *commit_count= stat.commit_count; + DBUG_RETURN(0); +} + + /* - Handling the shared NDB_SHARE structure that is needed to + Check if a cached query can be used. + This is done by comparing the supplied engine_data to commit_count of + the table. + The commit_count is either retrieved from the share for the table, where + it has been cached by the util thread. If the util thread is not started, + NDB has to be contacetd to retrieve the commit_count, this will introduce + a small delay while waiting for NDB to answer. + + + SYNOPSIS + ndbcluster_cache_retrieval_allowed + thd thread handle + full_name concatenation of database name, + the null character '\0', and the table + name + full_name_len length of the full name, + i.e. len(dbname) + len(tablename) + 1 + + engine_data parameter retrieved when query was first inserted into + the cache. If the value of engine_data is changed, + all queries for this table should be invalidated. + + RETURN VALUE + TRUE Yes, use the query from cache + FALSE No, don't use the cached query, and if engine_data + has changed, all queries for this table should be invalidated + +*/ + +static my_bool +ndbcluster_cache_retrieval_allowed(THD *thd, + char *full_name, uint full_name_len, + ulonglong *engine_data) +{ + DBUG_ENTER("ndbcluster_cache_retrieval_allowed"); + + Uint64 commit_count; + bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)); + char *dbname= full_name; + char *tabname= dbname+strlen(dbname)+1; + + DBUG_PRINT("enter",("dbname=%s, tabname=%s, autocommit=%d", + dbname, tabname, is_autocommit)); + + if (!is_autocommit) + DBUG_RETURN(FALSE); + + if (ndb_get_commitcount(thd, dbname, tabname, &commit_count)) + { + *engine_data+= 1; /* invalidate */ + DBUG_RETURN(FALSE); + } + DBUG_PRINT("info", ("*engine_data=%llu, commit_count=%llu", + *engine_data, commit_count)); + if (*engine_data != commit_count) + { + *engine_data= commit_count; /* invalidate */ + DBUG_PRINT("exit",("Do not use cache, commit_count has changed")); + DBUG_RETURN(FALSE); + } + + DBUG_PRINT("exit",("OK to use cache, *engine_data=%llu",*engine_data)); + DBUG_RETURN(TRUE); +} + + +/** + Register a table for use in the query cache. Fetch the commit_count + for the table and return it in engine_data, this will later be used + to check if the table has changed, before the cached query is reused. + + SYNOPSIS + ha_ndbcluster::can_query_cache_table + thd thread handle + full_name concatenation of database name, + the null character '\0', and the table + name + full_name_len length of the full name, + i.e. len(dbname) + len(tablename) + 1 + qc_engine_callback function to be called before using cache on this table + engine_data out, commit_count for this table + + RETURN VALUE + TRUE Yes, it's ok to cahce this query + FALSE No, don't cach the query + +*/ + +my_bool +ha_ndbcluster::register_query_cache_table(THD *thd, + char *full_name, uint full_name_len, + qc_engine_callback *engine_callback, + ulonglong *engine_data) +{ + DBUG_ENTER("ha_ndbcluster::register_query_cache_table"); + + bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)); + DBUG_PRINT("enter",("dbname=%s, tabname=%s, is_autocommit=%d", + m_dbname,m_tabname,is_autocommit)); + if (!is_autocommit) + DBUG_RETURN(FALSE); + + Uint64 commit_count; + if (ndb_get_commitcount(thd, m_dbname, m_tabname, &commit_count)) + { + *engine_data= 0; + DBUG_PRINT("error", ("Could not get commitcount")) + DBUG_RETURN(FALSE); + } + *engine_data= commit_count; + *engine_callback= ndbcluster_cache_retrieval_allowed; + DBUG_PRINT("exit",("*engine_data=%llu", *engine_data)); + DBUG_RETURN(TRUE); +} + + +/* + Handling the shared NDB_SHARE structure that is needed to provide table locking. It's also used for sharing data with other NDB handlers in the same MySQL Server. There is currently not much @@ -4802,8 +5029,14 @@ static NDB_SHARE* get_share(const char *table_name) } thr_lock_init(&share->lock); pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST); + share->commit_count= 0; } } + DBUG_PRINT("share", + ("table_name: %s, length: %d, use_count: %d, commit_count: %d", + share->table_name, share->table_name_length, share->use_count, + share->commit_count)); + share->use_count++; pthread_mutex_unlock(&ndbcluster_mutex); return share; @@ -5372,6 +5605,165 @@ ha_ndbcluster::setup_recattr(const NdbRecAttr* curr) DBUG_RETURN(0); } +char* +ha_ndbcluster::update_table_comment( + /* out: table comment + additional */ + const char* comment)/* in: table comment defined by user */ +{ + uint length= strlen(comment); + if(length > 64000 - 3) + { + return((char*)comment); /* string too long */ + } + + Ndb* ndb; + if (!(ndb= get_ndb())) + { + return((char*)comment); + } + + ndb->setDatabaseName(m_dbname); + NDBDICT* dict= ndb->getDictionary(); + const NDBTAB* tab; + if (!(tab= dict->getTable(m_tabname))) + { + return((char*)comment); + } + + char *str; + const char *fmt="%s%snumber_of_replicas: %d"; + const unsigned fmt_len_plus_extra= length + strlen(fmt); + if ((str= my_malloc(fmt_len_plus_extra, MYF(0))) == NULL) + { + return (char*)comment; + } + + snprintf(str,fmt_len_plus_extra,fmt,comment, + length > 0 ? " ":"", + tab->getReplicaCount()); + return str; +} + + +// Utility thread main loop +extern "C" pthread_handler_decl(ndb_util_thread_func, + arg __attribute__((unused))) +{ + THD *thd; /* needs to be first for thread_stack */ + int error= 0; + struct timespec abstime; + + my_thread_init(); + DBUG_ENTER("ndb_util_thread"); + DBUG_PRINT("enter", ("ndb_cache_check_time: %d", ndb_cache_check_time)); + + thd= new THD; /* note that contructor of THD uses DBUG_ */ + THD_CHECK_SENTRY(thd); + + pthread_detach_this_thread(); + ndb_util_thread= pthread_self(); + + thd->thread_stack= (char*)&thd; /* remember where our stack is */ + if (thd->store_globals()) + { + thd->cleanup(); + delete thd; + DBUG_RETURN(NULL); + } + + List util_open_tables; + set_timespec(abstime, ndb_cache_check_time); + for (;;) + { + + pthread_mutex_lock(&LOCK_ndb_util_thread); + error= pthread_cond_timedwait(&COND_ndb_util_thread, + &LOCK_ndb_util_thread, + &abstime); + pthread_mutex_unlock(&LOCK_ndb_util_thread); + + DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d", + ndb_cache_check_time)); + + if (abort_loop) + break; /* Shutting down server */ + + if (ndb_cache_check_time == 0) + { + set_timespec(abstime, 10); + continue; + } + + /* Set new time to wake up */ + set_timespec(abstime, ndb_cache_check_time); + + /* Lock mutex and fill list with pointers to all open tables */ + NDB_SHARE *share; + pthread_mutex_lock(&ndbcluster_mutex); + for (uint i= 0; i < ndbcluster_open_tables.records; i++) + { + share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i); + share->use_count++; /* Make sure the table can't be closed */ + + DBUG_PRINT("ndb_util_thread", + ("Found open table[%d]: %s, use_count: %d", + i, share->table_name, share->use_count)); + + /* Store pointer to table */ + util_open_tables.push_back(share); + } + pthread_mutex_unlock(&ndbcluster_mutex); + + /* Iterate through the open files list */ + List_iterator_fast it(util_open_tables); + while (share= it++) + { + /* Split tab- and dbname */ + char buf[FN_REFLEN]; + char *tabname, *db; + uint length= dirname_length(share->table_name); + tabname= share->table_name+length; + memcpy(buf, share->table_name, length-1); + buf[length-1]= 0; + db= buf+dirname_length(buf); + DBUG_PRINT("ndb_util_thread", + ("Fetching commit count for: %s, db: %s, tab: %s", + share->table_name, db, tabname)); + + /* Contact NDB to get commit count for table */ + g_ndb->setDatabaseName(db); + struct Ndb_statistics stat;; + if(ndb_get_table_statistics(g_ndb, tabname, &stat) == 0) + { + DBUG_PRINT("ndb_util_thread", + ("Table: %s, rows: %llu, commit_count: %llu", + share->table_name, stat.row_count, stat.commit_count)); + share->commit_count= stat.commit_count; + } + else + { + DBUG_PRINT("ndb_util_thread", + ("Error: Could not get commit count for table %s", + share->table_name)); + share->commit_count++; /* Invalidate */ + } + /* Decrease the use count and possibly free share */ + free_share(share); + } + + /* Clear the list of open tables */ + util_open_tables.empty(); + + } + + thd->cleanup(); + delete thd; + DBUG_PRINT("exit", ("ndb_util_thread")); + my_thread_end(); + pthread_exit(0); + DBUG_RETURN(NULL); +} + /* Condition pushdown */ diff --git a/sql/handler.h b/sql/handler.h index 8ad49456bf6..e9bc6652443 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -268,6 +268,9 @@ typedef struct st_table TABLE; struct st_foreign_key_info; typedef struct st_foreign_key_info FOREIGN_KEY_INFO; +/* Forward declaration for Condition Pushdown to Handler (CPDH) */ +typedef struct Item COND; + typedef struct st_ha_check_opt { ulong sort_buffer_size; @@ -601,7 +604,6 @@ public: *engine_callback= 0; return 1; } - /* RETURN true Primary key (if there is one) is clustered key covering all fields @@ -613,6 +615,12 @@ public: { return memcmp(ref1, ref2, ref_length); } + + /* + Condition pushdown to storage engines + */ + virtual const COND *cond_push(const COND *cond) { return cond; }; + virtual void cond_pop() { return; }; }; /* Some extern variables used with handlers */ diff --git a/sql/sql_class.h b/sql/sql_class.h index 502ce2e52a2..41af67f91b3 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -408,6 +408,7 @@ struct system_variables ulong table_type; ulong tmp_table_size; ulong tx_isolation; + ulong completion_type; /* Determines which non-standard SQL behaviour should be enabled */ ulong sql_mode; /* check of key presence in updatable view */ @@ -431,6 +432,11 @@ struct system_variables my_bool new_mode; my_bool query_cache_wlock_invalidate; my_bool engine_condition_pushdown; +#ifdef HAVE_REPLICATION + ulong sync_replication; + ulong sync_replication_slave_id; + ulong sync_replication_timeout; +#endif /* HAVE_REPLICATION */ #ifdef HAVE_INNOBASE_DB my_bool innodb_table_locks; #endif /* HAVE_INNOBASE_DB */ @@ -1022,10 +1028,13 @@ public: bool charset_is_system_charset, charset_is_collation_connection; bool slow_command; bool no_trans_update, abort_on_warning; + bool got_warning; /* Set on call to push_warning() */ longlong row_count_func; /* For the ROW_COUNT() function */ sp_rcontext *spcont; // SP runtime context sp_cache *sp_proc_cache; sp_cache *sp_func_cache; + bool shortcut_make_view; /* Don't do full mysql_make_view() + during pre-opening of tables. */ /* If we do a purge of binary logs, log index info of the threads @@ -1510,9 +1519,11 @@ public: select_max_min_finder_subselect(Item_subselect *item, bool mx) :select_subselect(item), cache(0), fmax(mx) {} + void cleanup(); bool send_data(List &items); bool cmp_real(); bool cmp_int(); + bool cmp_decimal(); bool cmp_str(); }; @@ -1586,9 +1597,10 @@ class user_var_entry ulong length, update_query_id, used_query_id; Item_result type; - double val(my_bool *null_value); + double val_real(my_bool *null_value); longlong val_int(my_bool *null_value); String *val_str(my_bool *null_value, String *str, uint decimals); + my_decimal *val_decimal(my_bool *null_value, my_decimal *result); DTCollation collation; }; @@ -1617,9 +1629,11 @@ public: ~Unique(); inline bool unique_add(void *ptr) { + DBUG_ENTER("unique_add"); + DBUG_PRINT("info", ("tree %u - %u", tree.elements_in_tree, max_elements)); if (tree.elements_in_tree > max_elements && flush()) - return 1; - return !tree_insert(&tree, ptr, 0, tree.custom_arg); + DBUG_RETURN(1); + DBUG_RETURN(!tree_insert(&tree, ptr, 0, tree.custom_arg)); } bool get(TABLE *table);