mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
231 lines
9.2 KiB
Python
231 lines
9.2 KiB
Python
import logging
|
|
import time
|
|
import threading
|
|
|
|
from .heartbeater import HeartBeater
|
|
from .config import Config
|
|
from .heartbeat_history import HBHistory
|
|
from .agent_comm import AgentComm
|
|
|
|
|
|
class NodeMonitor:
|
|
|
|
def __init__(
|
|
self, agent=None, config=None, samplingInterval=30,
|
|
flakyNodeThreshold=0.5
|
|
):
|
|
self._agentComm = AgentComm(agent)
|
|
self._die = False
|
|
self._inStandby = False
|
|
self._testMode = False # TODO: remove
|
|
self._hbHistory = HBHistory()
|
|
self._logger = logging.getLogger('node_monitor')
|
|
self._runner = None
|
|
if config is not None:
|
|
self._config = config
|
|
else:
|
|
self._config = Config()
|
|
self._hb = HeartBeater(self._config, self._hbHistory)
|
|
self.samplingInterval = samplingInterval
|
|
# not used yet, KI-V-SS for V1 [old comment from Patrick]
|
|
self.flakyNodeThreshold = flakyNodeThreshold
|
|
self.myName = self._config.who_am_I()
|
|
#self._logger.info("Using {} as my name".format(self.myName))
|
|
|
|
def __del__(self):
|
|
self.stop()
|
|
|
|
def start(self):
|
|
self._agentComm.start()
|
|
self._hb.start()
|
|
self._die = False
|
|
self._runner = threading.Thread(
|
|
target=self.monitor, name='NodeMonitor'
|
|
)
|
|
self._runner.start()
|
|
|
|
def stop(self):
|
|
self._die = True
|
|
self._agentComm.die()
|
|
if not self._testMode:
|
|
self._hb.stop()
|
|
self._runner.join()
|
|
|
|
def _removeRemovedNodes(self, desiredNodes):
|
|
self._hbHistory.keepOnlyTheseNodes(desiredNodes)
|
|
|
|
def _pickNewActor(self, nodes):
|
|
if not nodes:
|
|
return
|
|
if self.myName == nodes[0]:
|
|
self._isActorOfCohort = True
|
|
else:
|
|
self._isActorOfCohort = False
|
|
|
|
def _chooseNewPrimaryNode(self):
|
|
self._agentComm.movePrimaryNode()
|
|
|
|
def monitor(self):
|
|
while not self._die:
|
|
try:
|
|
self._logger.info('Starting the monitor logic')
|
|
self._monitor()
|
|
except Exception:
|
|
self._logger.error(
|
|
f'monitor() caught an exception.',
|
|
exc_info=True
|
|
)
|
|
if not self._die:
|
|
time.sleep(1)
|
|
self._logger.info("node monitor logic exiting normally...")
|
|
|
|
def _monitor(self):
|
|
"""
|
|
This works like the main loop of a game.
|
|
1) check current state
|
|
2) identify the differences
|
|
3) update based on the differences
|
|
"""
|
|
|
|
(desiredNodes, activeNodes, inactiveNodes) = self._config.getAllNodes()
|
|
self._pickNewActor(activeNodes)
|
|
|
|
logged_idleness_msg = False
|
|
logged_active_msg = False
|
|
inStandbyMode = False
|
|
while not self._die:
|
|
# these things would normally go at the end of the loop; doing it here
|
|
# to reduce line count & chance of missing something as we add more code
|
|
oldActiveNodes = activeNodes
|
|
wasActorOfCohort = self._isActorOfCohort
|
|
self._logger.trace(
|
|
f'Previous actor of cohort state is {wasActorOfCohort}'
|
|
)
|
|
time.sleep(1)
|
|
|
|
# get config updates
|
|
(desiredNodes, activeNodes, inactiveNodes) = self._config.getAllNodes()
|
|
self.myName = self._config.who_am_I()
|
|
self.primaryNode = self._config.getPrimaryNode()
|
|
|
|
# remove nodes from history that have been removed from the cluster
|
|
self._removeRemovedNodes(desiredNodes)
|
|
|
|
# if there are less than 3 nodes in the cluster, do nothing
|
|
if len(desiredNodes) < 3:
|
|
if not logged_idleness_msg:
|
|
self._logger.info(
|
|
'Failover support is inactive; '
|
|
'requires at least 3 nodes and a shared storage system'
|
|
)
|
|
logged_idleness_msg = True
|
|
logged_active_msg = False
|
|
elif not logged_active_msg:
|
|
self._logger.info(
|
|
'Failover support is active, '
|
|
f'monitoring nodes {desiredNodes}'
|
|
)
|
|
logged_active_msg = True
|
|
logged_idleness_msg = False
|
|
|
|
# nothing to do in this case
|
|
if len(desiredNodes) == 1:
|
|
continue
|
|
|
|
# has this node been reactivated?
|
|
if self.myName in activeNodes:
|
|
#TODO: remove useless flag or use it in future releases
|
|
self._inStandby = False
|
|
# has it been deactivated?
|
|
else:
|
|
self._logger.trace('Node not in active nodes, do nothing.')
|
|
self._inStandby = True
|
|
continue # wait to be activated
|
|
|
|
# send heartbeats
|
|
self._hb.sendHeartbeats()
|
|
|
|
# decide if action is necessary based on config changes
|
|
|
|
# get the list of nodes no longer responding to heartbeats
|
|
# V1: only remove a node that hasn't responded to any pings in the sampling period
|
|
deactivateSet = set()
|
|
for node in activeNodes:
|
|
if node == self.myName:
|
|
continue
|
|
history = self._hbHistory.getNodeHistory(node, self.samplingInterval, HBHistory.GoodResponse)
|
|
self._logger.trace(f'Get history "{history}" for node {node}')
|
|
noResponses = [ x for x in history if x == HBHistory.NoResponse ]
|
|
if len(noResponses) == self.samplingInterval:
|
|
deactivateSet.add(node)
|
|
|
|
# get the list of nodes that have started responding
|
|
# reactivate live nodes that have begun responding to heartbeats
|
|
# V1: only reactivate a node if we have good responses for the whole sampling period
|
|
activateSet = set()
|
|
for node in inactiveNodes:
|
|
history = self._hbHistory.getNodeHistory(node, self.samplingInterval, HBHistory.NoResponse)
|
|
goodResponses = [ x for x in history if x == HBHistory.GoodResponse ]
|
|
if len(goodResponses) == self.samplingInterval:
|
|
activateSet.add(node)
|
|
|
|
# effectiveActiveNodeList can be described as activeNodes after pending config changes
|
|
# have been applied. Another way to view it is that it reflects current reality, whereas
|
|
# the config file reflects a fixed point in the recent past.
|
|
effectiveActiveNodeList = sorted((set(activeNodes) - deactivateSet) | activateSet)
|
|
|
|
# if there was a change to the list of active nodes
|
|
# decide if this node is the effective actor in the cohort.
|
|
if effectiveActiveNodeList != activeNodes:
|
|
self._pickNewActor(effectiveActiveNodeList)
|
|
self._logger.trace(
|
|
f'Effective list changed, actor state is {self._isActorOfCohort}'
|
|
)
|
|
elif oldActiveNodes != activeNodes:
|
|
self._pickNewActor(activeNodes)
|
|
self._logger.trace(
|
|
f'Active list changed, actor state is {self._isActorOfCohort}'
|
|
)
|
|
|
|
|
|
# if we are in a cohort that has <= 50% of the desired nodes, enter standby
|
|
if len(activeNodes)/len(desiredNodes) <= 0.5 and len(effectiveActiveNodeList)/len(desiredNodes) <= 0.5:
|
|
if not inStandbyMode:
|
|
msg = "Only {} out of {} nodes are active. At least {} are required. Entering standby mode to protect the system."\
|
|
.format(len(activeNodes), len(desiredNodes), int(len(desiredNodes)/2) + 1)
|
|
self._agentComm.raiseAlarm(msg)
|
|
self._logger.critical(msg)
|
|
self._agentComm.enterStandbyMode()
|
|
inStandbyMode = True
|
|
continue
|
|
elif inStandbyMode and len(effectiveActiveNodeList)/len(desiredNodes) > 0.5:
|
|
self._logger.info("Exiting standby mode, waiting for config update")
|
|
inStandbyMode = False
|
|
|
|
# (wasActorOfCohort and not isActorOfCohort) indicates that a new Actor has come online.
|
|
# To hand over the crown, perform one last act as Actor to add it back to the cluster
|
|
# and synchronize its config file.
|
|
|
|
# if not the actor, nothing else for this node to do
|
|
if not self._isActorOfCohort and not wasActorOfCohort:
|
|
continue
|
|
|
|
# as of here, this node is the actor of its quorum
|
|
|
|
if len(deactivateSet) > 0:
|
|
self._agentComm.deactivateNodes(list(deactivateSet))
|
|
|
|
if len(activateSet) > 0:
|
|
self._agentComm.activateNodes(activateSet)
|
|
|
|
# if the primary node is in this list to be deactivated, or its already on the inactive list
|
|
# choose a new primary node. The deadNode list is a sanity check for cases like the cluster
|
|
# starting with the primary node already in inactive-nodes.
|
|
deadNodeList = list(deactivateSet) + inactiveNodes
|
|
if self.primaryNode in deadNodeList:
|
|
self._chooseNewPrimaryNode()
|
|
|
|
# methods for testing
|
|
def turnOffHBResponder(self):
|
|
self.stop()
|