Skip to content

Commit

Permalink
Add certificate endpoints (#451)
Browse files Browse the repository at this point in the history
* Adds version analytics event (#306)

* Techdebt: adds state for testrun page (#392)

* Fix tests (#397)

* Fix tests

* Update node version

* 331379891: (feat) disable connection settings when testrun is in progress (#371)

* 331379891: (feat) disable connection settings when testrun is in progress

* 331379891: (fix) include more testrun results as progress

* 331379891: (fix) fix spelling

* 333349715: (fix) GAR 1.3 The disabled system settings panel contains a focusable element (#388)

Co-authored-by: Volha Mardvilka <mardvilka@google.com>

---------

Co-authored-by: Volha Mardvilka <mardvilka@google.com>

* Disable device item if device in progress (#370)

* Adds ga to track testrun initiation (#415)

Adds ga ti track testrun initiation

* Certs progress

* Add list and upload cert endpoint

* Add delete certificate endpoint

* Update cert response codes

* Upgrade dependency

---------

Co-authored-by: Sofia Kurilova <sonnycactus@gmail.com>
Co-authored-by: Olga Mardvilko <omardvilko@mail.ru>
Co-authored-by: Volha Mardvilka <mardvilka@google.com>
  • Loading branch information
4 people authored May 23, 2024
1 parent 916b293 commit 2b33fde
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 38 deletions.
135 changes: 101 additions & 34 deletions framework/python/src/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides Testrun data via REST API."""
from fastapi import FastAPI, APIRouter, Response, Request, status
from fastapi import (FastAPI,
File,
Form,
APIRouter,
Response,
Request,
status,
UploadFile)
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from datetime import datetime
Expand Down Expand Up @@ -96,12 +103,14 @@ def __init__(self, test_run):
self.edit_device,
methods=["POST"])

self._router.add_api_route("/system/modules",
self.get_test_modules)

# Profiles
self._router.add_api_route("/profiles/format",
self._get_profiles_format)
self._router.add_api_route("/system/config/certs",
self.get_certs)
self._router.add_api_route("/system/config/certs",
self.upload_cert,
methods=["POST"])
self._router.add_api_route("/system/config/certs",
self.delete_cert,
methods=["DELETE"])

# Allow all origins to access the API
origins = ["*"]
Expand Down Expand Up @@ -134,9 +143,6 @@ def _start(self):
def stop(self):
LOGGER.info("Stopping API")

def get_session(self):
return self._session

async def get_sys_interfaces(self):
addrs = psutil.net_if_addrs()
ifaces = {}
Expand Down Expand Up @@ -211,7 +217,6 @@ async def start_test_run(self, request: Request, response: Response):
"A device with that MAC address could not be found")

device.firmware = body_json["device"]["firmware"]
device.test_modules = body_json["device"]["test_modules"]

# Check if config has been updated (device interface not default)
if (self._test_run.get_session().get_device_interface()
Expand Down Expand Up @@ -561,20 +566,6 @@ async def get_results(self, response: Response,
response.status_code = 404
return self._generate_msg(False, "Test results could not be found")

async def get_test_modules(self):

LOGGER.debug("Received request to list test modules")

test_modules = []

for test_module in self._get_test_run().get_test_orc().get_test_modules():

# Only add module if it is an actual, enabled test module
if (test_module.enabled and test_module.enable_container):
test_modules.append(test_module.display_name)

return test_modules

def _validate_device_json(self, json_obj):

# Check all required properties are present
Expand All @@ -600,17 +591,93 @@ def _validate_device_json(self, json_obj):

return True

def _get_test_run(self):
return self._test_run
def get_certs(self):
LOGGER.debug("Received certs list request")

# Profiles
def _get_profiles_format(self, response: Response):
# Reload certs
self._session.load_certs()

# Check if Testrun was able to load the format originally
if self.get_session().get_profiles_format() is None:
response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return self._session.get_certs()

async def upload_cert(self,
file: UploadFile,
response: Response):

filename = file.filename
content_type = file.content_type

LOGGER.debug("Received request to upload certificate")
LOGGER.debug(f"Filename: {filename}, content type: {content_type}")

if content_type not in [
"application/x-pem-file",
"application/x-x509-ca-cert"
]:
response.status_code = status.HTTP_400_BAD_REQUEST
return self._generate_msg(
False,
"Testrun could not load the risk assessment format")
"Failed to upload certificate. Is it in the correct format?"
)

return self.get_session().get_profiles_format()
if len(filename) > 24:
response.status_code = status.HTTP_400_BAD_REQUEST
return self._generate_msg(
False,
"Invalid filename. Maximum file name length is 24 characters."
)

# Check if file already exists
if not self._session.check_cert_file_name(
filename
):
response.status_code = status.HTTP_409_CONFLICT
return self._generate_msg(
False,
"A certificate with that file name already exists."
)

# Get file contents
contents = await file.read()

# Pass to session to check and write
cert_obj = self._session.upload_cert(filename,
contents)

# Return error if something went wrong
if cert_obj is None:
return self._generate_msg(
False,
"Failed to upload certificate. Is it in the correct format?"
)

response.status_code = status.HTTP_201_CREATED

return cert_obj

async def delete_cert(self, request: Request, response: Response):

LOGGER.debug("Received delete certificate request")

try:
req_raw = (await request.body()).decode("UTF-8")
req_json = json.loads(req_raw)

if "name" not in req_json:
response.status_code = status.HTTP_400_BAD_REQUEST
return self._generate_msg(False, "Received a bad request")

common_name = req_json.get("name")

for cert in self._session.get_certs():
if cert["name"] == common_name:
self._session.delete_cert(cert["filename"])
return self._generate_msg(True, "Successfully delete the certificate")

response.status_code = status.HTTP_404_NOT_FOUND
return self._generate_msg(
False,
"A certificate with that name could not be found")

except Exception as e:
LOGGER.error("An error occurred whilst deleting a certificate")
LOGGER.debug(e)
126 changes: 124 additions & 2 deletions framework/python/src/common/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
import os
from common import util, logger

# Certificate dependencies
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.backends import default_backend

NETWORK_KEY = 'network'
DEVICE_INTF_KEY = 'device_intf'
INTERNET_INTF_KEY = 'internet_intf'
Expand All @@ -28,6 +33,8 @@
API_URL_KEY = 'api_url'
API_PORT_KEY = 'api_port'
MAX_DEVICE_REPORTS_KEY = 'max_device_reports'
CERTS_PATH = 'local/root_certs'
CONFIG_FILE_PATH = 'local/system.json'

PROFILE_FORMAT_PATH = 'resources/risk_assessment.json'
PROFILES_DIR = 'local/profiles'
Expand All @@ -37,33 +44,53 @@
class TestrunSession():
"""Represents the current session of Test Run."""

def __init__(self, root_dir, config_file, version):
def __init__(self, root_dir, version):
self._root_dir = root_dir

self._status = 'Idle'

# Target test device
self._device = None

# Start time of testing
self._started = None
self._finished = None

# Current testing results
self._results = []

# All historical reports
self._module_reports = []

# Parameters specified when starting Testrun
self._runtime_params = []

# All device configurations
self._device_repository = []

# Number of tests to be run this session
self._total_tests = 0

# Direct url for PDF report
self._report_url = None

# Profiles
self._profiles = []
self._profile_format_json = None

# System configuration
self._config_file = config_file
self._config_file = os.path.join(root_dir, CONFIG_FILE_PATH)
self._config = self._get_default_config()

# Loading methods
self._load_version(default_version=version)
self._load_config()
self._load_profiles()

self._certs = []
self.load_certs()

# Fetch the timezone of the host system
tz = util.run_command('cat /etc/timezone')
# TODO: Check if timezone is fetched successfully
self._timezone = tz[0]
Expand Down Expand Up @@ -359,3 +386,98 @@ def to_json(self):

def get_timezone(self):
return self._timezone

def upload_cert(self, filename, content):

try:
# Parse bytes into x509 object
cert = x509.load_pem_x509_certificate(content, default_backend())

# Extract required properties
common_name = cert.subject.get_attributes_for_oid(
NameOID.COMMON_NAME)[0].value
issuer = cert.issuer.get_attributes_for_oid(
NameOID.ORGANIZATION_NAME)[0].value

# Craft python dictionary with values
cert_obj = {
'name': common_name,
'organisation': issuer,
'expires': cert.not_valid_after_utc,
'filename': filename
}

with open(os.path.join(CERTS_PATH, filename), 'wb') as f:
f.write(content)

util.run_command(f'chown -R {util.get_host_user()} {CERTS_PATH}')

return cert_obj

except Exception as e:
LOGGER.error('An error occured whilst parsing a certificate')
LOGGER.debug(e)
return None

def check_cert_file_name(self, name):

if os.path.exists(os.path.join(CERTS_PATH, name)):
return False

return True

def load_certs(self):

LOGGER.debug(f'Loading certificates from {CERTS_PATH}')

self._certs = []

for cert_file in os.listdir(CERTS_PATH):
LOGGER.debug(f'Loading certificate {cert_file}')
try:

# Open certificate file
with open(
os.path.join(
CERTS_PATH, cert_file), 'rb',) as f:

# Parse bytes into x509 object
cert = x509.load_pem_x509_certificate(f.read(), default_backend())

# Extract required properties
common_name = cert.subject.get_attributes_for_oid(
NameOID.COMMON_NAME)[0].value
issuer = cert.issuer.get_attributes_for_oid(
NameOID.ORGANIZATION_NAME)[0].value

# Craft python dictionary with values
cert_obj = {
'name': common_name,
'organisation': issuer,
'expires': cert.not_valid_after_utc,
'filename': cert_file
}

# Add certificate to list
self._certs.append(cert_obj)

LOGGER.debug(f'Successfully loaded {cert_file}')
except Exception as e:
LOGGER.error(f'An error occurred whilst loading {cert_file}')
LOGGER.debug(e)

def delete_cert(self, filename):

LOGGER.debug(f'Deleting certificate {filename}')

try:
cert_file = os.path.join(CERTS_PATH, filename)
os.remove(cert_file)
return True
except Exception as e:
LOGGER.error('An error occurred whilst deleting the certificate')
LOGGER.debug(e)
return False

def get_certs(self):
return self._certs
3 changes: 1 addition & 2 deletions framework/python/src/core/testrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
DEVICE_TEST_MODULES = 'test_modules'
MAX_DEVICE_REPORTS_KEY = 'max_device_reports'

VERSION = '1.2.2'
VERSION = '1.3'

class Testrun: # pylint: disable=too-few-public-methods
"""Test Run controller.
Expand Down Expand Up @@ -90,7 +90,6 @@ def __init__(self,

# Create session
self._session = TestrunSession(root_dir=root_dir,
config_file=self._config_file,
version=self.get_version())

# Register runtime parameters
Expand Down
4 changes: 4 additions & 0 deletions framework/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ weasyprint==60.2
fastapi==0.109.1
psutil==5.9.8
uvicorn==0.27.0
python-multipart==0.0.9
pydantic==2.7.1

# Requirements for testing
Expand All @@ -22,3 +23,6 @@ pytest-timeout==2.2.0

# Requirements for the report
markdown==3.5.2

# Requirements for the session
cryptography==42.0.7

0 comments on commit 2b33fde

Please sign in to comment.