1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-10 01:22:48 +03:00
Files
mariadb-columnstore-engine/cmapi/mcs_node_control/test/test_dbrm_socket.py
Leonid Fedorov e2367a9495 CI from develop + cmapi [develop-23.02] (#2863)
* MCOL-5496: Merge CMAPI code to engine repo.

[add] cmapi code to engine

* MCOL-5496: Fix CI adding CMAPI steps.

[fix] deb packages deps commands
[add] several additional local variables
[fix] packages url
[fix] smoke step
[fix] mtr step
[fix] regression step
[add] cmapipython, cmapibuild, cmapitest and cmapilog steps
[fix] dockerfile step
[fix] build step to include cmapi package on repodata creating
[fix] multi_node_mtr step
[fix] pkg step
[add] cmapi steps to pipelines
[fix] cmapi/CMakeLists.txt to prevent cmake byte-compile .py files for rpm packages
[add] setup-repo.sh file
[fix] now use packages from the repos in tests steps

* Adding color 2 build script

* Build script and logging scripts from develop

* Remove test 222 from full regression, it is missing in 23.02 regression set

---------

Co-authored-by: mariadb-AlanMologorsky <alan.mologorsky@mariadb.com>
Co-authored-by: mariadb-RomanNavrotskiy <roman.navrotskiy@mariadb.com>
2023-06-09 17:36:17 +03:00

30 lines
1.1 KiB
Python

import io
import logging
import unittest
from mcs_node_control.models.dbrm_socket import MAGIC_BYTES, DBRMSocketHandler
logging.basicConfig(level='DEBUG')
class TestDBRMSocketHandler(unittest.TestCase):
def test_myreceive_to_magic(self):
response_data = b'\x01\x00\x00\x00\x00'
valid_magic = b'%s%s' % (MAGIC_BYTES, response_data)
first_unknow = b'A%s%s' % (MAGIC_BYTES, response_data)
partial_first_magic = b'%s%s%s' % (
MAGIC_BYTES[:3], MAGIC_BYTES, response_data
)
sock_responses = [valid_magic, first_unknow, partial_first_magic]
for sock_response in sock_responses:
with self.subTest(sock_response=sock_response):
data_stream = io.BytesIO(sock_response)
data_stream.recv = data_stream.read
dbrm_socket = DBRMSocketHandler()
# pylint: disable=protected-access
dbrm_socket._socket = data_stream
dbrm_socket._receive_magic()
self.assertEqual(data_stream.read(), response_data)