mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
267 lines
8.7 KiB
Python
267 lines
8.7 KiB
Python
# this class handles the comm with the agent; whatever it will be
|
|
|
|
import datetime
|
|
import logging
|
|
import threading
|
|
import time
|
|
|
|
|
|
logger = logging.getLogger('agent_comm')
|
|
|
|
|
|
# First an agent base class
|
|
class AgentBase:
|
|
|
|
def activateNodes(self, nodes):
|
|
print("AgentBase: Got activateNodes({})".format(nodes))
|
|
|
|
def deactivateNodes(self, nodes):
|
|
print("AgentBase: Got deactivateNodes({})".format(nodes))
|
|
|
|
def movePrimaryNode(self, placeholder):
|
|
print("AgentBase: Got movePrimaryNode()")
|
|
|
|
def enterStandbyMode(self):
|
|
print("AgentBase: Got enterStandbyMode()")
|
|
|
|
def getNodeHealth(self):
|
|
print("AgentBase: Got getNodeHealth()")
|
|
return 0
|
|
|
|
def raiseAlarm(self, msg):
|
|
print("AgentBase: Got raiseAlarm({})".format(msg))
|
|
|
|
def startTransaction(self, extra_nodes = [], remove_nodes = []):
|
|
print(f"AgentBase: Got startTransaction, extra_nodes={extra_nodes}, remove_nodes={remove_nodes}")
|
|
return 0
|
|
|
|
def commitTransaction(self, txnid, nodes):
|
|
print("AgentBase: Got commitTransaction")
|
|
|
|
def rollbackTransaction(self, txnid, nodes):
|
|
print("AgentBase: Got abortTransaction")
|
|
|
|
|
|
|
|
class OpAndArgs:
|
|
name = None # a callable in AgentBase
|
|
args = None # a tuple containing the args for the callable
|
|
|
|
def __init__(self, name, *args):
|
|
self.name = name
|
|
self.args = args
|
|
|
|
def __str__(self):
|
|
return f"{str(self.name.__qualname__)}{str(self.args)}"
|
|
|
|
def __hash__(self):
|
|
return hash((self.name.__qualname__, str(self.args)))
|
|
|
|
def __eq__(self, other):
|
|
return self.name == other.name and self.args == other.args
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def run(self):
|
|
self.name(*self.args)
|
|
|
|
|
|
# The AgentComm class
|
|
# Doesn't do anything but pass along events to the Agent yet
|
|
# TODO: implement an event queue and a thread to pluck events and issue them
|
|
# to the agent. Done?
|
|
# TODO: de-dup events as they come in from the node monitor,
|
|
# add to the event queue\
|
|
# TODO: rewrite using builtin Queue class
|
|
class AgentComm:
|
|
|
|
def __init__(self, agent = None):
|
|
if agent is None:
|
|
self._agent = AgentBase()
|
|
else:
|
|
self._agent = agent
|
|
|
|
# deduper contains queue contents, events in progress, and finished
|
|
# events up to 10s after they finished
|
|
self._deduper = {}
|
|
self._die = False
|
|
self._queue = []
|
|
self._mutex = threading.Lock()
|
|
self._thread = None
|
|
|
|
def __del__(self):
|
|
self.die()
|
|
|
|
def start(self):
|
|
self._die = False
|
|
self._thread = threading.Thread(target=self._runner, name='AgentComm')
|
|
self._thread.start()
|
|
|
|
# TODO: rename to stop
|
|
def die(self):
|
|
self._die = True
|
|
self._thread.join()
|
|
|
|
# returns (len-of-event-queue, len-of-deduper)
|
|
def getQueueSize(self):
|
|
self._mutex.acquire()
|
|
ret = (len(self._queue), len(self._deduper))
|
|
self._mutex.release()
|
|
return ret
|
|
|
|
def activateNodes(self, nodes):
|
|
self._addEvent(self._agent.activateNodes, (nodes))
|
|
|
|
def deactivateNodes(self, nodes):
|
|
self._addEvent(self._agent.deactivateNodes, (nodes))
|
|
|
|
def movePrimaryNode(self):
|
|
self._addEvent(self._agent.movePrimaryNode, ())
|
|
|
|
def enterStandbyMode(self):
|
|
# The other events are moot if this node has to enter standby mode
|
|
self._mutex.acquire()
|
|
op = OpAndArgs(self._agent.enterStandbyMode, ())
|
|
self._queue = [ op ]
|
|
self._deduper = { op : datetime.datetime.now() }
|
|
self._mutex.release()
|
|
|
|
def getNodeHealth(self):
|
|
return self._agent.getNodeHealth()
|
|
|
|
def raiseAlarm(self, msg):
|
|
self._agent.raiseAlarm(msg)
|
|
|
|
def _addEvent(self, name, args):
|
|
"""Interface to the event queue."""
|
|
op = OpAndArgs(name, args)
|
|
|
|
self._mutex.acquire()
|
|
if op not in self._deduper:
|
|
self._deduper[op] = None
|
|
self._queue.append(op)
|
|
self._mutex.release()
|
|
|
|
def _getEvents(self):
|
|
"""
|
|
This gets all queued events at once and prunes events older than
|
|
10 seconds from the deduper.
|
|
"""
|
|
self._mutex.acquire()
|
|
ret = self._queue
|
|
self._queue = []
|
|
|
|
# prune events that finished more than 10 secs ago from the deduper
|
|
tenSecsAgo = datetime.datetime.now() - datetime.timedelta(seconds = 10)
|
|
for (op, finishTime) in list(self._deduper.items()):
|
|
if finishTime is not None and finishTime < tenSecsAgo:
|
|
del self._deduper[op]
|
|
|
|
self._mutex.release()
|
|
return ret
|
|
|
|
def _requeueEvents(self, events):
|
|
self._mutex.acquire()
|
|
# events has commands issued before what is currently in _queue
|
|
events.extend(self._queue)
|
|
self._queue = events
|
|
self._mutex.release()
|
|
|
|
def _markEventsFinished(self, events):
|
|
self._mutex.acquire()
|
|
now = datetime.datetime.now()
|
|
for event in events:
|
|
self._deduper[event] = now
|
|
self._mutex.release()
|
|
|
|
def _runner(self):
|
|
while not self._die:
|
|
try:
|
|
self.__runner()
|
|
except Exception:
|
|
logger.error(
|
|
'AgentComm.runner(): got an unrecognised exception.',
|
|
exc_info=True
|
|
)
|
|
if not self._die:
|
|
time.sleep(1)
|
|
logger.info('AgentComm.runner() exiting normally...')
|
|
|
|
def __runner(self):
|
|
while not self._die:
|
|
events = self._getEvents()
|
|
logger.trace(f'Get events from queue "{events}".')
|
|
if len(events) == 0:
|
|
time.sleep(5)
|
|
continue
|
|
|
|
nextPollTime = datetime.datetime.now() + datetime.timedelta(seconds = 5)
|
|
|
|
nodes_added = set()
|
|
nodes_removed = set()
|
|
|
|
# scan the list of events, put together the extra_nodes and remove_nodes parameters to
|
|
# startTransaction(). Note, we could consolidate the activate / deactivate calls here,
|
|
# but that's a minor optimization not worth doing yet.
|
|
needs_transaction = False
|
|
for event in events: # TODO: combine with loop below.
|
|
#print(f"got event: {event}")
|
|
|
|
# determine whether we need a transaction at all.
|
|
# List the fcns that require a txn here.
|
|
if not needs_transaction and event.name in (
|
|
self._agent.activateNodes,
|
|
self._agent.deactivateNodes,
|
|
self._agent.movePrimaryNode):
|
|
needs_transaction = True
|
|
|
|
if event.name == self._agent.activateNodes:
|
|
nodes = event.args[0]
|
|
for node in nodes:
|
|
nodes_added.add(node)
|
|
elif event.name == self._agent.deactivateNodes:
|
|
nodes = event.args[0]
|
|
for node in nodes:
|
|
nodes_removed.add(node)
|
|
|
|
if needs_transaction:
|
|
logger.debug(
|
|
'Failover starts transaction to run upcoming event.'
|
|
)
|
|
(txn_id, nodes) = self._agent.startTransaction(
|
|
extra_nodes=list(nodes_added),
|
|
remove_nodes=list(nodes_removed)
|
|
)
|
|
|
|
# The problem with this is that it's all-or-nothing
|
|
# It would be preferable to commit what has been done up to the point of failure
|
|
# and discard the event that failed.
|
|
# If the problem is with the event itself, then it may keep happening and block all
|
|
# progress.
|
|
try:
|
|
for event in events:
|
|
#print(f"Running {event}")
|
|
event.run()
|
|
except Exception as e:
|
|
logger.error(
|
|
'AgentComm.runner(): got an unrecognised exception.',
|
|
exc_info=True
|
|
)
|
|
if needs_transaction:
|
|
logger.warning(
|
|
f'Aborting transaction {txn_id}',
|
|
exc_info=True
|
|
)
|
|
self._agent.rollbackTransaction(txn_id, nodes=nodes)
|
|
# on failure, requeue the events in this batch to pick them up
|
|
# again on the next iteration
|
|
self._requeueEvents(events)
|
|
else:
|
|
if needs_transaction:
|
|
self._agent.commitTransaction(txn_id, nodes = nodes)
|
|
self._markEventsFinished(events)
|
|
finishTime = datetime.datetime.now()
|
|
if nextPollTime > finishTime:
|
|
time.sleep((nextPollTime - finishTime).seconds)
|