mirror of
https://github.com/MariaDB/server.git
synced 2025-08-01 03:47:19 +03:00
WL#3337 (Event scheduler new architecture)
This patch makes the relationship between Event_scheduler and Event_queue unidirectional from the former to the latter. The change is that the conditional on which the scheduler sleeped has been moved to the Event_queue and the latter does not call anymore Event_scheduler::queue_changed(), which in turn has be removed. sql/event_queue.cc: Remove dependency of Event_queue on Event_scheduler but not vice versa. Event_scheduler polls whether there is time to execute an event. Removed notify_observers() as the way of calling has changed. Added Event_queue::cond_wait() similar to Event_scheduler::cond_wait(). sql/event_queue.h: init_queue() does not need anymore Event_scheduler object because the relationship is now one-way. Event_scheduler knows about Event_queue but not vice versa. get_top_execution_if_time() does by itself the waiting instead of returning abstime. This simplifies the code in Event_scheduler::run() get_top_execution_if_time() returns only if job_data != NULL or if the scheduler thread was killed. notify_observers() is no more used and therefore removed. Added Event_queue::cond_wait() because now there is waiting on a conditional variable in Event_queue too (like in Event_scheduler for ::stop()). sql/event_scheduler.cc: Change the relationship between Event_scheduler & Event_queue. Event_queue does not know anymore about Event_scheduler. When the scheduler calls get_top_element_if_time() it may fall asleep on a conditional of Event_queue, if either the queue is empty or it's still not time for activation. When the method returns it will return a non-null address, namely an object to be executed. If the return value is NULL, the thread was killed by a call to Event_scheduler::stop() (we assert this). sql/event_scheduler.h: Remove queue_changed() as it is obsoleted by making the relationship between Event_scheduler and Event_queue one-way, from the former to the latter. Event_queue now does not know about Event_scheduler. get_state() is changed to is_running(). The state enum should be private, as it is not needed to be seen from outside anymore. sql/events.cc: Event_queue does not need anymore a pointer to Event_scheduler.
This commit is contained in:
@ -18,7 +18,6 @@
|
||||
#include "event_queue.h"
|
||||
#include "event_data_objects.h"
|
||||
#include "event_db_repository.h"
|
||||
#include "event_scheduler.h"
|
||||
|
||||
|
||||
#define EVENT_QUEUE_INITIAL_SIZE 30
|
||||
@ -87,6 +86,7 @@ Event_queue::Event_queue()
|
||||
{
|
||||
mutex_last_unlocked_in_func= mutex_last_locked_in_func=
|
||||
mutex_last_attempted_lock_in_func= "";
|
||||
set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME);
|
||||
}
|
||||
|
||||
|
||||
@ -135,8 +135,7 @@ Event_queue::deinit_mutexes()
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_queue::init_queue(THD *thd, Event_db_repository *db_repo,
|
||||
Event_scheduler *sched)
|
||||
Event_queue::init_queue(THD *thd, Event_db_repository *db_repo)
|
||||
{
|
||||
pthread_t th;
|
||||
bool res;
|
||||
@ -147,7 +146,6 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo,
|
||||
|
||||
LOCK_QUEUE_DATA();
|
||||
db_repository= db_repo;
|
||||
scheduler= sched;
|
||||
|
||||
if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/,
|
||||
0 /*max_on_top*/, event_queue_element_compare_q,
|
||||
@ -233,9 +231,8 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
|
||||
DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
|
||||
queue_insert_safe(&queue, (byte *) new_element);
|
||||
dbug_dump_queue(thd->query_start());
|
||||
pthread_cond_broadcast(&COND_queue_state);
|
||||
UNLOCK_QUEUE_DATA();
|
||||
|
||||
notify_observers();
|
||||
}
|
||||
|
||||
DBUG_RETURN(res);
|
||||
@ -298,13 +295,12 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
|
||||
{
|
||||
DBUG_PRINT("info", ("new event in the Q 0x%lx", new_element));
|
||||
queue_insert_safe(&queue, (byte *) new_element);
|
||||
pthread_cond_broadcast(&COND_queue_state);
|
||||
}
|
||||
|
||||
dbug_dump_queue(thd->query_start());
|
||||
UNLOCK_QUEUE_DATA();
|
||||
|
||||
if (new_element)
|
||||
notify_observers();
|
||||
|
||||
end:
|
||||
DBUG_PRINT("info", ("res=%d", res));
|
||||
DBUG_RETURN(res);
|
||||
@ -386,7 +382,8 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
|
||||
i++;
|
||||
}
|
||||
/*
|
||||
We don't call notify_observers() . If we remove the top event:
|
||||
We don't call pthread_cond_broadcast(&COND_queue_state);
|
||||
If we remove the top event:
|
||||
1. The queue is empty. The scheduler will wake up at some time and
|
||||
realize that the queue is empty. If create_event() comes inbetween
|
||||
it will signal the scheduler
|
||||
@ -421,24 +418,6 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Signals the observers (the main scheduler thread) that the
|
||||
state of the queue has been changed.
|
||||
|
||||
SYNOPSIS
|
||||
Event_queue::notify_observers()
|
||||
*/
|
||||
|
||||
void
|
||||
Event_queue::notify_observers()
|
||||
{
|
||||
DBUG_ENTER("Event_queue::notify_observers");
|
||||
DBUG_PRINT("info", ("Signalling change of the queue"));
|
||||
scheduler->queue_changed();
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Searches for an event in the queue
|
||||
|
||||
@ -701,6 +680,8 @@ Event_queue::dbug_dump_queue(time_t now)
|
||||
#endif
|
||||
}
|
||||
|
||||
static const char *queue_empty_msg= "Waiting on empty queue";
|
||||
static const char *queue_wait_msg= "Waiting for next activation";
|
||||
|
||||
/*
|
||||
Checks whether the top of the queue is elligible for execution and
|
||||
@ -725,39 +706,62 @@ Event_queue::dbug_dump_queue(time_t now)
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
|
||||
Event_job_data **job_data,
|
||||
struct timespec *abstime)
|
||||
Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
|
||||
{
|
||||
bool ret= FALSE;
|
||||
struct timespec top_time;
|
||||
struct timespec *abstime;
|
||||
*job_data= NULL;
|
||||
DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
|
||||
DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now));
|
||||
abstime->tv_nsec= 0;
|
||||
|
||||
top_time.tv_nsec= 0;
|
||||
LOCK_QUEUE_DATA();
|
||||
do {
|
||||
for (;;)
|
||||
{
|
||||
int res;
|
||||
if (!queue.elements)
|
||||
Event_queue_element *top= NULL;
|
||||
|
||||
thd->end_time();
|
||||
time_t now= thd->query_start();
|
||||
abstime= NULL;
|
||||
|
||||
if (queue.elements)
|
||||
{
|
||||
abstime->tv_sec= 0;
|
||||
break;
|
||||
top= ((Event_queue_element*) queue_element(&queue, 0));
|
||||
top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at);
|
||||
|
||||
abstime= &top_time;
|
||||
}
|
||||
|
||||
Event_queue_element *top= ((Event_queue_element*) queue_element(&queue, 0));
|
||||
|
||||
top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at);
|
||||
|
||||
if (top_time.tv_sec > now)
|
||||
if (!abstime || abstime->tv_sec > now)
|
||||
{
|
||||
abstime->tv_sec= top_time.tv_sec;
|
||||
DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now,
|
||||
abstime->tv_sec));
|
||||
break;
|
||||
const char *msg;
|
||||
if (abstime)
|
||||
{
|
||||
next_activation_at= top->execute_at;
|
||||
msg= queue_wait_msg;
|
||||
}
|
||||
else
|
||||
{
|
||||
set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME);
|
||||
msg= queue_wait_msg;
|
||||
}
|
||||
|
||||
cond_wait(thd, abstime, msg, SCHED_FUNC, __LINE__);
|
||||
if (thd->killed)
|
||||
{
|
||||
DBUG_PRINT("info", ("thd->killed=%d", thd->killed));
|
||||
goto end;
|
||||
}
|
||||
/*
|
||||
The queue could have been emptied. Therefore it's safe to start from
|
||||
the beginning. Moreover, this way we will get also the new top, if
|
||||
the element at the top has been changed.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
|
||||
DBUG_PRINT("info", ("Ready for execution"));
|
||||
abstime->tv_sec= 0;
|
||||
if (!(*job_data= new Event_job_data()))
|
||||
{
|
||||
ret= TRUE;
|
||||
@ -766,6 +770,7 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
|
||||
if ((res= db_repository->load_named_event(thd, top->dbname, top->name,
|
||||
*job_data)))
|
||||
{
|
||||
DBUG_PRINT("error", ("Got %d from load_named_event", res));
|
||||
delete *job_data;
|
||||
*job_data= NULL;
|
||||
ret= TRUE;
|
||||
@ -796,11 +801,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
|
||||
queue_replaced(&queue);
|
||||
|
||||
dbug_dump_queue(now);
|
||||
} while (0);
|
||||
break;
|
||||
}
|
||||
end:
|
||||
UNLOCK_QUEUE_DATA();
|
||||
|
||||
DBUG_PRINT("info", ("returning %d. et_new=0x%lx abstime.tv_sec=%d ",
|
||||
ret, *job_data, abstime->tv_sec));
|
||||
ret, *job_data, abstime? abstime->tv_sec:0));
|
||||
|
||||
if (*job_data)
|
||||
DBUG_PRINT("info", ("db=%s name=%s definer=%s", (*job_data)->dbname.str,
|
||||
@ -864,6 +871,52 @@ Event_queue::unlock_data(const char *func, uint line)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Wrapper for pthread_cond_wait/timedwait
|
||||
|
||||
SYNOPSIS
|
||||
Event_queue::cond_wait()
|
||||
thd Thread (Could be NULL during shutdown procedure)
|
||||
msg Message for thd->proc_info
|
||||
abstime If not null then call pthread_cond_timedwait()
|
||||
func Which function is requesting cond_wait
|
||||
line On which line cond_wait is requested
|
||||
*/
|
||||
|
||||
void
|
||||
Event_queue::cond_wait(THD *thd, struct timespec *abstime, const char* msg,
|
||||
const char *func, uint line)
|
||||
{
|
||||
DBUG_ENTER("Event_queue::cond_wait");
|
||||
waiting_on_cond= TRUE;
|
||||
mutex_last_unlocked_at_line= line;
|
||||
mutex_queue_data_locked= FALSE;
|
||||
mutex_last_unlocked_in_func= func;
|
||||
|
||||
thd->enter_cond(&COND_queue_state, &LOCK_event_queue, msg);
|
||||
|
||||
DBUG_PRINT("info", ("pthread_cond_%swait", abstime? "timed":""));
|
||||
if (!abstime)
|
||||
pthread_cond_wait(&COND_queue_state, &LOCK_event_queue);
|
||||
else
|
||||
pthread_cond_timedwait(&COND_queue_state, &LOCK_event_queue, abstime);
|
||||
|
||||
mutex_last_locked_in_func= func;
|
||||
mutex_last_locked_at_line= line;
|
||||
mutex_queue_data_locked= TRUE;
|
||||
waiting_on_cond= FALSE;
|
||||
|
||||
/*
|
||||
This will free the lock so we need to relock. Not the best thing to
|
||||
do but we need to obey cond_wait()
|
||||
*/
|
||||
thd->exit_cond("");
|
||||
LOCK_QUEUE_DATA();
|
||||
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Dumps the internal status of the queue
|
||||
|
||||
@ -943,6 +996,28 @@ Event_queue::dump_internal_status(THD *thd)
|
||||
protocol->store(&tmp_string);
|
||||
ret= protocol->write();
|
||||
|
||||
/* waiting on */
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("queue waiting on condition"), scs);
|
||||
int_string.set((longlong) waiting_on_cond, scs);
|
||||
protocol->store(&int_string);
|
||||
ret= protocol->write();
|
||||
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("next activation at"), scs);
|
||||
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
|
||||
tmp_string.alloced_length(),
|
||||
"%4d-%02d-%02d %02d:%02d:%02d",
|
||||
next_activation_at.year,
|
||||
next_activation_at.month,
|
||||
next_activation_at.day,
|
||||
next_activation_at.hour,
|
||||
next_activation_at.minute,
|
||||
next_activation_at.second
|
||||
));
|
||||
protocol->store(&tmp_string);
|
||||
ret= protocol->write();
|
||||
|
||||
#endif
|
||||
DBUG_RETURN(FALSE);
|
||||
}
|
||||
|
Reference in New Issue
Block a user