mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
Fix for bug#22740 Events: Decouple Event_queue from Event_db_repository
This patch implements the idea of the bug report by making Event_queue unaware of Event_db_repository by making a higher level class - Events, which is aware of most of all classes, responsible for passing all data needed for adding/updating/deleting an event to/from the queue. Introduces few new classes : - Event_worker_thread - Event_queue_element_for_exec sql/event_data_objects.cc: Introduced a new class Event_queue_element_for_exec According to Konstantin it should be named Event_name and hold only two LEX_STRINGs but `dropped` is not saved on disk and will require additional logic in Event_worker_thread class, after loading to compute whether the event should be dropped or not. It's easier just to pass this flag around. Removed Event_queue_element::drop(). This method was a source of a race condition. At the place where the event should be dropped we call Events::drop_event() which is the only code-flow for dropping. In addition, because ::drop_event() holds Events::LOCK_metadata there is no source of race conditions. Before this patch dropping from ::drop() wasn't under LOCK_metadata and races were possible. Because Events::open_event_table was removed as a method, provisionally events_event_db_repository was exported from events.cc till a solution is build where Event_queue_element does not access directly mysql.event. sql/event_data_objects.h: New class Event_queue_element_for_exec added which is returned from Event_queue::get_top_if_time() and passed through Event_scheduler to Event_worker_thread. There by using the (db)name Event_job_data is instanciated and executed. Dropped Event_queue_element::drop() thd was moved out of Event_job_data as it is now part of Event_queue_element_for_exec sql/event_queue.cc: Removed dependency of Event_queue on Event_db_repository. The instantiation of Event_job_data was moved to class Event_worker_thread In place is a return of an object of Event_queue_element_for_exec is used later for instantiating Event_job_data. The `dropped` flag of Event_queue_element is passed over Event_queue_element_for_exec to the code in Event_worker_thread. sql/event_queue.h: Removed dependency of Event_queue on Event_db_repository Removed dependency on Event_scheduler sql/event_scheduler.cc: Added class Event_worker_thread, which is used during the execution of an event. It has a static init() method to get a pointer to Event_db_repository to be used for instantiation of Event_job_data object. This object it then executed. sql/event_scheduler.h: Added class Event_worker_thread, which is used during the execution of an event. sql/events.cc: Removed Events::open_event_table() because it was a product of a bad architecture. sql/events.h: Removed friend definition, unneeded. Fixed Events::drop_event() to have the previous signature without bool only_from_disk sql/sql_parse.cc: Fix call
This commit is contained in:
@@ -16,7 +16,6 @@
|
||||
#include "mysql_priv.h"
|
||||
#include "event_queue.h"
|
||||
#include "event_data_objects.h"
|
||||
#include "event_db_repository.h"
|
||||
|
||||
|
||||
#define EVENT_QUEUE_INITIAL_SIZE 30
|
||||
@@ -136,16 +135,14 @@ Event_queue::deinit_mutexes()
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_queue::init_queue(THD *thd, Event_db_repository *db_repo)
|
||||
Event_queue::init_queue(THD *thd)
|
||||
{
|
||||
bool res;
|
||||
struct event_queue_param *event_queue_param_value= NULL;
|
||||
|
||||
DBUG_ENTER("Event_queue::init_queue");
|
||||
DBUG_PRINT("enter", ("this: 0x%lx", (long) this));
|
||||
|
||||
LOCK_QUEUE_DATA();
|
||||
db_repository= db_repo;
|
||||
|
||||
if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/,
|
||||
0 /*max_on_top*/, event_queue_element_compare_q,
|
||||
@@ -162,12 +159,8 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo)
|
||||
goto err;
|
||||
}
|
||||
|
||||
res= load_events_from_db(thd);
|
||||
UNLOCK_QUEUE_DATA();
|
||||
if (res)
|
||||
deinit_queue();
|
||||
|
||||
DBUG_RETURN(res);
|
||||
DBUG_RETURN(FALSE);
|
||||
|
||||
err:
|
||||
UNLOCK_QUEUE_DATA();
|
||||
@@ -204,37 +197,29 @@ Event_queue::deinit_queue()
|
||||
Event_queue::create_event()
|
||||
dbname The schema of the new event
|
||||
name The name of the new event
|
||||
|
||||
RETURN VALUE
|
||||
OP_OK OK or scheduler not working
|
||||
OP_LOAD_ERROR Error during loading from disk
|
||||
*/
|
||||
|
||||
int
|
||||
Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
|
||||
void
|
||||
Event_queue::create_event(THD *thd, Event_queue_element *new_element)
|
||||
{
|
||||
int res;
|
||||
Event_queue_element *new_element;
|
||||
DBUG_ENTER("Event_queue::create_event");
|
||||
DBUG_PRINT("enter", ("thd: 0x%lx et=%s.%s", (long) thd, dbname.str, name.str));
|
||||
DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd,
|
||||
new_element->dbname.str, new_element->name.str));
|
||||
|
||||
new_element= new Event_queue_element();
|
||||
res= db_repository->load_named_event(thd, dbname, name, new_element);
|
||||
if (res || new_element->status == Event_queue_element::DISABLED)
|
||||
if (new_element->status == Event_queue_element::DISABLED)
|
||||
delete new_element;
|
||||
else
|
||||
{
|
||||
new_element->compute_next_execution_time();
|
||||
DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
|
||||
|
||||
LOCK_QUEUE_DATA();
|
||||
DBUG_PRINT("info", ("new event in the queue: 0x%lx", (long) new_element));
|
||||
queue_insert_safe(&queue, (byte *) new_element);
|
||||
dbug_dump_queue(thd->query_start());
|
||||
pthread_cond_broadcast(&COND_queue_state);
|
||||
UNLOCK_QUEUE_DATA();
|
||||
}
|
||||
|
||||
DBUG_RETURN(res);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
@@ -248,32 +233,16 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
|
||||
name Name of the event
|
||||
new_schema New schema, in case of RENAME TO, otherwise NULL
|
||||
new_name New name, in case of RENAME TO, otherwise NULL
|
||||
|
||||
RETURN VALUE
|
||||
OP_OK OK or scheduler not working
|
||||
OP_LOAD_ERROR Error during loading from disk
|
||||
*/
|
||||
|
||||
int
|
||||
void
|
||||
Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
|
||||
LEX_STRING *new_schema, LEX_STRING *new_name)
|
||||
Event_queue_element *new_element)
|
||||
{
|
||||
int res;
|
||||
Event_queue_element *new_element;
|
||||
|
||||
DBUG_ENTER("Event_queue::update_event");
|
||||
DBUG_PRINT("enter", ("thd: 0x%lx et=[%s.%s]", (long) thd, dbname.str, name.str));
|
||||
|
||||
new_element= new Event_queue_element();
|
||||
|
||||
res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname,
|
||||
new_name ? *new_name:name, new_element);
|
||||
if (res)
|
||||
{
|
||||
delete new_element;
|
||||
goto end;
|
||||
}
|
||||
else if (new_element->status == Event_queue_element::DISABLED)
|
||||
if (new_element->status == Event_queue_element::DISABLED)
|
||||
{
|
||||
DBUG_PRINT("info", ("The event is disabled."));
|
||||
/*
|
||||
@@ -300,9 +269,7 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
|
||||
dbug_dump_queue(thd->query_start());
|
||||
UNLOCK_QUEUE_DATA();
|
||||
|
||||
end:
|
||||
DBUG_PRINT("info", ("res=%d", res));
|
||||
DBUG_RETURN(res);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
@@ -453,133 +420,6 @@ Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Loads all ENABLED events from mysql.event into the prioritized
|
||||
queue. Called during scheduler main thread initialization. Compiles
|
||||
the events. Creates Event_queue_element instances for every ENABLED event
|
||||
from mysql.event.
|
||||
|
||||
SYNOPSIS
|
||||
Event_queue::load_events_from_db()
|
||||
thd - Thread context. Used for memory allocation in some cases.
|
||||
|
||||
RETURN VALUE
|
||||
0 OK
|
||||
!0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP,
|
||||
EVEX_COMPILE_ERROR) - in all these cases mysql.event was
|
||||
tampered.
|
||||
|
||||
NOTES
|
||||
Reports the error to the console
|
||||
*/
|
||||
|
||||
int
|
||||
Event_queue::load_events_from_db(THD *thd)
|
||||
{
|
||||
TABLE *table;
|
||||
READ_RECORD read_record_info;
|
||||
int ret= -1;
|
||||
uint count= 0;
|
||||
bool clean_the_queue= TRUE;
|
||||
|
||||
DBUG_ENTER("Event_queue::load_events_from_db");
|
||||
DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
|
||||
|
||||
if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
|
||||
{
|
||||
sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
|
||||
DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
|
||||
}
|
||||
|
||||
init_read_record(&read_record_info, thd, table ,NULL,1,0);
|
||||
while (!(read_record_info.read_record(&read_record_info)))
|
||||
{
|
||||
Event_queue_element *et;
|
||||
if (!(et= new Event_queue_element))
|
||||
{
|
||||
DBUG_PRINT("info", ("Out of memory"));
|
||||
break;
|
||||
}
|
||||
DBUG_PRINT("info", ("Loading event from row."));
|
||||
|
||||
if ((ret= et->load_from_row(table)))
|
||||
{
|
||||
sql_print_error("SCHEDULER: Error while loading from mysql.event. "
|
||||
"Table probably corrupted");
|
||||
break;
|
||||
}
|
||||
if (et->status != Event_queue_element::ENABLED)
|
||||
{
|
||||
DBUG_PRINT("info",("%s is disabled",et->name.str));
|
||||
delete et;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* let's find when to be executed */
|
||||
if (et->compute_next_execution_time())
|
||||
{
|
||||
sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
|
||||
" Skipping", et->dbname.str, et->name.str);
|
||||
continue;
|
||||
}
|
||||
|
||||
{
|
||||
Event_job_data temp_job_data;
|
||||
DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
|
||||
|
||||
temp_job_data.load_from_row(table);
|
||||
|
||||
/*
|
||||
We load only on scheduler root just to check whether the body
|
||||
compiles.
|
||||
*/
|
||||
switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
|
||||
case EVEX_MICROSECOND_UNSUP:
|
||||
sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
|
||||
"supported but found in mysql.event");
|
||||
break;
|
||||
case EVEX_COMPILE_ERROR:
|
||||
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
|
||||
et->dbname.str, et->name.str);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
thd->end_statement();
|
||||
thd->cleanup_after_query();
|
||||
}
|
||||
if (ret)
|
||||
{
|
||||
delete et;
|
||||
goto end;
|
||||
}
|
||||
|
||||
queue_insert_safe(&queue, (byte *) et);
|
||||
count++;
|
||||
}
|
||||
clean_the_queue= FALSE;
|
||||
end:
|
||||
end_read_record(&read_record_info);
|
||||
|
||||
if (clean_the_queue)
|
||||
{
|
||||
empty_queue();
|
||||
ret= -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ret= 0;
|
||||
sql_print_information("SCHEDULER: Loaded %d event%s", count,
|
||||
(count == 1)?"":"s");
|
||||
}
|
||||
|
||||
close_thread_tables(thd);
|
||||
|
||||
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
|
||||
DBUG_RETURN(ret);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Recalculates activation times in the queue. There is one reason for
|
||||
that. Because the values (execute_at) by which the queue is ordered are
|
||||
@@ -629,7 +469,7 @@ Event_queue::empty_queue()
|
||||
{
|
||||
uint i;
|
||||
DBUG_ENTER("Event_queue::empty_queue");
|
||||
DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements));
|
||||
DBUG_PRINT("enter", ("Purging the queue. %u element(s)", queue.elements));
|
||||
sql_print_information("SCHEDULER: Purging queue. %u events", queue.elements);
|
||||
/* empty the queue */
|
||||
for (i= 0; i < queue.elements; ++i)
|
||||
@@ -690,31 +530,27 @@ static const char *queue_wait_msg= "Waiting for next activation";
|
||||
|
||||
SYNOPSIS
|
||||
Event_queue::get_top_for_execution_if_time()
|
||||
thd [in] Thread
|
||||
job_data [out] The object to execute
|
||||
thd [in] Thread
|
||||
event_name [out] The object to execute
|
||||
|
||||
RETURN VALUE
|
||||
FALSE No error. If *job_data==NULL then top not elligible for execution.
|
||||
Could be that there is no top.
|
||||
TRUE Error
|
||||
|
||||
FALSE No error. event_name != NULL
|
||||
TRUE Serious error
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
|
||||
Event_queue::get_top_for_execution_if_time(THD *thd,
|
||||
Event_queue_element_for_exec **event_name)
|
||||
{
|
||||
bool ret= FALSE;
|
||||
struct timespec top_time;
|
||||
Event_queue_element *top= NULL;
|
||||
bool to_free= FALSE;
|
||||
bool to_drop= FALSE;
|
||||
*job_data= NULL;
|
||||
*event_name= NULL;
|
||||
DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
|
||||
|
||||
LOCK_QUEUE_DATA();
|
||||
for (;;)
|
||||
{
|
||||
int res;
|
||||
Event_queue_element *top= NULL;
|
||||
|
||||
/* Break loop if thd has been killed */
|
||||
if (thd->killed)
|
||||
@@ -753,39 +589,30 @@ Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
|
||||
continue;
|
||||
}
|
||||
|
||||
DBUG_PRINT("info", ("Ready for execution"));
|
||||
if (!(*job_data= new Event_job_data()))
|
||||
if (!(*event_name= new Event_queue_element_for_exec()) ||
|
||||
(*event_name)->init(top->dbname, top->name))
|
||||
{
|
||||
ret= TRUE;
|
||||
break;
|
||||
}
|
||||
if ((res= db_repository->load_named_event(thd, top->dbname, top->name,
|
||||
*job_data)))
|
||||
{
|
||||
DBUG_PRINT("error", ("Got %d from load_named_event", res));
|
||||
delete *job_data;
|
||||
*job_data= NULL;
|
||||
ret= TRUE;
|
||||
break;
|
||||
}
|
||||
|
||||
DBUG_PRINT("info", ("Ready for execution"));
|
||||
top->mark_last_executed(thd);
|
||||
if (top->compute_next_execution_time())
|
||||
top->status= Event_queue_element::DISABLED;
|
||||
DBUG_PRINT("info", ("event %s status is %d", top->name.str, top->status));
|
||||
|
||||
(*job_data)->execution_count= top->execution_count;
|
||||
top->execution_count++;
|
||||
(*event_name)->dropped= top->dropped;
|
||||
|
||||
top->update_timing_fields(thd);
|
||||
if (((top->execute_at.year && !top->expression) || top->execute_at_null) ||
|
||||
(top->status == Event_queue_element::DISABLED))
|
||||
if (top->status == Event_queue_element::DISABLED)
|
||||
{
|
||||
DBUG_PRINT("info", ("removing from the queue"));
|
||||
sql_print_information("SCHEDULER: Last execution of %s.%s. %s",
|
||||
top->dbname.str, top->name.str,
|
||||
top->dropped? "Dropping.":"");
|
||||
to_free= TRUE;
|
||||
to_drop= top->dropped;
|
||||
delete top;
|
||||
queue_remove(&queue, 0);
|
||||
}
|
||||
else
|
||||
@@ -796,19 +623,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
|
||||
}
|
||||
end:
|
||||
UNLOCK_QUEUE_DATA();
|
||||
if (to_drop)
|
||||
{
|
||||
DBUG_PRINT("info", ("Dropping from disk"));
|
||||
top->drop(thd);
|
||||
}
|
||||
if (to_free)
|
||||
delete top;
|
||||
|
||||
DBUG_PRINT("info", ("returning %d et_new: 0x%lx ", ret, (long) *job_data));
|
||||
DBUG_PRINT("info", ("returning %d et_new: 0x%lx ",
|
||||
ret, (long) *event_name));
|
||||
|
||||
if (*job_data)
|
||||
DBUG_PRINT("info", ("db: %s name: %s definer=%s", (*job_data)->dbname.str,
|
||||
(*job_data)->name.str, (*job_data)->definer.str));
|
||||
if (*event_name)
|
||||
DBUG_PRINT("info", ("db: %s name: %s",
|
||||
(*event_name)->dbname.str, (*event_name)->name.str));
|
||||
|
||||
DBUG_RETURN(ret);
|
||||
}
|
||||
|
Reference in New Issue
Block a user