You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-06-12 05:01:56 +03:00
96 lines
3.4 KiB
Python
96 lines
3.4 KiB
Python
from array import array
|
|
from threading import Lock
|
|
|
|
# for tracking the history of heartbeat responses
|
|
|
|
class InvalidNode:
|
|
pass
|
|
|
|
class HBHistory:
|
|
# consts to denote state of the responses
|
|
NoResponse = 1
|
|
GoodResponse = 2
|
|
LateResponse = -1
|
|
NewNode = 0
|
|
|
|
# By default, keep a 600 heartbeat history for each node (10 mins @ 1hb/s)
|
|
# and consider a response late if it arrives 3+ ticks late. 3 is an arbitrary small value.
|
|
def __init__(self, tickWindow=600, lateWindow=3):
|
|
# a list of a heartbeats for each node. index = str, value = array of int,
|
|
# history flushes each time threaad restarted
|
|
self.nodeHistory = {}
|
|
# current tick resets to zero each time thread restarted
|
|
self.currentTick = 0
|
|
self.lateWindow = lateWindow
|
|
self.mutex = Lock()
|
|
self.tickWindow = tickWindow
|
|
|
|
def _initNode(self, node, defaultValue = GoodResponse):
|
|
self.nodeHistory[node] = array(
|
|
'b', [ defaultValue for _ in range(self.tickWindow) ]
|
|
)
|
|
|
|
def removeNode(self, node):
|
|
self.mutex.acquire()
|
|
if node in self.nodeHistory:
|
|
del self.nodeHistory[node]
|
|
self.mutex.release()
|
|
|
|
def keepOnlyTheseNodes(self, nodes):
|
|
self.mutex.acquire()
|
|
nodesToKeep = set(nodes)
|
|
historicalNodes = set(self.nodeHistory.keys())
|
|
for node in historicalNodes:
|
|
if node not in nodesToKeep:
|
|
del self.nodeHistory[node]
|
|
self.mutex.release()
|
|
|
|
def setCurrentTick(self, tick):
|
|
self.mutex.acquire()
|
|
|
|
self.currentTick = tick
|
|
for pongs in self.nodeHistory.values():
|
|
pongs[tick % self.tickWindow] = self.NoResponse
|
|
|
|
self.mutex.release()
|
|
|
|
def gotHeartbeat(self, node, tickID):
|
|
if tickID <= self.currentTick - self.lateWindow:
|
|
status = self.LateResponse
|
|
else:
|
|
status = self.GoodResponse
|
|
|
|
self.mutex.acquire()
|
|
if node not in self.nodeHistory:
|
|
self._initNode(node)
|
|
self.nodeHistory[node][tickID % self.tickWindow] = status
|
|
self.mutex.release()
|
|
|
|
# defaultValue is used to init a fake history for a node this code is learning about
|
|
# 'now'. If a node is inserted into the active list, we do not want to remove
|
|
# it right away b/c it hasn't responded to any pings yet. Likewise,
|
|
# if a node is inserted into the inactive list, we do not want to activate it
|
|
# right away b/c it has responded to all pings sent so far (0). TBD if we want
|
|
# to add logic to handle an 'init' value in the history.
|
|
def getNodeHistory(self, node, tickInterval, defaultValue = GoodResponse):
|
|
self.mutex.acquire()
|
|
if node not in self.nodeHistory:
|
|
self._initNode(node, defaultValue = defaultValue)
|
|
|
|
# We don't want to return values in the range where we are likely to be
|
|
# gathering responses.
|
|
# The return value is the range of heartbeat responses from node from
|
|
# tickInterval + lateWindow ticks ago to lateWindow ticks ago
|
|
|
|
lastIndex = (self.currentTick - self.lateWindow) % self.tickWindow
|
|
firstIndex = lastIndex - tickInterval
|
|
history = self.nodeHistory[node]
|
|
if firstIndex < 0:
|
|
ret = history[firstIndex:]
|
|
ret.extend(history[:lastIndex])
|
|
else:
|
|
ret = history[firstIndex:lastIndex]
|
|
|
|
self.mutex.release()
|
|
return ret
|