mirror of
https://github.com/MariaDB/server.git
synced 2025-07-30 16:24:05 +03:00
Fixed indention, removed compiler varnings and fixed a bug
in FULLTEXT indexes.
This commit is contained in:
470
sql/slave.cc
470
sql/slave.cc
@ -516,7 +516,6 @@ command");
|
||||
static uint read_event(MYSQL* mysql, MASTER_INFO *mi)
|
||||
{
|
||||
uint len = packet_error;
|
||||
NET* net = &mysql->net;
|
||||
int read_errno = EINTR; // for convinience lets think we start by
|
||||
// being in the interrupted state :-)
|
||||
// my_real_read() will time us out
|
||||
@ -543,251 +542,250 @@ static uint read_event(MYSQL* mysql, MASTER_INFO *mi)
|
||||
}
|
||||
|
||||
DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
|
||||
len, net->read_pos[4]));
|
||||
len, mysql->net.read_pos[4]));
|
||||
|
||||
return len - 1;
|
||||
}
|
||||
|
||||
|
||||
static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
|
||||
{
|
||||
Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1 , event_len);
|
||||
if(ev)
|
||||
{
|
||||
switch(ev->get_type_code())
|
||||
{
|
||||
case QUERY_EVENT:
|
||||
{
|
||||
Query_log_event* qev = (Query_log_event*)ev;
|
||||
int q_len = qev->q_len;
|
||||
init_sql_alloc(&thd->mem_root, 8192,0);
|
||||
thd->db = (char*)qev->db;
|
||||
if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
|
||||
{
|
||||
thd->query = (char*)qev->query;
|
||||
thd->set_time((time_t)qev->when);
|
||||
thd->current_tablenr = 0;
|
||||
VOID(pthread_mutex_lock(&LOCK_thread_count));
|
||||
thd->query_id = query_id++;
|
||||
VOID(pthread_mutex_unlock(&LOCK_thread_count));
|
||||
thd->last_nx_table = thd->last_nx_db = 0;
|
||||
for(;;)
|
||||
{
|
||||
thd->query_error = 0; // clear error
|
||||
thd->last_nx_table = thd->last_nx_db = 0;
|
||||
thd->net.last_errno = 0;
|
||||
thd->net.last_error[0] = 0;
|
||||
mysql_parse(thd, thd->query, q_len); // try query
|
||||
if(!thd->query_error || slave_killed(thd)) // break if ok
|
||||
break;
|
||||
// if not ok
|
||||
if(thd->last_nx_table && thd->last_nx_db)
|
||||
{
|
||||
// for now, let's just fail if the table is not
|
||||
// there, and not try to be a smart alec...
|
||||
|
||||
// if table was not there
|
||||
//if(fetch_nx_table(thd,&glob_mi))
|
||||
// try to to fetch from master
|
||||
break; // if we can't, just break
|
||||
}
|
||||
else
|
||||
break; // if failed for some other reason, bail out
|
||||
|
||||
// if fetched the table from master successfully
|
||||
// we need to restore query info in thd because
|
||||
// fetch_nx_table executes create table
|
||||
thd->query = (char*)qev->query;
|
||||
thd->set_time((time_t)qev->when);
|
||||
thd->current_tablenr = 0;
|
||||
}
|
||||
}
|
||||
thd->db = 0;// prevent db from being freed
|
||||
thd->query = 0; // just to be sure
|
||||
close_thread_tables(thd);
|
||||
free_root(&thd->mem_root,0);
|
||||
if (thd->query_error)
|
||||
{
|
||||
sql_print_error("Slave: error running query '%s' ",
|
||||
qev->query);
|
||||
return 1;
|
||||
}
|
||||
delete ev;
|
||||
|
||||
if(thd->fatal_error)
|
||||
{
|
||||
sql_print_error("Slave: Fatal error running query '%s' ",
|
||||
thd->query);
|
||||
return 1;
|
||||
}
|
||||
|
||||
mi->inc_pos(event_len);
|
||||
flush_master_info(mi);
|
||||
break;
|
||||
}
|
||||
|
||||
case LOAD_EVENT:
|
||||
{
|
||||
Load_log_event* lev = (Load_log_event*)ev;
|
||||
init_sql_alloc(&thd->mem_root, 8192,0);
|
||||
thd->db = (char*)lev->db;
|
||||
thd->query = 0;
|
||||
thd->query_error = 0;
|
||||
|
||||
if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
|
||||
{
|
||||
thd->set_time((time_t)lev->when);
|
||||
thd->current_tablenr = 0;
|
||||
VOID(pthread_mutex_lock(&LOCK_thread_count));
|
||||
thd->query_id = query_id++;
|
||||
VOID(pthread_mutex_unlock(&LOCK_thread_count));
|
||||
|
||||
enum enum_duplicates handle_dup = DUP_IGNORE;
|
||||
if(lev->sql_ex.opt_flags && REPLACE_FLAG)
|
||||
handle_dup = DUP_REPLACE;
|
||||
sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags && DUMPFILE_FLAG );
|
||||
String field_term(&lev->sql_ex.field_term, 1),
|
||||
enclosed(&lev->sql_ex.enclosed, 1), line_term(&lev->sql_ex.line_term,1),
|
||||
escaped(&lev->sql_ex.escaped, 1), line_start(&lev->sql_ex.line_start, 1);
|
||||
|
||||
ex.field_term = &field_term;
|
||||
if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
|
||||
ex.field_term->length(0);
|
||||
|
||||
ex.enclosed = &enclosed;
|
||||
if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY)
|
||||
ex.enclosed->length(0);
|
||||
|
||||
ex.line_term = &line_term;
|
||||
if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY)
|
||||
ex.line_term->length(0);
|
||||
|
||||
ex.line_start = &line_start;
|
||||
if(lev->sql_ex.empty_flags & LINE_START_EMPTY)
|
||||
ex.line_start->length(0);
|
||||
|
||||
ex.escaped = &escaped;
|
||||
if(lev->sql_ex.empty_flags & ESCAPED_EMPTY)
|
||||
ex.escaped->length(0);
|
||||
|
||||
ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
|
||||
if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
|
||||
ex.field_term->length(0);
|
||||
|
||||
ex.skip_lines = lev->skip_lines;
|
||||
|
||||
TABLE_LIST tables;
|
||||
bzero((char*) &tables,sizeof(tables));
|
||||
tables.db = thd->db;
|
||||
tables.name = tables.real_name = (char*)lev->table_name;
|
||||
tables.lock_type = TL_WRITE;
|
||||
|
||||
if (open_tables(thd, &tables))
|
||||
{
|
||||
sql_print_error("Slave: error opening table %s ",
|
||||
tables.name);
|
||||
delete ev;
|
||||
return 1;
|
||||
}
|
||||
|
||||
List<Item> fields;
|
||||
lev->set_fields(fields);
|
||||
thd->net.vio = net->vio;
|
||||
// mysql_load will use thd->net to read the file
|
||||
thd->net.pkt_nr = net->pkt_nr;
|
||||
// make sure the client does get confused
|
||||
// about the packet sequence
|
||||
if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1,
|
||||
TL_WRITE))
|
||||
thd->query_error = 1;
|
||||
net->pkt_nr = thd->net.pkt_nr;
|
||||
}
|
||||
else // we will just ask the master to send us /dev/null if we do not want to
|
||||
// load the data :-)
|
||||
{
|
||||
(void)my_net_write(net, "\xfb/dev/null", 10);
|
||||
(void)net_flush(net);
|
||||
(void)my_net_read(net); // discard response
|
||||
send_ok(net); // the master expects it
|
||||
}
|
||||
|
||||
thd->net.vio = 0;
|
||||
thd->db = 0;// prevent db from being freed
|
||||
close_thread_tables(thd);
|
||||
if(thd->query_error)
|
||||
{
|
||||
int sql_error = thd->net.last_errno;
|
||||
if(!sql_error)
|
||||
sql_error = ER_UNKNOWN_ERROR;
|
||||
|
||||
sql_print_error("Slave: error '%s' running load data infile ",
|
||||
ER(sql_error));
|
||||
delete ev;
|
||||
return 1;
|
||||
}
|
||||
delete ev;
|
||||
|
||||
if(thd->fatal_error)
|
||||
{
|
||||
sql_print_error("Slave: Fatal error running query '%s' ",
|
||||
thd->query);
|
||||
return 1;
|
||||
}
|
||||
|
||||
mi->inc_pos(event_len);
|
||||
flush_master_info(mi);
|
||||
break;
|
||||
}
|
||||
|
||||
case START_EVENT:
|
||||
mi->inc_pos(event_len);
|
||||
flush_master_info(mi);
|
||||
break;
|
||||
|
||||
case STOP_EVENT:
|
||||
mi->inc_pos(event_len);
|
||||
flush_master_info(mi);
|
||||
break;
|
||||
case ROTATE_EVENT:
|
||||
{
|
||||
Rotate_log_event* rev = (Rotate_log_event*)ev;
|
||||
int ident_len = rev->ident_len;
|
||||
memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
|
||||
mi->log_file_name[ident_len] = 0;
|
||||
mi->pos = 0;
|
||||
break;
|
||||
}
|
||||
|
||||
case INTVAR_EVENT:
|
||||
{
|
||||
Intvar_log_event* iev = (Intvar_log_event*)ev;
|
||||
switch(iev->type)
|
||||
{
|
||||
case LAST_INSERT_ID_EVENT:
|
||||
thd->last_insert_id_used = 1;
|
||||
thd->last_insert_id = iev->val;
|
||||
break;
|
||||
case INSERT_ID_EVENT:
|
||||
thd->next_insert_id = iev->val;
|
||||
break;
|
||||
|
||||
}
|
||||
mi->inc_pos(event_len);
|
||||
flush_master_info(mi);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
|
||||
event_len);
|
||||
if (ev)
|
||||
{
|
||||
switch(ev->get_type_code())
|
||||
{
|
||||
sql_print_error("Could not parse log event entry, check the master for binlog corruption\
|
||||
This may also be a network problem, or just a bug in the master or slave code");
|
||||
return 1;
|
||||
case QUERY_EVENT:
|
||||
{
|
||||
Query_log_event* qev = (Query_log_event*)ev;
|
||||
int q_len = qev->q_len;
|
||||
init_sql_alloc(&thd->mem_root, 8192,0);
|
||||
thd->db = (char*)qev->db;
|
||||
if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
|
||||
{
|
||||
thd->query = (char*)qev->query;
|
||||
thd->set_time((time_t)qev->when);
|
||||
thd->current_tablenr = 0;
|
||||
VOID(pthread_mutex_lock(&LOCK_thread_count));
|
||||
thd->query_id = query_id++;
|
||||
VOID(pthread_mutex_unlock(&LOCK_thread_count));
|
||||
thd->last_nx_table = thd->last_nx_db = 0;
|
||||
for(;;)
|
||||
{
|
||||
thd->query_error = 0; // clear error
|
||||
thd->last_nx_table = thd->last_nx_db = 0;
|
||||
thd->net.last_errno = 0;
|
||||
thd->net.last_error[0] = 0;
|
||||
mysql_parse(thd, thd->query, q_len); // try query
|
||||
if(!thd->query_error || slave_killed(thd)) // break if ok
|
||||
break;
|
||||
// if not ok
|
||||
if(thd->last_nx_table && thd->last_nx_db)
|
||||
{
|
||||
// for now, let's just fail if the table is not
|
||||
// there, and not try to be a smart alec...
|
||||
|
||||
// if table was not there
|
||||
//if(fetch_nx_table(thd,&glob_mi))
|
||||
// try to to fetch from master
|
||||
break; // if we can't, just break
|
||||
}
|
||||
else
|
||||
break; // if failed for some other reason, bail out
|
||||
|
||||
// if fetched the table from master successfully
|
||||
// we need to restore query info in thd because
|
||||
// fetch_nx_table executes create table
|
||||
thd->query = (char*)qev->query;
|
||||
thd->set_time((time_t)qev->when);
|
||||
thd->current_tablenr = 0;
|
||||
}
|
||||
}
|
||||
thd->db = 0;// prevent db from being freed
|
||||
thd->query = 0; // just to be sure
|
||||
close_thread_tables(thd);
|
||||
free_root(&thd->mem_root,0);
|
||||
if (thd->query_error)
|
||||
{
|
||||
sql_print_error("Slave: error running query '%s' ",
|
||||
qev->query);
|
||||
return 1;
|
||||
}
|
||||
delete ev;
|
||||
|
||||
if(thd->fatal_error)
|
||||
{
|
||||
sql_print_error("Slave: Fatal error running query '%s' ",
|
||||
thd->query);
|
||||
return 1;
|
||||
}
|
||||
|
||||
mi->inc_pos(event_len);
|
||||
flush_master_info(mi);
|
||||
break;
|
||||
}
|
||||
|
||||
case LOAD_EVENT:
|
||||
{
|
||||
Load_log_event* lev = (Load_log_event*)ev;
|
||||
init_sql_alloc(&thd->mem_root, 8192,0);
|
||||
thd->db = (char*)lev->db;
|
||||
thd->query = 0;
|
||||
thd->query_error = 0;
|
||||
|
||||
if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
|
||||
{
|
||||
thd->set_time((time_t)lev->when);
|
||||
thd->current_tablenr = 0;
|
||||
VOID(pthread_mutex_lock(&LOCK_thread_count));
|
||||
thd->query_id = query_id++;
|
||||
VOID(pthread_mutex_unlock(&LOCK_thread_count));
|
||||
|
||||
enum enum_duplicates handle_dup = DUP_IGNORE;
|
||||
if(lev->sql_ex.opt_flags && REPLACE_FLAG)
|
||||
handle_dup = DUP_REPLACE;
|
||||
sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags && DUMPFILE_FLAG );
|
||||
String field_term(&lev->sql_ex.field_term, 1),
|
||||
enclosed(&lev->sql_ex.enclosed, 1), line_term(&lev->sql_ex.line_term,1),
|
||||
escaped(&lev->sql_ex.escaped, 1), line_start(&lev->sql_ex.line_start, 1);
|
||||
|
||||
ex.field_term = &field_term;
|
||||
if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
|
||||
ex.field_term->length(0);
|
||||
|
||||
ex.enclosed = &enclosed;
|
||||
if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY)
|
||||
ex.enclosed->length(0);
|
||||
|
||||
ex.line_term = &line_term;
|
||||
if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY)
|
||||
ex.line_term->length(0);
|
||||
|
||||
ex.line_start = &line_start;
|
||||
if(lev->sql_ex.empty_flags & LINE_START_EMPTY)
|
||||
ex.line_start->length(0);
|
||||
|
||||
ex.escaped = &escaped;
|
||||
if(lev->sql_ex.empty_flags & ESCAPED_EMPTY)
|
||||
ex.escaped->length(0);
|
||||
|
||||
ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
|
||||
if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
|
||||
ex.field_term->length(0);
|
||||
|
||||
ex.skip_lines = lev->skip_lines;
|
||||
|
||||
TABLE_LIST tables;
|
||||
bzero((char*) &tables,sizeof(tables));
|
||||
tables.db = thd->db;
|
||||
tables.name = tables.real_name = (char*)lev->table_name;
|
||||
tables.lock_type = TL_WRITE;
|
||||
|
||||
if (open_tables(thd, &tables))
|
||||
{
|
||||
sql_print_error("Slave: error opening table %s ",
|
||||
tables.name);
|
||||
delete ev;
|
||||
return 1;
|
||||
}
|
||||
|
||||
List<Item> fields;
|
||||
lev->set_fields(fields);
|
||||
thd->net.vio = net->vio;
|
||||
// mysql_load will use thd->net to read the file
|
||||
thd->net.pkt_nr = net->pkt_nr;
|
||||
// make sure the client does get confused
|
||||
// about the packet sequence
|
||||
if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1,
|
||||
TL_WRITE))
|
||||
thd->query_error = 1;
|
||||
net->pkt_nr = thd->net.pkt_nr;
|
||||
}
|
||||
else // we will just ask the master to send us /dev/null if we do not want to
|
||||
// load the data :-)
|
||||
{
|
||||
(void)my_net_write(net, "\xfb/dev/null", 10);
|
||||
(void)net_flush(net);
|
||||
(void)my_net_read(net); // discard response
|
||||
send_ok(net); // the master expects it
|
||||
}
|
||||
|
||||
thd->net.vio = 0;
|
||||
thd->db = 0;// prevent db from being freed
|
||||
close_thread_tables(thd);
|
||||
if(thd->query_error)
|
||||
{
|
||||
int sql_error = thd->net.last_errno;
|
||||
if(!sql_error)
|
||||
sql_error = ER_UNKNOWN_ERROR;
|
||||
|
||||
sql_print_error("Slave: error '%s' running load data infile ",
|
||||
ER(sql_error));
|
||||
delete ev;
|
||||
return 1;
|
||||
}
|
||||
delete ev;
|
||||
|
||||
if(thd->fatal_error)
|
||||
{
|
||||
sql_print_error("Slave: Fatal error running query '%s' ",
|
||||
thd->query);
|
||||
return 1;
|
||||
}
|
||||
|
||||
mi->inc_pos(event_len);
|
||||
flush_master_info(mi);
|
||||
break;
|
||||
}
|
||||
|
||||
case START_EVENT:
|
||||
mi->inc_pos(event_len);
|
||||
flush_master_info(mi);
|
||||
break;
|
||||
|
||||
case STOP_EVENT:
|
||||
mi->inc_pos(event_len);
|
||||
flush_master_info(mi);
|
||||
break;
|
||||
case ROTATE_EVENT:
|
||||
{
|
||||
Rotate_log_event* rev = (Rotate_log_event*)ev;
|
||||
int ident_len = rev->ident_len;
|
||||
memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
|
||||
mi->log_file_name[ident_len] = 0;
|
||||
mi->pos = 0;
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
return 0;
|
||||
case INTVAR_EVENT:
|
||||
{
|
||||
Intvar_log_event* iev = (Intvar_log_event*)ev;
|
||||
switch(iev->type)
|
||||
{
|
||||
case LAST_INSERT_ID_EVENT:
|
||||
thd->last_insert_id_used = 1;
|
||||
thd->last_insert_id = iev->val;
|
||||
break;
|
||||
case INSERT_ID_EVENT:
|
||||
thd->next_insert_id = iev->val;
|
||||
break;
|
||||
|
||||
}
|
||||
mi->inc_pos(event_len);
|
||||
flush_master_info(mi);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
sql_print_error("Could not parse log event entry, check the master for binlog corruption\n\
|
||||
This may also be a network problem, or just a bug in the master or slave code");
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// slave thread
|
||||
|
Reference in New Issue
Block a user