mirror of
https://github.com/MariaDB/server.git
synced 2025-07-27 18:02:13 +03:00
Merge alik.:/mnt/raid/alik/MySQL/devel/5.1-monty
into alik.:/mnt/raid/alik/MySQL/devel/5.1-rt-merged mysql-test/mysql-test-run.pl: Auto merged sql/ha_ndbcluster.cc: Auto merged sql/handler.cc: Auto merged sql/log.cc: Auto merged sql/mysqld.cc: Auto merged sql/sp_head.cc: Auto merged sql/sql_base.cc: Auto merged sql/sql_class.h: Auto merged sql/sql_insert.cc: Auto merged sql/sql_parse.cc: Auto merged sql/sql_select.cc: Auto merged sql/sql_view.cc: Auto merged sql/table.cc: Auto merged server-tools/instance-manager/guardian.cc: Manual merged. server-tools/instance-manager/instance.cc: Manual merged. server-tools/instance-manager/mysql_connection.cc: Manual merged.
This commit is contained in:
@ -74,7 +74,7 @@ Guardian_thread::Guardian_thread(Thread_registry &thread_registry_arg,
|
||||
uint monitoring_interval_arg) :
|
||||
Guardian_thread_args(thread_registry_arg, instance_map_arg,
|
||||
monitoring_interval_arg),
|
||||
thread_info(pthread_self()), guarded_instances(0)
|
||||
thread_info(pthread_self(), TRUE), guarded_instances(0)
|
||||
{
|
||||
pthread_mutex_init(&LOCK_guardian, 0);
|
||||
pthread_cond_init(&COND_guardian, 0);
|
||||
@ -95,11 +95,11 @@ Guardian_thread::~Guardian_thread()
|
||||
}
|
||||
|
||||
|
||||
void Guardian_thread::request_shutdown(bool stop_instances_arg)
|
||||
void Guardian_thread::request_shutdown()
|
||||
{
|
||||
pthread_mutex_lock(&LOCK_guardian);
|
||||
/* stop instances or just clean up Guardian repository */
|
||||
stop_instances(stop_instances_arg);
|
||||
stop_instances();
|
||||
shutdown_requested= TRUE;
|
||||
pthread_mutex_unlock(&LOCK_guardian);
|
||||
}
|
||||
@ -154,11 +154,11 @@ void Guardian_thread::process_instance(Instance *instance,
|
||||
{
|
||||
/* Pid file not created yet, don't go to STARTED state yet */
|
||||
}
|
||||
else
|
||||
else if (current_node->state != STARTED)
|
||||
{
|
||||
/* clear status fields */
|
||||
log_info("guardian: instance %s is running, set state to STARTED",
|
||||
instance->options.instance_name.str);
|
||||
log_info("guardian: instance '%s' is running, set state to STARTED.",
|
||||
(const char *) instance->options.instance_name.str);
|
||||
current_node->restart_counter= 0;
|
||||
current_node->crash_moment= 0;
|
||||
current_node->state= STARTED;
|
||||
@ -168,8 +168,8 @@ void Guardian_thread::process_instance(Instance *instance,
|
||||
{
|
||||
switch (current_node->state) {
|
||||
case NOT_STARTED:
|
||||
log_info("guardian: starting instance %s",
|
||||
instance->options.instance_name.str);
|
||||
log_info("guardian: starting instance '%s'...",
|
||||
(const char *) instance->options.instance_name.str);
|
||||
|
||||
/* NOTE, set state to STARTING _before_ start() is called */
|
||||
current_node->state= STARTING;
|
||||
@ -193,8 +193,8 @@ void Guardian_thread::process_instance(Instance *instance,
|
||||
if (instance->is_crashed())
|
||||
{
|
||||
instance->start();
|
||||
log_info("guardian: starting instance %s",
|
||||
instance->options.instance_name.str);
|
||||
log_info("guardian: starting instance '%s'...",
|
||||
(const char *) instance->options.instance_name.str);
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -211,8 +211,8 @@ void Guardian_thread::process_instance(Instance *instance,
|
||||
instance->start();
|
||||
current_node->last_checked= current_time;
|
||||
current_node->restart_counter++;
|
||||
log_info("guardian: restarting instance %s",
|
||||
instance->options.instance_name.str);
|
||||
log_info("guardian: restarting instance '%s'...",
|
||||
(const char *) instance->options.instance_name.str);
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -250,6 +250,8 @@ void Guardian_thread::run()
|
||||
LIST *node;
|
||||
struct timespec timeout;
|
||||
|
||||
log_info("Guardian: started.");
|
||||
|
||||
thread_registry.register_thread(&thread_info);
|
||||
|
||||
my_thread_init();
|
||||
@ -277,12 +279,16 @@ void Guardian_thread::run()
|
||||
&LOCK_guardian, &timeout);
|
||||
}
|
||||
|
||||
log_info("Guardian: stopped.");
|
||||
|
||||
stopped= TRUE;
|
||||
pthread_mutex_unlock(&LOCK_guardian);
|
||||
/* now, when the Guardian is stopped we can stop the IM */
|
||||
thread_registry.unregister_thread(&thread_info);
|
||||
thread_registry.request_shutdown();
|
||||
my_thread_end();
|
||||
|
||||
log_info("Guardian: finished.");
|
||||
}
|
||||
|
||||
|
||||
@ -414,12 +420,11 @@ int Guardian_thread::stop_guard(Instance *instance)
|
||||
|
||||
SYNOPSYS
|
||||
stop_instances()
|
||||
stop_instances_arg whether we should stop instances at shutdown
|
||||
|
||||
DESCRIPTION
|
||||
Loops through the guarded_instances list and prepares them for shutdown.
|
||||
If stop_instances was requested, we need to issue a stop command and change
|
||||
the state accordingly. Otherwise we simply delete an entry.
|
||||
For each instance we issue a stop command and change the state
|
||||
accordingly.
|
||||
|
||||
NOTE
|
||||
Guardian object should be locked by the calling function.
|
||||
@ -429,42 +434,29 @@ int Guardian_thread::stop_guard(Instance *instance)
|
||||
1 - error occured
|
||||
*/
|
||||
|
||||
int Guardian_thread::stop_instances(bool stop_instances_arg)
|
||||
int Guardian_thread::stop_instances()
|
||||
{
|
||||
LIST *node;
|
||||
node= guarded_instances;
|
||||
while (node != NULL)
|
||||
{
|
||||
if (!stop_instances_arg)
|
||||
GUARD_NODE *current_node= (GUARD_NODE *) node->data;
|
||||
/*
|
||||
If instance is running or was running (and now probably hanging),
|
||||
request stop.
|
||||
*/
|
||||
if (current_node->instance->is_running() ||
|
||||
(current_node->state == STARTED))
|
||||
{
|
||||
/* just forget about an instance */
|
||||
guarded_instances= list_delete(guarded_instances, node);
|
||||
/*
|
||||
This should still work fine, as we have only removed the
|
||||
node from the list. The pointer to the next one is still valid
|
||||
*/
|
||||
node= node->next;
|
||||
current_node->state= STOPPING;
|
||||
current_node->last_checked= time(NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
GUARD_NODE *current_node= (GUARD_NODE *) node->data;
|
||||
/*
|
||||
If instance is running or was running (and now probably hanging),
|
||||
request stop.
|
||||
*/
|
||||
if (current_node->instance->is_running() ||
|
||||
(current_node->state == STARTED))
|
||||
{
|
||||
current_node->state= STOPPING;
|
||||
current_node->last_checked= time(NULL);
|
||||
}
|
||||
else
|
||||
/* otherwise remove it from the list */
|
||||
guarded_instances= list_delete(guarded_instances, node);
|
||||
/* But try to kill it anyway. Just in case */
|
||||
current_node->instance->kill_instance(SIGTERM);
|
||||
node= node->next;
|
||||
}
|
||||
/* otherwise remove it from the list */
|
||||
guarded_instances= list_delete(guarded_instances, node);
|
||||
/* But try to kill it anyway. Just in case */
|
||||
current_node->instance->kill_instance(SIGTERM);
|
||||
node= node->next;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -91,7 +91,7 @@ public:
|
||||
/* Initialize or refresh the list of guarded instances */
|
||||
int init();
|
||||
/* Request guardian shutdown. Stop instances if needed */
|
||||
void request_shutdown(bool stop_instances);
|
||||
void request_shutdown();
|
||||
/* Start instance protection */
|
||||
int guard(Instance *instance, bool nolock= FALSE);
|
||||
/* Stop instance protection */
|
||||
@ -123,7 +123,7 @@ public:
|
||||
|
||||
private:
|
||||
/* Prepares Guardian shutdown. Stops instances is needed */
|
||||
int stop_instances(bool stop_instances_arg);
|
||||
int stop_instances();
|
||||
/* check instance state and act accordingly */
|
||||
void process_instance(Instance *instance, GUARD_NODE *current_node,
|
||||
LIST **guarded_instances, LIST *elem);
|
||||
|
@ -34,6 +34,7 @@
|
||||
#include "mysql_manager_error.h"
|
||||
#include "portability.h"
|
||||
#include "priv.h"
|
||||
#include "thread_registry.h"
|
||||
|
||||
|
||||
const LEX_STRING
|
||||
@ -44,7 +45,8 @@ static const int INSTANCE_NAME_PREFIX_LEN= Instance::DFLT_INSTANCE_NAME.length;
|
||||
|
||||
|
||||
static void start_and_monitor_instance(Instance_options *old_instance_options,
|
||||
Instance_map *instance_map);
|
||||
Instance_map *instance_map,
|
||||
Thread_registry *thread_registry);
|
||||
|
||||
#ifndef __WIN__
|
||||
typedef pid_t My_process_info;
|
||||
@ -63,7 +65,8 @@ pthread_handler_t proxy(void *arg)
|
||||
{
|
||||
Instance *instance= (Instance *) arg;
|
||||
start_and_monitor_instance(&instance->options,
|
||||
instance->get_map());
|
||||
instance->get_map(),
|
||||
&instance->thread_registry);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -99,6 +102,7 @@ static int wait_process(My_process_info *pi)
|
||||
thread, but we don't know this one). Or we could use waitpid(), but
|
||||
couldn't use wait(), because it could return in any wait() in the program.
|
||||
*/
|
||||
|
||||
if (linuxthreads)
|
||||
wait(NULL); /* LinuxThreads were detected */
|
||||
else
|
||||
@ -165,8 +169,8 @@ static int start_process(Instance_options *instance_options,
|
||||
/* exec never returns */
|
||||
exit(1);
|
||||
case -1:
|
||||
log_info("cannot create a new process to start instance %s",
|
||||
instance_options->instance_name.str);
|
||||
log_info("cannot create a new process to start instance '%s'.",
|
||||
(const char *) instance_options->instance_name.str);
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
@ -239,11 +243,28 @@ static int start_process(Instance_options *instance_options,
|
||||
*/
|
||||
|
||||
static void start_and_monitor_instance(Instance_options *old_instance_options,
|
||||
Instance_map *instance_map)
|
||||
Instance_map *instance_map,
|
||||
Thread_registry *thread_registry)
|
||||
{
|
||||
Instance_name instance_name(&old_instance_options->instance_name);
|
||||
Instance *current_instance;
|
||||
My_process_info process_info;
|
||||
Thread_info thread_info(pthread_self(), FALSE);
|
||||
|
||||
log_info("Monitoring thread (instance: '%s'): started.",
|
||||
(const char *) instance_name.get_c_str());
|
||||
|
||||
if (!old_instance_options->nonguarded)
|
||||
{
|
||||
/*
|
||||
Register thread in Thread_registry to wait for it to stop on shutdown
|
||||
only if instance is nuarded. If instance is guarded, the thread will not
|
||||
finish, because nonguarded instances are not stopped on shutdown.
|
||||
*/
|
||||
|
||||
thread_registry->register_thread(&thread_info);
|
||||
my_thread_init();
|
||||
}
|
||||
|
||||
/*
|
||||
Lock instance map to guarantee that no instances are deleted during
|
||||
@ -256,7 +277,8 @@ static void start_and_monitor_instance(Instance_options *old_instance_options,
|
||||
are using is destroyed. (E.g. by "FLUSH INSTANCES")
|
||||
*/
|
||||
|
||||
log_info("starting instance %s", (const char *) instance_name.get_c_str());
|
||||
log_info("starting instance %s...",
|
||||
(const char *) instance_name.get_c_str());
|
||||
|
||||
if (start_process(old_instance_options, &process_info))
|
||||
{
|
||||
@ -279,7 +301,14 @@ static void start_and_monitor_instance(Instance_options *old_instance_options,
|
||||
|
||||
instance_map->unlock();
|
||||
|
||||
return;
|
||||
if (!old_instance_options->nonguarded)
|
||||
{
|
||||
thread_registry->unregister_thread(&thread_info);
|
||||
my_thread_end();
|
||||
}
|
||||
|
||||
log_info("Monitoring thread (instance: '%s'): finished.",
|
||||
(const char *) instance_name.get_c_str());
|
||||
}
|
||||
|
||||
|
||||
@ -311,9 +340,9 @@ void Instance::remove_pid()
|
||||
int pid;
|
||||
if ((pid= options.get_pid()) != 0) /* check the pidfile */
|
||||
if (options.unlink_pidfile()) /* remove stalled pidfile */
|
||||
log_error("cannot remove pidfile for instance %s, this might be "
|
||||
"since IM lacks permmissions or hasn't found the pidfile",
|
||||
options.instance_name.str);
|
||||
log_error("cannot remove pidfile for instance '%s', this might be "
|
||||
"since IM lacks permmissions or hasn't found the pidifle",
|
||||
(const char *) options.instance_name.str);
|
||||
}
|
||||
|
||||
|
||||
@ -342,10 +371,6 @@ int Instance::start()
|
||||
{
|
||||
remove_pid();
|
||||
|
||||
/*
|
||||
No need to monitor this thread in the Thread_registry, as all
|
||||
instances are to be stopped during shutdown.
|
||||
*/
|
||||
pthread_t proxy_thd_id;
|
||||
pthread_attr_t proxy_thd_attr;
|
||||
int rc;
|
||||
@ -403,7 +428,8 @@ void Instance::set_crash_flag_n_wake_all()
|
||||
|
||||
|
||||
|
||||
Instance::Instance(): crashed(FALSE), configured(FALSE)
|
||||
Instance::Instance(Thread_registry &thread_registry_arg):
|
||||
crashed(FALSE), configured(FALSE), thread_registry(thread_registry_arg)
|
||||
{
|
||||
pthread_mutex_init(&LOCK_instance, 0);
|
||||
pthread_cond_init(&COND_instance_stopped, 0);
|
||||
@ -467,9 +493,9 @@ bool Instance::is_running()
|
||||
We have successfully connected to the server using fake
|
||||
username/password. Write a warning to the logfile.
|
||||
*/
|
||||
log_info("The Instance Manager was able to log into you server \
|
||||
with faked compiled-in password while checking server status. \
|
||||
Looks like something is wrong.");
|
||||
log_info("The Instance Manager was able to log into you server "
|
||||
"with faked compiled-in password while checking server status. "
|
||||
"Looks like something is wrong.");
|
||||
pthread_mutex_unlock(&LOCK_instance);
|
||||
return_val= TRUE; /* server is alive */
|
||||
}
|
||||
@ -616,10 +642,10 @@ void Instance::kill_instance(int signum)
|
||||
/* Kill suceeded */
|
||||
if (signum == SIGKILL) /* really killed instance with SIGKILL */
|
||||
{
|
||||
log_error("The instance %s is being stopped forcibly. Normally" \
|
||||
"it should not happen. Probably the instance has been" \
|
||||
log_error("The instance '%s' is being stopped forcibly. Normally"
|
||||
"it should not happen. Probably the instance has been"
|
||||
"hanging. You should also check your IM setup",
|
||||
options.instance_name.str);
|
||||
(const char *) options.instance_name.str);
|
||||
/* After sucessful hard kill the pidfile need to be removed */
|
||||
options.unlink_pidfile();
|
||||
}
|
||||
|
@ -27,6 +27,7 @@
|
||||
#endif
|
||||
|
||||
class Instance_map;
|
||||
class Thread_registry;
|
||||
|
||||
|
||||
/*
|
||||
@ -87,7 +88,7 @@ public:
|
||||
static bool is_mysqld_compatible_name(const LEX_STRING *name);
|
||||
|
||||
public:
|
||||
Instance();
|
||||
Instance(Thread_registry &thread_registry_arg);
|
||||
|
||||
~Instance();
|
||||
int init(const LEX_STRING *name_arg);
|
||||
@ -120,6 +121,7 @@ public:
|
||||
public:
|
||||
enum { DEFAULT_SHUTDOWN_DELAY= 35 };
|
||||
Instance_options options;
|
||||
Thread_registry &thread_registry;
|
||||
|
||||
private:
|
||||
/* This attributes is a flag, specifies if the instance has been crashed. */
|
||||
|
@ -169,7 +169,7 @@ int Instance_map::process_one_option(const LEX_STRING *group,
|
||||
if (!(instance= (Instance *) hash_search(&hash, (byte *) group->str,
|
||||
group->length)))
|
||||
{
|
||||
if (!(instance= new Instance()))
|
||||
if (!(instance= new Instance(thread_registry)))
|
||||
return 1;
|
||||
|
||||
if (instance->init(group) || add_instance(instance))
|
||||
@ -213,8 +213,10 @@ int Instance_map::process_one_option(const LEX_STRING *group,
|
||||
}
|
||||
|
||||
|
||||
Instance_map::Instance_map(const char *default_mysqld_path_arg):
|
||||
mysqld_path(default_mysqld_path_arg)
|
||||
Instance_map::Instance_map(const char *default_mysqld_path_arg,
|
||||
Thread_registry &thread_registry_arg):
|
||||
mysqld_path(default_mysqld_path_arg),
|
||||
thread_registry(thread_registry_arg)
|
||||
{
|
||||
pthread_mutex_init(&LOCK_instance_map, 0);
|
||||
}
|
||||
@ -293,7 +295,9 @@ int Instance_map::flush_instances()
|
||||
get_instance_key, delete_instance, 0);
|
||||
|
||||
rc= load();
|
||||
guardian->init(); // TODO: check error status.
|
||||
/* don't init guardian if we failed to load instances */
|
||||
if (!rc)
|
||||
guardian->init(); // TODO: check error status.
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -331,7 +335,7 @@ int Instance_map::remove_instance(Instance *instance)
|
||||
int Instance_map::create_instance(const LEX_STRING *instance_name,
|
||||
const Named_value_arr *options)
|
||||
{
|
||||
Instance *instance= new Instance();
|
||||
Instance *instance= new Instance(thread_registry);
|
||||
|
||||
if (!instance)
|
||||
{
|
||||
|
@ -28,6 +28,7 @@
|
||||
class Guardian_thread;
|
||||
class Instance;
|
||||
class Named_value_arr;
|
||||
class Thread_registry;
|
||||
|
||||
extern int load_all_groups(char ***groups, const char *filename);
|
||||
extern void free_groups(char **groups);
|
||||
@ -104,7 +105,8 @@ public:
|
||||
int create_instance(const LEX_STRING *instance_name,
|
||||
const Named_value_arr *options);
|
||||
|
||||
Instance_map(const char *default_mysqld_path_arg);
|
||||
Instance_map(const char *default_mysqld_path_arg,
|
||||
Thread_registry &thread_registry_arg);
|
||||
~Instance_map();
|
||||
|
||||
/*
|
||||
@ -130,6 +132,8 @@ private:
|
||||
enum { START_HASH_SIZE = 16 };
|
||||
pthread_mutex_t LOCK_instance_map;
|
||||
HASH hash;
|
||||
|
||||
Thread_registry &thread_registry;
|
||||
};
|
||||
|
||||
#endif /* INCLUDES_MYSQL_INSTANCE_MANAGER_INSTANCE_MAP_H */
|
||||
|
@ -87,7 +87,7 @@ private:
|
||||
Listener_thread::Listener_thread(const Listener_thread_args &args) :
|
||||
Listener_thread_args(args.thread_registry, args.user_map, args.instance_map)
|
||||
,total_connection_count(0)
|
||||
,thread_info(pthread_self())
|
||||
,thread_info(pthread_self(), TRUE)
|
||||
,num_sockets(0)
|
||||
{
|
||||
}
|
||||
@ -112,6 +112,8 @@ void Listener_thread::run()
|
||||
{
|
||||
int i, n= 0;
|
||||
|
||||
log_info("Listener_thread: started.");
|
||||
|
||||
#ifndef __WIN__
|
||||
/* we use this var to check whether we are running on LinuxThreads */
|
||||
pid_t thread_pid;
|
||||
@ -164,7 +166,7 @@ void Listener_thread::run()
|
||||
if (rc == 0 || rc == -1)
|
||||
{
|
||||
if (rc == -1 && errno != EINTR)
|
||||
log_error("Listener_thread::run(): select() failed, %s",
|
||||
log_error("Listener_thread: select() failed, %s",
|
||||
strerror(errno));
|
||||
continue;
|
||||
}
|
||||
@ -198,7 +200,7 @@ void Listener_thread::run()
|
||||
|
||||
/* III. Release all resources and exit */
|
||||
|
||||
log_info("Listener_thread::run(): shutdown requested, exiting...");
|
||||
log_info("Listener_thread: shutdown requested, exiting...");
|
||||
|
||||
for (i= 0; i < num_sockets; i++)
|
||||
close(sockets[i]);
|
||||
@ -209,6 +211,8 @@ void Listener_thread::run()
|
||||
|
||||
thread_registry.unregister_thread(&thread_info);
|
||||
my_thread_end();
|
||||
|
||||
log_info("Listener_thread: finished.");
|
||||
return;
|
||||
|
||||
err:
|
||||
@ -230,7 +234,7 @@ int Listener_thread::create_tcp_socket()
|
||||
int ip_socket= socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (ip_socket == INVALID_SOCKET)
|
||||
{
|
||||
log_error("Listener_thead::run(): socket(AF_INET) failed, %s",
|
||||
log_error("Listener_thead: socket(AF_INET) failed, %s",
|
||||
strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
@ -261,7 +265,7 @@ int Listener_thread::create_tcp_socket()
|
||||
if (bind(ip_socket, (struct sockaddr *) &ip_socket_address,
|
||||
sizeof(ip_socket_address)))
|
||||
{
|
||||
log_error("Listener_thread::run(): bind(ip socket) failed, '%s'",
|
||||
log_error("Listener_thread: bind(ip socket) failed, '%s'",
|
||||
strerror(errno));
|
||||
close(ip_socket);
|
||||
return -1;
|
||||
@ -269,7 +273,7 @@ int Listener_thread::create_tcp_socket()
|
||||
|
||||
if (listen(ip_socket, LISTEN_BACK_LOG_SIZE))
|
||||
{
|
||||
log_error("Listener_thread::run(): listen(ip socket) failed, %s",
|
||||
log_error("Listener_thread: listen(ip socket) failed, %s",
|
||||
strerror(errno));
|
||||
close(ip_socket);
|
||||
return -1;
|
||||
@ -283,7 +287,7 @@ int Listener_thread::create_tcp_socket()
|
||||
|
||||
FD_SET(ip_socket, &read_fds);
|
||||
sockets[num_sockets++]= ip_socket;
|
||||
log_info("accepting connections on ip socket");
|
||||
log_info("accepting connections on ip socket (port: %d)", (int) im_port);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -294,7 +298,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
|
||||
int unix_socket= socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (unix_socket == INVALID_SOCKET)
|
||||
{
|
||||
log_error("Listener_thead::run(): socket(AF_UNIX) failed, %s",
|
||||
log_error("Listener_thead: socket(AF_UNIX) failed, %s",
|
||||
strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
@ -314,7 +318,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
|
||||
if (bind(unix_socket, (struct sockaddr *) &unix_socket_address,
|
||||
sizeof(unix_socket_address)))
|
||||
{
|
||||
log_error("Listener_thread::run(): bind(unix socket) failed, "
|
||||
log_error("Listener_thread: bind(unix socket) failed, "
|
||||
"socket file name is '%s', error '%s'",
|
||||
unix_socket_address.sun_path, strerror(errno));
|
||||
close(unix_socket);
|
||||
@ -325,7 +329,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
|
||||
|
||||
if (listen(unix_socket, LISTEN_BACK_LOG_SIZE))
|
||||
{
|
||||
log_error("Listener_thread::run(): listen(unix socket) failed, %s",
|
||||
log_error("Listener_thread: listen(unix socket) failed, %s",
|
||||
strerror(errno));
|
||||
close(unix_socket);
|
||||
return -1;
|
||||
@ -337,7 +341,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
|
||||
/* make sure that instances won't be listening our sockets */
|
||||
set_no_inherit(unix_socket);
|
||||
|
||||
log_info("accepting connections on unix socket %s",
|
||||
log_info("accepting connections on unix socket '%s'",
|
||||
unix_socket_address.sun_path);
|
||||
sockets[num_sockets++]= unix_socket;
|
||||
FD_SET(unix_socket, &read_fds);
|
||||
|
@ -52,14 +52,16 @@ static inline void log(FILE *file, const char *format, va_list args)
|
||||
struct tm bd_time; // broken-down time
|
||||
localtime_r(&now, &bd_time);
|
||||
|
||||
char buff_date[32];
|
||||
sprintf(buff_date, "%02d%02d%02d %2d:%02d:%02d\t",
|
||||
bd_time.tm_year % 100,
|
||||
bd_time.tm_mon + 1,
|
||||
bd_time.tm_mday,
|
||||
bd_time.tm_hour,
|
||||
bd_time.tm_min,
|
||||
bd_time.tm_sec);
|
||||
char buff_date[128];
|
||||
sprintf(buff_date, "[%d/%lu] [%02d/%02d/%02d %02d:%02d:%02d] ",
|
||||
(int) getpid(),
|
||||
(unsigned long) pthread_self(),
|
||||
bd_time.tm_year % 100,
|
||||
bd_time.tm_mon + 1,
|
||||
bd_time.tm_mday,
|
||||
bd_time.tm_hour,
|
||||
bd_time.tm_min,
|
||||
bd_time.tm_sec);
|
||||
/* Format the message */
|
||||
char buff_stack[256];
|
||||
|
||||
|
@ -120,6 +120,19 @@ int my_sigwait(const sigset_t *set, int *sig)
|
||||
#endif
|
||||
|
||||
|
||||
void stop_all(Guardian_thread *guardian, Thread_registry *registry)
|
||||
{
|
||||
/*
|
||||
Let guardian thread know that it should break it's processing cycle,
|
||||
once it wakes up.
|
||||
*/
|
||||
guardian->request_shutdown();
|
||||
/* wake guardian */
|
||||
pthread_cond_signal(&guardian->COND_guardian);
|
||||
/* stop all threads */
|
||||
registry->deliver_shutdown();
|
||||
}
|
||||
|
||||
/*
|
||||
manager - entry point to the main instance manager process: start
|
||||
listener thread, write pid file and enter into signal handling.
|
||||
@ -143,7 +156,8 @@ void manager()
|
||||
*/
|
||||
|
||||
User_map user_map;
|
||||
Instance_map instance_map(Options::Main::default_mysqld_path);
|
||||
Instance_map instance_map(Options::Main::default_mysqld_path,
|
||||
thread_registry);
|
||||
Guardian_thread guardian_thread(thread_registry,
|
||||
&instance_map,
|
||||
Options::Main::monitoring_interval);
|
||||
@ -251,7 +265,6 @@ void manager()
|
||||
|
||||
/* Load instances. */
|
||||
|
||||
|
||||
{
|
||||
instance_map.guardian->lock();
|
||||
instance_map.lock();
|
||||
@ -266,7 +279,8 @@ void manager()
|
||||
log_error("Cannot init instances repository. This might be caused by "
|
||||
"the wrong config file options. For instance, missing mysqld "
|
||||
"binary. Aborting.");
|
||||
return;
|
||||
stop_all(&guardian_thread, &thread_registry);
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
|
||||
@ -284,6 +298,7 @@ void manager()
|
||||
if (rc)
|
||||
{
|
||||
log_error("manager(): set_stacksize_n_create_thread(listener) failed");
|
||||
stop_all(&guardian_thread, &thread_registry);
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
@ -294,6 +309,8 @@ void manager()
|
||||
*/
|
||||
pthread_cond_signal(&guardian_thread.COND_guardian);
|
||||
|
||||
log_info("Main loop: started.");
|
||||
|
||||
while (!shutdown_complete)
|
||||
{
|
||||
int signo;
|
||||
@ -302,9 +319,24 @@ void manager()
|
||||
if ((status= my_sigwait(&mask, &signo)) != 0)
|
||||
{
|
||||
log_error("sigwait() failed");
|
||||
stop_all(&guardian_thread, &thread_registry);
|
||||
goto err;
|
||||
}
|
||||
|
||||
/*
|
||||
The general idea in this loop is the following:
|
||||
- we are waiting for SIGINT, SIGTERM -- signals that mean we should
|
||||
shutdown;
|
||||
- as shutdown signal is caught, we stop Guardian thread (by calling
|
||||
Guardian_thread::request_shutdown());
|
||||
- as Guardian_thread is stopped, it sends SIGTERM to this thread
|
||||
(by calling Thread_registry::request_shutdown()), so that the
|
||||
my_sigwait() above returns;
|
||||
- as we catch the second SIGTERM, we send signals to all threads
|
||||
registered in Thread_registry (by calling
|
||||
Thread_registry::deliver_shutdown()) and waiting for threads to stop;
|
||||
*/
|
||||
|
||||
#ifndef __WIN__
|
||||
/*
|
||||
On some Darwin kernels SIGHUP is delivered along with most
|
||||
@ -321,10 +353,11 @@ void manager()
|
||||
else
|
||||
#endif
|
||||
{
|
||||
log_info("Main loop: got shutdown signal.");
|
||||
|
||||
if (!guardian_thread.is_stopped())
|
||||
{
|
||||
bool stop_instances= TRUE;
|
||||
guardian_thread.request_shutdown(stop_instances);
|
||||
guardian_thread.request_shutdown();
|
||||
pthread_cond_signal(&guardian_thread.COND_guardian);
|
||||
}
|
||||
else
|
||||
@ -335,6 +368,8 @@ void manager()
|
||||
}
|
||||
}
|
||||
|
||||
log_info("Main loop: finished.");
|
||||
|
||||
err:
|
||||
/* delete the pid file */
|
||||
my_delete(Options::Main::pid_file_name, MYF(0));
|
||||
|
@ -97,7 +97,7 @@ Mysql_connection_thread::Mysql_connection_thread(
|
||||
args.user_map,
|
||||
args.connection_id,
|
||||
args.instance_map)
|
||||
,thread_info(pthread_self())
|
||||
,thread_info(pthread_self(), TRUE)
|
||||
{
|
||||
thread_registry.register_thread(&thread_info);
|
||||
}
|
||||
@ -165,7 +165,7 @@ Mysql_connection_thread::~Mysql_connection_thread()
|
||||
|
||||
void Mysql_connection_thread::run()
|
||||
{
|
||||
log_info("accepted mysql connection %lu", connection_id);
|
||||
log_info("accepted mysql connection %lu", (unsigned long) connection_id);
|
||||
|
||||
my_thread_init();
|
||||
|
||||
@ -175,7 +175,8 @@ void Mysql_connection_thread::run()
|
||||
return;
|
||||
}
|
||||
|
||||
log_info("connection %lu is checked successfully", connection_id);
|
||||
log_info("connection %lu is checked successfully",
|
||||
(unsigned long) connection_id);
|
||||
|
||||
vio_keepalive(vio, TRUE);
|
||||
|
||||
@ -315,7 +316,7 @@ int Mysql_connection_thread::do_command()
|
||||
enum enum_server_command command= (enum enum_server_command)
|
||||
(uchar) *packet;
|
||||
log_info("connection %lu: packet_length=%lu, command=%d",
|
||||
connection_id, packet_length, command);
|
||||
(int) connection_id, (int) packet_length, (int) command);
|
||||
return dispatch_command(command, packet + 1, packet_length - 1);
|
||||
}
|
||||
}
|
||||
@ -325,27 +326,33 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command,
|
||||
{
|
||||
switch (command) {
|
||||
case COM_QUIT: // client exit
|
||||
log_info("query for connection %lu received quit command", connection_id);
|
||||
log_info("query for connection %lu received quit command",
|
||||
(unsigned long) connection_id);
|
||||
return 1;
|
||||
case COM_PING:
|
||||
log_info("query for connection %lu received ping command", connection_id);
|
||||
log_info("query for connection %lu received ping command",
|
||||
(unsigned long) connection_id);
|
||||
net_send_ok(&net, connection_id, NULL);
|
||||
break;
|
||||
case COM_QUERY:
|
||||
{
|
||||
log_info("query for connection %lu : ----\n%s\n-------------------------",
|
||||
connection_id,packet);
|
||||
(int) connection_id,
|
||||
(const char *) packet);
|
||||
if (Command *command= parse_command(&instance_map, packet))
|
||||
{
|
||||
int res= 0;
|
||||
log_info("query for connection %lu successfully parsed",connection_id);
|
||||
log_info("query for connection %lu successfully parsed",
|
||||
(unsigned long) connection_id);
|
||||
res= command->execute(&net, connection_id);
|
||||
delete command;
|
||||
if (!res)
|
||||
log_info("query for connection %lu executed ok",connection_id);
|
||||
log_info("query for connection %lu executed ok",
|
||||
(unsigned long) connection_id);
|
||||
else
|
||||
{
|
||||
log_info("query for connection %lu executed err=%d",connection_id,res);
|
||||
log_info("query for connection %lu executed err=%d",
|
||||
(unsigned long) connection_id, (int) res);
|
||||
net_send_error(&net, res);
|
||||
return 0;
|
||||
}
|
||||
@ -358,7 +365,8 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command,
|
||||
break;
|
||||
}
|
||||
default:
|
||||
log_info("query for connection %lu received unknown command",connection_id);
|
||||
log_info("query for connection %lu received unknown command",
|
||||
(unsigned long) connection_id);
|
||||
net_send_error(&net, ER_UNKNOWN_COM_ERROR);
|
||||
break;
|
||||
}
|
||||
|
@ -41,7 +41,6 @@
|
||||
static char win_dflt_config_file_name[FN_REFLEN];
|
||||
static char win_dflt_password_file_name[FN_REFLEN];
|
||||
static char win_dflt_pid_file_name[FN_REFLEN];
|
||||
static char win_dflt_socket_file_name[FN_REFLEN];
|
||||
|
||||
static char win_dflt_mysqld_path[FN_REFLEN];
|
||||
|
||||
@ -54,7 +53,6 @@ my_bool Options::Service::stand_alone;
|
||||
const char *Options::Main::config_file= win_dflt_config_file_name;
|
||||
const char *Options::Main::password_file_name= win_dflt_password_file_name;
|
||||
const char *Options::Main::pid_file_name= win_dflt_pid_file_name;
|
||||
const char *Options::Main::socket_file_name= win_dflt_socket_file_name;
|
||||
|
||||
const char *Options::Main::default_mysqld_path= win_dflt_mysqld_path;
|
||||
|
||||
@ -262,10 +260,12 @@ static struct my_option my_long_options[] =
|
||||
0, 0, GET_BOOL, NO_ARG, 0, 0, 1, 0, 0, 0 },
|
||||
#endif
|
||||
|
||||
#ifndef __WIN__
|
||||
{ "socket", OPT_SOCKET, "Socket file to use for connection.",
|
||||
(gptr *) &Options::Main::socket_file_name,
|
||||
(gptr *) &Options::Main::socket_file_name,
|
||||
0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
||||
#endif
|
||||
|
||||
#ifdef __WIN__
|
||||
{ "standalone", OPT_STAND_ALONE, "Run the application in stand alone mode.",
|
||||
@ -550,8 +550,6 @@ static int setup_windows_defaults()
|
||||
strxmov(win_dflt_password_file_name, dir_name, im_name, DFLT_PASSWD_FILE_EXT,
|
||||
NullS);
|
||||
strxmov(win_dflt_pid_file_name, dir_name, im_name, DFLT_PID_FILE_EXT, NullS);
|
||||
strxmov(win_dflt_socket_file_name, dir_name, im_name, DFLT_SOCKET_FILE_EXT,
|
||||
NullS);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -50,7 +50,9 @@ struct Options
|
||||
static bool is_forced_default_file;
|
||||
|
||||
static const char *pid_file_name;
|
||||
#ifndef __WIN__
|
||||
static const char *socket_file_name;
|
||||
#endif
|
||||
static const char *password_file_name;
|
||||
static const char *default_mysqld_path;
|
||||
static uint monitoring_interval;
|
||||
|
@ -43,8 +43,10 @@ static void handle_signal(int __attribute__((unused)) sig_no)
|
||||
*/
|
||||
|
||||
Thread_info::Thread_info() {}
|
||||
Thread_info::Thread_info(pthread_t thread_id_arg) :
|
||||
thread_id(thread_id_arg) {}
|
||||
Thread_info::Thread_info(pthread_t thread_id_arg,
|
||||
bool send_signal_on_shutdown_arg) :
|
||||
thread_id(thread_id_arg),
|
||||
send_signal_on_shutdown(send_signal_on_shutdown_arg) {}
|
||||
|
||||
/*
|
||||
TODO: think about moving signal information (now it's shutdown_in_progress)
|
||||
@ -86,6 +88,9 @@ Thread_registry::~Thread_registry()
|
||||
|
||||
void Thread_registry::register_thread(Thread_info *info)
|
||||
{
|
||||
log_info("Thread_registry: registering thread %d...",
|
||||
(int) info->thread_id);
|
||||
|
||||
#ifndef __WIN__
|
||||
struct sigaction sa;
|
||||
sa.sa_handler= handle_signal;
|
||||
@ -112,11 +117,19 @@ void Thread_registry::register_thread(Thread_info *info)
|
||||
|
||||
void Thread_registry::unregister_thread(Thread_info *info)
|
||||
{
|
||||
log_info("Thread_registry: unregistering thread %d...",
|
||||
(int) info->thread_id);
|
||||
|
||||
pthread_mutex_lock(&LOCK_thread_registry);
|
||||
info->prev->next= info->next;
|
||||
info->next->prev= info->prev;
|
||||
|
||||
if (head.next == &head)
|
||||
{
|
||||
log_info("Thread_registry: thread registry is empty!");
|
||||
pthread_cond_signal(&COND_thread_registry_is_empty);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&LOCK_thread_registry);
|
||||
}
|
||||
|
||||
@ -181,11 +194,6 @@ int Thread_registry::cond_timedwait(Thread_info *info, pthread_cond_t *cond,
|
||||
|
||||
void Thread_registry::deliver_shutdown()
|
||||
{
|
||||
Thread_info *info;
|
||||
struct timespec shutdown_time;
|
||||
int error;
|
||||
set_timespec(shutdown_time, 1);
|
||||
|
||||
pthread_mutex_lock(&LOCK_thread_registry);
|
||||
shutdown_in_progress= TRUE;
|
||||
|
||||
@ -199,29 +207,14 @@ void Thread_registry::deliver_shutdown()
|
||||
process_alarm(THR_SERVER_ALARM);
|
||||
#endif
|
||||
|
||||
for (info= head.next; info != &head; info= info->next)
|
||||
{
|
||||
pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL);
|
||||
/*
|
||||
sic: race condition here, the thread may not yet fall into
|
||||
pthread_cond_wait.
|
||||
*/
|
||||
if (info->current_cond)
|
||||
pthread_cond_signal(info->current_cond);
|
||||
}
|
||||
/*
|
||||
The common practice is to test predicate before pthread_cond_wait.
|
||||
I don't do that here because the predicate is practically always false
|
||||
before wait - is_shutdown's been just set, and the lock's still not
|
||||
released - the only case when the predicate is false is when no other
|
||||
threads exist.
|
||||
sic: race condition here, the thread may not yet fall into
|
||||
pthread_cond_wait.
|
||||
*/
|
||||
while (((error= pthread_cond_timedwait(&COND_thread_registry_is_empty,
|
||||
&LOCK_thread_registry,
|
||||
&shutdown_time)) != ETIMEDOUT &&
|
||||
error != ETIME) &&
|
||||
head.next != &head)
|
||||
;
|
||||
|
||||
interrupt_threads();
|
||||
|
||||
wait_for_threads_to_unregister();
|
||||
|
||||
/*
|
||||
If previous signals did not reach some threads, they must be sleeping
|
||||
@ -230,11 +223,28 @@ void Thread_registry::deliver_shutdown()
|
||||
so this time everybody should be informed (presumably each worker can
|
||||
get CPU during shutdown_time.)
|
||||
*/
|
||||
for (info= head.next; info != &head; info= info->next)
|
||||
|
||||
interrupt_threads();
|
||||
|
||||
/* Get the last chance to threads to stop. */
|
||||
|
||||
wait_for_threads_to_unregister();
|
||||
|
||||
/*
|
||||
Print out threads, that didn't stopped. Thread_registry destructor will
|
||||
probably abort the program if there is still any alive thread.
|
||||
*/
|
||||
|
||||
if (head.next != &head)
|
||||
{
|
||||
pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL);
|
||||
if (info->current_cond)
|
||||
pthread_cond_signal(info->current_cond);
|
||||
log_info("Thread_registry: non-stopped threads:");
|
||||
|
||||
for (Thread_info *info= head.next; info != &head; info= info->next)
|
||||
log_info(" - %ld", (long int) info->thread_id);
|
||||
}
|
||||
else
|
||||
{
|
||||
log_info("Thread_registry: all threads stopped.");
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&LOCK_thread_registry);
|
||||
@ -245,3 +255,46 @@ void Thread_registry::request_shutdown()
|
||||
{
|
||||
pthread_kill(sigwait_thread_pid, SIGTERM);
|
||||
}
|
||||
|
||||
|
||||
void Thread_registry::interrupt_threads()
|
||||
{
|
||||
for (Thread_info *info= head.next; info != &head; info= info->next)
|
||||
{
|
||||
if (!info->send_signal_on_shutdown)
|
||||
continue;
|
||||
|
||||
pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL);
|
||||
if (info->current_cond)
|
||||
pthread_cond_signal(info->current_cond);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void Thread_registry::wait_for_threads_to_unregister()
|
||||
{
|
||||
struct timespec shutdown_time;
|
||||
|
||||
set_timespec(shutdown_time, 1);
|
||||
|
||||
log_info("Thread_registry: joining threads...");
|
||||
|
||||
while (true)
|
||||
{
|
||||
if (head.next == &head)
|
||||
{
|
||||
log_info("Thread_registry: emptied.");
|
||||
return;
|
||||
}
|
||||
|
||||
int error= pthread_cond_timedwait(&COND_thread_registry_is_empty,
|
||||
&LOCK_thread_registry,
|
||||
&shutdown_time);
|
||||
|
||||
if (error == ETIMEDOUT || error == ETIME)
|
||||
{
|
||||
log_info("Thread_registry: threads shutdown timed out.");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -67,13 +67,17 @@
|
||||
class Thread_info
|
||||
{
|
||||
public:
|
||||
Thread_info();
|
||||
Thread_info(pthread_t thread_id_arg);
|
||||
Thread_info(pthread_t thread_id_arg, bool send_signal_on_shutdown_arg);
|
||||
friend class Thread_registry;
|
||||
|
||||
private:
|
||||
Thread_info();
|
||||
|
||||
private:
|
||||
pthread_cond_t *current_cond;
|
||||
Thread_info *prev, *next;
|
||||
pthread_t thread_id;
|
||||
bool send_signal_on_shutdown;
|
||||
};
|
||||
|
||||
|
||||
@ -97,6 +101,10 @@ public:
|
||||
pthread_mutex_t *mutex);
|
||||
int cond_timedwait(Thread_info *info, pthread_cond_t *cond,
|
||||
pthread_mutex_t *mutex, struct timespec *wait_time);
|
||||
private:
|
||||
void interrupt_threads();
|
||||
void wait_for_threads_to_unregister();
|
||||
|
||||
private:
|
||||
Thread_info head;
|
||||
bool shutdown_in_progress;
|
||||
|
Reference in New Issue
Block a user