1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

fix(cmapi): MCOL-5638 Improve Client Error Messaging and async broadcasting config.

* fix(cmapi): broadcast_new_config function using aiohttp and improved errors handling mechanics
* fix(cmapi): requirements updating aiohttp version and it's dependencies
* fix(cmapi, deps): unused imports
* fix(cmapi, tests): test_broadcast_new_config
This commit is contained in:
mariadb-AlanMologorsky
2025-02-07 14:36:11 +03:00
committed by Alan Mologorsky
parent 89c892ab2f
commit 55b09a71fc
4 changed files with 95 additions and 111 deletions

View File

@ -47,17 +47,7 @@ def toggle_cluster_state(
switch_node_maintenance(maintainance_flag)
update_revision_and_manager()
# TODO: move this from multiple places to one
try:
broadcast_successful = broadcast_new_config(config)
except Exception as err:
raise CMAPIBasicError(
'Error while distributing config file.'
) from err
if not broadcast_successful:
raise CMAPIBasicError('Config distribution isn\'t successful.')
broadcast_new_config(config)
class ClusterHandler():
@ -189,16 +179,7 @@ class ClusterHandler():
input_config_filename=config, output_config_filename=config
)
try:
broadcast_successful = broadcast_new_config(config)
except Exception as err:
raise CMAPIBasicError(
'Error while distributing config file.'
) from err
if not broadcast_successful:
raise CMAPIBasicError('Config distribution isn\'t successful.')
broadcast_new_config(config)
logger.debug(f'Successfully finished adding node {node}.')
return response
@ -242,17 +223,7 @@ class ClusterHandler():
update_revision_and_manager(
input_config_filename=config, output_config_filename=config
)
try:
broadcast_successful = broadcast_new_config(
config, nodes=active_nodes
)
except Exception as err:
raise CMAPIBasicError(
'Error while distributing config file.'
) from err
if not broadcast_successful:
raise CMAPIBasicError('Config distribution isn\'t successful.')
broadcast_new_config(config, nodes=active_nodes)
logger.debug(f'Successfully finished removing node {node}.')
return response

View File

