1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00

cve: upgrade elasticsearch and elasticsearch-dsl to 8.13.0 (PROJQUAY-6962) (#3418)

* upgrade elasticsearch and elasticsearch-dsl to 8.13.0
* update tests and elasticsearch logs to handle new major version
* pinning elasticsearch-transport according to check requirements.txt test
This commit is contained in:
Marcus Kok
2024-12-02 09:50:42 -05:00
committed by GitHub
parent 2901c3067f
commit fb95c90574
6 changed files with 72 additions and 19 deletions

View File

@@ -3,8 +3,12 @@ import os
import re
from datetime import datetime, timedelta
from elasticsearch import RequestsHttpConnection
from elasticsearch.exceptions import AuthorizationException, NotFoundError
from elastic_transport import RequestsHttpNode, SerializationError
from elasticsearch.exceptions import (
AuthorizationException,
NotFoundError,
UnsupportedProductError,
)
from elasticsearch_dsl import Date, Document, Index, Integer, Ip, Keyword, Object, Text
from elasticsearch_dsl.connections import connections
from requests_aws4auth import AWS4Auth
@@ -75,7 +79,10 @@ class LogEntry(Document):
def create_or_update_template(cls):
assert cls._index and cls._index_prefix
index_template = cls._index.as_template(cls._index_prefix)
index_template.save(using=ELASTICSEARCH_TEMPLATE_CONNECTION_ALIAS)
try:
index_template.save(using=ELASTICSEARCH_TEMPLATE_CONNECTION_ALIAS)
except SerializationError:
pass
def save(self, **kwargs):
# We group the logs based on year, month and day as different indexes, so that
@@ -127,6 +134,7 @@ class ElasticsearchLogs(object):
"""
if not self._initialized:
http_auth = None
scheme = "https" if self._use_ssl else "http"
if self._access_key and self._secret_key and self._aws_region:
http_auth = AWS4Auth(self._access_key, self._secret_key, self._aws_region, "es")
elif self._access_key and self._secret_key:
@@ -135,11 +143,10 @@ class ElasticsearchLogs(object):
logger.warning("Connecting to Elasticsearch without HTTP auth")
self._client = connections.create_connection(
hosts=[{"host": self._host, "port": self._port}],
hosts=[{"host": self._host, "port": self._port, "scheme": scheme}],
http_auth=http_auth,
use_ssl=self._use_ssl,
verify_certs=True,
connection_class=RequestsHttpConnection,
node_class=RequestsHttpNode,
timeout=ELASTICSEARCH_DEFAULT_CONNECTION_TIMEOUT,
)
@@ -149,17 +156,16 @@ class ElasticsearchLogs(object):
# This only needs to be done once to initialize the index template
connections.create_connection(
alias=ELASTICSEARCH_TEMPLATE_CONNECTION_ALIAS,
hosts=[{"host": self._host, "port": self._port}],
hosts=[{"host": self._host, "port": self._port, "scheme": scheme}],
http_auth=http_auth,
use_ssl=self._use_ssl,
verify_certs=True,
connection_class=RequestsHttpConnection,
node_class=RequestsHttpNode,
timeout=ELASTICSEARCH_TEMPLATE_CONNECTION_TIMEOUT,
)
try:
force_template_update = ELASTICSEARCH_FORCE_INDEX_TEMPLATE_UPDATE.lower() == "true"
self._client.indices.get_template(self._index_prefix)
self._client.indices.get_template(name=self._index_prefix)
LogEntry.init(
self._index_prefix,
self._index_settings,
@@ -167,6 +173,8 @@ class ElasticsearchLogs(object):
)
except NotFoundError:
LogEntry.init(self._index_prefix, self._index_settings, skip_template_init=False)
except SerializationError:
LogEntry.init(self._index_prefix, self._index_settings, skip_template_init=False)
finally:
try:
connections.remove_connection(ELASTICSEARCH_TEMPLATE_CONNECTION_ALIAS)
@@ -187,9 +195,11 @@ class ElasticsearchLogs(object):
def index_exists(self, index):
try:
return index in self._client.indices.get(index)
return index in self._client.indices.get(index=index)
except NotFoundError:
return False
except SerializationError:
return False
@staticmethod
def _valid_index_prefix(prefix):
@@ -229,20 +239,25 @@ class ElasticsearchLogs(object):
def list_indices(self):
self._initialize()
try:
return list(self._client.indices.get(self._index_prefix + "*").keys())
return list(self._client.indices.get(index=(self._index_prefix + "*")).keys())
except NotFoundError as nfe:
logger.exception("`%s` indices not found: %s", self._index_prefix, nfe.info)
return []
except AuthorizationException as ae:
logger.exception("Unauthorized for indices `%s`: %s", self._index_prefix, ae.info)
return None
except SerializationError as se:
logger.exception(
"Serialization error for indices `%s`: %s", self._index_prefix, se.info
)
return None
def delete_index(self, index):
self._initialize()
assert self._valid_index_name(index)
try:
self._client.indices.delete(index)
self._client.indices.delete(index=index)
return index
except NotFoundError as nfe:
logger.exception("`%s` indices not found: %s", index, nfe.info)

View File

@@ -1,6 +1,6 @@
import logging
from elasticsearch.exceptions import ElasticsearchException
from elasticsearch.exceptions import ApiError
from data.logs_model.logs_producer import LogSendException
from data.logs_model.logs_producer.interface import LogProducerInterface
@@ -18,7 +18,7 @@ class ElasticsearchLogsProducer(LogProducerInterface):
def send(self, logentry):
try:
logentry.save()
except ElasticsearchException as ex:
except ApiError as ex:
logger.exception("ElasticsearchLogsProducer error sending log to Elasticsearch: %s", ex)
raise LogSendException(
"ElasticsearchLogsProducer error sending log to Elasticsearch: %s" % ex

View File

@@ -8,6 +8,8 @@ from datetime import datetime
import dateutil.parser
from httmock import HTTMock, urlmatch
from data.logs_model.test.test_elasticsearch import add_elastic_headers
FAKE_ES_HOST = "fakees"
EMPTY_RESULT = {
@@ -42,6 +44,7 @@ def fake_elasticsearch(allow_wildcard=True):
return value
@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/_template/(.+)", method="GET")
def get_template(url, request):
template_name = url[len("/_template/") :]
@@ -50,12 +53,14 @@ def fake_elasticsearch(allow_wildcard=True):
return {"status_code": 404}
@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/_template/(.+)", method="PUT")
def put_template(url, request):
template_name = url[len("/_template/") :]
templates[template_name] = True
return {"status_code": 201}
@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)/_doc", method="POST")
def post_doc(url, request):
index_name, _ = url.path[1:].split("/")
@@ -75,6 +80,7 @@ def fake_elasticsearch(allow_wildcard=True):
),
}
@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)$", method="DELETE")
def index_delete(url, request):
index_name_or_pattern = url.path[1:]
@@ -96,6 +102,7 @@ def fake_elasticsearch(allow_wildcard=True):
"content": {"acknowledged": True},
}
@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)$", method="GET")
def index_lookup(url, request):
index_name_or_pattern = url.path[1:]
@@ -184,6 +191,7 @@ def fake_elasticsearch(allow_wildcard=True):
return found, found_index or (index_name_or_pattern.find("*") >= 0)
@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)/_count$", method="POST")
def count_docs(url, request):
request = json.loads(request.body)
@@ -203,6 +211,7 @@ def fake_elasticsearch(allow_wildcard=True):
"content": json.dumps({"count": len(found)}),
}
@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/_search/scroll$", method="POST")
def lookup_scroll(url, request):
request_obj = json.loads(request.body)
@@ -220,6 +229,7 @@ def fake_elasticsearch(allow_wildcard=True):
"status_code": 404,
}
@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/_search/scroll$", method="DELETE")
def delete_scroll(url, request):
request = json.loads(request.body)
@@ -230,6 +240,7 @@ def fake_elasticsearch(allow_wildcard=True):
"status_code": 404,
}
@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST, path=r"/([^/]+)/_search$", method="POST")
def lookup_docs(url, request):
query_params = parse_query(url.query)
@@ -383,6 +394,7 @@ def fake_elasticsearch(allow_wildcard=True):
"content": json.dumps(final_result),
}
@add_elastic_headers
@urlmatch(netloc=FAKE_ES_HOST)
def catchall_handler(url, request):
print(

View File

@@ -315,11 +315,12 @@ SCROLL_REQUESTS = [
"query": {
"range": {"datetime": {"lt": "2018-04-02T00:00:00", "gte": "2018-03-08T00:00:00"}}
},
"size": 1,
},
],
[{"scroll": "5m", "scroll_id": _scroll_id}],
[{"scroll": "5m", "scroll_id": _scroll_id}],
[{"scroll_id": [_scroll_id]}],
[{"scroll_id": _scroll_id}],
]
SCROLL_RESPONSES = [SCROLL_CREATE, SCROLL_GET, SCROLL_GET_2, SCROLL_DELETE]

View File

@@ -26,6 +26,8 @@ FAKE_AWS_ACCESS_KEY = None
FAKE_AWS_SECRET_KEY = None
FAKE_AWS_REGION = None
DEFAULT_SEARCH_SCROLL_SIZE = 1
@pytest.fixture()
def logs_model_config():
@@ -142,6 +144,21 @@ def parse_query(query):
return {s.split("=")[0]: s.split("=")[1] for s in query.split("&") if s != ""}
def add_elastic_headers(response_fn):
# Adding special headers to the mocked response
# to prevent an UnsupportedProductError exception from occuring
# during testing
def wrapper(*args, **kwargs):
response = response_fn(*args, **kwargs)
if isinstance(response, dict):
headers = response.get("headers", {})
headers["X-Elastic-Product"] = "Elasticsearch"
response["headers"] = headers
return response
return wrapper
@pytest.fixture()
def mock_elasticsearch():
mock = Mock()
@@ -156,6 +173,7 @@ def mock_elasticsearch():
mock.list_indices.side_effect = NotImplementedError
@urlmatch(netloc=r".*", path=r".*")
@add_elastic_headers
def default(url, req):
raise Exception(
"\nurl={}\nmethod={}\nreq.url={}\nheaders={}\nbody={}".format(
@@ -164,14 +182,17 @@ def mock_elasticsearch():
)
@urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/_template/.*")
@add_elastic_headers
def template(url, req):
return mock.template(url.query.split("/")[-1], req.body)
@urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/logentry_(\*|[0-9\-]+)")
@add_elastic_headers
def list_indices(url, req):
return mock.list_indices()
@urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/logentry_[0-9\-]*/_doc")
@add_elastic_headers
def index(url, req):
index = url.path.split("/")[1]
body = json.loads(req.body)
@@ -179,10 +200,12 @@ def mock_elasticsearch():
return mock.index(index, body)
@urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/logentry_([0-9\-]*|\*)/_count")
@add_elastic_headers
def count(_, req):
return mock.count(json.loads(req.body))
@urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/_search/scroll")
@add_elastic_headers
def scroll(url, req):
if req.method == "DELETE":
return mock.scroll_delete(json.loads(req.body))
@@ -192,11 +215,12 @@ def mock_elasticsearch():
raise NotImplementedError()
@urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r"/logentry_(\*|[0-9\-]*)/_search")
@add_elastic_headers
def search(url, req):
if "scroll" in url.query:
query = parse_query(url.query)
window_size = query["scroll"]
maximum_result_size = int(query["size"])
maximum_result_size = int(query.get("size", DEFAULT_SEARCH_SCROLL_SIZE))
return mock.search_scroll_create(window_size, maximum_result_size, json.loads(req.body))
elif b"aggs" in req.body:
return mock.search_aggs(json.loads(req.body))

View File

@@ -26,8 +26,9 @@ debtcollector==1.22.0
decorator==5.1.1
Deprecated==1.2.14
dumb-init==1.2.5.post1
elasticsearch==7.6.0
elasticsearch-dsl==7.0.0
elastic-transport==8.15.1
elasticsearch==8.13.0
elasticsearch-dsl==8.13.0
Flask==2.3.2
Flask-Login==0.6.3
Flask-Mail==0.9.1