1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
mariadb-AlanMologorsky a079a2c944 MCOL-5496: Merge CMAPI code to engine repo.
[add] cmapi code to engine
2023-06-07 10:00:16 +03:00

267 lines
8.7 KiB
Python

# this class handles the comm with the agent; whatever it will be
import datetime
import logging
import threading
import time
logger = logging.getLogger('agent_comm')
# First an agent base class
class AgentBase:
def activateNodes(self, nodes):
print("AgentBase: Got activateNodes({})".format(nodes))
def deactivateNodes(self, nodes):
print("AgentBase: Got deactivateNodes({})".format(nodes))
def movePrimaryNode(self, placeholder):
print("AgentBase: Got movePrimaryNode()")
def enterStandbyMode(self):
print("AgentBase: Got enterStandbyMode()")
def getNodeHealth(self):
print("AgentBase: Got getNodeHealth()")
return 0
def raiseAlarm(self, msg):
print("AgentBase: Got raiseAlarm({})".format(msg))
def startTransaction(self, extra_nodes = [], remove_nodes = []):
print(f"AgentBase: Got startTransaction, extra_nodes={extra_nodes}, remove_nodes={remove_nodes}")
return 0
def commitTransaction(self, txnid, nodes):
print("AgentBase: Got commitTransaction")
def rollbackTransaction(self, txnid, nodes):
print("AgentBase: Got abortTransaction")
class OpAndArgs:
name = None # a callable in AgentBase
args = None # a tuple containing the args for the callable
def __init__(self, name, *args):
self.name = name
self.args = args
def __str__(self):
return f"{str(self.name.__qualname__)}{str(self.args)}"
def __hash__(self):
return hash((self.name.__qualname__, str(self.args)))
def __eq__(self, other):
return self.name == other.name and self.args == other.args
def __ne__(self, other):
return not self.__eq__(other)
def run(self):
self.name(*self.args)
# The AgentComm class
# Doesn't do anything but pass along events to the Agent yet
# TODO: implement an event queue and a thread to pluck events and issue them
# to the agent. Done?
# TODO: de-dup events as they come in from the node monitor,
# add to the event queue\
# TODO: rewrite using builtin Queue class
class AgentComm:
def __init__(self, agent = None):
if agent is None:
self._agent = AgentBase()
else:
self._agent = agent
# deduper contains queue contents, events in progress, and finished
# events up to 10s after they finished
self._deduper = {}
self._die = False
self._queue = []
self._mutex = threading.Lock()
self._thread = None
def __del__(self):
self.die()
def start(self):
self._die = False
self._thread = threading.Thread(target=self._runner, name='AgentComm')
self._thread.start()
# TODO: rename to stop
def die(self):
self._die = True
self._thread.join()
# returns (len-of-event-queue, len-of-deduper)
def getQueueSize(self):
self._mutex.acquire()
ret = (len(self._queue), len(self._deduper))
self._mutex.release()
return ret
def activateNodes(self, nodes):
self._addEvent(self._agent.activateNodes, (nodes))
def deactivateNodes(self, nodes):
self._addEvent(self._agent.deactivateNodes, (nodes))
def movePrimaryNode(self):
self._addEvent(self._agent.movePrimaryNode, ())
def enterStandbyMode(self):
# The other events are moot if this node has to enter standby mode
self._mutex.acquire()
op = OpAndArgs(self._agent.enterStandbyMode, ())
self._queue = [ op ]
self._deduper = { op : datetime.datetime.now() }
self._mutex.release()
def getNodeHealth(self):
return self._agent.getNodeHealth()
def raiseAlarm(self, msg):
self._agent.raiseAlarm(msg)
def _addEvent(self, name, args):
"""Interface to the event queue."""
op = OpAndArgs(name, args)
self._mutex.acquire()
if op not in self._deduper:
self._deduper[op] = None
self._queue.append(op)
self._mutex.release()
def _getEvents(self):
"""
This gets all queued events at once and prunes events older than
10 seconds from the deduper.
"""
self._mutex.acquire()
ret = self._queue
self._queue = []
# prune events that finished more than 10 secs ago from the deduper
tenSecsAgo = datetime.datetime.now() - datetime.timedelta(seconds = 10)
for (op, finishTime) in list(self._deduper.items()):
if finishTime is not None and finishTime < tenSecsAgo:
del self._deduper[op]
self._mutex.release()
return ret
def _requeueEvents(self, events):
self._mutex.acquire()
# events has commands issued before what is currently in _queue
events.extend(self._queue)
self._queue = events
self._mutex.release()
def _markEventsFinished(self, events):
self._mutex.acquire()
now = datetime.datetime.now()
for event in events:
self._deduper[event] = now
self._mutex.release()
def _runner(self):
while not self._die:
try:
self.__runner()
except Exception:
logger.error(
'AgentComm.runner(): got an unrecognised exception.',
exc_info=True
)
if not self._die:
time.sleep(1)
logger.info('AgentComm.runner() exiting normally...')
def __runner(self):
while not self._die:
events = self._getEvents()
logger.trace(f'Get events from queue "{events}".')
if len(events) == 0:
time.sleep(5)
continue
nextPollTime = datetime.datetime.now() + datetime.timedelta(seconds = 5)
nodes_added = set()
nodes_removed = set()
# scan the list of events, put together the extra_nodes and remove_nodes parameters to
# startTransaction(). Note, we could consolidate the activate / deactivate calls here,
# but that's a minor optimization not worth doing yet.
needs_transaction = False
for event in events: # TODO: combine with loop below.
#print(f"got event: {event}")
# determine whether we need a transaction at all.
# List the fcns that require a txn here.
if not needs_transaction and event.name in (
self._agent.activateNodes,
self._agent.deactivateNodes,
self._agent.movePrimaryNode):
needs_transaction = True
if event.name == self._agent.activateNodes:
nodes = event.args[0]
for node in nodes:
nodes_added.add(node)
elif event.name == self._agent.deactivateNodes:
nodes = event.args[0]
for node in nodes:
nodes_removed.add(node)
if needs_transaction:
logger.debug(
'Failover starts transaction to run upcoming event.'
)
(txn_id, nodes) = self._agent.startTransaction(
extra_nodes=list(nodes_added),
remove_nodes=list(nodes_removed)
)
# The problem with this is that it's all-or-nothing
# It would be preferable to commit what has been done up to the point of failure
# and discard the event that failed.
# If the problem is with the event itself, then it may keep happening and block all
# progress.
try:
for event in events:
#print(f"Running {event}")
event.run()
except Exception as e:
logger.error(
'AgentComm.runner(): got an unrecognised exception.',
exc_info=True
)
if needs_transaction:
logger.warning(
f'Aborting transaction {txn_id}',
exc_info=True
)
self._agent.rollbackTransaction(txn_id, nodes=nodes)
# on failure, requeue the events in this batch to pick them up
# again on the next iteration
self._requeueEvents(events)
else:
if needs_transaction:
self._agent.commitTransaction(txn_id, nodes = nodes)
self._markEventsFinished(events)
finishTime = datetime.datetime.now()
if nextPollTime > finishTime:
time.sleep((nextPollTime - finishTime).seconds)