mirror of
https://github.com/MariaDB/server.git
synced 2025-07-29 05:21:33 +03:00
WL#3337 (Event scheduler new architecture)
Post-review fixes. Mostly whitespace, int-to-bool return value, fixed comments sql/Makefile.am: compile all submodules of Events before compiling the facade sql/event_data_objects.cc: - Use initialization list - Clean whitespaces - Shorten comments - Fix comments sql/event_data_objects.h: - Fix whitespace sql/event_db_repository.cc: - Change return type from int to bool where only one error code is returned. - Don't use macros but get the maximal number of characters in a column from the column - Fix comments - Make functions which has return value but it's not used - void. sql/event_db_repository.h: - Methods with only one error code int -> bool return value - Remove declaration of fill_schema_events, a function that does not exist sql/event_queue.cc: - Use initialization lists - Let find_n_remove_event delete the object thus making the code more robust. The caller could forget to destruct the object. In addition, find_n_remove_element() does not return a value. - Move check_system_tables() to class Events - Fix comments sql/event_queue.h: - Whitespace changes - init_queue() should allow passing of THD - check_system_tables moved to class Events - find_n_remove_event() is now void sql/event_scheduler.cc: - Initialize res before use - Remove end stop from message sql/event_scheduler.h: Add uninitialized state. The scheduler is in it before init_scheduler() is called. The rationale is that otherwise state has no value before the call. If the system tables were damaged the scheduler won't be initialized but in Events::deinit() Event_scheduler::stop() will be called and this will touch state, generating valgrind warning at minimum. sql/events.cc: - Whitespace changes - Fix comments - Make methods which have only one error code be bool instead of int - Create temporarily a THD to be used for the initialization of Event_queue - Event_queue::check_system_tables() moved to Events::check_system_tables - is_started() is renamed to is_execution_of_events_started() sql/events.h: - Whitespace changes - When a method returns only one error code it should be bool, not int - is_started() renamed to is_execution_of_events_started() sql/set_var.cc: is_started() is renamed to is_execution_of_events_started() sql/sql_db.cc: The return code is not used, thus don't return anything and drop_schema_events() is now void. sql/sql_yacc.yy: - Fix comments - Remove unneeded initialization which is performed in lex_init() sql/share/errmsg.txt: New error message sql/table.cc: - Fix comments - make table_check_intact() accespt const *table_def sql/table.h: Make table_check_intact() accespt const *table_def
This commit is contained in:
@ -73,35 +73,6 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b)
|
||||
}
|
||||
|
||||
|
||||
pthread_handler_t
|
||||
event_queue_loader_thread(void *arg)
|
||||
{
|
||||
/* needs to be first for thread_stack */
|
||||
THD *thd= (THD *)((struct event_queue_param *) arg)->thd;
|
||||
struct event_queue_param *param= (struct event_queue_param *) arg;
|
||||
thd->thread_stack= (char *) &thd;
|
||||
|
||||
if (post_init_event_thread(thd))
|
||||
goto end;
|
||||
|
||||
DBUG_ENTER("event_queue_loader_thread");
|
||||
|
||||
|
||||
pthread_mutex_lock(¶m->LOCK_loaded);
|
||||
param->queue->check_system_tables(thd);
|
||||
param->queue->load_events_from_db(thd);
|
||||
|
||||
param->loading_finished= TRUE;
|
||||
pthread_cond_signal(¶m->COND_loaded);
|
||||
|
||||
pthread_mutex_unlock(¶m->LOCK_loaded);
|
||||
|
||||
end:
|
||||
deinit_event_thread(thd);
|
||||
DBUG_RETURN(0); // Against gcc warnings
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Constructor of class Event_queue.
|
||||
|
||||
@ -110,14 +81,12 @@ end:
|
||||
*/
|
||||
|
||||
Event_queue::Event_queue()
|
||||
:mutex_last_unlocked_at_line(0), mutex_last_locked_at_line(0),
|
||||
mutex_last_attempted_lock_at_line(0),
|
||||
mutex_queue_data_locked(FALSE), mutex_queue_data_attempting_lock(FALSE)
|
||||
{
|
||||
mutex_last_unlocked_at_line= mutex_last_locked_at_line=
|
||||
mutex_last_attempted_lock_at_line= 0;
|
||||
|
||||
mutex_last_unlocked_in_func= mutex_last_locked_in_func=
|
||||
mutex_last_attempted_lock_in_func= "";
|
||||
|
||||
mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE;
|
||||
}
|
||||
|
||||
|
||||
@ -150,7 +119,12 @@ Event_queue::deinit_mutexes()
|
||||
|
||||
|
||||
/*
|
||||
Inits the queue
|
||||
This is a queue's constructor. Until this method is called, the
|
||||
queue is unusable. We don't use a C++ constructor instead in
|
||||
order to be able to check the return value. The queue is
|
||||
initialized once at server startup. Initialization can fail in
|
||||
case of a failure reading events from the database or out of
|
||||
memory.
|
||||
|
||||
SYNOPSIS
|
||||
Event_queue::init()
|
||||
@ -161,9 +135,9 @@ Event_queue::deinit_mutexes()
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
|
||||
Event_queue::init_queue(THD *thd, Event_db_repository *db_repo,
|
||||
Event_scheduler *sched)
|
||||
{
|
||||
THD *new_thd;
|
||||
pthread_t th;
|
||||
bool res;
|
||||
struct event_queue_param *event_queue_param_value= NULL;
|
||||
@ -186,43 +160,16 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
|
||||
if (sizeof(my_time_t) != sizeof(time_t))
|
||||
{
|
||||
sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
|
||||
"The scheduler may not work correctly. Stopping.");
|
||||
"The scheduler may not work correctly. Stopping");
|
||||
DBUG_ASSERT(0);
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (!(new_thd= new THD))
|
||||
goto err;
|
||||
|
||||
pre_init_event_thread(new_thd);
|
||||
new_thd->security_ctx->set_user((char*)"event_scheduler_loader");
|
||||
|
||||
event_queue_param_value= (struct event_queue_param *)
|
||||
my_malloc(sizeof(struct event_queue_param), MYF(0));
|
||||
|
||||
event_queue_param_value->thd= new_thd;
|
||||
event_queue_param_value->queue= this;
|
||||
event_queue_param_value->loading_finished= FALSE;
|
||||
pthread_mutex_init(&event_queue_param_value->LOCK_loaded, MY_MUTEX_INIT_FAST);
|
||||
pthread_cond_init(&event_queue_param_value->COND_loaded, NULL);
|
||||
|
||||
pthread_mutex_lock(&event_queue_param_value->LOCK_loaded);
|
||||
DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd));
|
||||
if (!(res= pthread_create(&th, &connection_attrib, event_queue_loader_thread,
|
||||
(void*)event_queue_param_value)))
|
||||
{
|
||||
do {
|
||||
pthread_cond_wait(&event_queue_param_value->COND_loaded,
|
||||
&event_queue_param_value->LOCK_loaded);
|
||||
} while (event_queue_param_value->loading_finished == FALSE);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&event_queue_param_value->LOCK_loaded);
|
||||
pthread_mutex_destroy(&event_queue_param_value->LOCK_loaded);
|
||||
pthread_cond_destroy(&event_queue_param_value->COND_loaded);
|
||||
my_free((char *)event_queue_param_value, MYF(0));
|
||||
|
||||
res= load_events_from_db(thd);
|
||||
UNLOCK_QUEUE_DATA();
|
||||
if (res)
|
||||
deinit_queue();
|
||||
|
||||
DBUG_RETURN(res);
|
||||
|
||||
err:
|
||||
@ -316,16 +263,15 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
|
||||
LEX_STRING *new_schema, LEX_STRING *new_name)
|
||||
{
|
||||
int res;
|
||||
Event_queue_element *old_element= NULL,
|
||||
*new_element;
|
||||
Event_queue_element *new_element;
|
||||
|
||||
DBUG_ENTER("Event_queue::update_event");
|
||||
DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str));
|
||||
|
||||
new_element= new Event_queue_element();
|
||||
|
||||
res= db_repository->load_named_event(thd, new_schema? *new_schema:dbname,
|
||||
new_name? *new_name:name, new_element);
|
||||
res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname,
|
||||
new_name ? *new_name:name, new_element);
|
||||
if (res)
|
||||
{
|
||||
delete new_element;
|
||||
@ -345,16 +291,12 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
|
||||
new_element->compute_next_execution_time();
|
||||
|
||||
LOCK_QUEUE_DATA();
|
||||
if (!(old_element= find_n_remove_event(dbname, name)))
|
||||
{
|
||||
DBUG_PRINT("info", ("%s.%s not cached, probably was DISABLED",
|
||||
dbname.str, name.str));
|
||||
}
|
||||
find_n_remove_event(dbname, name);
|
||||
|
||||
/* If not disabled event */
|
||||
if (new_element)
|
||||
{
|
||||
DBUG_PRINT("info", ("new event in the Q 0x%lx old 0x%lx",
|
||||
new_element, old_element));
|
||||
DBUG_PRINT("info", ("new event in the Q 0x%lx", new_element));
|
||||
queue_insert_safe(&queue, (byte *) new_element);
|
||||
}
|
||||
dbug_dump_queue(thd->query_start());
|
||||
@ -363,8 +305,6 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
|
||||
if (new_element)
|
||||
notify_observers();
|
||||
|
||||
if (old_element)
|
||||
delete old_element;
|
||||
end:
|
||||
DBUG_PRINT("info", ("res=%d", res));
|
||||
DBUG_RETURN(res);
|
||||
@ -385,19 +325,13 @@ void
|
||||
Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
|
||||
{
|
||||
int res;
|
||||
Event_queue_element *element;
|
||||
DBUG_ENTER("Event_queue::drop_event");
|
||||
DBUG_PRINT("enter", ("thd=0x%lx name=0x%lx", thd, name));
|
||||
|
||||
LOCK_QUEUE_DATA();
|
||||
element= find_n_remove_event(dbname, name);
|
||||
find_n_remove_event(dbname, name);
|
||||
dbug_dump_queue(thd->query_start());
|
||||
UNLOCK_QUEUE_DATA();
|
||||
|
||||
if (element)
|
||||
delete element;
|
||||
else
|
||||
DBUG_PRINT("info", ("No such event found, probably DISABLED"));
|
||||
|
||||
/*
|
||||
We don't signal here because the scheduler will catch the change
|
||||
@ -429,10 +363,10 @@ void
|
||||
Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
|
||||
bool (*comparator)(LEX_STRING, Event_basic *))
|
||||
{
|
||||
uint i= 0;
|
||||
DBUG_ENTER("Event_queue::drop_matching_events");
|
||||
DBUG_PRINT("enter", ("pattern=%s", pattern.str));
|
||||
|
||||
uint i= 0;
|
||||
while (i < queue.elements)
|
||||
{
|
||||
Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
|
||||
@ -440,10 +374,10 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
|
||||
if (comparator(pattern, et))
|
||||
{
|
||||
/*
|
||||
The queue is ordered. If we remove an element, then all elements after
|
||||
it will shift one position to the left, if we imagine it as an array
|
||||
from left to the right. In this case we should not increment the
|
||||
counter and the (i < queue.elements) condition is ok.
|
||||
The queue is ordered. If we remove an element, then all elements
|
||||
after it will shift one position to the left, if we imagine it as
|
||||
an array from left to the right. In this case we should not
|
||||
increment the counter and the (i < queue.elements) condition is ok.
|
||||
*/
|
||||
queue_remove(&queue, i);
|
||||
delete et;
|
||||
@ -453,12 +387,12 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
|
||||
}
|
||||
/*
|
||||
We don't call notify_observers() . If we remove the top event:
|
||||
1. The queue is empty. The scheduler will wake up at some time and realize
|
||||
that the queue is empty. If create_event() comes inbetween it will
|
||||
signal the scheduler
|
||||
2. The queue is not empty, but the next event after the previous top, won't
|
||||
be executed any time sooner than the element we removed. Hence, we may
|
||||
not notify the scheduler and it will realize the change when it
|
||||
1. The queue is empty. The scheduler will wake up at some time and
|
||||
realize that the queue is empty. If create_event() comes inbetween
|
||||
it will signal the scheduler
|
||||
2. The queue is not empty, but the next event after the previous top,
|
||||
won't be executed any time sooner than the element we removed. Hence,
|
||||
we may not notify the scheduler and it will realize the change when it
|
||||
wakes up from timedwait.
|
||||
*/
|
||||
|
||||
@ -472,11 +406,8 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
|
||||
|
||||
SYNOPSIS
|
||||
Event_queue::drop_schema_events()
|
||||
thd THD
|
||||
db The schema name
|
||||
|
||||
RETURN VALUE
|
||||
>=0 Number of dropped events
|
||||
thd HD
|
||||
schema The schema name
|
||||
*/
|
||||
|
||||
void
|
||||
@ -516,16 +447,12 @@ Event_queue::notify_observers()
|
||||
db The schema of the event to find
|
||||
name The event to find
|
||||
|
||||
RETURN VALUE
|
||||
NULL Not found
|
||||
otherwise Address
|
||||
|
||||
NOTE
|
||||
The caller should do the locking also the caller is responsible for
|
||||
actual signalling in case an event is removed from the queue.
|
||||
*/
|
||||
|
||||
Event_queue_element *
|
||||
void
|
||||
Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
|
||||
{
|
||||
uint i;
|
||||
@ -539,11 +466,12 @@ Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
|
||||
if (event_basic_identifier_equal(db, name, et))
|
||||
{
|
||||
queue_remove(&queue, i);
|
||||
DBUG_RETURN(et);
|
||||
delete et;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
DBUG_RETURN(NULL);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
@ -583,7 +511,7 @@ Event_queue::load_events_from_db(THD *thd)
|
||||
|
||||
if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
|
||||
{
|
||||
sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open.");
|
||||
sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
|
||||
DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
|
||||
}
|
||||
|
||||
@ -625,14 +553,17 @@ Event_queue::load_events_from_db(THD *thd)
|
||||
|
||||
temp_job_data.load_from_row(table);
|
||||
|
||||
/* We load only on scheduler root just to check whether the body compiles */
|
||||
/*
|
||||
We load only on scheduler root just to check whether the body
|
||||
compiles.
|
||||
*/
|
||||
switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
|
||||
case EVEX_MICROSECOND_UNSUP:
|
||||
sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
|
||||
"supported but found in mysql.event");
|
||||
break;
|
||||
case EVEX_COMPILE_ERROR:
|
||||
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
|
||||
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
|
||||
et->dbname.str, et->name.str);
|
||||
break;
|
||||
default:
|
||||
@ -663,12 +594,10 @@ end:
|
||||
else
|
||||
{
|
||||
ret= 0;
|
||||
sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s");
|
||||
sql_print_information("SCHEDULER: Loaded %d event%s", count,
|
||||
(count == 1)?"":"s");
|
||||
}
|
||||
|
||||
/* Force close to free memory */
|
||||
thd->version--;
|
||||
|
||||
close_thread_tables(thd);
|
||||
|
||||
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
|
||||
@ -676,71 +605,6 @@ end:
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Opens mysql.db and mysql.user and checks whether:
|
||||
1. mysql.db has column Event_priv at column 20 (0 based);
|
||||
2. mysql.user has column Event_priv at column 29 (0 based);
|
||||
|
||||
SYNOPSIS
|
||||
Event_queue::check_system_tables()
|
||||
thd Thread
|
||||
|
||||
RETURN VALUE
|
||||
FALSE OK
|
||||
TRUE Error
|
||||
*/
|
||||
|
||||
void
|
||||
Event_queue::check_system_tables(THD *thd)
|
||||
{
|
||||
TABLE_LIST tables;
|
||||
bool not_used;
|
||||
Open_tables_state backup;
|
||||
bool ret;
|
||||
|
||||
DBUG_ENTER("Event_queue::check_system_tables");
|
||||
DBUG_PRINT("enter", ("thd=0x%lx", thd));
|
||||
|
||||
thd->reset_n_backup_open_tables_state(&backup);
|
||||
|
||||
bzero((char*) &tables, sizeof(tables));
|
||||
tables.db= (char*) "mysql";
|
||||
tables.table_name= tables.alias= (char*) "db";
|
||||
tables.lock_type= TL_READ;
|
||||
|
||||
if ((ret= simple_open_n_lock_tables(thd, &tables)))
|
||||
{
|
||||
sql_print_error("Cannot open mysql.db");
|
||||
goto end;
|
||||
}
|
||||
ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT,
|
||||
mysql_db_table_fields, &mysql_db_table_last_check,
|
||||
ER_CANNOT_LOAD_FROM_TABLE);
|
||||
close_thread_tables(thd);
|
||||
|
||||
bzero((char*) &tables, sizeof(tables));
|
||||
tables.db= (char*) "mysql";
|
||||
tables.table_name= tables.alias= (char*) "user";
|
||||
tables.lock_type= TL_READ;
|
||||
|
||||
if (simple_open_n_lock_tables(thd, &tables))
|
||||
sql_print_error("Cannot open mysql.db");
|
||||
else
|
||||
{
|
||||
if (tables.table->s->fields < 29 ||
|
||||
strncmp(tables.table->field[29]->field_name,
|
||||
STRING_WITH_LEN("Event_priv")))
|
||||
sql_print_error("mysql.user has no `Event_priv` column at position 29");
|
||||
close_thread_tables(thd);
|
||||
}
|
||||
|
||||
end:
|
||||
thd->restore_backup_open_tables_state(&backup);
|
||||
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Recalculates activation times in the queue. There is one reason for
|
||||
that. Because the values (execute_at) by which the queue is ordered are
|
||||
@ -782,7 +646,7 @@ Event_queue::recalculate_activation_times(THD *thd)
|
||||
Event_queue::empty_queue()
|
||||
|
||||
NOTE
|
||||
Should be called with LOCK_event_queue locked
|
||||
Should be called with LOCK_event_queue locked
|
||||
*/
|
||||
|
||||
void
|
||||
|
Reference in New Issue
Block a user