1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

WL #3337 (Events new architecture)

Final stroke, events should be loaded from disk on server startup.
Also check the validity of their bodies if possible during loading.
This commit is contained in:
andrey@lmy004.
2006-07-13 10:59:58 +02:00
parent 3f4e1f5c69
commit dd31d45789
22 changed files with 317 additions and 163 deletions

View File

@ -35,6 +35,14 @@
#define LOCK_QUEUE_DATA() lock_data(SCHED_FUNC, __LINE__)
#define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__)
struct event_queue_param
{
THD *thd;
Event_queue *queue;
pthread_mutex_t LOCK_loaded;
pthread_cond_t COND_loaded;
};
/*
Compares the execute_at members of two Event_queue_element instances.
@ -64,6 +72,31 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b)
}
pthread_handler_t
event_queue_loader_thread(void *arg)
{
/* needs to be first for thread_stack */
THD *thd= (THD *)((struct event_queue_param *) arg)->thd;
struct event_queue_param *param= (struct event_queue_param *) arg;
thd->thread_stack= (char *) &thd;
if (post_init_event_thread(thd))
goto end;
DBUG_ENTER("event_queue_loader_thread");
pthread_mutex_lock(&param->LOCK_loaded);
param->queue->load_events_from_db(thd);
pthread_cond_signal(&param->COND_loaded);
pthread_mutex_unlock(&param->LOCK_loaded);
end:
deinit_event_thread(thd);
DBUG_RETURN(0); // Against gcc warnings
}
/*
Constructor of class Event_queue.
@ -80,6 +113,8 @@ Event_queue::Event_queue()
mutex_last_attempted_lock_in_func= "";
mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE;
queue_loaded= FALSE;
}
@ -125,8 +160,11 @@ Event_queue::deinit_mutexes()
bool
Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
{
int i= 0;
bool ret= FALSE;
THD *new_thd;
pthread_t th;
bool res;
struct event_queue_param *event_queue_param_value= NULL;
DBUG_ENTER("Event_queue::init_queue");
DBUG_PRINT("enter", ("this=0x%lx", this));
@ -139,8 +177,7 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
NULL, EVENT_QUEUE_EXTENT))
{
sql_print_error("SCHEDULER: Can't initialize the execution queue");
ret= TRUE;
goto end;
goto err;
}
if (sizeof(my_time_t) != sizeof(time_t))
@ -148,13 +185,43 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
"The scheduler may not work correctly. Stopping.");
DBUG_ASSERT(0);
ret= TRUE;
goto end;
goto err;
}
end:
if (!(new_thd= new THD))
goto err;
pre_init_event_thread(new_thd);
event_queue_param_value= (struct event_queue_param *)
my_malloc(sizeof(struct event_queue_param), MYF(0));
event_queue_param_value->thd= new_thd;
event_queue_param_value->queue= this;
pthread_mutex_init(&event_queue_param_value->LOCK_loaded, MY_MUTEX_INIT_FAST);
pthread_cond_init(&event_queue_param_value->COND_loaded, NULL);
pthread_mutex_lock(&event_queue_param_value->LOCK_loaded);
DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd));
if (!(res= pthread_create(&th, &connection_attrib, event_queue_loader_thread,
(void*)event_queue_param_value)))
{
do {
pthread_cond_wait(&event_queue_param_value->COND_loaded,
&event_queue_param_value->LOCK_loaded);
} while (queue_loaded == FALSE);
}
pthread_mutex_unlock(&event_queue_param_value->LOCK_loaded);
pthread_mutex_destroy(&event_queue_param_value->LOCK_loaded);
pthread_cond_destroy(&event_queue_param_value->COND_loaded);
my_free((char *)event_queue_param_value, MYF(0));
UNLOCK_QUEUE_DATA();
DBUG_RETURN(ret);
DBUG_RETURN(res);
err:
UNLOCK_QUEUE_DATA();
DBUG_RETURN(TRUE);
}
@ -498,7 +565,7 @@ Event_queue::load_events_from_db(THD *thd)
READ_RECORD read_record_info;
int ret= -1;
uint count= 0;
bool clean_the_queue= FALSE;
bool clean_the_queue= TRUE;
/* Compile the events on this root but only for syntax check, then discard */
MEM_ROOT boot_root;
@ -518,14 +585,12 @@ Event_queue::load_events_from_db(THD *thd)
if (!(et= new Event_queue_element))
{
DBUG_PRINT("info", ("Out of memory"));
clean_the_queue= TRUE;
break;
}
DBUG_PRINT("info", ("Loading event from row."));
if ((ret= et->load_from_row(table)))
{
clean_the_queue= TRUE;
sql_print_error("SCHEDULER: Error while loading from mysql.event. "
"Table probably corrupted");
break;
@ -536,27 +601,6 @@ Event_queue::load_events_from_db(THD *thd)
delete et;
continue;
}
#if 0
init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
/* We load only on scheduler root just to check whether the body compiles */
switch (ret= et->compile(thd, &boot_root)) {
case EVEX_MICROSECOND_UNSUP:
et->free_sp();
sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
"supported but found in mysql.event");
goto end;
case EVEX_COMPILE_ERROR:
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
et->dbname.str, et->name.str);
goto end;
default:
/* Free it, it will be compiled again on the worker thread */
et->free_sp();
break;
}
free_root(&boot_root, MYF(0));
/* let's find when to be executed */
if (et->compute_next_execution_time())
@ -565,11 +609,40 @@ Event_queue::load_events_from_db(THD *thd)
" Skipping", et->dbname.str, et->name.str);
continue;
}
#endif
{
Event_job_data temp_job_data;
DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
temp_job_data.load_from_row(table);
/* We load only on scheduler root just to check whether the body compiles */
switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
case EVEX_MICROSECOND_UNSUP:
sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
"supported but found in mysql.event");
break;
case EVEX_COMPILE_ERROR:
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
et->dbname.str, et->name.str);
break;
default:
break;
}
thd->end_statement();
thd->cleanup_after_query();
}
if (ret)
{
delete et;
goto end;
}
DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list."));
queue_insert_safe(&queue, (byte *) et);
count++;
}
clean_the_queue= FALSE;
end:
end_read_record(&read_record_info);
@ -585,10 +658,12 @@ end:
}
/* Force close to free memory */
thd->version--;
thd->version--;
close_thread_tables(thd);
queue_loaded= TRUE;
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
DBUG_RETURN(ret);
}
@ -713,6 +788,7 @@ Event_queue::empty_queue()
uint i;
DBUG_ENTER("Event_queue::empty_queue");
DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements));
sql_print_information("SCHEDULER: Purging queue. %u events", queue.elements);
/* empty the queue */
for (i= 0; i < queue.elements; ++i)
{