import configparser from contextlib import contextmanager import grp import logging import pwd import re import socket from os import chown, mkdir, replace from pathlib import Path from shutil import copyfile from typing import Optional from xml.dom import minidom # to pick up pretty printing functionality from lxml import etree from cmapi_server.constants import ( DEFAULT_MCS_CONF_PATH, DEFAULT_SM_CONF_PATH, MCS_MODULE_FILE_PATH, ) # from cmapi_server.managers.process import MCSProcessManager from mcs_node_control.models.misc import get_dbroots_list, read_module_id from mcs_node_control.models.network_ifaces import get_network_interfaces module_logger = logging.getLogger() class NodeConfig: """Class to operate with the configuration file. The class instance applies new config or retrives current. config_filename and output_filename allow tests to override the input & output of this fcn The output in this case may be a config file upgraded to version 1. """ def get_current_config_root( self, config_filename: str = DEFAULT_MCS_CONF_PATH, upgrade=True ) -> etree.Element: """Retrieves current configuration. Read the config and returns Element. TODO: pretty the same function in misc.py - review :rtype: lxml.Element """ parser = etree.XMLParser(load_dtd=True) tree = etree.parse(config_filename, parser=parser) self.upgrade_config(tree=tree, upgrade=upgrade) return tree.getroot() def get_root_from_string(self, config_string: str) -> etree.Element: root = etree.fromstring(config_string) self.upgrade_config(root=root) return root def upgrade_from_v0(self, root): revision = etree.SubElement(root, 'ConfigRevision') revision.text = '1' cluster_manager = etree.SubElement(root, 'ClusterManager') cluster_manager.text = str(self.get_module_net_address(root=root)) cluster_name = etree.SubElement(root, 'ClusterName') cluster_name.text = 'MyCluster' # Need to get the addresses/host names of all nodes. # Should all be listed as DBRM_worker nodes addrs = set() num = 1 max_node = 1 while True: node = root.find(f'./DBRM_Worker{num}/IPAddr') if node is None: break if node.text != '0.0.0.0': addrs.add(node.text) if max_node < num: max_node = num num += 1 # NextNodeId can be derived from the max DBRM_worker entry with non-0 # ip address next_node_id = etree.SubElement(root, 'NextNodeId') next_node_id.text = str(max_node + 1) # NextDBRootId is the max current dbroot in use + 1 num = 1 max_dbroot = 1 while num < 100: node = root.find(f'./SystemConfig/DBRoot{num}') if node is not None: max_dbroot = num num += 1 next_dbroot_id = etree.SubElement(root, 'NextDBRootId') next_dbroot_id.text = str(max_dbroot + 1) # The current primary node is listed under DBRMControllerNode. # Might as well start with that. primary_node_addr = root.find('./DBRM_Controller/IPAddr').text # Put them all in the DesiredNodes and ActiveNodes sections desired_nodes = etree.SubElement(root, 'DesiredNodes') active_nodes = etree.SubElement(root, 'ActiveNodes') for addr in addrs: node = etree.SubElement(desired_nodes, 'Node') node.text = addr node = etree.SubElement(active_nodes, 'Node') node.text = addr # Add an empty InactiveNodes section and set the primary node addr inactive_nodes = etree.SubElement(root, 'InactiveNodes') primary_node = etree.SubElement(root, 'PrimaryNode') primary_node.text = primary_node_addr # Add Maintenance tag and set to False maintenance = etree.SubElement(root, 'Maintenance') maintenance.text = str(False).lower() def upgrade_config(self, tree=None, root=None, upgrade=True): """ Add the parts that might be missing after an upgrade from an earlier version. .. note:: one or the other optional parameter should be specified (?) """ if root is None and tree is not None: root = tree.getroot() rev_node = root.find('./ConfigRevision') if rev_node is None and upgrade: self.upgrade_from_v0(root) # as we add revisions, add add'l checks on rev_node.text here def write_config(self, tree, filename=DEFAULT_MCS_CONF_PATH): tmp_filename = filename + ".cmapi.tmp" with open(tmp_filename, "w") as f: f.write(self.to_string(tree)) replace(tmp_filename, filename) # atomic replacement @contextmanager def modify_config( self, input_config_filename: str = DEFAULT_MCS_CONF_PATH, output_config_filename: Optional[str] = None, ): """Context manager to modify the config file If exception is raised, the config file is not modified and exception is re-raised If output_config_filename is not provided, the input config file is modified """ try: c_root = self.get_current_config_root(input_config_filename) yield c_root except Exception as e: logging.error(f"modify_config(): Caught exception: '{str(e)}', config file not modified") raise else: output_config_filename = output_config_filename or input_config_filename self.write_config(c_root, output_config_filename) def to_string(self, tree): # TODO: try to use lxml to do this to avoid the add'l dependency xmlstr = minidom.parseString(etree.tostring(tree)).toprettyxml( indent=" " ) # fix annoying issue of extra newlines added by toprettyxml() xmlstr = '\n'.join([ line.rstrip() for line in xmlstr.split('\n') if line.strip() != "" ]) return xmlstr def get_dbrm_conn_info(self, root=None): """Retrievs current DBRM master IP and port Read the config and returns a dict with the connection information. :rtype: dict """ if root is None: return None addr = '' port = 0 for el in root: if el.tag == 'DBRM_Controller': for subel in el: if subel.tag == 'IPAddr': addr = subel.text elif subel.tag == 'Port': port = subel.text return {'IPAddr': addr, 'Port': port} return None def apply_config( self, config_filename: str = DEFAULT_MCS_CONF_PATH, xml_string: str = None, sm_config_filename: str = None, sm_config_string: str = None ): """Applies the configuration WIP. Instance iterates over the xml nodes. : param config_filename: string 4 testing : param xml_string: string :rtype: bool """ if xml_string is None: return current_root = self.get_current_config_root(config_filename) parser = etree.XMLParser(load_dtd=True) new_root = etree.fromstring(xml_string, parser=parser) try: # We don't change module ids for non-single nodes. # if self.is_single_node(root=current_root): # set_module_id(self.get_new_module_id(new_root)) # make sure all of the dbroot directories exist on this node for dbroot in self.get_all_dbroots(new_root): try: node = new_root.find(f'./SystemConfig/DBRoot{dbroot}') mkdir(node.text, mode=0o755) # if we are using the systemd dispatcher we need to change # ownership on any created dirs to mysql:mysql # TODO: remove conditional once container dispatcher will # use non-root by default # TODO: what happened if we change ownership in container? # check the container installations works as expected # from cmapi_server.managers.process import MCSProcessManager # if MCSProcessManager.dispatcher_name == 'systemd': uid = pwd.getpwnam('mysql').pw_uid gid = grp.getgrnam('mysql').gr_gid chown(node.text, uid, gid) except FileExistsError: pass # Save current config config_file = Path(config_filename) config_dir = config_file.resolve().parent copyfile( config_file, f'{config_dir}/{config_file.name}.cmapi.save' ) # Save new config self.write_config(tree=new_root, filename=config_filename) # Save current and new storagemanager config if sm_config_string and sm_config_filename: sm_config_file = Path(sm_config_filename) sm_config_dir = sm_config_file.resolve().parent copyfile( sm_config_file, f'{sm_config_dir}/{sm_config_file.name}.cmapi.save' ) with open(sm_config_filename, 'w') as sm_config_file: sm_config_file.write(sm_config_string) # TODO: review # figure out what to put in the 'module' file to make # the old oam library happy module_file = None try: pm_num = self.get_current_pm_num(new_root) with open(MCS_MODULE_FILE_PATH, 'w') as module_file: module_file.write(f'pm{pm_num}\n') module_logger.info( f'Wrote "pm{pm_num}" to {MCS_MODULE_FILE_PATH}' ) except Exception: module_logger.error( 'Failed to get or set this node\'s pm number.\n' 'You may observe add\'l errors as a result.\n', exc_info=True ) except: # Raise an appropriate exception module_logger.error( f'{self.apply_config.__name__} throws an exception.' 'The original config must be restored by ' 'explicit ROLLBACK command or timeout.', exc_info=True ) raise def in_active_nodes(self, root): my_names = set(self.get_network_addresses_and_names()) active_nodes = [ node.text for node in root.findall("./ActiveNodes/Node") ] for node in active_nodes: if node in my_names: return True return False def get_current_pm_num(self, root): # Find this node in the Module* tags, return the module number my_names = set(self.get_network_addresses_and_names()) smc_node = root.find("./SystemModuleConfig") pm_count = int(smc_node.find("./ModuleCount3").text) for pm_num in range(1, pm_count + 1): ip_addr = smc_node.find(f"./ModuleIPAddr{pm_num}-1-3").text name = smc_node.find(f"./ModuleHostName{pm_num}-1-3").text if ip_addr in my_names: module_logger.info(f" -- Matching against ModuleIPAddr{pm_num}-1-3, which says {ip_addr}") return pm_num if name in my_names: module_logger.info(f" -- Matching against ModuleHostName{pm_num}-1-3, which says {name}") return pm_num raise Exception("Did not find my IP addresses or names in the SystemModuleConfig section") def rollback_config(self, config_filename: str = DEFAULT_MCS_CONF_PATH): """Rollback the configuration. Copyback the copy of the configuration file. : param config_filename: Columnstore config file path :rtype: dict """ # TODO: Rollback doesn't restart needed processes? config_file = Path(config_filename) config_dir = config_file.resolve().parent backup_path = f"{config_dir}/{config_file.name}.cmapi.save" config_file_copy = Path(backup_path) if config_file_copy.exists(): replace(backup_path, config_file) # atomic replacement def get_current_config(self, config_filename: str = DEFAULT_MCS_CONF_PATH): """Retrievs current configuration. Read the config and convert it into bytes string. :rtype: string ..TODO: fix using self.get_current_config_root() """ parser = etree.XMLParser(load_dtd=True) tree = etree.parse(config_filename, parser=parser) self.upgrade_config(tree=tree) # TODO: Unicode? UTF-8 may be? return etree.tostring( tree.getroot(), pretty_print=True, encoding='unicode' ) def get_current_sm_config( self, config_filename: str = DEFAULT_SM_CONF_PATH ) -> str: """Retrievs current SM configuration Read the config and convert it into a string. :rtype: str """ func_name = 'get_current_sm_config' sm_config_path = Path(config_filename) try: return sm_config_path.read_text(encoding='utf-8') except FileNotFoundError: module_logger.error(f"{func_name} SM config {config_filename} not found.") return '' def s3_enabled(self, config_filename: str = DEFAULT_SM_CONF_PATH) -> bool: """Checks if SM is enabled Reads SM config and checks if storage set to S3. It also checks for additional settings in the XML that must be set too. :rtype: bool """ func_name = 's3_enabled' sm_config = configparser.ConfigParser() if len(sm_config.read(config_filename)) > 0: storage = sm_config.get('ObjectStorage', 'service') if storage is None: storage = 'LocalStorage' if storage.lower() == 's3': config_root = self.get_current_config_root() if not config_root.find('./Installation/DBRootStorageType').text.lower() == "storagemanager": module_logger.error(f"{func_name} DBRootStorageType.lower() != storagemanager") if not config_root.find('./StorageManager/Enabled').text.lower() == "y": module_logger.error(f"{func_name} StorageManager/Enabled.lower() != y") if not config_root.find('./SystemConfig/DataFilePlugin').text == "libcloudio.so": module_logger.error(f"{func_name} SystemConfig/DataFilePlugin != libcloudio.so") return True else: module_logger.error(f"{func_name} SM config {config_filename} not found.") return False def get_network_addresses(self): """Retrievs the list of the network addresses Generator that yields network interface addresses. :rtype: str """ for ni in get_network_interfaces(): for fam in [socket.AF_INET, socket.AF_INET6]: addrs = ni.addresses.get(fam) if addrs is not None: for addr in addrs: yield(addr) def get_network_addresses_and_names(self): """Retrievs the list of the network addresses, hostnames, and aliases Generator that yields network interface addresses, hostnames, and aliases :rtype: str """ for ni in get_network_interfaces(): for fam in [socket.AF_INET, socket.AF_INET6]: addrs = ni.addresses.get(fam) if addrs is not None: for addr in addrs: yield(addr) try: (host, aliases, _) = socket.gethostbyaddr(addr) except: continue yield host for alias in aliases: yield alias def is_primary_node(self, root=None): """Checks if this node is the primary node. Reads the config and compares DBRM_Controller IP or hostname with the this node's IP and hostname. :rtype: bool """ if root is None: root = self.get_current_config_root() primary_address = self.get_dbrm_conn_info(root)['IPAddr'] return primary_address in self.get_network_addresses_and_names() def is_single_node(self, root=None): """Checks if this node is the single node. Reads the config and compares DBRMMaster IP with the predefined localhost addresses. :rtype: bool """ if root is None: root = self.get_current_config_root() master_address = self.get_dbrm_conn_info(root)['IPAddr'] if master_address in ['127.0.0.1', 'localhost', '::1']: return True return False def get_new_module_id(self, new_root=None): """Retrieves new module id. Reads new XML config and searches IP belongs to this host in SystemModuleConfig.ModuleIPAddrX-1-3. X is the new module id. :rtype: int """ func_name = 'get_new_module_id' current_module_id = read_module_id() if new_root is None: module_logger.error(f'{func_name} Empty new XML tree root.') return current_module_id net_address = self.get_module_net_address(new_root, current_module_id) # Use getaddrinfo in case of IPv6 if net_address is None: module_logger.error(f'{func_name} Columnstore.xml has unknown value in SystemModuleConfig.\ ModuleIPAddr{current_module_id}-1-3.') raise RuntimeError('net_address is None.') if socket.gethostbyname(net_address) in self.get_network_addresses(): return current_module_id # Use getaddrinfo in case of IPv6 # This fires for a added node when node id changes from 1 to something for module_entry in self.get_modules_addresses(new_root): if module_entry['addr'] is not None: net_addr = socket.gethostbyname(module_entry['addr']) if net_addr in self.get_network_addresses(): module_logger.debug(f'{func_name} New module id \ {module_entry["id"]}') return int(module_entry['id']) module_logger.error(f'{func_name} Cannot find new module id for \ the node.') raise RuntimeError('Fail to find module id.') def get_module_net_address(self, root=None, module_id: int = None): """Retrieves the module network address. Reads new XML config and returns IP or hostname from SystemModuleConfig.ModuleIPAddrX-1-3. :rtype: string """ func_name = 'get_module_net_address' if module_id is None: module_id = read_module_id() if root is None: module_logger.error(f'{func_name} Empty XML root.') return for el in root: if el.tag == 'SystemModuleConfig': for subel in el: if subel.tag == f'ModuleIPAddr{module_id}-1-3': module_logger.debug( f'{func_name} Module {module_id} ' f'network address {subel.text}' ) return subel.text module_logger.error(f'{func_name} Module {module_id} was not found.') return def get_modules_addresses(self, root=None): """Retrieves the modules network addresses. Reads new XML config and returns IP or hostname from SystemModuleConfig.ModuleIPAddrX-1-3 with X being a node id. :rtype: dict """ func_name = 'get_module_addresses' if root is None: module_logger.error(f'{func_name} Empty XML root.') return None regex_string = 'ModuleIPAddr[0-9]+-1-3' for el in root: if el.tag == 'SystemModuleConfig': for subel in el: module_ip_m = re.match(regex_string, subel.tag) if module_ip_m is not None: id_m = re.search('[0-9]+', module_ip_m.group(0)) module_id = id_m.group(0) module_logger.debug( f'{func_name} Module {module_id} ' f'network address {subel.text}' ) yield {'addr': subel.text, 'id': module_id} module_logger.error(f'{func_name} Module {module_id} was not found.') return None def dbroots_to_create(self, root=None, module_id:int=None): """Generates dbroot ids if there are new dbroots to be created/renamed Reads new XML config and generates dbroot ids if on-disk dbroots differs from the config's set. :rtype: generator of strings """ func_name = 'dbroots_to_create' if module_id is None: module_id = read_module_id() if root is None: module_logger.error(f'{func_name} Empty XML root.') return current_dbroot_list = get_dbroots_list() regex_string = f'ModuleDBRootID{module_id}-[0-9]+-3' for el in root: if el.tag == 'SystemModuleConfig': for subel in el: if re.match(regex_string, subel.tag) is not None and \ int(subel.text) not in current_dbroot_list: module_logger.debug(f'{func_name} Module {module_id} \ has dbroot {subel.text}') yield int(subel.text) return def get_all_dbroots(self, root): dbroots = [] smc_node = root.find("./SystemModuleConfig") mod_count = int(smc_node.find("./ModuleCount3").text) for i in range(1, mod_count+1): for j in range(1, int(smc_node.find(f"./ModuleDBRootCount{i}-3").text) + 1): dbroots.append(smc_node.find(f"./ModuleDBRootID{i}-{j}-3").text) return dbroots def get_read_only_nodes(self, root=None) -> list[str]: """Get names of read only nodes from config""" root = root or self.get_current_config_root() return [node.text for node in root.findall('./ReadOnlyNodes/Node')] def is_read_only(self, root=None) -> bool: """Checks if this node is in read-only mode""" root = root or self.get_current_config_root() read_only_nodes = set(self.get_read_only_nodes(root)) my_names = set(self.get_network_addresses_and_names()) return bool(read_only_nodes.intersection(my_names))