1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-06-12 05:01:56 +03:00
Files
mariadb-columnstore-engine/cmapi/failover/heartbeater.py
mariadb-AlanMologorsky a079a2c944 MCOL-5496: Merge CMAPI code to engine repo.
[add] cmapi code to engine
2023-06-07 10:00:16 +03:00

122 lines
4.2 KiB
Python

import logging
import threading
import time
from socket import socket, SOCK_DGRAM
from struct import pack, unpack_from
class HeartBeater:
port = 9051
dieMsg = b'die!00'
areYouThereMsg = b'AYTM'
yesIAmMsg = b'YIAM'
def __init__(self, config, history):
self.config = config
self.die = False
self.history = history
self.sequenceNum = 0
self.responseThread = None
self.sock = None
self.sockMutex = threading.Lock()
self.logger = logging.getLogger('heartbeater')
def start(self):
self.initSockets()
self.die = False
self.responseThread = threading.Thread(
target=self.listenAndRespond, name='HeartBeater'
)
self.responseThread.start()
def stop(self):
self.die = True
# break out of the recv loop
sock = socket(type=SOCK_DGRAM)
sock.sendto(self.dieMsg, ('localhost', self.port))
time.sleep(1)
self.sock.close()
self.responseThread.join()
def initSockets(self):
self.sock = socket(type=SOCK_DGRAM)
self.sock.bind(('0.0.0.0', self.port))
def listenAndRespond(self):
self.logger.info('Starting the heartbeat listener.')
while not self.die:
try:
self._listenAndRespond()
except Exception:
self.logger.warning(
'Caught an exception while listening and responding.',
exc_info=True
)
time.sleep(1)
self.logger.info('Heartbeat listener exiting normally...')
def _listenAndRespond(self):
(data, remote) = self.sock.recvfrom(300)
if len(data) < 6:
return
(msg_type, seq) = unpack_from('4sH', data, 0)
if msg_type == self.areYouThereMsg:
self.logger.trace(f'Got "are you there?" from {remote[0]}')
name = self.config.who_am_I()
if name is None:
self.logger.warning(
'Heartbeater: got an "are you there?" msg from '
f'{remote[0]}, but this node is not in the list of '
'desired nodes for the cluster. '
'This node needs a config update.'
)
return
bname = name.encode('ascii')
if len(bname) > 255:
bname = bname[:255]
msg = pack(f'4sH{len(bname)}s', self.yesIAmMsg, seq, bname)
self.send(msg, remote[0])
self.logger.trace(f'Send "yes I Am" to {remote[0]}')
elif msg_type == self.yesIAmMsg:
if len(data) > 6:
name = data[6:].decode('ascii')
self.logger.trace(f'Got "yes I am" from {name}')
self.history.gotHeartbeat(name, seq)
def send(self, msg, destaddr):
self.sockMutex.acquire()
try:
self.sock.sendto(msg, (destaddr, self.port))
except Exception:
self.logger.warning(
f'Heartbeater.send(): caught error sending msg to {destaddr}',
exc_info=True
)
finally:
self.sockMutex.release()
def sendHeartbeats(self):
nodes = self.config.getDesiredNodes()
my_name = self.config.who_am_I()
msg = pack('4sH', self.areYouThereMsg, self.sequenceNum)
self.sockMutex.acquire()
for node in nodes:
if node == my_name:
continue
try:
self.logger.trace(f'Send "are you there" node {node}')
self.sock.sendto(msg, (node, self.port))
except Exception as e:
pass
# Suppressing these logs.
# In docker the whole dns entry gets removed when a container
# goes away.
# Ends up spamming the logs until the node is removed from
# the cluster via the rest endpoint, or the node comes back up.
# self.logger.warning("Heartbeater.sendHeartbeats():
# caught an exception sending heartbeat to {}: {}".
# format(node, e))
self.sockMutex.release()
self.sequenceNum = (self.sequenceNum + 1) % 65535
self.history.setCurrentTick(self.sequenceNum)