mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
Merge
This commit is contained in:
@@ -87,6 +87,12 @@ static int unpackfrm(const void **data, uint *len,
|
|||||||
static int ndb_get_table_statistics(Ndb*, const char *,
|
static int ndb_get_table_statistics(Ndb*, const char *,
|
||||||
struct Ndb_statistics *);
|
struct Ndb_statistics *);
|
||||||
|
|
||||||
|
// Util thread variables
|
||||||
|
static pthread_t ndb_util_thread;
|
||||||
|
pthread_mutex_t LOCK_ndb_util_thread;
|
||||||
|
pthread_cond_t COND_ndb_util_thread;
|
||||||
|
extern "C" pthread_handler_decl(ndb_util_thread_func, arg);
|
||||||
|
ulong ndb_cache_check_time;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Dummy buffer to read zero pack_length fields
|
Dummy buffer to read zero pack_length fields
|
||||||
@@ -2354,22 +2360,21 @@ void ha_ndbcluster::print_results()
|
|||||||
|
|
||||||
if (!_db_on_)
|
if (!_db_on_)
|
||||||
DBUG_VOID_RETURN;
|
DBUG_VOID_RETURN;
|
||||||
|
|
||||||
char buf_type[MAX_FIELD_WIDTH], buf_val[MAX_FIELD_WIDTH];
|
char buf_type[MAX_FIELD_WIDTH], buf_val[MAX_FIELD_WIDTH];
|
||||||
String type(buf_type, sizeof(buf_type), &my_charset_bin);
|
String type(buf_type, sizeof(buf_type), &my_charset_bin);
|
||||||
String val(buf_val, sizeof(buf_val), &my_charset_bin);
|
String val(buf_val, sizeof(buf_val), &my_charset_bin);
|
||||||
for (uint f=0; f<table->s->fields;f++)
|
for (uint f=0; f<table->s->fields;f++)
|
||||||
{
|
{
|
||||||
// Use DBUG_PRINT since DBUG_FILE cannot be filtered out
|
/* Use DBUG_PRINT since DBUG_FILE cannot be filtered out */
|
||||||
char buf[2000];
|
char buf[2000];
|
||||||
Field *field;
|
Field *field;
|
||||||
void* ptr;
|
void* ptr;
|
||||||
const NDBCOL *col= NULL;
|
|
||||||
NdbValue value;
|
NdbValue value;
|
||||||
NdbBlob *ndb_blob;
|
NdbBlob *ndb_blob;
|
||||||
|
|
||||||
buf[0]= 0;
|
buf[0]= 0;
|
||||||
field= table->field[f];
|
field= table->field[f];
|
||||||
if (!(value= m_value[f]).ptr)
|
if (!(value= m_value[f]).ptr)
|
||||||
{
|
{
|
||||||
my_snprintf(buf, sizeof(buf), "not read");
|
my_snprintf(buf, sizeof(buf), "not read");
|
||||||
@@ -2377,8 +2382,6 @@ void ha_ndbcluster::print_results()
|
|||||||
}
|
}
|
||||||
|
|
||||||
ptr= field->ptr;
|
ptr= field->ptr;
|
||||||
DBUG_DUMP("field->ptr", (char*)ptr, field->pack_length());
|
|
||||||
col= tab->getColumn(f);
|
|
||||||
|
|
||||||
if (! (field->flags & BLOB_FLAG))
|
if (! (field->flags & BLOB_FLAG))
|
||||||
{
|
{
|
||||||
@@ -2404,9 +2407,9 @@ void ha_ndbcluster::print_results()
|
|||||||
goto print_value;
|
goto print_value;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
print_value:
|
print_value:
|
||||||
DBUG_PRINT("value", ("%u,%s: %s", f, col->getName(), buf));
|
DBUG_PRINT("value", ("%u,%s: %s", f, field->field_name, buf));
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
DBUG_VOID_RETURN;
|
DBUG_VOID_RETURN;
|
||||||
@@ -3051,8 +3054,12 @@ int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
const char **ha_ndbcluster::bas_ext() const
|
static const char *ha_ndb_bas_ext[]= { ha_ndb_ext, NullS };
|
||||||
{ static const char *ext[]= { ha_ndb_ext, NullS }; return ext; }
|
const char**
|
||||||
|
ha_ndbcluster::bas_ext() const
|
||||||
|
{
|
||||||
|
return ha_ndb_bas_ext;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -3220,7 +3227,6 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
|
|||||||
m_transaction_on= FALSE;
|
m_transaction_on= FALSE;
|
||||||
else
|
else
|
||||||
m_transaction_on= thd->variables.ndb_use_transactions;
|
m_transaction_on= thd->variables.ndb_use_transactions;
|
||||||
// m_use_local_query_cache= thd->variables.ndb_use_local_query_cache;
|
|
||||||
|
|
||||||
m_active_trans= thd->transaction.all.ndb_tid ?
|
m_active_trans= thd->transaction.all.ndb_tid ?
|
||||||
(NdbTransaction*)thd->transaction.all.ndb_tid:
|
(NdbTransaction*)thd->transaction.all.ndb_tid:
|
||||||
@@ -3647,6 +3653,52 @@ static int create_ndb_column(NDBCOL &col,
|
|||||||
Create a table in NDB Cluster
|
Create a table in NDB Cluster
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
static void ndb_set_fragmentation(NDBTAB &tab, TABLE *form, uint pk_length)
|
||||||
|
{
|
||||||
|
if (form->s->max_rows == 0) /* default setting, don't set fragmentation */
|
||||||
|
return;
|
||||||
|
/**
|
||||||
|
* get the number of fragments right
|
||||||
|
*/
|
||||||
|
uint no_fragments;
|
||||||
|
{
|
||||||
|
#if MYSQL_VERSION_ID >= 50000
|
||||||
|
uint acc_row_size= 25 + /*safety margin*/ 2;
|
||||||
|
#else
|
||||||
|
uint acc_row_size= pk_length*4;
|
||||||
|
/* add acc overhead */
|
||||||
|
if (pk_length <= 8) /* main page will set the limit */
|
||||||
|
acc_row_size+= 25 + /*safety margin*/ 2;
|
||||||
|
else /* overflow page will set the limit */
|
||||||
|
acc_row_size+= 4 + /*safety margin*/ 4;
|
||||||
|
#endif
|
||||||
|
ulonglong acc_fragment_size= 512*1024*1024;
|
||||||
|
ulonglong max_rows= form->s->max_rows;
|
||||||
|
#if MYSQL_VERSION_ID >= 50100
|
||||||
|
no_fragments= (max_rows*acc_row_size)/acc_fragment_size+1;
|
||||||
|
#else
|
||||||
|
no_fragments= ((max_rows*acc_row_size)/acc_fragment_size+1
|
||||||
|
+1/*correct rounding*/)/2;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
{
|
||||||
|
uint no_nodes= g_ndb_cluster_connection->no_db_nodes();
|
||||||
|
NDBTAB::FragmentType ftype;
|
||||||
|
if (no_fragments > 2*no_nodes)
|
||||||
|
{
|
||||||
|
ftype= NDBTAB::FragAllLarge;
|
||||||
|
if (no_fragments > 4*no_nodes)
|
||||||
|
push_warning(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
|
||||||
|
"Ndb might have problems storing the max amount of rows specified");
|
||||||
|
}
|
||||||
|
else if (no_fragments > no_nodes)
|
||||||
|
ftype= NDBTAB::FragAllMedium;
|
||||||
|
else
|
||||||
|
ftype= NDBTAB::FragAllSmall;
|
||||||
|
tab.setFragmentType(ftype);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int ha_ndbcluster::create(const char *name,
|
int ha_ndbcluster::create(const char *name,
|
||||||
TABLE *form,
|
TABLE *form,
|
||||||
HA_CREATE_INFO *info)
|
HA_CREATE_INFO *info)
|
||||||
@@ -3748,7 +3800,9 @@ int ha_ndbcluster::create(const char *name,
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ndb_set_fragmentation(tab, form, pk_length);
|
||||||
|
|
||||||
if ((my_errno= check_ndb_connection()))
|
if ((my_errno= check_ndb_connection()))
|
||||||
DBUG_RETURN(my_errno);
|
DBUG_RETURN(my_errno);
|
||||||
|
|
||||||
@@ -4014,7 +4068,6 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
|
|||||||
m_force_send(TRUE),
|
m_force_send(TRUE),
|
||||||
m_autoincrement_prefetch(32),
|
m_autoincrement_prefetch(32),
|
||||||
m_transaction_on(TRUE),
|
m_transaction_on(TRUE),
|
||||||
m_use_local_query_cache(FALSE),
|
|
||||||
m_cond_stack(NULL),
|
m_cond_stack(NULL),
|
||||||
m_multi_cursor(NULL)
|
m_multi_cursor(NULL)
|
||||||
{
|
{
|
||||||
@@ -4070,6 +4123,7 @@ ha_ndbcluster::~ha_ndbcluster()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Open a table for further use
|
Open a table for further use
|
||||||
- fetch metadata for this table from NDB
|
- fetch metadata for this table from NDB
|
||||||
@@ -4170,16 +4224,14 @@ void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)
|
|||||||
|
|
||||||
Ndb* check_ndb_in_thd(THD* thd)
|
Ndb* check_ndb_in_thd(THD* thd)
|
||||||
{
|
{
|
||||||
DBUG_ENTER("check_ndb_in_thd");
|
Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
|
||||||
Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
|
|
||||||
|
|
||||||
if (!thd_ndb)
|
if (!thd_ndb)
|
||||||
{
|
{
|
||||||
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
|
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
|
||||||
DBUG_RETURN(NULL);
|
return NULL;
|
||||||
thd->transaction.thd_ndb= thd_ndb;
|
thd->transaction.thd_ndb= thd_ndb;
|
||||||
}
|
}
|
||||||
DBUG_RETURN(thd_ndb->ndb);
|
return thd_ndb->ndb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -4532,13 +4584,21 @@ bool ndbcluster_init()
|
|||||||
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
|
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
|
||||||
(hash_get_key) ndbcluster_get_key,0,0);
|
(hash_get_key) ndbcluster_get_key,0,0);
|
||||||
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
|
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
|
||||||
|
pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
|
||||||
|
pthread_cond_init(&COND_ndb_util_thread, NULL);
|
||||||
|
|
||||||
|
|
||||||
|
// Create utility thread
|
||||||
|
pthread_t tmp;
|
||||||
|
if (pthread_create(&tmp, &connection_attrib, ndb_util_thread_func, 0))
|
||||||
|
{
|
||||||
|
DBUG_PRINT("error", ("Could not create ndb utility thread"));
|
||||||
|
goto ndbcluster_init_error;
|
||||||
|
}
|
||||||
|
|
||||||
ndbcluster_inited= 1;
|
ndbcluster_inited= 1;
|
||||||
#ifdef USE_DISCOVER_ON_STARTUP
|
|
||||||
if (ndb_discover_tables() != 0)
|
|
||||||
goto ndbcluster_init_error;
|
|
||||||
#endif
|
|
||||||
DBUG_RETURN(FALSE);
|
DBUG_RETURN(FALSE);
|
||||||
|
|
||||||
ndbcluster_init_error:
|
ndbcluster_init_error:
|
||||||
ndbcluster_end();
|
ndbcluster_end();
|
||||||
DBUG_RETURN(TRUE);
|
DBUG_RETURN(TRUE);
|
||||||
@@ -4548,12 +4608,19 @@ bool ndbcluster_init()
|
|||||||
/*
|
/*
|
||||||
End use of the NDB Cluster table handler
|
End use of the NDB Cluster table handler
|
||||||
- free all global variables allocated by
|
- free all global variables allocated by
|
||||||
ndcluster_init()
|
ndbcluster_init()
|
||||||
*/
|
*/
|
||||||
|
|
||||||
bool ndbcluster_end()
|
bool ndbcluster_end()
|
||||||
{
|
{
|
||||||
DBUG_ENTER("ndbcluster_end");
|
DBUG_ENTER("ndbcluster_end");
|
||||||
|
|
||||||
|
// Kill ndb utility thread
|
||||||
|
(void) pthread_mutex_lock(&LOCK_ndb_util_thread);
|
||||||
|
DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread));
|
||||||
|
(void) pthread_cond_signal(&COND_ndb_util_thread);
|
||||||
|
(void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
|
||||||
|
|
||||||
if(g_ndb)
|
if(g_ndb)
|
||||||
delete g_ndb;
|
delete g_ndb;
|
||||||
g_ndb= NULL;
|
g_ndb= NULL;
|
||||||
@@ -4564,6 +4631,8 @@ bool ndbcluster_end()
|
|||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
hash_free(&ndbcluster_open_tables);
|
hash_free(&ndbcluster_open_tables);
|
||||||
pthread_mutex_destroy(&ndbcluster_mutex);
|
pthread_mutex_destroy(&ndbcluster_mutex);
|
||||||
|
pthread_mutex_destroy(&LOCK_ndb_util_thread);
|
||||||
|
pthread_cond_destroy(&COND_ndb_util_thread);
|
||||||
ndbcluster_inited= 0;
|
ndbcluster_inited= 0;
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
}
|
}
|
||||||
@@ -4756,16 +4825,174 @@ const char* ha_ndbcluster::index_type(uint key_number)
|
|||||||
return "HASH";
|
return "HASH";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uint8 ha_ndbcluster::table_cache_type()
|
uint8 ha_ndbcluster::table_cache_type()
|
||||||
{
|
{
|
||||||
if (m_use_local_query_cache)
|
DBUG_ENTER("ha_ndbcluster::table_cache_type=HA_CACHE_TBL_ASKTRANSACT");
|
||||||
return HA_CACHE_TBL_TRANSACT;
|
DBUG_RETURN(HA_CACHE_TBL_ASKTRANSACT);
|
||||||
else
|
|
||||||
return HA_CACHE_TBL_NOCACHE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
|
||||||
|
Uint64 *commit_count)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("ndb_get_commitcount");
|
||||||
|
|
||||||
|
if (ndb_cache_check_time > 0)
|
||||||
|
{
|
||||||
|
/* Use cached commit_count from share */
|
||||||
|
char name[FN_REFLEN];
|
||||||
|
NDB_SHARE *share;
|
||||||
|
(void)strxnmov(name, FN_REFLEN,
|
||||||
|
"./",dbname,"/",tabname,NullS);
|
||||||
|
DBUG_PRINT("info", ("name: %s", name));
|
||||||
|
pthread_mutex_lock(&ndbcluster_mutex);
|
||||||
|
if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
|
||||||
|
(byte*) name,
|
||||||
|
strlen(name))))
|
||||||
|
{
|
||||||
|
pthread_mutex_unlock(&ndbcluster_mutex);
|
||||||
|
DBUG_RETURN(1);
|
||||||
|
}
|
||||||
|
*commit_count= share->commit_count;
|
||||||
|
DBUG_PRINT("info", ("commit_count: %d", *commit_count));
|
||||||
|
pthread_mutex_unlock(&ndbcluster_mutex);
|
||||||
|
DBUG_RETURN(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Get commit_count from NDB */
|
||||||
|
Ndb *ndb;
|
||||||
|
if (!(ndb= check_ndb_in_thd(thd)))
|
||||||
|
DBUG_RETURN(1);
|
||||||
|
ndb->setDatabaseName(dbname);
|
||||||
|
|
||||||
|
struct Ndb_statistics stat;
|
||||||
|
if (ndb_get_table_statistics(ndb, tabname, &stat))
|
||||||
|
DBUG_RETURN(1);
|
||||||
|
*commit_count= stat.commit_count;
|
||||||
|
DBUG_RETURN(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Handling the shared NDB_SHARE structure that is needed to
|
Check if a cached query can be used.
|
||||||
|
This is done by comparing the supplied engine_data to commit_count of
|
||||||
|
the table.
|
||||||
|
The commit_count is either retrieved from the share for the table, where
|
||||||
|
it has been cached by the util thread. If the util thread is not started,
|
||||||
|
NDB has to be contacetd to retrieve the commit_count, this will introduce
|
||||||
|
a small delay while waiting for NDB to answer.
|
||||||
|
|
||||||
|
|
||||||
|
SYNOPSIS
|
||||||
|
ndbcluster_cache_retrieval_allowed
|
||||||
|
thd thread handle
|
||||||
|
full_name concatenation of database name,
|
||||||
|
the null character '\0', and the table
|
||||||
|
name
|
||||||
|
full_name_len length of the full name,
|
||||||
|
i.e. len(dbname) + len(tablename) + 1
|
||||||
|
|
||||||
|
engine_data parameter retrieved when query was first inserted into
|
||||||
|
the cache. If the value of engine_data is changed,
|
||||||
|
all queries for this table should be invalidated.
|
||||||
|
|
||||||
|
RETURN VALUE
|
||||||
|
TRUE Yes, use the query from cache
|
||||||
|
FALSE No, don't use the cached query, and if engine_data
|
||||||
|
has changed, all queries for this table should be invalidated
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
static my_bool
|
||||||
|
ndbcluster_cache_retrieval_allowed(THD *thd,
|
||||||
|
char *full_name, uint full_name_len,
|
||||||
|
ulonglong *engine_data)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("ndbcluster_cache_retrieval_allowed");
|
||||||
|
|
||||||
|
Uint64 commit_count;
|
||||||
|
bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
|
||||||
|
char *dbname= full_name;
|
||||||
|
char *tabname= dbname+strlen(dbname)+1;
|
||||||
|
|
||||||
|
DBUG_PRINT("enter",("dbname=%s, tabname=%s, autocommit=%d",
|
||||||
|
dbname, tabname, is_autocommit));
|
||||||
|
|
||||||
|
if (!is_autocommit)
|
||||||
|
DBUG_RETURN(FALSE);
|
||||||
|
|
||||||
|
if (ndb_get_commitcount(thd, dbname, tabname, &commit_count))
|
||||||
|
{
|
||||||
|
*engine_data+= 1; /* invalidate */
|
||||||
|
DBUG_RETURN(FALSE);
|
||||||
|
}
|
||||||
|
DBUG_PRINT("info", ("*engine_data=%llu, commit_count=%llu",
|
||||||
|
*engine_data, commit_count));
|
||||||
|
if (*engine_data != commit_count)
|
||||||
|
{
|
||||||
|
*engine_data= commit_count; /* invalidate */
|
||||||
|
DBUG_PRINT("exit",("Do not use cache, commit_count has changed"));
|
||||||
|
DBUG_RETURN(FALSE);
|
||||||
|
}
|
||||||
|
|
||||||
|
DBUG_PRINT("exit",("OK to use cache, *engine_data=%llu",*engine_data));
|
||||||
|
DBUG_RETURN(TRUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
Register a table for use in the query cache. Fetch the commit_count
|
||||||
|
for the table and return it in engine_data, this will later be used
|
||||||
|
to check if the table has changed, before the cached query is reused.
|
||||||
|
|
||||||
|
SYNOPSIS
|
||||||
|
ha_ndbcluster::can_query_cache_table
|
||||||
|
thd thread handle
|
||||||
|
full_name concatenation of database name,
|
||||||
|
the null character '\0', and the table
|
||||||
|
name
|
||||||
|
full_name_len length of the full name,
|
||||||
|
i.e. len(dbname) + len(tablename) + 1
|
||||||
|
qc_engine_callback function to be called before using cache on this table
|
||||||
|
engine_data out, commit_count for this table
|
||||||
|
|
||||||
|
RETURN VALUE
|
||||||
|
TRUE Yes, it's ok to cahce this query
|
||||||
|
FALSE No, don't cach the query
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
my_bool
|
||||||
|
ha_ndbcluster::register_query_cache_table(THD *thd,
|
||||||
|
char *full_name, uint full_name_len,
|
||||||
|
qc_engine_callback *engine_callback,
|
||||||
|
ulonglong *engine_data)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("ha_ndbcluster::register_query_cache_table");
|
||||||
|
|
||||||
|
bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
|
||||||
|
DBUG_PRINT("enter",("dbname=%s, tabname=%s, is_autocommit=%d",
|
||||||
|
m_dbname,m_tabname,is_autocommit));
|
||||||
|
if (!is_autocommit)
|
||||||
|
DBUG_RETURN(FALSE);
|
||||||
|
|
||||||
|
Uint64 commit_count;
|
||||||
|
if (ndb_get_commitcount(thd, m_dbname, m_tabname, &commit_count))
|
||||||
|
{
|
||||||
|
*engine_data= 0;
|
||||||
|
DBUG_PRINT("error", ("Could not get commitcount"))
|
||||||
|
DBUG_RETURN(FALSE);
|
||||||
|
}
|
||||||
|
*engine_data= commit_count;
|
||||||
|
*engine_callback= ndbcluster_cache_retrieval_allowed;
|
||||||
|
DBUG_PRINT("exit",("*engine_data=%llu", *engine_data));
|
||||||
|
DBUG_RETURN(TRUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Handling the shared NDB_SHARE structure that is needed to
|
||||||
provide table locking.
|
provide table locking.
|
||||||
It's also used for sharing data with other NDB handlers
|
It's also used for sharing data with other NDB handlers
|
||||||
in the same MySQL Server. There is currently not much
|
in the same MySQL Server. There is currently not much
|
||||||
@@ -4802,8 +5029,14 @@ static NDB_SHARE* get_share(const char *table_name)
|
|||||||
}
|
}
|
||||||
thr_lock_init(&share->lock);
|
thr_lock_init(&share->lock);
|
||||||
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
|
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
|
||||||
|
share->commit_count= 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
DBUG_PRINT("share",
|
||||||
|
("table_name: %s, length: %d, use_count: %d, commit_count: %d",
|
||||||
|
share->table_name, share->table_name_length, share->use_count,
|
||||||
|
share->commit_count));
|
||||||
|
|
||||||
share->use_count++;
|
share->use_count++;
|
||||||
pthread_mutex_unlock(&ndbcluster_mutex);
|
pthread_mutex_unlock(&ndbcluster_mutex);
|
||||||
return share;
|
return share;
|
||||||
@@ -5372,6 +5605,165 @@ ha_ndbcluster::setup_recattr(const NdbRecAttr* curr)
|
|||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char*
|
||||||
|
ha_ndbcluster::update_table_comment(
|
||||||
|
/* out: table comment + additional */
|
||||||
|
const char* comment)/* in: table comment defined by user */
|
||||||
|
{
|
||||||
|
uint length= strlen(comment);
|
||||||
|
if(length > 64000 - 3)
|
||||||
|
{
|
||||||
|
return((char*)comment); /* string too long */
|
||||||
|
}
|
||||||
|
|
||||||
|
Ndb* ndb;
|
||||||
|
if (!(ndb= get_ndb()))
|
||||||
|
{
|
||||||
|
return((char*)comment);
|
||||||
|
}
|
||||||
|
|
||||||
|
ndb->setDatabaseName(m_dbname);
|
||||||
|
NDBDICT* dict= ndb->getDictionary();
|
||||||
|
const NDBTAB* tab;
|
||||||
|
if (!(tab= dict->getTable(m_tabname)))
|
||||||
|
{
|
||||||
|
return((char*)comment);
|
||||||
|
}
|
||||||
|
|
||||||
|
char *str;
|
||||||
|
const char *fmt="%s%snumber_of_replicas: %d";
|
||||||
|
const unsigned fmt_len_plus_extra= length + strlen(fmt);
|
||||||
|
if ((str= my_malloc(fmt_len_plus_extra, MYF(0))) == NULL)
|
||||||
|
{
|
||||||
|
return (char*)comment;
|
||||||
|
}
|
||||||
|
|
||||||
|
snprintf(str,fmt_len_plus_extra,fmt,comment,
|
||||||
|
length > 0 ? " ":"",
|
||||||
|
tab->getReplicaCount());
|
||||||
|
return str;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Utility thread main loop
|
||||||
|
extern "C" pthread_handler_decl(ndb_util_thread_func,
|
||||||
|
arg __attribute__((unused)))
|
||||||
|
{
|
||||||
|
THD *thd; /* needs to be first for thread_stack */
|
||||||
|
int error= 0;
|
||||||
|
struct timespec abstime;
|
||||||
|
|
||||||
|
my_thread_init();
|
||||||
|
DBUG_ENTER("ndb_util_thread");
|
||||||
|
DBUG_PRINT("enter", ("ndb_cache_check_time: %d", ndb_cache_check_time));
|
||||||
|
|
||||||
|
thd= new THD; /* note that contructor of THD uses DBUG_ */
|
||||||
|
THD_CHECK_SENTRY(thd);
|
||||||
|
|
||||||
|
pthread_detach_this_thread();
|
||||||
|
ndb_util_thread= pthread_self();
|
||||||
|
|
||||||
|
thd->thread_stack= (char*)&thd; /* remember where our stack is */
|
||||||
|
if (thd->store_globals())
|
||||||
|
{
|
||||||
|
thd->cleanup();
|
||||||
|
delete thd;
|
||||||
|
DBUG_RETURN(NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<NDB_SHARE> util_open_tables;
|
||||||
|
set_timespec(abstime, ndb_cache_check_time);
|
||||||
|
for (;;)
|
||||||
|
{
|
||||||
|
|
||||||
|
pthread_mutex_lock(&LOCK_ndb_util_thread);
|
||||||
|
error= pthread_cond_timedwait(&COND_ndb_util_thread,
|
||||||
|
&LOCK_ndb_util_thread,
|
||||||
|
&abstime);
|
||||||
|
pthread_mutex_unlock(&LOCK_ndb_util_thread);
|
||||||
|
|
||||||
|
DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d",
|
||||||
|
ndb_cache_check_time));
|
||||||
|
|
||||||
|
if (abort_loop)
|
||||||
|
break; /* Shutting down server */
|
||||||
|
|
||||||
|
if (ndb_cache_check_time == 0)
|
||||||
|
{
|
||||||
|
set_timespec(abstime, 10);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Set new time to wake up */
|
||||||
|
set_timespec(abstime, ndb_cache_check_time);
|
||||||
|
|
||||||
|
/* Lock mutex and fill list with pointers to all open tables */
|
||||||
|
NDB_SHARE *share;
|
||||||
|
pthread_mutex_lock(&ndbcluster_mutex);
|
||||||
|
for (uint i= 0; i < ndbcluster_open_tables.records; i++)
|
||||||
|
{
|
||||||
|
share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
|
||||||
|
share->use_count++; /* Make sure the table can't be closed */
|
||||||
|
|
||||||
|
DBUG_PRINT("ndb_util_thread",
|
||||||
|
("Found open table[%d]: %s, use_count: %d",
|
||||||
|
i, share->table_name, share->use_count));
|
||||||
|
|
||||||
|
/* Store pointer to table */
|
||||||
|
util_open_tables.push_back(share);
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&ndbcluster_mutex);
|
||||||
|
|
||||||
|
/* Iterate through the open files list */
|
||||||
|
List_iterator_fast<NDB_SHARE> it(util_open_tables);
|
||||||
|
while (share= it++)
|
||||||
|
{
|
||||||
|
/* Split tab- and dbname */
|
||||||
|
char buf[FN_REFLEN];
|
||||||
|
char *tabname, *db;
|
||||||
|
uint length= dirname_length(share->table_name);
|
||||||
|
tabname= share->table_name+length;
|
||||||
|
memcpy(buf, share->table_name, length-1);
|
||||||
|
buf[length-1]= 0;
|
||||||
|
db= buf+dirname_length(buf);
|
||||||
|
DBUG_PRINT("ndb_util_thread",
|
||||||
|
("Fetching commit count for: %s, db: %s, tab: %s",
|
||||||
|
share->table_name, db, tabname));
|
||||||
|
|
||||||
|
/* Contact NDB to get commit count for table */
|
||||||
|
g_ndb->setDatabaseName(db);
|
||||||
|
struct Ndb_statistics stat;;
|
||||||
|
if(ndb_get_table_statistics(g_ndb, tabname, &stat) == 0)
|
||||||
|
{
|
||||||
|
DBUG_PRINT("ndb_util_thread",
|
||||||
|
("Table: %s, rows: %llu, commit_count: %llu",
|
||||||
|
share->table_name, stat.row_count, stat.commit_count));
|
||||||
|
share->commit_count= stat.commit_count;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
DBUG_PRINT("ndb_util_thread",
|
||||||
|
("Error: Could not get commit count for table %s",
|
||||||
|
share->table_name));
|
||||||
|
share->commit_count++; /* Invalidate */
|
||||||
|
}
|
||||||
|
/* Decrease the use count and possibly free share */
|
||||||
|
free_share(share);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Clear the list of open tables */
|
||||||
|
util_open_tables.empty();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
thd->cleanup();
|
||||||
|
delete thd;
|
||||||
|
DBUG_PRINT("exit", ("ndb_util_thread"));
|
||||||
|
my_thread_end();
|
||||||
|
pthread_exit(0);
|
||||||
|
DBUG_RETURN(NULL);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Condition pushdown
|
Condition pushdown
|
||||||
*/
|
*/
|
||||||
|
@@ -268,6 +268,9 @@ typedef struct st_table TABLE;
|
|||||||
struct st_foreign_key_info;
|
struct st_foreign_key_info;
|
||||||
typedef struct st_foreign_key_info FOREIGN_KEY_INFO;
|
typedef struct st_foreign_key_info FOREIGN_KEY_INFO;
|
||||||
|
|
||||||
|
/* Forward declaration for Condition Pushdown to Handler (CPDH) */
|
||||||
|
typedef struct Item COND;
|
||||||
|
|
||||||
typedef struct st_ha_check_opt
|
typedef struct st_ha_check_opt
|
||||||
{
|
{
|
||||||
ulong sort_buffer_size;
|
ulong sort_buffer_size;
|
||||||
@@ -601,7 +604,6 @@ public:
|
|||||||
*engine_callback= 0;
|
*engine_callback= 0;
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
RETURN
|
RETURN
|
||||||
true Primary key (if there is one) is clustered key covering all fields
|
true Primary key (if there is one) is clustered key covering all fields
|
||||||
@@ -613,6 +615,12 @@ public:
|
|||||||
{
|
{
|
||||||
return memcmp(ref1, ref2, ref_length);
|
return memcmp(ref1, ref2, ref_length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Condition pushdown to storage engines
|
||||||
|
*/
|
||||||
|
virtual const COND *cond_push(const COND *cond) { return cond; };
|
||||||
|
virtual void cond_pop() { return; };
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Some extern variables used with handlers */
|
/* Some extern variables used with handlers */
|
||||||
|
@@ -408,6 +408,7 @@ struct system_variables
|
|||||||
ulong table_type;
|
ulong table_type;
|
||||||
ulong tmp_table_size;
|
ulong tmp_table_size;
|
||||||
ulong tx_isolation;
|
ulong tx_isolation;
|
||||||
|
ulong completion_type;
|
||||||
/* Determines which non-standard SQL behaviour should be enabled */
|
/* Determines which non-standard SQL behaviour should be enabled */
|
||||||
ulong sql_mode;
|
ulong sql_mode;
|
||||||
/* check of key presence in updatable view */
|
/* check of key presence in updatable view */
|
||||||
@@ -431,6 +432,11 @@ struct system_variables
|
|||||||
my_bool new_mode;
|
my_bool new_mode;
|
||||||
my_bool query_cache_wlock_invalidate;
|
my_bool query_cache_wlock_invalidate;
|
||||||
my_bool engine_condition_pushdown;
|
my_bool engine_condition_pushdown;
|
||||||
|
#ifdef HAVE_REPLICATION
|
||||||
|
ulong sync_replication;
|
||||||
|
ulong sync_replication_slave_id;
|
||||||
|
ulong sync_replication_timeout;
|
||||||
|
#endif /* HAVE_REPLICATION */
|
||||||
#ifdef HAVE_INNOBASE_DB
|
#ifdef HAVE_INNOBASE_DB
|
||||||
my_bool innodb_table_locks;
|
my_bool innodb_table_locks;
|
||||||
#endif /* HAVE_INNOBASE_DB */
|
#endif /* HAVE_INNOBASE_DB */
|
||||||
@@ -1022,10 +1028,13 @@ public:
|
|||||||
bool charset_is_system_charset, charset_is_collation_connection;
|
bool charset_is_system_charset, charset_is_collation_connection;
|
||||||
bool slow_command;
|
bool slow_command;
|
||||||
bool no_trans_update, abort_on_warning;
|
bool no_trans_update, abort_on_warning;
|
||||||
|
bool got_warning; /* Set on call to push_warning() */
|
||||||
longlong row_count_func; /* For the ROW_COUNT() function */
|
longlong row_count_func; /* For the ROW_COUNT() function */
|
||||||
sp_rcontext *spcont; // SP runtime context
|
sp_rcontext *spcont; // SP runtime context
|
||||||
sp_cache *sp_proc_cache;
|
sp_cache *sp_proc_cache;
|
||||||
sp_cache *sp_func_cache;
|
sp_cache *sp_func_cache;
|
||||||
|
bool shortcut_make_view; /* Don't do full mysql_make_view()
|
||||||
|
during pre-opening of tables. */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
If we do a purge of binary logs, log index info of the threads
|
If we do a purge of binary logs, log index info of the threads
|
||||||
@@ -1510,9 +1519,11 @@ public:
|
|||||||
select_max_min_finder_subselect(Item_subselect *item, bool mx)
|
select_max_min_finder_subselect(Item_subselect *item, bool mx)
|
||||||
:select_subselect(item), cache(0), fmax(mx)
|
:select_subselect(item), cache(0), fmax(mx)
|
||||||
{}
|
{}
|
||||||
|
void cleanup();
|
||||||
bool send_data(List<Item> &items);
|
bool send_data(List<Item> &items);
|
||||||
bool cmp_real();
|
bool cmp_real();
|
||||||
bool cmp_int();
|
bool cmp_int();
|
||||||
|
bool cmp_decimal();
|
||||||
bool cmp_str();
|
bool cmp_str();
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -1586,9 +1597,10 @@ class user_var_entry
|
|||||||
ulong length, update_query_id, used_query_id;
|
ulong length, update_query_id, used_query_id;
|
||||||
Item_result type;
|
Item_result type;
|
||||||
|
|
||||||
double val(my_bool *null_value);
|
double val_real(my_bool *null_value);
|
||||||
longlong val_int(my_bool *null_value);
|
longlong val_int(my_bool *null_value);
|
||||||
String *val_str(my_bool *null_value, String *str, uint decimals);
|
String *val_str(my_bool *null_value, String *str, uint decimals);
|
||||||
|
my_decimal *val_decimal(my_bool *null_value, my_decimal *result);
|
||||||
DTCollation collation;
|
DTCollation collation;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -1617,9 +1629,11 @@ public:
|
|||||||
~Unique();
|
~Unique();
|
||||||
inline bool unique_add(void *ptr)
|
inline bool unique_add(void *ptr)
|
||||||
{
|
{
|
||||||
|
DBUG_ENTER("unique_add");
|
||||||
|
DBUG_PRINT("info", ("tree %u - %u", tree.elements_in_tree, max_elements));
|
||||||
if (tree.elements_in_tree > max_elements && flush())
|
if (tree.elements_in_tree > max_elements && flush())
|
||||||
return 1;
|
DBUG_RETURN(1);
|
||||||
return !tree_insert(&tree, ptr, 0, tree.custom_arg);
|
DBUG_RETURN(!tree_insert(&tree, ptr, 0, tree.custom_arg));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool get(TABLE *table);
|
bool get(TABLE *table);
|
||||||
|
Reference in New Issue
Block a user