@ -4,7 +4,6 @@ TODO: remove NodeConfig usage and move to arguments (eg. nc or root)
"""
import asyncio
import concurrent
import configparser
import datetime
import logging
@ -18,6 +17,7 @@ from shutil import copyfile
from typing import Tuple, Optional
from urllib.parse import urlencode, urlunparse
import aiohttp
import lxml.objectify
import requests
@ -291,8 +291,8 @@ def broadcast_new_config(
test_mode: bool = False,
nodes: Optional[list] = None,
timeout: Optional[int] = None
) -> bool:
"""Send new config to nodes. Now in async way.
) -> None:
"""Send new config to nodes in async way.
:param cs_config_filename: Columnstore.xml path,
defaults to DEFAULT_MCS_CONF_PATH
@ -309,9 +309,8 @@ def broadcast_new_config(
:type nodes: Optional[list], optional
:param timeout: timeout passing to gracefully stop DMLProc TODO: for next
releases. Could affect all logic of broadcacting new config
:type timeout: int
:return: success state
:rtype: bool
:type timeout: Optional[int], optional
:raises CMAPIBasicError: If Broadcasting config to nodes failed with errors
"""
# TODO: move this from multiple places to one, eg to helpers
@ -343,72 +342,75 @@ def broadcast_new_config(
if test_mode:
body['test'] = True
failed_nodes = []
success_nodes = []
async def update_config(node: str, headers: dict, body: dict) -> None:
"""Update remote node config asyncronously.
async def update_config(node, success_nodes, failed_nodes, headers, body):
:param node: node ip address or hostname
:type node: str
:param headers: headers for request
:type headers: dict
:param body: request body
:type body: dict
:raises CMAPIBasicError: If request failed by status code
:raises CMAPIBasicError: If request failed by some undefined error
:raises CMAPIBasicError: If request failed by timeout
:raises CMAPIBasicError: If undefined error happened
"""
url = f'https://{node}:8640/cmapi/{version}/node/config'
# TODO: investigate about hardcoded 120 seconds timeout
# Check e1242eed47b61276ebc86136f124f6d974655515 in cmapi old
# repo to get more info. Patric made it because:
# "Made the timeout for a CS process restart 120s, since
# the container dispatcher waits up to 60s for SM to stop"
request_put = partial(
requests.put, url, verify=False, headers=headers, json=body,
timeout=120
)
success = False
executor = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
# TODO: remove this retry, it cause retries and long waiting time
# for eg if some of mcs processes couldn't properly start/stop.
# Fix error handling, could be raising error instead of returning
# bool value
for retry in range(5):
async with aiohttp.ClientSession() as session:
try:
r = await loop.run_in_executor(executor, request_put)
r.raise_for_status()
except requests.Timeout as e:
logging.warning(
f'Timeout while pushing new config to "{node}"'
async with session.put(
url, headers=headers, json=body, ssl=False, timeout=120
) as response:
response.raise_for_status()
logging.info(f'Node "{node}" config put successfull.')
except aiohttp.ClientResponseError as err:
message = (
f'Node "{node}" config put failed with status: '
f'"{err.status}" and message: {err.message}'
)
except requests.exceptions.RequestException as e:
logging.warning(
'Error while pushing new config to "%s": %s"', node, str(e)
logging.error(message)
raise CMAPIBasicError(message)
except aiohttp.ClientError as err:
message = (
f'Node "{node}" config put failed with ClientError: '
f'{str(err)}'
)
logging.error(message)
raise CMAPIBasicError(message)
except asyncio.TimeoutError:
message = f'Node "{node}" config put failed by Timeout: '
logging.error(message)
raise CMAPIBasicError(message)
except Exception as err:
message = (
f'Node "{node}" config put failed by undefined exception: '
f'{str(err)}'
)
logging.debug('Response: %s', r.text)
except Exception as e:
logging.warning(
f'Got an unexpected error pushing new config to "{node}"',
exc_info=True
)
else:
success_nodes.append(node)
success = True
break
if not success:
failed_nodes.append(node)
logging.error(message)
raise CMAPIBasicError(message)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [
update_config(node, success_nodes, failed_nodes, headers, body)
update_config(node, headers, body)
for node in nodes
]
loop.run_until_complete(asyncio.wait(tasks))
error_messages = []
results = loop.run_until_complete(
asyncio.gather(*tasks, return_exceptions=True)
)
for result in results:
if isinstance(result, Exception):
error_messages.append(result.message)
loop.close()
if len(success_nodes) > 0:
logging.info(
f'Successfully pushed new config file to {success_nodes}'
if error_messages:
errors_str = ', '.join(error_messages)
final_message = (
f'Broadcasting config to nodes failed with errors: {errors_str}'
)
if len(failed_nodes) > 0:
logging.error(
f'Failed to push the new config to {failed_nodes}'
)
return False
return True
raise CMAPIBasicError(final_message)
# Might be more appropriate to put these in node_manipulation?

View File

@ -10,7 +10,7 @@ from mcs_node_control.models.node_config import NodeConfig
from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error
from cmapi_server.managers.certificate import CertificateManager
from cmapi_server.test.unittest_global import (
mcs_config_filename, cmapi_config_filename, tmp_mcs_config_filename,
mcs_config_filename, cmapi_config_filename, tmp_mcs_config_filename,
tmp_cmapi_config_filename,
)
@ -115,7 +115,10 @@ class TestTransactions(unittest.TestCase):
node_manipulation.add_node(myaddr, mcs_config_filename, mcs_config_filename)
# Note, 1.2.3.4 is intentional -> doesn't exist, so shouldn't end up in the node list returned
print("\n\nNOTE! This is expected to pause here for ~10s, this isn't an error, yet.\n")
print(
"\n\nNOTE! This is expected to pause here for ~10s, "
"this isn't an error, yet.\n"
)
result = helpers.start_transaction(
cmapi_config_filename, mcs_config_filename,
optional_nodes = ['1.2.3.4']
@ -123,17 +126,24 @@ class TestTransactions(unittest.TestCase):
self.assertTrue(result[0])
self.assertEqual(len(result[2]), 1)
self.assertEqual(result[2][0], myaddr)
success = helpers.broadcast_new_config(
mcs_config_filename,
cmapi_config_filename=cmapi_config_filename,
test_mode=True,
nodes = result[2]
)
# not specifying nodes here to exercise the nodes = None path
helpers.commit_transaction(
result[1], cmapi_config_filename, mcs_config_filename
)
self.assertTrue(success)
try:
helpers.broadcast_new_config(
mcs_config_filename,
cmapi_config_filename=cmapi_config_filename,
test_mode=True,
nodes = result[2]
)
# not specifying nodes here to exercise the
# nodes = None path
helpers.commit_transaction(
result[1], cmapi_config_filename, mcs_config_filename
)
except Exception as exc:
self.fail(
'Unexpected broadcast_new_config failure with error: '
f'{exc}'
)
raise
except:
cherrypy.engine.exit()
cherrypy.engine.block()