1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00
mysql-test/r/events.result:
  Auto merged
mysql-test/t/events.test:
  Auto merged
sql/event_priv.h:
  Auto merged
sql/sql_db.cc:
  Auto merged
sql/event.cc:
  manual merge
sql/event.h:
  manual merge
sql/event_executor.cc:
  manual merge
sql/event_timed.cc:
  manual merge
This commit is contained in:
unknown
2006-02-16 01:27:36 +01:00
9 changed files with 704 additions and 128 deletions

View File

@ -18,6 +18,11 @@
#include "event.h"
#include "sp.h"
#define WAIT_STATUS_READY 0
#define WAIT_STATUS_EMPTY_QUEUE 1
#define WAIT_STATUS_NEW_TOP_EVENT 2
#define WAIT_STATUS_STOP_EXECUTOR 3
/*
Make this define DBUG_FAULTY_THR to be able to put breakpoints inside
@ -294,6 +299,85 @@ init_event_thread(THD* thd)
}
/*
This function waits till the time next event in the queue should be
executed.
Returns
WAIT_STATUS_READY There is an event to be executed right now
WAIT_STATUS_EMPTY_QUEUE No events or the last event was dropped.
WAIT_STATUS_NEW_TOP_EVENT New event has entered the queue and scheduled
on top. Restart ticking.
WAIT_STATUS_STOP_EXECUTOR The thread was killed or SET global event_scheduler=0;
*/
static int
executor_wait_till_next_event_exec(THD *thd)
{
event_timed *et;
TIME time_now;
int t2sleep;
DBUG_ENTER("executor_wait_till_next_event_exec");
/*
now let's see how much time to sleep, we know there is at least 1
element in the queue.
*/
VOID(pthread_mutex_lock(&LOCK_event_arrays));
if (!evex_queue_num_elements(EVEX_EQ_NAME))
{
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
DBUG_RETURN(1);
}
et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*);
DBUG_ASSERT(et);
if (et->status == MYSQL_EVENT_DISABLED)
{
DBUG_PRINT("evex main thread",("Now it is disabled-exec no more"));
if (et->dropped)
et->drop(thd);
delete et;
evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
sql_print_information("Event found disabled, dropping.");
DBUG_RETURN(1);
}
DBUG_PRINT("evex main thread",("computing time to sleep till next exec"));
// set the internal clock of thd
thd->end_time();
my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start());
t2sleep= evex_time_diff(&et->execute_at, &time_now);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
DBUG_PRINT("evex main thread",("unlocked LOCK_event_arrays"));
if (t2sleep > 0)
{
/*
We sleep t2sleep seconds but we check every second whether this thread
has been killed, or there is a new candidate
*/
while (t2sleep-- && !thd->killed && event_executor_running_global_var &&
evex_queue_num_elements(EVEX_EQ_NAME) &&
(evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et))
{
DBUG_PRINT("evex main thread",("will sleep a bit more"));
my_sleep(1000000);
}
}
int ret= 0;
if (!evex_queue_num_elements(EVEX_EQ_NAME))
ret= 1;
else if (evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) != et)
ret= 2;
if (thd->killed && event_executor_running_global_var)
ret= 3;
DBUG_RETURN(ret);
}
/*
The main scheduler thread. Inits the priority queue on start and
destroys it on thread shutdown. Forks child threads for every event
@ -313,9 +397,9 @@ pthread_handler_t
event_executor_main(void *arg)
{
THD *thd; /* needs to be first for thread_stack */
ulonglong iter_num= 0;
uint i=0, j=0;
my_ulonglong cnt= 0;
TIME time_now;
DBUG_ENTER("event_executor_main");
DBUG_PRINT("event_executor_main", ("EVEX thread started"));
@ -330,15 +414,16 @@ event_executor_main(void *arg)
if (sizeof(my_time_t) != sizeof(time_t))
{
sql_print_error("sizeof(my_time_t) != sizeof(time_t) ."
sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
"The scheduler will not work correctly. Stopping.");
DBUG_ASSERT(0);
goto err_no_thd;
}
//TODO Andrey: Check for NULL
if (!(thd = new THD)) // note that contructor of THD uses DBUG_ !
{
sql_print_error("Cannot create THD for event_executor_main");
sql_print_error("SCHEDULER: Cannot create THD for the main thread.");
goto err_no_thd;
}
thd->thread_stack = (char*)&thd; // remember where our stack is
@ -346,7 +431,7 @@ event_executor_main(void *arg)
pthread_detach_this_thread();
if (init_event_thread(thd))
goto err;
goto finish;
/*
make this thread visible it has no vio -> show processlist won't see it
@ -360,7 +445,7 @@ event_executor_main(void *arg)
thread_running++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
DBUG_PRINT("EVEX main thread", ("Initing events_queuey"));
DBUG_PRINT("EVEX main thread", ("Initing events_queue"));
/*
eventually manifest that we are running, not to crashe because of
@ -376,15 +461,14 @@ event_executor_main(void *arg)
thd->security_ctx->user= my_strdup("event_scheduler", MYF(0));
if (evex_load_events_from_db(thd))
goto err;
goto finish;
evex_main_thread_id= thd->thread_id;
sql_print_information("Scheduler thread started");
sql_print_information("SCHEDULER: Main thread started");
while (!thd->killed)
{
TIME time_now;
my_time_t now;
event_timed *et;
cnt++;
@ -393,7 +477,7 @@ event_executor_main(void *arg)
thd->proc_info = "Sleeping";
if (!event_executor_running_global_var)
{
sql_print_information("Scheduler asked to stop.");
sql_print_information("SCHEDULER: Asked to stop.");
break;
}
@ -402,62 +486,31 @@ event_executor_main(void *arg)
my_sleep(1000000);// sleep 1s
continue;
}
{
int t2sleep;
/*
now let's see how much time to sleep, we know there is at least 1
element in the queue.
*/
VOID(pthread_mutex_lock(&LOCK_event_arrays));
if (!evex_queue_num_elements(EVEX_EQ_NAME))
{
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
continue;
}
et= evex_queue_first_element(&EVEX_EQ_NAME, event_timed*);
if (et->status == MYSQL_EVENT_DISABLED)
{
DBUG_PRINT("evex main thread",("Now it is disabled-exec no more"));
if (et->dropped)
et->drop(thd);
delete et;
evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
sql_print_information("Event found disabled, dropping.");
continue;
}
DBUG_PRINT("evex main thread",("computing time to sleep till next exec"));
time((time_t *)&now);
my_tz_UTC->gmt_sec_to_TIME(&time_now, now);
t2sleep= evex_time_diff(&et->execute_at, &time_now);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
DBUG_PRINT("evex main thread",("unlocked LOCK_event_arrays"));
if (t2sleep > 0)
{
/*
We sleep t2sleep seconds but we check every second whether this thread
has been killed, or there is a new candidate
*/
while (t2sleep-- && !thd->killed && event_executor_running_global_var &&
evex_queue_num_elements(EVEX_EQ_NAME) &&
(evex_queue_first_element(&EVEX_EQ_NAME, event_timed*) == et))
{
DBUG_PRINT("evex main thread",("will sleep a bit more"));
my_sleep(1000000);
}
}
if (!event_executor_running_global_var)
{
sql_print_information("Scheduler asked to stop.");
break;
}
restart_ticking:
switch (executor_wait_till_next_event_exec(thd)) {
case WAIT_STATUS_READY: // time to execute the event on top
DBUG_PRINT("evex main thread",("time to execute an event"));
break;
case WAIT_STATUS_EMPTY_QUEUE: // no more events
DBUG_PRINT("evex main thread",("no more events"));
continue;
break;
case WAIT_STATUS_NEW_TOP_EVENT: // new event on top in the queue
DBUG_PRINT("evex main thread",("restart ticking"));
goto restart_ticking;
case WAIT_STATUS_STOP_EXECUTOR:
sql_print_information("SCHEDULER: Asked to stop.");
goto finish;
break;
default:
DBUG_ASSERT(0);
}
VOID(pthread_mutex_lock(&LOCK_event_arrays));
thd->end_time();
my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start());
if (!evex_queue_num_elements(EVEX_EQ_NAME))
{
@ -479,14 +532,14 @@ event_executor_main(void *arg)
DBUG_PRINT("evex main thread",("it's right time"));
if (et->status == MYSQL_EVENT_ENABLED)
{
pthread_t th;
int fork_ret_code;
DBUG_PRINT("evex main thread", ("[%10s] this exec at [%llu]", et->name.str,
TIME_to_ulonglong_datetime(&et->execute_at)));
et->mark_last_executed(thd);
if (et->compute_next_execution_time())
{
sql_print_error("Error while computing time of %s.%s . "
sql_print_error("SCHEDULER: Error while computing time of %s.%s . "
"Disabling after execution.",
et->dbname.str, et->name.str);
et->status= MYSQL_EVENT_DISABLED;
@ -495,13 +548,23 @@ event_executor_main(void *arg)
TIME_to_ulonglong_datetime(&et->execute_at)));
et->update_fields(thd);
++iter_num;
DBUG_PRINT("info", (" Spawning a thread %d", iter_num));
#ifndef DBUG_FAULTY_THR
if (pthread_create(&th,&connection_attrib,event_executor_worker,(void*)et))
{
sql_print_error("Problem while trying to create a thread");
UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, err);
thread_safe_increment(workers_count, &LOCK_workers_count);
switch ((fork_ret_code= et->spawn_now(event_executor_worker))) {
case EVENT_EXEC_CANT_FORK:
thread_safe_decrement(workers_count, &LOCK_workers_count);
sql_print_error("SCHEDULER: Problem while trying to create a thread");
UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, finish);
case EVENT_EXEC_ALREADY_EXEC:
thread_safe_decrement(workers_count, &LOCK_workers_count);
sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.",
et->dbname.str, et->name.str);
break;
default:
DBUG_ASSERT(!fork_ret_code);
if (fork_ret_code)
thread_safe_decrement(workers_count, &LOCK_workers_count);
break;
}
#else
event_executor_worker((void *) et);
@ -511,22 +574,21 @@ event_executor_main(void *arg)
et->flags |= EVENT_EXEC_NO_MORE;
if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED)
evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
evex_queue_delete_element(&EVEX_EQ_NAME, 0);// 0 is top, internally 1
else
evex_queue_first_updated(&EVEX_EQ_NAME);
}
DBUG_PRINT("evex main thread",("unlocking"));
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
}// while
finish:
err:
// First manifest that this thread does not work and then destroy
VOID(pthread_mutex_lock(&LOCK_evex_running));
evex_is_running= false;
evex_main_thread_id= 0;
VOID(pthread_mutex_unlock(&LOCK_evex_running));
sql_print_information("Event scheduler stopping. Waiting for worker threads to finish.");
/*
TODO: A better will be with a conditional variable
@ -535,21 +597,33 @@ err:
Read workers_count without lock, no need for locking.
In the worst case we have to wait 1sec more.
*/
while (workers_count)
my_sleep(1000000);// 1s
sql_print_information("SCHEDULER: Stopping. Waiting for worker threads to finish.");
while (1)
{
VOID(pthread_mutex_lock(&LOCK_workers_count));
if (!workers_count)
{
VOID(pthread_mutex_unlock(&LOCK_workers_count));
break;
}
VOID(pthread_mutex_unlock(&LOCK_workers_count));
my_sleep(1000000);// 1s
}
/*
LEX_STRINGs reside in the memory root and will be destroyed with it.
Hence no need of delete but only freeing of SP
First we free all objects ...
Lock because a DROP DATABASE could be running in parallel and it locks on these
*/
// First we free all objects ...
sql_print_information("SCHEDULER: Emptying the queue.");
VOID(pthread_mutex_lock(&LOCK_event_arrays));
for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i)
{
event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, event_timed*);
et->free_sp();
delete et;
}
// ... then we can thras the whole queue at once
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
// ... then we can thrash the whole queue at once
evex_queue_destroy(&EVEX_EQ_NAME);
thd->proc_info = "Clearing";
@ -573,7 +647,7 @@ err_no_thd:
VOID(pthread_mutex_unlock(&LOCK_evex_running));
free_root(&evex_mem_root, MYF(0));
sql_print_information("Event scheduler stopped.");
sql_print_information("SCHEDULER: Stopped.");
#ifndef DBUG_FAULTY_THR
my_thread_end();
@ -600,9 +674,6 @@ event_executor_worker(void *event_void)
MEM_ROOT worker_mem_root;
DBUG_ENTER("event_executor_worker");
VOID(pthread_mutex_lock(&LOCK_workers_count));
++workers_count;
VOID(pthread_mutex_unlock(&LOCK_workers_count));
init_alloc_root(&worker_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
@ -611,7 +682,7 @@ event_executor_worker(void *event_void)
if (!(thd = new THD)) // note that contructor of THD uses DBUG_ !
{
sql_print_error("Cannot create a THD structure in a scheduler worker thread");
sql_print_error("SCHEDULER: Cannot create a THD structure in an worker.");
goto err_no_thd;
}
thd->thread_stack = (char*)&thd; // remember where our stack is
@ -653,14 +724,7 @@ event_executor_worker(void *event_void)
event->dbname.str, event->name.str,
event->definer.str);
}
if ((event->flags & EVENT_EXEC_NO_MORE) || event->status==MYSQL_EVENT_DISABLED)
{
DBUG_PRINT("event_executor_worker",
("%s exec no more. to drop=%d",event->name.str, event->dropped));
if (event->dropped)
event->drop(thd);
delete event;
}
event->spawn_thread_finish(thd);
err:
@ -689,10 +753,7 @@ err:
err_no_thd:
free_root(&worker_mem_root, MYF(0));
VOID(pthread_mutex_lock(&LOCK_workers_count));
--workers_count;
VOID(pthread_mutex_unlock(&LOCK_workers_count));
thread_safe_decrement(workers_count, &LOCK_workers_count);
#ifndef DBUG_FAULTY_THR
my_thread_end();
@ -733,7 +794,7 @@ evex_load_events_from_db(THD *thd)
if ((ret= evex_open_event_table(thd, TL_READ, &table)))
{
sql_print_error("Table mysql.event is damaged.");
sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open.");
DBUG_RETURN(SP_OPEN_TABLE_FAILED);
}
@ -753,7 +814,7 @@ evex_load_events_from_db(THD *thd)
if ((ret= et->load_from_row(&evex_mem_root, table)))
{
sql_print_error("Error while loading from mysql.event. "
sql_print_error("SCHEDULER: Error while loading from mysql.event. "
"Table probably corrupted");
goto end;
}
@ -769,7 +830,7 @@ evex_load_events_from_db(THD *thd)
if ((ret= et->compile(thd, &evex_mem_root)))
{
sql_print_error("Error while compiling %s.%s. Aborting load.",
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
et->dbname.str, et->name.str);
goto end;
}
@ -777,8 +838,8 @@ evex_load_events_from_db(THD *thd)
// let's find when to be executed
if (et->compute_next_execution_time())
{
sql_print_error("Error while computing execution time of %s.%s. Skipping",
et->dbname.str, et->name.str);
sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
" Skipping", et->dbname.str, et->name.str);
continue;
}
@ -799,7 +860,7 @@ end:
thd->version--; // Force close to free memory
close_thread_tables(thd);
sql_print_information("Scheduler loaded %d event%s", count, (count == 1)?"":"s");
sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s");
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
DBUG_RETURN(ret);