From 2b33fde06bece4ea77ea7f326bce1adb5a5b1c58 Mon Sep 17 00:00:00 2001 From: J Boddey Date: Thu, 23 May 2024 19:53:02 +0100 Subject: [PATCH] Add certificate endpoints (#451) * Adds version analytics event (#306) * Techdebt: adds state for testrun page (#392) * Fix tests (#397) * Fix tests * Update node version * 331379891: (feat) disable connection settings when testrun is in progress (#371) * 331379891: (feat) disable connection settings when testrun is in progress * 331379891: (fix) include more testrun results as progress * 331379891: (fix) fix spelling * 333349715: (fix) GAR 1.3 The disabled system settings panel contains a focusable element (#388) Co-authored-by: Volha Mardvilka --------- Co-authored-by: Volha Mardvilka * Disable device item if device in progress (#370) * Adds ga to track testrun initiation (#415) Adds ga ti track testrun initiation * Certs progress * Add list and upload cert endpoint * Add delete certificate endpoint * Update cert response codes * Upgrade dependency --------- Co-authored-by: Sofia Kurilova Co-authored-by: Olga Mardvilko Co-authored-by: Volha Mardvilka --- framework/python/src/api/api.py | 135 ++++++++++++++++++------- framework/python/src/common/session.py | 126 ++++++++++++++++++++++- framework/python/src/core/testrun.py | 3 +- framework/requirements.txt | 4 + 4 files changed, 230 insertions(+), 38 deletions(-) diff --git a/framework/python/src/api/api.py b/framework/python/src/api/api.py index a5a72f31f..f48632b87 100644 --- a/framework/python/src/api/api.py +++ b/framework/python/src/api/api.py @@ -12,7 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. """Provides Testrun data via REST API.""" -from fastapi import FastAPI, APIRouter, Response, Request, status +from fastapi import (FastAPI, + File, + Form, + APIRouter, + Response, + Request, + status, + UploadFile) from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from datetime import datetime @@ -96,12 +103,14 @@ def __init__(self, test_run): self.edit_device, methods=["POST"]) - self._router.add_api_route("/system/modules", - self.get_test_modules) - - # Profiles - self._router.add_api_route("/profiles/format", - self._get_profiles_format) + self._router.add_api_route("/system/config/certs", + self.get_certs) + self._router.add_api_route("/system/config/certs", + self.upload_cert, + methods=["POST"]) + self._router.add_api_route("/system/config/certs", + self.delete_cert, + methods=["DELETE"]) # Allow all origins to access the API origins = ["*"] @@ -134,9 +143,6 @@ def _start(self): def stop(self): LOGGER.info("Stopping API") - def get_session(self): - return self._session - async def get_sys_interfaces(self): addrs = psutil.net_if_addrs() ifaces = {} @@ -211,7 +217,6 @@ async def start_test_run(self, request: Request, response: Response): "A device with that MAC address could not be found") device.firmware = body_json["device"]["firmware"] - device.test_modules = body_json["device"]["test_modules"] # Check if config has been updated (device interface not default) if (self._test_run.get_session().get_device_interface() @@ -561,20 +566,6 @@ async def get_results(self, response: Response, response.status_code = 404 return self._generate_msg(False, "Test results could not be found") - async def get_test_modules(self): - - LOGGER.debug("Received request to list test modules") - - test_modules = [] - - for test_module in self._get_test_run().get_test_orc().get_test_modules(): - - # Only add module if it is an actual, enabled test module - if (test_module.enabled and test_module.enable_container): - test_modules.append(test_module.display_name) - - return test_modules - def _validate_device_json(self, json_obj): # Check all required properties are present @@ -600,17 +591,93 @@ def _validate_device_json(self, json_obj): return True - def _get_test_run(self): - return self._test_run + def get_certs(self): + LOGGER.debug("Received certs list request") - # Profiles - def _get_profiles_format(self, response: Response): + # Reload certs + self._session.load_certs() - # Check if Testrun was able to load the format originally - if self.get_session().get_profiles_format() is None: - response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + return self._session.get_certs() + + async def upload_cert(self, + file: UploadFile, + response: Response): + + filename = file.filename + content_type = file.content_type + + LOGGER.debug("Received request to upload certificate") + LOGGER.debug(f"Filename: {filename}, content type: {content_type}") + + if content_type not in [ + "application/x-pem-file", + "application/x-x509-ca-cert" + ]: + response.status_code = status.HTTP_400_BAD_REQUEST return self._generate_msg( False, - "Testrun could not load the risk assessment format") + "Failed to upload certificate. Is it in the correct format?" + ) - return self.get_session().get_profiles_format() + if len(filename) > 24: + response.status_code = status.HTTP_400_BAD_REQUEST + return self._generate_msg( + False, + "Invalid filename. Maximum file name length is 24 characters." + ) + + # Check if file already exists + if not self._session.check_cert_file_name( + filename + ): + response.status_code = status.HTTP_409_CONFLICT + return self._generate_msg( + False, + "A certificate with that file name already exists." + ) + + # Get file contents + contents = await file.read() + + # Pass to session to check and write + cert_obj = self._session.upload_cert(filename, + contents) + + # Return error if something went wrong + if cert_obj is None: + return self._generate_msg( + False, + "Failed to upload certificate. Is it in the correct format?" + ) + + response.status_code = status.HTTP_201_CREATED + + return cert_obj + + async def delete_cert(self, request: Request, response: Response): + + LOGGER.debug("Received delete certificate request") + + try: + req_raw = (await request.body()).decode("UTF-8") + req_json = json.loads(req_raw) + + if "name" not in req_json: + response.status_code = status.HTTP_400_BAD_REQUEST + return self._generate_msg(False, "Received a bad request") + + common_name = req_json.get("name") + + for cert in self._session.get_certs(): + if cert["name"] == common_name: + self._session.delete_cert(cert["filename"]) + return self._generate_msg(True, "Successfully delete the certificate") + + response.status_code = status.HTTP_404_NOT_FOUND + return self._generate_msg( + False, + "A certificate with that name could not be found") + + except Exception as e: + LOGGER.error("An error occurred whilst deleting a certificate") + LOGGER.debug(e) diff --git a/framework/python/src/common/session.py b/framework/python/src/common/session.py index bd9dc658a..bce74754e 100644 --- a/framework/python/src/common/session.py +++ b/framework/python/src/common/session.py @@ -19,6 +19,11 @@ import os from common import util, logger +# Certificate dependencies +from cryptography import x509 +from cryptography.x509.oid import NameOID +from cryptography.hazmat.backends import default_backend + NETWORK_KEY = 'network' DEVICE_INTF_KEY = 'device_intf' INTERNET_INTF_KEY = 'internet_intf' @@ -28,6 +33,8 @@ API_URL_KEY = 'api_url' API_PORT_KEY = 'api_port' MAX_DEVICE_REPORTS_KEY = 'max_device_reports' +CERTS_PATH = 'local/root_certs' +CONFIG_FILE_PATH = 'local/system.json' PROFILE_FORMAT_PATH = 'resources/risk_assessment.json' PROFILES_DIR = 'local/profiles' @@ -37,18 +44,34 @@ class TestrunSession(): """Represents the current session of Test Run.""" - def __init__(self, root_dir, config_file, version): + def __init__(self, root_dir, version): self._root_dir = root_dir self._status = 'Idle' + + # Target test device self._device = None + + # Start time of testing self._started = None self._finished = None + + # Current testing results self._results = [] + + # All historical reports self._module_reports = [] + + # Parameters specified when starting Testrun self._runtime_params = [] + + # All device configurations self._device_repository = [] + + # Number of tests to be run this session self._total_tests = 0 + + # Direct url for PDF report self._report_url = None # Profiles @@ -56,7 +79,7 @@ def __init__(self, root_dir, config_file, version): self._profile_format_json = None # System configuration - self._config_file = config_file + self._config_file = os.path.join(root_dir, CONFIG_FILE_PATH) self._config = self._get_default_config() # Loading methods @@ -64,6 +87,10 @@ def __init__(self, root_dir, config_file, version): self._load_config() self._load_profiles() + self._certs = [] + self.load_certs() + + # Fetch the timezone of the host system tz = util.run_command('cat /etc/timezone') # TODO: Check if timezone is fetched successfully self._timezone = tz[0] @@ -359,3 +386,98 @@ def to_json(self): def get_timezone(self): return self._timezone + + def upload_cert(self, filename, content): + + try: + # Parse bytes into x509 object + cert = x509.load_pem_x509_certificate(content, default_backend()) + + # Extract required properties + common_name = cert.subject.get_attributes_for_oid( + NameOID.COMMON_NAME)[0].value + issuer = cert.issuer.get_attributes_for_oid( + NameOID.ORGANIZATION_NAME)[0].value + + # Craft python dictionary with values + cert_obj = { + 'name': common_name, + 'organisation': issuer, + 'expires': cert.not_valid_after_utc, + 'filename': filename + } + + with open(os.path.join(CERTS_PATH, filename), 'wb') as f: + f.write(content) + + util.run_command(f'chown -R {util.get_host_user()} {CERTS_PATH}') + + return cert_obj + + except Exception as e: + LOGGER.error('An error occured whilst parsing a certificate') + LOGGER.debug(e) + return None + + def check_cert_file_name(self, name): + + if os.path.exists(os.path.join(CERTS_PATH, name)): + return False + + return True + + def load_certs(self): + + LOGGER.debug(f'Loading certificates from {CERTS_PATH}') + + self._certs = [] + + for cert_file in os.listdir(CERTS_PATH): + LOGGER.debug(f'Loading certificate {cert_file}') + try: + + # Open certificate file + with open( + os.path.join( + CERTS_PATH, cert_file), 'rb',) as f: + + # Parse bytes into x509 object + cert = x509.load_pem_x509_certificate(f.read(), default_backend()) + + # Extract required properties + common_name = cert.subject.get_attributes_for_oid( + NameOID.COMMON_NAME)[0].value + issuer = cert.issuer.get_attributes_for_oid( + NameOID.ORGANIZATION_NAME)[0].value + + # Craft python dictionary with values + cert_obj = { + 'name': common_name, + 'organisation': issuer, + 'expires': cert.not_valid_after_utc, + 'filename': cert_file + } + + # Add certificate to list + self._certs.append(cert_obj) + + LOGGER.debug(f'Successfully loaded {cert_file}') + except Exception as e: + LOGGER.error(f'An error occurred whilst loading {cert_file}') + LOGGER.debug(e) + + def delete_cert(self, filename): + + LOGGER.debug(f'Deleting certificate {filename}') + + try: + cert_file = os.path.join(CERTS_PATH, filename) + os.remove(cert_file) + return True + except Exception as e: + LOGGER.error('An error occurred whilst deleting the certificate') + LOGGER.debug(e) + return False + + def get_certs(self): + return self._certs diff --git a/framework/python/src/core/testrun.py b/framework/python/src/core/testrun.py index e7c77c517..c8973fc6a 100644 --- a/framework/python/src/core/testrun.py +++ b/framework/python/src/core/testrun.py @@ -60,7 +60,7 @@ DEVICE_TEST_MODULES = 'test_modules' MAX_DEVICE_REPORTS_KEY = 'max_device_reports' -VERSION = '1.2.2' +VERSION = '1.3' class Testrun: # pylint: disable=too-few-public-methods """Test Run controller. @@ -90,7 +90,6 @@ def __init__(self, # Create session self._session = TestrunSession(root_dir=root_dir, - config_file=self._config_file, version=self.get_version()) # Register runtime parameters diff --git a/framework/requirements.txt b/framework/requirements.txt index 0d931f0dd..bf6a9bb95 100644 --- a/framework/requirements.txt +++ b/framework/requirements.txt @@ -14,6 +14,7 @@ weasyprint==60.2 fastapi==0.109.1 psutil==5.9.8 uvicorn==0.27.0 +python-multipart==0.0.9 pydantic==2.7.1 # Requirements for testing @@ -22,3 +23,6 @@ pytest-timeout==2.2.0 # Requirements for the report markdown==3.5.2 + +# Requirements for the session +cryptography==42.0.7