1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
2023-06-16 04:38:09 +03:00

344 lines
12 KiB
Python

import logging
import os
import re
import selectors
import tempfile
import uuid
from subprocess import PIPE, Popen, run, CalledProcessError
import cherrypy
import furl
from cmapi_server.constants import (
CMAPI_PYTHON_BIN, CMAPI_PYTHON_BINARY_DEPS_PATH, CMAPI_PYTHON_DEPS_PATH
)
from cmapi_server.controllers.endpoints import raise_422_error
module_logger = logging.getLogger('cmapi_server')
def response_error(text):
raise_422_error(module_logger, 'load_s3data', text)
class S3DataLoadController:
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.validate_api_key() # pylint: disable=no-member
def load_s3data(self):
"""
Handler for /cluster/load_s3data (POST, PUT)
Invokes cpimport with passed params
This is internal columnstore engine handler
Not targeted for manual usage
Waits for json dictionary params in request
bucket - S3 bucket with table data
table - table name to load data into
filename - name of file in S3 with table data
key - S3 secret key
secret - S3 secret
region - S3 region
database - db name to load data into
"""
def checkShellParamsAreOK(param, paramname):
"""Check shell params for dangerous symbols.
As this params will be passed to shell, we should check,
there is no shell injection
AWS Access Key ID is 20 alpha-numeric characters
like022QF06E7MXBSH9DHM02
AWS Secret Access Key is 40 alpha-numeric-slash-plus characters
like kWcrlUX5JEDGM/LtmEENI/aVmYvHNif5zB+d9+ct
AWS buckets names are alpha-numeric-dot-underscore
like log-delivery-march-2020.com
AWS regions names, table names, file names are also not allowed
for dangerous symbols so just raise error for injection dangerous
symbols in params.
"""
dangerous_symbols = ' #&|;\n\r`$'
for symbol in dangerous_symbols:
if symbol in param:
response_error(
f'S3 configuration parameters wrong: {paramname}'
f'cannot contain "{symbol}"'
)
def getKey(keyname, request_body, skip_check=False, required=True):
value = request_body.get(keyname, None)
if not value and required:
response_error(
f'Some S3 configuration parameters missing: {keyname} '
'not provided'
)
if not skip_check:
checkShellParamsAreOK(value, keyname)
return value
def prepare_aws(bucket, filename, secret, key, region):
"""Prepare aws_cli popen object.
Invoke aws_cli download, and return proc for further
use with cpimport.
:param bucket: bucket name
:type bucket: str
:param filename: filename in bucket
:type filename: str
:param secret: aws secret
:type secret: str
:param key: aws key
:type key: str
:param region: aws region
:type region: str
:return: popen aws_cli object
:rtype: subprocess.Popen
"""
my_env = os.environ.copy()
my_env['AWS_ACCESS_KEY_ID'] = key
my_env['AWS_SECRET_ACCESS_KEY'] = secret
my_env['PYTHONPATH'] = CMAPI_PYTHON_DEPS_PATH
aws_cli_binary = os.path.join(CMAPI_PYTHON_BINARY_DEPS_PATH, 'aws')
s3_url = furl.furl(bucket).add(path=filename).url
aws_command_line = [
CMAPI_PYTHON_BIN, aws_cli_binary,
"s3", "cp", "--source-region", region, s3_url, "-"
]
module_logger.debug(
f'AWS commandline: {" ".join(aws_command_line)}')
try:
aws_proc = Popen(
aws_command_line, env=my_env, stdout=PIPE,
stderr=PIPE, shell=False, encoding='utf-8'
)
except CalledProcessError as exc:
response_error(exc.stderr.split('\n')[0])
return aws_proc
def prepare_google_storage(
bucket, filename, secret, key, temporary_config
):
"""Prepare gsutil popen object.
Invoke gsutil download, and return proc for further use
with cpimport.
:param bucket: bucket name
:type bucket: str
:param filename: filename in bucket
:type filename: str
:param secret: gsutil secret
:type secret: str
:param key: gsutil key
:type key: str
:param temporary_config: temp config for gsutil
:type temporary_config: str
:return: popen gsutil object
:rtype: subprocess.Popen
"""
project_id = 'project_id'
gs_cli_binary = os.path.join(
CMAPI_PYTHON_BINARY_DEPS_PATH, 'gsutil'
)
commandline = (
f'/usr/bin/bash -c '
f'\'echo -e "{key}\n{secret}\n{project_id}"\' | '
f'{CMAPI_PYTHON_BIN} {gs_cli_binary} '
f'config -a -o {temporary_config}'
)
module_logger.debug(
f'gsutil config commadline: '
f'{commandline.encode("unicode_escape").decode("utf-8")}'
)
my_env = os.environ.copy()
my_env['PYTHONPATH'] = CMAPI_PYTHON_DEPS_PATH
my_env['BOTO_CONFIG'] = temporary_config
try:
p = run(
commandline, capture_output=True,
shell=True, encoding='utf-8', check=True, env=my_env
)
except CalledProcessError as exc:
response_error(exc.stderr.split('\n')[0])
try:
check_commandline = [
CMAPI_PYTHON_BIN, gs_cli_binary, 'version', '-l'
]
p = run(
check_commandline, capture_output=True,
shell=False, encoding='utf-8', check=True, env=my_env
)
module_logger.debug(
f'gsutil config check commandline : '
f'{" ".join(check_commandline)}'
)
module_logger.debug(f'gsutil config : {p.stdout}')
except CalledProcessError as exc:
response_error(exc.stderr.split('\n')[0])
gs_url = furl.furl(bucket).add(path=filename).url
gs_command_line = [
CMAPI_PYTHON_BIN, gs_cli_binary, 'cat', gs_url
]
module_logger.debug(
f'gsutil cat commandline : {" ".join(gs_command_line)}'
)
try:
gs_process = Popen(
gs_command_line, env=my_env, stdout=PIPE, stderr=PIPE,
shell=False, encoding='utf-8'
)
except CalledProcessError as exc:
response_error(exc.stderr.split('\n')[0])
return gs_process
module_logger.debug(f'LOAD S3 Data')
request = cherrypy.request
request_body = request.json
bucket = getKey('bucket', request_body)
if bucket.startswith(r's3://'):
storage = 'aws'
elif bucket.startswith(r'gs://'):
storage = 'gs'
else:
error = (
'Incorrect bucket. Should start with s3://for AWS S3 or '
'gs:// for Google Storage'
)
response_error(error)
table = getKey('table', request_body)
filename = getKey('filename', request_body)
key = getKey('key', request_body)
secret = getKey('secret', request_body)
region = getKey('region', request_body, required=storage=='aws')
database = getKey('database', request_body)
terminated_by = getKey('terminated_by', request_body, skip_check=True)
enclosed_by = getKey(
'enclosed_by', request_body, skip_check=True, required=False
)
escaped_by = getKey(
'escaped_by', request_body, skip_check=True, required=False
)
if storage == 'aws':
download_proc = prepare_aws(bucket, filename, secret, key, region)
elif storage == 'gs':
temporary_config = os.path.join(
tempfile.gettempdir(), '.boto.' + str(uuid.uuid4())
)
download_proc = prepare_google_storage(
bucket, filename, secret, key, temporary_config
)
else:
response_error('Unknown storage detected. Internal error')
cpimport_command_line = [
'cpimport', database, table, '-s', terminated_by
]
if escaped_by:
cpimport_command_line += ['-C', escaped_by]
if enclosed_by:
cpimport_command_line += ['-E', enclosed_by]
module_logger.debug(
f'cpimport command line: {" ".join(cpimport_command_line)}'
)
cpimport_proc = Popen(
cpimport_command_line, shell=False, stdin=download_proc.stdout,
stdout=PIPE, stderr=PIPE, encoding='utf-8'
)
selector = selectors.DefaultSelector()
for stream in [
download_proc.stderr, cpimport_proc.stderr, cpimport_proc.stdout
]:
os.set_blocking(stream.fileno(), False)
selector.register(
download_proc.stderr, selectors.EVENT_READ, data='downloader_error'
)
selector.register(
cpimport_proc.stderr, selectors.EVENT_READ, data='cpimport_error'
)
selector.register(
cpimport_proc.stdout, selectors.EVENT_READ, data='cpimport_output'
)
downloader_error = ''
cpimport_error = ''
cpimport_output = ''
while True:
events = selector.select()
for key, mask in events:
name = key.data
line = key.fileobj.readline().rstrip()
if name == 'downloader_error' and line:
downloader_error += line + '\n'
if name == 'cpimport_error' and line:
cpimport_error += line + '\n'
if name == 'cpimport_output' and line:
cpimport_output += line + '\n'
if downloader_error:
response_error(downloader_error)
if cpimport_error:
response_error(cpimport_error)
cpimport_status = cpimport_proc.poll()
download_status = download_proc.poll()
if cpimport_status is not None \
and download_status is not None:
break
# clean after Prepare Google
if storage == 'gs' and os.path.exists(temporary_config):
os.remove(temporary_config)
if downloader_error:
response_error(downloader_error)
if cpimport_error:
response_error(cpimport_error)
module_logger.debug(f'LOAD S3 Data stdout: {cpimport_output}')
pattern = '([0-9]+) rows processed and ([0-9]+) rows inserted'
match = re.search(pattern, cpimport_output)
if not match:
return {
'success': False,
'inserted': 0,
'processed': 0
}
return {
'success': True,
'inserted': match.group(2),
'processed': match.group(1)
}