You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			122 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			122 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import logging
 | 
						|
import threading
 | 
						|
import time
 | 
						|
from socket import socket, SOCK_DGRAM
 | 
						|
from struct import pack, unpack_from
 | 
						|
 | 
						|
 | 
						|
class HeartBeater:
 | 
						|
    port = 9051
 | 
						|
    dieMsg = b'die!00'
 | 
						|
    areYouThereMsg = b'AYTM'
 | 
						|
    yesIAmMsg = b'YIAM'
 | 
						|
 | 
						|
    def __init__(self, config, history):
 | 
						|
        self.config = config
 | 
						|
        self.die = False
 | 
						|
        self.history = history
 | 
						|
        self.sequenceNum = 0
 | 
						|
        self.responseThread = None
 | 
						|
        self.sock = None
 | 
						|
        self.sockMutex = threading.Lock()
 | 
						|
        self.logger = logging.getLogger('heartbeater')
 | 
						|
 | 
						|
    def start(self):
 | 
						|
        self.initSockets()
 | 
						|
        self.die = False
 | 
						|
        self.responseThread = threading.Thread(
 | 
						|
            target=self.listenAndRespond, name='HeartBeater'
 | 
						|
        )
 | 
						|
        self.responseThread.start()
 | 
						|
 | 
						|
    def stop(self):
 | 
						|
        self.die = True
 | 
						|
        # break out of the recv loop
 | 
						|
        sock = socket(type=SOCK_DGRAM)
 | 
						|
        sock.sendto(self.dieMsg, ('localhost', self.port))
 | 
						|
        time.sleep(1)
 | 
						|
        self.sock.close()
 | 
						|
        self.responseThread.join()
 | 
						|
 | 
						|
    def initSockets(self):
 | 
						|
        self.sock = socket(type=SOCK_DGRAM)
 | 
						|
        self.sock.bind(('0.0.0.0', self.port))
 | 
						|
 | 
						|
    def listenAndRespond(self):
 | 
						|
        self.logger.info('Starting the heartbeat listener.')
 | 
						|
        while not self.die:
 | 
						|
            try:
 | 
						|
                self._listenAndRespond()
 | 
						|
            except Exception:
 | 
						|
                self.logger.warning(
 | 
						|
                    'Caught an exception while listening and responding.',
 | 
						|
                    exc_info=True
 | 
						|
                )
 | 
						|
                time.sleep(1)
 | 
						|
        self.logger.info('Heartbeat listener exiting normally...')
 | 
						|
 | 
						|
    def _listenAndRespond(self):
 | 
						|
        (data, remote) =  self.sock.recvfrom(300)
 | 
						|
        if len(data) < 6:
 | 
						|
            return
 | 
						|
        (msg_type, seq) = unpack_from('4sH', data, 0)
 | 
						|
        if msg_type == self.areYouThereMsg:
 | 
						|
            self.logger.trace(f'Got "are you there?" from {remote[0]}')
 | 
						|
            name = self.config.who_am_I()
 | 
						|
            if name is None:
 | 
						|
                self.logger.warning(
 | 
						|
                    'Heartbeater: got an "are you there?" msg from '
 | 
						|
                    f'{remote[0]}, but this node is not in the list of '
 | 
						|
                    'desired nodes for the cluster. '
 | 
						|
                    'This node needs a config update.'
 | 
						|
                )
 | 
						|
                return
 | 
						|
            bname = name.encode('ascii')
 | 
						|
            if len(bname) > 255:
 | 
						|
                bname = bname[:255]
 | 
						|
            msg = pack(f'4sH{len(bname)}s', self.yesIAmMsg, seq, bname)
 | 
						|
            self.send(msg, remote[0])
 | 
						|
            self.logger.trace(f'Send "yes I Am" to {remote[0]}')
 | 
						|
        elif msg_type == self.yesIAmMsg:
 | 
						|
            if len(data) > 6:
 | 
						|
                name = data[6:].decode('ascii')
 | 
						|
                self.logger.trace(f'Got "yes I am" from {name}')
 | 
						|
                self.history.gotHeartbeat(name, seq)
 | 
						|
 | 
						|
    def send(self, msg, destaddr):
 | 
						|
        self.sockMutex.acquire()
 | 
						|
        try:
 | 
						|
            self.sock.sendto(msg, (destaddr, self.port))
 | 
						|
        except Exception:
 | 
						|
            self.logger.warning(
 | 
						|
                f'Heartbeater.send(): caught error sending msg to {destaddr}',
 | 
						|
                exc_info=True
 | 
						|
            )
 | 
						|
        finally:
 | 
						|
            self.sockMutex.release()
 | 
						|
 | 
						|
    def sendHeartbeats(self):
 | 
						|
        nodes = self.config.getDesiredNodes()
 | 
						|
        my_name = self.config.who_am_I()
 | 
						|
        msg = pack('4sH', self.areYouThereMsg, self.sequenceNum)
 | 
						|
        self.sockMutex.acquire()
 | 
						|
        for node in nodes:
 | 
						|
            if node == my_name:
 | 
						|
                continue
 | 
						|
            try:
 | 
						|
                self.logger.trace(f'Send "are you there" node {node}')
 | 
						|
                self.sock.sendto(msg, (node, self.port))
 | 
						|
            except Exception as e:
 | 
						|
                pass
 | 
						|
                # Suppressing these logs.
 | 
						|
                # In docker the whole dns entry gets removed when a container
 | 
						|
                # goes away.
 | 
						|
                # Ends up spamming the logs until the node is removed from
 | 
						|
                # the cluster via the rest endpoint, or the node comes back up.
 | 
						|
                # self.logger.warning("Heartbeater.sendHeartbeats():
 | 
						|
                # caught an exception sending heartbeat to {}: {}".
 | 
						|
                # format(node, e))
 | 
						|
        self.sockMutex.release()
 | 
						|
        self.sequenceNum = (self.sequenceNum + 1) % 65535
 | 
						|
        self.history.setCurrentTick(self.sequenceNum)
 |