mirror of
https://github.com/MariaDB/server.git
synced 2025-07-29 05:21:33 +03:00
WL #3337 (Event scheduler new architecture)
Cut Nr. 8. All tests pass. Separated Event_scheduler into Event_queue and Event_scheduler. Added new Event_scheduler_ng which is the new scheduler and is used system-wide. Will be moved to the event_scheduler.cc in the future. Using Event_timed in Event_queue as well as cloned during execution. Next step is to have Event_worker_data which will be used during execution and will take ::compile()/::execute() out of Event_timed. mysql-test/r/events.result: update result mysql-test/r/events_bugs.result: update result mysql-test/r/ps_1general.result: update result mysql-test/r/skip_name_resolve.result: update result mysql-test/r/sp-threads.result: update result mysql-test/r/sp_notembedded.result: update result mysql-test/r/status.result: update result mysql-test/t/events_stress.test: Make event_stress a bit longer sql/Makefile.am: Add new event_scheduler_ng.h/cc . These are only to be in the experimental clone. Later their content will be moved to event_scheduler.h/cc sql/event_data_objects.cc: Allocate strings memory on own memory root, instead on the schedulers. Thus don't "leak" memory. This should fix bug#18683 memory leak in event scheduler sql/event_data_objects.h: add mem_root add THD - this is only temporal, will be moved to class Event_job_data once Event_job_data is responsible for the execution. sql/event_db_repository.cc: Remove unused code. Cosmetic changes sql/event_queue.cc: Now use the Event_scheduler_ng (NextGen) sql/event_queue.h: Now use the Event_scheduler_ng (NextGen) sql/event_scheduler.cc: This file is no more used, but will be soon. sql/event_scheduler.h: This file is no more used but will be soon sql/events.cc: Now use the Event_scheduler_ng (NextGen) sql/events.h: Now use the Event_scheduler_ng (NextGen) sql/mysqld.cc: Make it again possible to kill the scheduler thread sql/set_var.cc: Now use the Event_scheduler_ng (NextGen) sql/share/errmsg.txt: Shorten the message. sql/sql_show.cc: Loading is on a own root, then don't use thd->mem_root
This commit is contained in:
@ -16,7 +16,7 @@
|
||||
|
||||
#include "mysql_priv.h"
|
||||
#include "events.h"
|
||||
#include "event_scheduler.h"
|
||||
#include "event_scheduler_ng.h"
|
||||
#include "event_queue.h"
|
||||
#include "event_data_objects.h"
|
||||
#include "event_db_repository.h"
|
||||
@ -35,10 +35,6 @@
|
||||
#define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__)
|
||||
|
||||
|
||||
Event_scheduler*
|
||||
Event_queue::singleton= NULL;
|
||||
|
||||
|
||||
/*
|
||||
Compares the execute_at members of 2 Event_timed instances.
|
||||
Used as callback for the prioritized queue when shifting
|
||||
@ -111,10 +107,10 @@ Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence)
|
||||
goto end;
|
||||
}
|
||||
|
||||
/* We need to load the event on scheduler_root */
|
||||
if (!(res= db_repository->
|
||||
load_named_event(thd, et->dbname, et->name, &et_new)))
|
||||
{
|
||||
DBUG_PRINT("info", ("new event in the queue %p", et_new));
|
||||
queue_insert_safe(&queue, (byte *) et_new);
|
||||
on_queue_change();
|
||||
}
|
||||
@ -130,7 +126,7 @@ end:
|
||||
Updates an event from the scheduler queue
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::update_event()
|
||||
Event_queue::update_event()
|
||||
thd Thread
|
||||
et The event to replace(add) into the queue
|
||||
new_schema New schema
|
||||
@ -172,15 +168,11 @@ Event_queue::update_event(THD *thd, Event_parse_data *et,
|
||||
et->dbname= *new_schema;
|
||||
et->name= *new_name;
|
||||
}
|
||||
/*
|
||||
We need to load the event (it's strings but on the object itself)
|
||||
on scheduler_root. et_new could be NULL :
|
||||
1. Error occured
|
||||
2. If the replace is DISABLED, we don't load it into the queue.
|
||||
*/
|
||||
|
||||
if (!(res= db_repository->
|
||||
load_named_event(thd, et->dbname, et->name, &et_new)))
|
||||
{
|
||||
DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old));
|
||||
queue_insert_safe(&queue, (byte *) et_new);
|
||||
on_queue_change();
|
||||
}
|
||||
@ -240,7 +232,7 @@ Event_queue::update_event(THD *thd, Event_parse_data *et,
|
||||
|
||||
|
||||
/*
|
||||
Drops an event from the scheduler queue
|
||||
Drops an event from the queue
|
||||
|
||||
SYNOPSIS
|
||||
Event_queue::drop_event()
|
||||
@ -303,10 +295,8 @@ Event_queue::drop_event(THD *thd, sp_name *name)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/*
|
||||
Searches for an event in the scheduler queue
|
||||
Searches for an event in the queue
|
||||
|
||||
SYNOPSIS
|
||||
Event_queue::find_event()
|
||||
@ -358,7 +348,6 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
|
||||
comparator The function to use for comparing
|
||||
|
||||
RETURN VALUE
|
||||
-1 Scheduler not working
|
||||
>=0 Number of dropped events
|
||||
|
||||
NOTE
|
||||
@ -426,7 +415,6 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
|
||||
db The schema name
|
||||
|
||||
RETURN VALUE
|
||||
-1 Scheduler not working
|
||||
>=0 Number of dropped events
|
||||
*/
|
||||
|
||||
@ -459,8 +447,7 @@ void
|
||||
Event_queue::lock_data(const char *func, uint line)
|
||||
{
|
||||
DBUG_ENTER("Event_queue::lock_mutex");
|
||||
DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
|
||||
&LOCK_event_queue, func, line));
|
||||
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
|
||||
pthread_mutex_lock(&LOCK_event_queue);
|
||||
mutex_last_locked_in_func= func;
|
||||
mutex_last_locked_at_line= line;
|
||||
@ -481,9 +468,8 @@ Event_queue::lock_data(const char *func, uint line)
|
||||
void
|
||||
Event_queue::unlock_data(const char *func, uint line)
|
||||
{
|
||||
DBUG_ENTER("Event_queue::UNLOCK_mutex");
|
||||
DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
|
||||
&LOCK_event_queue, func, line));
|
||||
DBUG_ENTER("Event_queue::unlock_mutex");
|
||||
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
|
||||
mutex_last_unlocked_at_line= line;
|
||||
mutex_queue_data_locked= FALSE;
|
||||
mutex_last_unlocked_in_func= func;
|
||||
@ -510,7 +496,7 @@ Event_queue::events_count()
|
||||
LOCK_QUEUE_DATA();
|
||||
n= queue.elements;
|
||||
UNLOCK_QUEUE_DATA();
|
||||
|
||||
DBUG_PRINT("info", ("n=%u", n));
|
||||
DBUG_RETURN(n);
|
||||
}
|
||||
|
||||
@ -529,7 +515,7 @@ uint
|
||||
Event_queue::events_count_no_lock()
|
||||
{
|
||||
uint n;
|
||||
DBUG_ENTER("Event_scheduler::events_count_no_lock");
|
||||
DBUG_ENTER("Event_queue::events_count_no_lock");
|
||||
|
||||
n= queue.elements;
|
||||
|
||||
@ -590,7 +576,7 @@ Event_queue::load_events_from_db(THD *thd)
|
||||
}
|
||||
DBUG_PRINT("info", ("Loading event from row."));
|
||||
|
||||
if ((ret= et->load_from_row(&scheduler_root, table)))
|
||||
if ((ret= et->load_from_row(table)))
|
||||
{
|
||||
clean_the_queue= TRUE;
|
||||
sql_print_error("SCHEDULER: Error while loading from mysql.event. "
|
||||
@ -735,7 +721,7 @@ Event_queue::check_system_tables(THD *thd)
|
||||
void
|
||||
Event_queue::init_mutexes()
|
||||
{
|
||||
pthread_mutex_init(&singleton->LOCK_event_queue, MY_MUTEX_INIT_FAST);
|
||||
pthread_mutex_init(&LOCK_event_queue, MY_MUTEX_INIT_FAST);
|
||||
}
|
||||
|
||||
|
||||
@ -743,13 +729,13 @@ Event_queue::init_mutexes()
|
||||
Destroys mutexes.
|
||||
|
||||
SYNOPSIS
|
||||
Event_queue::destroy_mutexes()
|
||||
Event_queue::deinit_mutexes()
|
||||
*/
|
||||
|
||||
void
|
||||
Event_queue::destroy_mutexes()
|
||||
Event_queue::deinit_mutexes()
|
||||
{
|
||||
pthread_mutex_destroy(&singleton->LOCK_event_queue);
|
||||
pthread_mutex_destroy(&LOCK_event_queue);
|
||||
}
|
||||
|
||||
|
||||
@ -765,8 +751,8 @@ void
|
||||
Event_queue::on_queue_change()
|
||||
{
|
||||
DBUG_ENTER("Event_queue::on_queue_change");
|
||||
DBUG_PRINT("info", ("Sending COND_new_work"));
|
||||
singleton->queue_changed();
|
||||
DBUG_PRINT("info", ("Signalling change of the queue"));
|
||||
scheduler->queue_changed();
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
@ -787,13 +773,11 @@ Event_queue::init(Event_db_repository *db_repo)
|
||||
{
|
||||
int i= 0;
|
||||
bool ret= FALSE;
|
||||
DBUG_ENTER("Event_scheduler::init");
|
||||
DBUG_ENTER("Event_queue::init");
|
||||
DBUG_PRINT("enter", ("this=%p", this));
|
||||
|
||||
LOCK_QUEUE_DATA();
|
||||
db_repository= db_repo;
|
||||
/* init memory root */
|
||||
init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
|
||||
|
||||
if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
|
||||
event_timed_compare_q, NULL, 30 /*auto_extent*/))
|
||||
@ -824,8 +808,8 @@ Event_queue::deinit()
|
||||
DBUG_ENTER("Event_queue::deinit");
|
||||
|
||||
LOCK_QUEUE_DATA();
|
||||
empty_queue();
|
||||
delete_queue(&queue);
|
||||
free_root(&scheduler_root, MYF(0));
|
||||
UNLOCK_QUEUE_DATA();
|
||||
|
||||
DBUG_VOID_RETURN;
|
||||
@ -835,7 +819,7 @@ Event_queue::deinit()
|
||||
void
|
||||
Event_queue::recalculate_queue(THD *thd)
|
||||
{
|
||||
int i;
|
||||
uint i;
|
||||
for (i= 0; i < queue.elements; i++)
|
||||
{
|
||||
((Event_timed*)queue_element(&queue, i))->compute_next_execution_time();
|
||||
@ -848,13 +832,118 @@ Event_queue::recalculate_queue(THD *thd)
|
||||
void
|
||||
Event_queue::empty_queue()
|
||||
{
|
||||
int i;
|
||||
uint i;
|
||||
/* empty the queue */
|
||||
for (i= 0; i < events_count_no_lock(); ++i)
|
||||
{
|
||||
Event_timed *et= (Event_timed *) queue_element(&queue, i);
|
||||
et->free_sp();
|
||||
delete et;
|
||||
}
|
||||
resize_queue(&queue, 0);
|
||||
}
|
||||
|
||||
|
||||
Event_timed*
|
||||
Event_queue::get_top()
|
||||
{
|
||||
return (Event_timed *)queue_top(&queue);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Event_queue::remove_top()
|
||||
{
|
||||
queue_remove(&queue, 0);// 0 is top, internally 1
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Event_queue::top_changed()
|
||||
{
|
||||
queue_replaced(&queue);
|
||||
}
|
||||
|
||||
|
||||
Event_timed *
|
||||
Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
|
||||
struct timespec *abstime)
|
||||
{
|
||||
struct timespec top_time;
|
||||
Event_timed *et_new= NULL;
|
||||
DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
|
||||
DBUG_PRINT("enter", ("thd=%p now=%d", thd, now));
|
||||
abstime->tv_nsec= 0;
|
||||
LOCK_QUEUE_DATA();
|
||||
do {
|
||||
int res;
|
||||
Event_timed *et= NULL;
|
||||
if (!queue.elements)
|
||||
{
|
||||
abstime->tv_sec= 0;
|
||||
break;
|
||||
}
|
||||
int i;
|
||||
DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
|
||||
for (i = 0; i < queue.elements; i++)
|
||||
{
|
||||
et= ((Event_timed*)queue_element(&queue, i));
|
||||
DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str));
|
||||
DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu "
|
||||
" expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
|
||||
TIME_to_ulonglong_datetime(&et->execute_at),
|
||||
TIME_to_ulonglong_datetime(&et->starts),
|
||||
TIME_to_ulonglong_datetime(&et->ends),
|
||||
et->expression, sec_since_epoch_TIME(&et->execute_at), now,
|
||||
(int)(sec_since_epoch_TIME(&et->execute_at) - now),
|
||||
sec_since_epoch_TIME(&et->execute_at) <= now));
|
||||
}
|
||||
et= ((Event_timed*)queue_element(&queue, 0));
|
||||
top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at);
|
||||
|
||||
if (top_time.tv_sec <= now)
|
||||
{
|
||||
DBUG_PRINT("info", ("Ready for execution"));
|
||||
abstime->tv_sec= 0;
|
||||
if ((res= db_repository->load_named_event(thd, et->dbname, et->name,
|
||||
&et_new)))
|
||||
{
|
||||
DBUG_ASSERT(0);
|
||||
break;
|
||||
}
|
||||
|
||||
et->mark_last_executed(thd);
|
||||
if (et->compute_next_execution_time())
|
||||
et->status= Event_timed::DISABLED;
|
||||
DBUG_PRINT("info", ("event's status is %d", et->status));
|
||||
|
||||
et->update_fields(thd);
|
||||
if (((et->execute_at.year && !et->expression) || et->execute_at_null) ||
|
||||
(et->status == Event_timed::DISABLED))
|
||||
{
|
||||
DBUG_PRINT("info", ("removing from the queue"));
|
||||
if (et->dropped)
|
||||
et->drop(thd);
|
||||
delete et;
|
||||
queue_remove(&queue, 0);
|
||||
}
|
||||
else
|
||||
queue_replaced(&queue);
|
||||
}
|
||||
else
|
||||
{
|
||||
abstime->tv_sec= top_time.tv_sec;
|
||||
DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now,
|
||||
abstime->tv_sec));
|
||||
}
|
||||
} while (0);
|
||||
UNLOCK_QUEUE_DATA();
|
||||
|
||||
DBUG_PRINT("info", ("returning. et_new=%p abstime.tv_sec=%d ", et_new,
|
||||
abstime->tv_sec));
|
||||
if (et_new)
|
||||
DBUG_PRINT("info", ("db=%s name=%s definer=%s "
|
||||
"et_new.execute_at=%lld", et_new->dbname.str, et_new->name.str,
|
||||
et_new->definer.str,
|
||||
TIME_to_ulonglong_datetime(&et_new->execute_at)));
|
||||
DBUG_RETURN(et_new);
|
||||
}
|
||||
|
Reference in New Issue
Block a user