mirror of
https://github.com/MariaDB/server.git
synced 2025-11-19 19:03:26 +03:00
Improvement of on-line discovery in injector thread
This commit is contained in:
@@ -519,6 +519,7 @@ void ha_ndbcluster::invalidate_dictionary_cache(bool global)
|
||||
{
|
||||
NDBINDEX *index = (NDBINDEX *) m_index[i].index;
|
||||
NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
|
||||
if (!index && !unique_index) continue;
|
||||
NDB_INDEX_TYPE idx_type= m_index[i].type;
|
||||
|
||||
switch (idx_type) {
|
||||
@@ -1076,7 +1077,7 @@ int ha_ndbcluster::get_metadata(const char *path)
|
||||
m_table= (void *)tab;
|
||||
m_table_info= NULL; // Set in external lock
|
||||
|
||||
DBUG_RETURN(open_indexes(ndb, table));
|
||||
DBUG_RETURN(open_indexes(ndb, table, FALSE));
|
||||
}
|
||||
|
||||
static int fix_unique_index_attr_order(NDB_INDEX_DATA &data,
|
||||
@@ -1249,7 +1250,7 @@ int ha_ndbcluster::add_index_handle(THD *thd, NDBDICT *dict, KEY *key_info,
|
||||
/*
|
||||
Associate index handles for each index of a table
|
||||
*/
|
||||
int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab)
|
||||
int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error)
|
||||
{
|
||||
uint i;
|
||||
int error= 0;
|
||||
@@ -1263,7 +1264,10 @@ int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab)
|
||||
for (i= 0; i < tab->s->keys; i++, key_info++, key_name++)
|
||||
{
|
||||
if ((error= add_index_handle(thd, dict, key_info, *key_name, i)))
|
||||
break;
|
||||
if (ignore_error)
|
||||
m_index[i].index= m_index[i].unique_index= NULL;
|
||||
else
|
||||
break;
|
||||
}
|
||||
|
||||
DBUG_RETURN(error);
|
||||
@@ -3699,7 +3703,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
|
||||
{
|
||||
m_table= (void *)tab;
|
||||
m_table_version = tab->getObjectVersion();
|
||||
if (!(my_errno= open_indexes(ndb, table)))
|
||||
if (!(my_errno= open_indexes(ndb, table, FALSE)))
|
||||
DBUG_RETURN(my_errno);
|
||||
}
|
||||
m_table_info= tab_info;
|
||||
|
||||
@@ -691,6 +691,9 @@ static void set_tabname(const char *pathname, char *tabname);
|
||||
|
||||
private:
|
||||
friend int ndbcluster_drop_database_impl(const char *path);
|
||||
friend int ndb_handle_schema_change(THD *thd,
|
||||
Ndb *ndb, NdbEventOperation *pOp,
|
||||
NDB_SHARE *share);
|
||||
int alter_table_name(const char *to);
|
||||
static int delete_table(ha_ndbcluster *h, Ndb *ndb,
|
||||
const char *path,
|
||||
@@ -708,7 +711,7 @@ private:
|
||||
int create_indexes(Ndb *ndb, TABLE *tab);
|
||||
void clear_index(int i);
|
||||
void clear_indexes();
|
||||
int open_indexes(Ndb *ndb, TABLE *tab);
|
||||
int open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error);
|
||||
void renumber_indexes(Ndb *ndb, TABLE *tab);
|
||||
int drop_indexes(Ndb *ndb, TABLE *tab);
|
||||
int add_index_handle(THD *thd, NdbDictionary::Dictionary *dict,
|
||||
|
||||
@@ -233,6 +233,72 @@ static void run_query(THD *thd, char *buf, char *end,
|
||||
}
|
||||
}
|
||||
|
||||
int
|
||||
ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
|
||||
TABLE_SHARE *table_share, TABLE *table)
|
||||
{
|
||||
int error;
|
||||
MEM_ROOT *mem_root= &share->mem_root;
|
||||
DBUG_ENTER("ndbcluster_binlog_open_table");
|
||||
|
||||
init_tmp_table_share(table_share, share->db, 0, share->table_name,
|
||||
share->key);
|
||||
if ((error= open_table_def(thd, table_share, 0)))
|
||||
{
|
||||
sql_print_error("Unable to get table share for %s, error=%d",
|
||||
share->key, error);
|
||||
DBUG_PRINT("error", ("open_table_def failed %d", error));
|
||||
my_free((gptr) table_share, MYF(0));
|
||||
table_share= 0;
|
||||
my_free((gptr) table, MYF(0));
|
||||
table= 0;
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
if ((error= open_table_from_share(thd, table_share, "", 0,
|
||||
(uint) READ_ALL, 0, table, FALSE)))
|
||||
{
|
||||
sql_print_error("Unable to open table for %s, error=%d(%d)",
|
||||
share->key, error, my_errno);
|
||||
DBUG_PRINT("error", ("open_table_from_share failed %d", error));
|
||||
my_free((gptr) table_share, MYF(0));
|
||||
table_share= 0;
|
||||
my_free((gptr) table, MYF(0));
|
||||
table= 0;
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
assign_new_table_id(table);
|
||||
if (!table->record[1] || table->record[1] == table->record[0])
|
||||
{
|
||||
table->record[1]= alloc_root(&table->mem_root,
|
||||
table->s->rec_buff_length);
|
||||
}
|
||||
table->in_use= injector_thd;
|
||||
|
||||
table->s->db.str= share->db;
|
||||
table->s->db.length= strlen(share->db);
|
||||
table->s->table_name.str= share->table_name;
|
||||
table->s->table_name.length= strlen(share->table_name);
|
||||
|
||||
share->table_share= table_share;
|
||||
share->table= table;
|
||||
#ifndef DBUG_OFF
|
||||
dbug_print_table("table", table);
|
||||
#endif
|
||||
/*
|
||||
! do not touch the contents of the table
|
||||
it may be in use by the injector thread
|
||||
*/
|
||||
share->ndb_value[0]= (NdbValue*)
|
||||
alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
|
||||
+ 1 /*extra for hidden key*/);
|
||||
share->ndb_value[1]= (NdbValue*)
|
||||
alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
|
||||
+1 /*extra for hidden key*/);
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Initialize the binlog part of the NDB_SHARE
|
||||
*/
|
||||
@@ -260,64 +326,12 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
|
||||
}
|
||||
while (1)
|
||||
{
|
||||
int error;
|
||||
TABLE_SHARE *table_share=
|
||||
(TABLE_SHARE *) my_malloc(sizeof(*table_share), MYF(MY_WME));
|
||||
TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
|
||||
int error;
|
||||
|
||||
init_tmp_table_share(table_share, share->db, 0, share->table_name,
|
||||
share->key);
|
||||
if ((error= open_table_def(thd, table_share, 0)))
|
||||
{
|
||||
sql_print_error("Unable to get table share for %s, error=%d",
|
||||
share->key, error);
|
||||
DBUG_PRINT("error", ("open_table_def failed %d", error));
|
||||
my_free((gptr) table_share, MYF(0));
|
||||
table_share= 0;
|
||||
my_free((gptr) table, MYF(0));
|
||||
table= 0;
|
||||
if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table)))
|
||||
break;
|
||||
}
|
||||
if ((error= open_table_from_share(thd, table_share, "", 0,
|
||||
(uint) READ_ALL, 0, table, FALSE)))
|
||||
{
|
||||
sql_print_error("Unable to open table for %s, error=%d(%d)",
|
||||
share->key, error, my_errno);
|
||||
DBUG_PRINT("error", ("open_table_from_share failed %d", error));
|
||||
my_free((gptr) table_share, MYF(0));
|
||||
table_share= 0;
|
||||
my_free((gptr) table, MYF(0));
|
||||
table= 0;
|
||||
break;
|
||||
}
|
||||
assign_new_table_id(table);
|
||||
if (!table->record[1] || table->record[1] == table->record[0])
|
||||
{
|
||||
table->record[1]= alloc_root(&table->mem_root,
|
||||
table->s->rec_buff_length);
|
||||
}
|
||||
table->in_use= injector_thd;
|
||||
|
||||
table->s->db.str= share->db;
|
||||
table->s->db.length= strlen(share->db);
|
||||
table->s->table_name.str= share->table_name;
|
||||
table->s->table_name.length= strlen(share->table_name);
|
||||
|
||||
share->table_share= table_share;
|
||||
share->table= table;
|
||||
#ifndef DBUG_OFF
|
||||
dbug_print_table("table", table);
|
||||
#endif
|
||||
/*
|
||||
! do not touch the contents of the table
|
||||
it may be in use by the injector thread
|
||||
*/
|
||||
share->ndb_value[0]= (NdbValue*)
|
||||
alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
|
||||
+ 1 /*extra for hidden key*/);
|
||||
share->ndb_value[1]= (NdbValue*)
|
||||
alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
|
||||
+1 /*extra for hidden key*/);
|
||||
{
|
||||
int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
|
||||
share->subscriber_bitmap= (MY_BITMAP*)
|
||||
@@ -651,10 +665,10 @@ static int ndbcluster_create_apply_status_table(THD *thd)
|
||||
if so, remove it since there is none in Ndb
|
||||
*/
|
||||
{
|
||||
strxnmov(buf, sizeof(buf),
|
||||
mysql_data_home,
|
||||
"/" NDB_REP_DB "/" NDB_APPLY_TABLE,
|
||||
reg_ext, NullS);
|
||||
build_table_filename(buf, sizeof(buf),
|
||||
NDB_REP_DB,
|
||||
NDB_APPLY_TABLE,
|
||||
reg_ext);
|
||||
unpack_filename(buf,buf);
|
||||
my_delete(buf, MYF(0));
|
||||
}
|
||||
@@ -703,10 +717,10 @@ static int ndbcluster_create_schema_table(THD *thd)
|
||||
if so, remove it since there is none in Ndb
|
||||
*/
|
||||
{
|
||||
strxnmov(buf, sizeof(buf),
|
||||
mysql_data_home,
|
||||
"/" NDB_REP_DB "/" NDB_SCHEMA_TABLE,
|
||||
reg_ext, NullS);
|
||||
build_table_filename(buf, sizeof(buf),
|
||||
NDB_REP_DB,
|
||||
NDB_SCHEMA_TABLE,
|
||||
reg_ext);
|
||||
unpack_filename(buf,buf);
|
||||
my_delete(buf, MYF(0));
|
||||
}
|
||||
@@ -1287,7 +1301,8 @@ end:
|
||||
/*
|
||||
Handle _non_ data events from the storage nodes
|
||||
*/
|
||||
static int
|
||||
//static int
|
||||
int
|
||||
ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
||||
NDB_SHARE *share)
|
||||
{
|
||||
@@ -1299,50 +1314,37 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
||||
pOp->tableFrmChanged());
|
||||
|
||||
if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE &&
|
||||
pOp->getReqNodeId() != g_ndb_cluster_connection->node_id())
|
||||
(uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id())
|
||||
{
|
||||
NDBDICT *dict= ndb->getDictionary();
|
||||
NdbDictionary::Dictionary::List index_list;
|
||||
|
||||
ndb->setDatabaseName(dbname);
|
||||
// Invalidating indexes
|
||||
if (! dict->listIndexes(index_list, tabname))
|
||||
{
|
||||
for (unsigned i = 0; i < index_list.count; i++) {
|
||||
NdbDictionary::Dictionary::List::Element& index=
|
||||
index_list.elements[i];
|
||||
DBUG_PRINT("info", ("Invalidating index %s.%s",
|
||||
index.database, index.name));
|
||||
dict->invalidateIndex(index.name, tabname);
|
||||
}
|
||||
}
|
||||
// Invalidate table
|
||||
ha_ndbcluster::invalidate_dictionary_cache(share->table->s,
|
||||
ndb,
|
||||
dbname,
|
||||
tabname,
|
||||
TRUE);
|
||||
|
||||
TABLE_SHARE *table_share= share->table->s; //share->table_share;
|
||||
TABLE* table= share->table;
|
||||
|
||||
/*
|
||||
Invalidate table and all it's indexes
|
||||
*/
|
||||
ha_ndbcluster table_handler(table_share);
|
||||
table_handler.set_dbname(share->key);
|
||||
table_handler.set_tabname(share->key);
|
||||
table_handler.open_indexes(ndb, table, TRUE);
|
||||
table_handler.invalidate_dictionary_cache(TRUE);
|
||||
|
||||
if (online_alter_table)
|
||||
{
|
||||
char key[FN_REFLEN];
|
||||
const void *data= 0, *pack_data= 0;
|
||||
uint length, pack_length;
|
||||
int error;
|
||||
NDBDICT *dict= ndb->getDictionary();
|
||||
const NDBTAB *altered_table= pOp->getTable();
|
||||
|
||||
DBUG_PRINT("info", ("Detected frm change of table %s.%s",
|
||||
dbname, tabname));
|
||||
const NDBTAB *altered_table= pOp->getEvent()->getTable();
|
||||
bool remote_event=
|
||||
pOp->getReqNodeId() != g_ndb_cluster_connection->node_id();
|
||||
strxnmov(key, FN_LEN-1, mysql_data_home, "/",
|
||||
dbname, "/", tabname, NullS);
|
||||
build_table_filename(key, FN_LEN-1, dbname, tabname, NullS);
|
||||
/*
|
||||
If the frm of the altered table is different than the one on
|
||||
disk then overwrite it with the new table definition
|
||||
*/
|
||||
if (remote_event &&
|
||||
readfrm(key, &data, &length) == 0 &&
|
||||
if (readfrm(key, &data, &length) == 0 &&
|
||||
packfrm(data, length, &pack_data, &pack_length) == 0 &&
|
||||
cmp_frm(altered_table, pack_data, pack_length))
|
||||
{
|
||||
@@ -1359,6 +1361,12 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
||||
}
|
||||
pthread_mutex_unlock(&LOCK_open);
|
||||
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0);
|
||||
/*
|
||||
if ((error= ndbcluster_binlog_open_table(thd, share,
|
||||
table_share, table)))
|
||||
sql_print_information("NDB: Failed to re-open table %s.%s",
|
||||
dbname, tabname);
|
||||
*/
|
||||
}
|
||||
}
|
||||
remote_drop_table= 1;
|
||||
@@ -1838,6 +1846,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
|
||||
/* Handle any trailing share */
|
||||
NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
|
||||
(byte*) key, key_len);
|
||||
|
||||
if (share && share_may_exist)
|
||||
{
|
||||
if (share->flags & NSF_NO_BINLOG ||
|
||||
|
||||
@@ -66,6 +66,7 @@
|
||||
#define MAX_FRAGMENT_DATA_BYTES (4+(2 * 8 * MAX_REPLICAS * MAX_NDB_NODES))
|
||||
#define MAX_NDB_PARTITIONS 1024
|
||||
#define MAX_RANGE_DATA (131072+MAX_NDB_PARTITIONS) //0.5 MByte of list data
|
||||
#define MAX_WORDS_META_FILE 16382
|
||||
|
||||
#define MIN_ATTRBUF ((MAX_ATTRIBUTES_IN_TABLE/24) + 1)
|
||||
/*
|
||||
|
||||
@@ -39,6 +39,7 @@ class AlterTableReq {
|
||||
friend class NdbEventOperationImpl;
|
||||
friend class NdbDictInterface;
|
||||
friend class Dbdict;
|
||||
friend class Suma;
|
||||
|
||||
/**
|
||||
* For printing
|
||||
|
||||
@@ -530,7 +530,7 @@ public:
|
||||
Config c_defaults;
|
||||
Uint32 m_diskless;
|
||||
|
||||
STATIC_CONST(NO_OF_PAGES_META_FILE = 2);
|
||||
STATIC_CONST(NO_OF_PAGES_META_FILE = MAX_WORDS_META_FILE/BACKUP_WORDS_PER_PAGE);
|
||||
|
||||
/**
|
||||
* Pools
|
||||
|
||||
@@ -42,6 +42,7 @@
|
||||
#include <signaldata/GCPSave.hpp>
|
||||
#include <signaldata/CreateTab.hpp>
|
||||
#include <signaldata/DropTab.hpp>
|
||||
#include <signaldata/AlterTable.hpp>
|
||||
#include <signaldata/AlterTab.hpp>
|
||||
#include <signaldata/DihFragCount.hpp>
|
||||
#include <signaldata/SystemError.hpp>
|
||||
@@ -3440,7 +3441,7 @@ Suma::execDROP_TAB_CONF(Signal *signal)
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
static Uint32 b_dti_buf[10000];
|
||||
static Uint32 b_dti_buf[MAX_WORDS_META_FILE];
|
||||
|
||||
void
|
||||
Suma::execALTER_TAB_REQ(Signal *signal)
|
||||
@@ -3462,7 +3463,7 @@ Suma::execALTER_TAB_REQ(Signal *signal)
|
||||
}
|
||||
|
||||
DBUG_PRINT("info",("alter table id: %d[i=%u]", tableId, tabPtr.i));
|
||||
|
||||
Table::State old_state = tabPtr.p->m_state;
|
||||
tabPtr.p->m_state = Table::ALTERED;
|
||||
// triggers must be removed, waiting for sub stop req for that
|
||||
|
||||
@@ -3520,6 +3521,11 @@ Suma::execALTER_TAB_REQ(Signal *signal)
|
||||
DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
|
||||
}
|
||||
}
|
||||
if (AlterTableReq::getFrmFlag(changeMask))
|
||||
{
|
||||
// Frm changes only are handled on-line
|
||||
tabPtr.p->m_state = old_state;
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
@@ -1320,6 +1320,8 @@ NdbDictionaryImpl::putTable(NdbTableImpl *impl)
|
||||
|
||||
m_localHash.put(impl->m_internalName.c_str(), info);
|
||||
|
||||
addBlobTables(*impl);
|
||||
|
||||
m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
|
||||
m_ndb.theLastTupleId[impl->getTableId()] = ~0;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user