mirror of
https://github.com/MariaDB/server.git
synced 2026-01-06 05:22:24 +03:00
WL #3337 (Events new architecture)
Final stroke, events should be loaded from disk on server startup. Also check the validity of their bodies if possible during loading. sql/event_data_objects.cc: Remove Event_job_data::free_sp(), move the code to the destructor Change the way we change the security context Steal some code from sql_parse.cc sql/event_data_objects.h: Remove free_sp() Make compile() public, to be used when booting for verifying the integrity of mysql.event sql/event_queue.cc: Make the queue load events from disk on server boot. Compile and thus check for integrity the events. sql/event_queue.h: shift methods around. add queue_loaded boolean. sql/event_scheduler.cc: Rename init_event_thread() to pre_init_event_thread() and make it more generic. Add post_init_event_thread() Export these two as well as deinit_event_thread(). Now it is quite easy to write code to spawn a new event thread whenever needed. sql/event_scheduler.h: export pre_init_event_thread(), post_init_event_thread() and deinit_event_thread() to simplify writing of thread functions. sql/events.cc: Events::init() returns only one error code, then make it bool sql/events.h: Events::init() returns only one error code, then make it bool sql/mysqld.cc: Check the return code of Events::init() sql/sp_head.cc: Add trace info sql/sql_class.cc: Reorganize thd::change_security_context() to load main_security_ctx sql/sql_class.h: Reorganize thd::change_security_context() to load main_security_ctx sql/sql_lex.cc: Initialize lex->spname sql/sql_yacc.yy: Add a comment
This commit is contained in:
@@ -35,6 +35,14 @@
|
||||
#define LOCK_QUEUE_DATA() lock_data(SCHED_FUNC, __LINE__)
|
||||
#define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__)
|
||||
|
||||
struct event_queue_param
|
||||
{
|
||||
THD *thd;
|
||||
Event_queue *queue;
|
||||
pthread_mutex_t LOCK_loaded;
|
||||
pthread_cond_t COND_loaded;
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
Compares the execute_at members of two Event_queue_element instances.
|
||||
@@ -64,6 +72,31 @@ event_queue_element_compare_q(void *vptr, byte* a, byte *b)
|
||||
}
|
||||
|
||||
|
||||
pthread_handler_t
|
||||
event_queue_loader_thread(void *arg)
|
||||
{
|
||||
/* needs to be first for thread_stack */
|
||||
THD *thd= (THD *)((struct event_queue_param *) arg)->thd;
|
||||
struct event_queue_param *param= (struct event_queue_param *) arg;
|
||||
thd->thread_stack= (char *) &thd;
|
||||
|
||||
if (post_init_event_thread(thd))
|
||||
goto end;
|
||||
|
||||
DBUG_ENTER("event_queue_loader_thread");
|
||||
|
||||
pthread_mutex_lock(¶m->LOCK_loaded);
|
||||
param->queue->load_events_from_db(thd);
|
||||
pthread_cond_signal(¶m->COND_loaded);
|
||||
pthread_mutex_unlock(¶m->LOCK_loaded);
|
||||
|
||||
end:
|
||||
deinit_event_thread(thd);
|
||||
|
||||
DBUG_RETURN(0); // Against gcc warnings
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Constructor of class Event_queue.
|
||||
|
||||
@@ -80,6 +113,8 @@ Event_queue::Event_queue()
|
||||
mutex_last_attempted_lock_in_func= "";
|
||||
|
||||
mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE;
|
||||
|
||||
queue_loaded= FALSE;
|
||||
}
|
||||
|
||||
|
||||
@@ -125,8 +160,11 @@ Event_queue::deinit_mutexes()
|
||||
bool
|
||||
Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
|
||||
{
|
||||
int i= 0;
|
||||
bool ret= FALSE;
|
||||
THD *new_thd;
|
||||
pthread_t th;
|
||||
bool res;
|
||||
struct event_queue_param *event_queue_param_value= NULL;
|
||||
|
||||
DBUG_ENTER("Event_queue::init_queue");
|
||||
DBUG_PRINT("enter", ("this=0x%lx", this));
|
||||
|
||||
@@ -139,8 +177,7 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
|
||||
NULL, EVENT_QUEUE_EXTENT))
|
||||
{
|
||||
sql_print_error("SCHEDULER: Can't initialize the execution queue");
|
||||
ret= TRUE;
|
||||
goto end;
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (sizeof(my_time_t) != sizeof(time_t))
|
||||
@@ -148,13 +185,43 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
|
||||
sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
|
||||
"The scheduler may not work correctly. Stopping.");
|
||||
DBUG_ASSERT(0);
|
||||
ret= TRUE;
|
||||
goto end;
|
||||
goto err;
|
||||
}
|
||||
|
||||
end:
|
||||
if (!(new_thd= new THD))
|
||||
goto err;
|
||||
|
||||
pre_init_event_thread(new_thd);
|
||||
|
||||
event_queue_param_value= (struct event_queue_param *)
|
||||
my_malloc(sizeof(struct event_queue_param), MYF(0));
|
||||
event_queue_param_value->thd= new_thd;
|
||||
event_queue_param_value->queue= this;
|
||||
pthread_mutex_init(&event_queue_param_value->LOCK_loaded, MY_MUTEX_INIT_FAST);
|
||||
pthread_cond_init(&event_queue_param_value->COND_loaded, NULL);
|
||||
|
||||
pthread_mutex_lock(&event_queue_param_value->LOCK_loaded);
|
||||
DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd));
|
||||
if (!(res= pthread_create(&th, &connection_attrib, event_queue_loader_thread,
|
||||
(void*)event_queue_param_value)))
|
||||
{
|
||||
do {
|
||||
pthread_cond_wait(&event_queue_param_value->COND_loaded,
|
||||
&event_queue_param_value->LOCK_loaded);
|
||||
} while (queue_loaded == FALSE);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&event_queue_param_value->LOCK_loaded);
|
||||
pthread_mutex_destroy(&event_queue_param_value->LOCK_loaded);
|
||||
pthread_cond_destroy(&event_queue_param_value->COND_loaded);
|
||||
my_free((char *)event_queue_param_value, MYF(0));
|
||||
|
||||
UNLOCK_QUEUE_DATA();
|
||||
DBUG_RETURN(ret);
|
||||
DBUG_RETURN(res);
|
||||
|
||||
err:
|
||||
UNLOCK_QUEUE_DATA();
|
||||
DBUG_RETURN(TRUE);
|
||||
}
|
||||
|
||||
|
||||
@@ -498,7 +565,7 @@ Event_queue::load_events_from_db(THD *thd)
|
||||
READ_RECORD read_record_info;
|
||||
int ret= -1;
|
||||
uint count= 0;
|
||||
bool clean_the_queue= FALSE;
|
||||
bool clean_the_queue= TRUE;
|
||||
/* Compile the events on this root but only for syntax check, then discard */
|
||||
MEM_ROOT boot_root;
|
||||
|
||||
@@ -518,14 +585,12 @@ Event_queue::load_events_from_db(THD *thd)
|
||||
if (!(et= new Event_queue_element))
|
||||
{
|
||||
DBUG_PRINT("info", ("Out of memory"));
|
||||
clean_the_queue= TRUE;
|
||||
break;
|
||||
}
|
||||
DBUG_PRINT("info", ("Loading event from row."));
|
||||
|
||||
if ((ret= et->load_from_row(table)))
|
||||
{
|
||||
clean_the_queue= TRUE;
|
||||
sql_print_error("SCHEDULER: Error while loading from mysql.event. "
|
||||
"Table probably corrupted");
|
||||
break;
|
||||
@@ -536,27 +601,6 @@ Event_queue::load_events_from_db(THD *thd)
|
||||
delete et;
|
||||
continue;
|
||||
}
|
||||
#if 0
|
||||
init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
|
||||
DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
|
||||
|
||||
/* We load only on scheduler root just to check whether the body compiles */
|
||||
switch (ret= et->compile(thd, &boot_root)) {
|
||||
case EVEX_MICROSECOND_UNSUP:
|
||||
et->free_sp();
|
||||
sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
|
||||
"supported but found in mysql.event");
|
||||
goto end;
|
||||
case EVEX_COMPILE_ERROR:
|
||||
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
|
||||
et->dbname.str, et->name.str);
|
||||
goto end;
|
||||
default:
|
||||
/* Free it, it will be compiled again on the worker thread */
|
||||
et->free_sp();
|
||||
break;
|
||||
}
|
||||
free_root(&boot_root, MYF(0));
|
||||
|
||||
/* let's find when to be executed */
|
||||
if (et->compute_next_execution_time())
|
||||
@@ -565,11 +609,40 @@ Event_queue::load_events_from_db(THD *thd)
|
||||
" Skipping", et->dbname.str, et->name.str);
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
{
|
||||
Event_job_data temp_job_data;
|
||||
DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
|
||||
|
||||
temp_job_data.load_from_row(table);
|
||||
|
||||
/* We load only on scheduler root just to check whether the body compiles */
|
||||
switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
|
||||
case EVEX_MICROSECOND_UNSUP:
|
||||
sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
|
||||
"supported but found in mysql.event");
|
||||
break;
|
||||
case EVEX_COMPILE_ERROR:
|
||||
sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.",
|
||||
et->dbname.str, et->name.str);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
thd->end_statement();
|
||||
thd->cleanup_after_query();
|
||||
}
|
||||
if (ret)
|
||||
{
|
||||
delete et;
|
||||
goto end;
|
||||
}
|
||||
|
||||
DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list."));
|
||||
queue_insert_safe(&queue, (byte *) et);
|
||||
count++;
|
||||
}
|
||||
clean_the_queue= FALSE;
|
||||
end:
|
||||
end_read_record(&read_record_info);
|
||||
|
||||
@@ -585,10 +658,12 @@ end:
|
||||
}
|
||||
|
||||
/* Force close to free memory */
|
||||
thd->version--;
|
||||
thd->version--;
|
||||
|
||||
close_thread_tables(thd);
|
||||
|
||||
queue_loaded= TRUE;
|
||||
|
||||
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
|
||||
DBUG_RETURN(ret);
|
||||
}
|
||||
@@ -713,6 +788,7 @@ Event_queue::empty_queue()
|
||||
uint i;
|
||||
DBUG_ENTER("Event_queue::empty_queue");
|
||||
DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements));
|
||||
sql_print_information("SCHEDULER: Purging queue. %u events", queue.elements);
|
||||
/* empty the queue */
|
||||
for (i= 0; i < queue.elements; ++i)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user