1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-06-12 05:01:56 +03:00
Files
mariadb-columnstore-engine/cmapi/failover/heartbeat_history.py
mariadb-AlanMologorsky a079a2c944 MCOL-5496: Merge CMAPI code to engine repo.
[add] cmapi code to engine
2023-06-07 10:00:16 +03:00

96 lines
3.4 KiB
Python

from array import array
from threading import Lock
# for tracking the history of heartbeat responses
class InvalidNode:
pass
class HBHistory:
# consts to denote state of the responses
NoResponse = 1
GoodResponse = 2
LateResponse = -1
NewNode = 0
# By default, keep a 600 heartbeat history for each node (10 mins @ 1hb/s)
# and consider a response late if it arrives 3+ ticks late. 3 is an arbitrary small value.
def __init__(self, tickWindow=600, lateWindow=3):
# a list of a heartbeats for each node. index = str, value = array of int,
# history flushes each time threaad restarted
self.nodeHistory = {}
# current tick resets to zero each time thread restarted
self.currentTick = 0
self.lateWindow = lateWindow
self.mutex = Lock()
self.tickWindow = tickWindow
def _initNode(self, node, defaultValue = GoodResponse):
self.nodeHistory[node] = array(
'b', [ defaultValue for _ in range(self.tickWindow) ]
)
def removeNode(self, node):
self.mutex.acquire()
if node in self.nodeHistory:
del self.nodeHistory[node]
self.mutex.release()
def keepOnlyTheseNodes(self, nodes):
self.mutex.acquire()
nodesToKeep = set(nodes)
historicalNodes = set(self.nodeHistory.keys())
for node in historicalNodes:
if node not in nodesToKeep:
del self.nodeHistory[node]
self.mutex.release()
def setCurrentTick(self, tick):
self.mutex.acquire()
self.currentTick = tick
for pongs in self.nodeHistory.values():
pongs[tick % self.tickWindow] = self.NoResponse
self.mutex.release()
def gotHeartbeat(self, node, tickID):
if tickID <= self.currentTick - self.lateWindow:
status = self.LateResponse
else:
status = self.GoodResponse
self.mutex.acquire()
if node not in self.nodeHistory:
self._initNode(node)
self.nodeHistory[node][tickID % self.tickWindow] = status
self.mutex.release()
# defaultValue is used to init a fake history for a node this code is learning about
# 'now'. If a node is inserted into the active list, we do not want to remove
# it right away b/c it hasn't responded to any pings yet. Likewise,
# if a node is inserted into the inactive list, we do not want to activate it
# right away b/c it has responded to all pings sent so far (0). TBD if we want
# to add logic to handle an 'init' value in the history.
def getNodeHistory(self, node, tickInterval, defaultValue = GoodResponse):
self.mutex.acquire()
if node not in self.nodeHistory:
self._initNode(node, defaultValue = defaultValue)
# We don't want to return values in the range where we are likely to be
# gathering responses.
# The return value is the range of heartbeat responses from node from
# tickInterval + lateWindow ticks ago to lateWindow ticks ago
lastIndex = (self.currentTick - self.lateWindow) % self.tickWindow
firstIndex = lastIndex - tickInterval
history = self.nodeHistory[node]
if firstIndex < 0:
ret = history[firstIndex:]
ret.extend(history[:lastIndex])
else:
ret = history[firstIndex:lastIndex]
self.mutex.release()
return ret