diff --git a/cmapi/cmapi_server/handlers/cluster.py b/cmapi/cmapi_server/handlers/cluster.py index 8b9c94882..d5fffeae7 100644 --- a/cmapi/cmapi_server/handlers/cluster.py +++ b/cmapi/cmapi_server/handlers/cluster.py @@ -47,17 +47,7 @@ def toggle_cluster_state( switch_node_maintenance(maintainance_flag) update_revision_and_manager() - - # TODO: move this from multiple places to one - try: - broadcast_successful = broadcast_new_config(config) - except Exception as err: - raise CMAPIBasicError( - 'Error while distributing config file.' - ) from err - - if not broadcast_successful: - raise CMAPIBasicError('Config distribution isn\'t successful.') + broadcast_new_config(config) class ClusterHandler(): @@ -189,16 +179,7 @@ class ClusterHandler(): input_config_filename=config, output_config_filename=config ) - try: - broadcast_successful = broadcast_new_config(config) - except Exception as err: - raise CMAPIBasicError( - 'Error while distributing config file.' - ) from err - - if not broadcast_successful: - raise CMAPIBasicError('Config distribution isn\'t successful.') - + broadcast_new_config(config) logger.debug(f'Successfully finished adding node {node}.') return response @@ -242,17 +223,7 @@ class ClusterHandler(): update_revision_and_manager( input_config_filename=config, output_config_filename=config ) - try: - broadcast_successful = broadcast_new_config( - config, nodes=active_nodes - ) - except Exception as err: - raise CMAPIBasicError( - 'Error while distributing config file.' - ) from err - if not broadcast_successful: - raise CMAPIBasicError('Config distribution isn\'t successful.') - + broadcast_new_config(config, nodes=active_nodes) logger.debug(f'Successfully finished removing node {node}.') return response diff --git a/cmapi/cmapi_server/helpers.py b/cmapi/cmapi_server/helpers.py index 72b3c2783..b394a9bed 100644 --- a/cmapi/cmapi_server/helpers.py +++ b/cmapi/cmapi_server/helpers.py @@ -4,7 +4,6 @@ TODO: remove NodeConfig usage and move to arguments (eg. nc or root) """ import asyncio -import concurrent import configparser import datetime import logging @@ -18,6 +17,7 @@ from shutil import copyfile from typing import Tuple, Optional from urllib.parse import urlencode, urlunparse +import aiohttp import lxml.objectify import requests @@ -291,8 +291,8 @@ def broadcast_new_config( test_mode: bool = False, nodes: Optional[list] = None, timeout: Optional[int] = None -) -> bool: - """Send new config to nodes. Now in async way. +) -> None: + """Send new config to nodes in async way. :param cs_config_filename: Columnstore.xml path, defaults to DEFAULT_MCS_CONF_PATH @@ -309,9 +309,8 @@ def broadcast_new_config( :type nodes: Optional[list], optional :param timeout: timeout passing to gracefully stop DMLProc TODO: for next releases. Could affect all logic of broadcacting new config - :type timeout: int - :return: success state - :rtype: bool + :type timeout: Optional[int], optional + :raises CMAPIBasicError: If Broadcasting config to nodes failed with errors """ # TODO: move this from multiple places to one, eg to helpers @@ -343,72 +342,75 @@ def broadcast_new_config( if test_mode: body['test'] = True - failed_nodes = [] - success_nodes = [] + async def update_config(node: str, headers: dict, body: dict) -> None: + """Update remote node config asyncronously. - async def update_config(node, success_nodes, failed_nodes, headers, body): + :param node: node ip address or hostname + :type node: str + :param headers: headers for request + :type headers: dict + :param body: request body + :type body: dict + :raises CMAPIBasicError: If request failed by status code + :raises CMAPIBasicError: If request failed by some undefined error + :raises CMAPIBasicError: If request failed by timeout + :raises CMAPIBasicError: If undefined error happened + """ url = f'https://{node}:8640/cmapi/{version}/node/config' - # TODO: investigate about hardcoded 120 seconds timeout - # Check e1242eed47b61276ebc86136f124f6d974655515 in cmapi old - # repo to get more info. Patric made it because: - # "Made the timeout for a CS process restart 120s, since - # the container dispatcher waits up to 60s for SM to stop" - request_put = partial( - requests.put, url, verify=False, headers=headers, json=body, - timeout=120 - ) - success = False - executor = concurrent.futures.ThreadPoolExecutor() - loop = asyncio.get_event_loop() - # TODO: remove this retry, it cause retries and long waiting time - # for eg if some of mcs processes couldn't properly start/stop. - # Fix error handling, could be raising error instead of returning - # bool value - for retry in range(5): + async with aiohttp.ClientSession() as session: try: - r = await loop.run_in_executor(executor, request_put) - r.raise_for_status() - except requests.Timeout as e: - logging.warning( - f'Timeout while pushing new config to "{node}"' + async with session.put( + url, headers=headers, json=body, ssl=False, timeout=120 + ) as response: + response.raise_for_status() + logging.info(f'Node "{node}" config put successfull.') + except aiohttp.ClientResponseError as err: + message = ( + f'Node "{node}" config put failed with status: ' + f'"{err.status}" and message: {err.message}' ) - except requests.exceptions.RequestException as e: - logging.warning( - 'Error while pushing new config to "%s": %s"', node, str(e) + logging.error(message) + raise CMAPIBasicError(message) + except aiohttp.ClientError as err: + message = ( + f'Node "{node}" config put failed with ClientError: ' + f'{str(err)}' + ) + logging.error(message) + raise CMAPIBasicError(message) + except asyncio.TimeoutError: + message = f'Node "{node}" config put failed by Timeout: ' + logging.error(message) + raise CMAPIBasicError(message) + except Exception as err: + message = ( + f'Node "{node}" config put failed by undefined exception: ' + f'{str(err)}' ) - logging.debug('Response: %s', r.text) - except Exception as e: - logging.warning( - f'Got an unexpected error pushing new config to "{node}"', - exc_info=True - ) - else: - success_nodes.append(node) - success = True - break - if not success: - failed_nodes.append(node) + logging.error(message) + raise CMAPIBasicError(message) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) tasks = [ - update_config(node, success_nodes, failed_nodes, headers, body) + update_config(node, headers, body) for node in nodes ] - loop.run_until_complete(asyncio.wait(tasks)) + error_messages = [] + results = loop.run_until_complete( + asyncio.gather(*tasks, return_exceptions=True) + ) + for result in results: + if isinstance(result, Exception): + error_messages.append(result.message) loop.close() - - if len(success_nodes) > 0: - logging.info( - f'Successfully pushed new config file to {success_nodes}' + if error_messages: + errors_str = ', '.join(error_messages) + final_message = ( + f'Broadcasting config to nodes failed with errors: {errors_str}' ) - if len(failed_nodes) > 0: - logging.error( - f'Failed to push the new config to {failed_nodes}' - ) - return False - return True + raise CMAPIBasicError(final_message) # Might be more appropriate to put these in node_manipulation? diff --git a/cmapi/cmapi_server/test/test_txns.py b/cmapi/cmapi_server/test/test_txns.py index 01b2232db..4bd03f3a9 100644 --- a/cmapi/cmapi_server/test/test_txns.py +++ b/cmapi/cmapi_server/test/test_txns.py @@ -10,7 +10,7 @@ from mcs_node_control.models.node_config import NodeConfig from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error from cmapi_server.managers.certificate import CertificateManager from cmapi_server.test.unittest_global import ( - mcs_config_filename, cmapi_config_filename, tmp_mcs_config_filename, + mcs_config_filename, cmapi_config_filename, tmp_mcs_config_filename, tmp_cmapi_config_filename, ) @@ -115,7 +115,10 @@ class TestTransactions(unittest.TestCase): node_manipulation.add_node(myaddr, mcs_config_filename, mcs_config_filename) # Note, 1.2.3.4 is intentional -> doesn't exist, so shouldn't end up in the node list returned - print("\n\nNOTE! This is expected to pause here for ~10s, this isn't an error, yet.\n") + print( + "\n\nNOTE! This is expected to pause here for ~10s, " + "this isn't an error, yet.\n" + ) result = helpers.start_transaction( cmapi_config_filename, mcs_config_filename, optional_nodes = ['1.2.3.4'] @@ -123,17 +126,24 @@ class TestTransactions(unittest.TestCase): self.assertTrue(result[0]) self.assertEqual(len(result[2]), 1) self.assertEqual(result[2][0], myaddr) - success = helpers.broadcast_new_config( - mcs_config_filename, - cmapi_config_filename=cmapi_config_filename, - test_mode=True, - nodes = result[2] - ) - # not specifying nodes here to exercise the nodes = None path - helpers.commit_transaction( - result[1], cmapi_config_filename, mcs_config_filename - ) - self.assertTrue(success) + try: + helpers.broadcast_new_config( + mcs_config_filename, + cmapi_config_filename=cmapi_config_filename, + test_mode=True, + nodes = result[2] + ) + # not specifying nodes here to exercise the + # nodes = None path + helpers.commit_transaction( + result[1], cmapi_config_filename, mcs_config_filename + ) + except Exception as exc: + self.fail( + 'Unexpected broadcast_new_config failure with error: ' + f'{exc}' + ) + raise except: cherrypy.engine.exit() cherrypy.engine.block() diff --git a/cmapi/requirements.txt b/cmapi/requirements.txt index 217a819a4..40525dcb2 100644 --- a/cmapi/requirements.txt +++ b/cmapi/requirements.txt @@ -8,15 +8,15 @@ psutil==5.9.1 pyotp==2.6.0 requests==2.27.1 typer==0.15.1 - +aiohttp==3.11.12 # indirect dependencies -aiohttp==3.8.1 -aiosignal==1.2.0 +aiohappyeyeballs==2.4.4 +aiosignal==1.3.2 argcomplete==2.0.0 -async-timeout==4.0.2 +async-timeout==5.0.1 asynctest==0.13.0 -attrs==22.1.0 +attrs==25.1.0 boto==2.49.0 boto3==1.24.55 botocore==1.27.55 @@ -30,7 +30,7 @@ colorama==0.4.4 crcmod==1.7 docutils==0.16 fasteners==0.17.3 -frozenlist==1.3.1 +frozenlist==1.5.0 gcs-oauth2-boto-plugin==3.0 google-apitools==0.5.32 google-auth==2.10.0 @@ -49,10 +49,11 @@ markdown-it-py==3.0.0 mdurl==0.1.2 monotonic==1.6 more-itertools==8.12.0 -multidict==6.0.2 +multidict==6.1.0 oauth2client==4.1.3 orderedmultidict==1.0.1 portend==3.1.0 +propcache==0.2.1 pyasn1-modules==0.2.8 pyasn1==0.4.8 pycparser==2.21 @@ -74,6 +75,6 @@ six==1.16.0 tempora==5.0.1 typing_extensions==4.12.2 urllib3==1.26.8 -yarl==1.8.1 +yarl==1.18.3 zc.lockfile==2.0 zipp==3.7.0