Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exception handling to timestamp parsing #598

Merged
merged 3 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions framework/python/src/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,12 +788,22 @@ async def upload_cert(self, file: UploadFile, response: Response):
try:
# Pass to session to check and write
cert_obj = self._session.upload_cert(filename, contents)
except ValueError:
response.status_code = status.HTTP_409_CONFLICT
return self._generate_msg(False,
"A certificate with that name already exists.")
except IOError:
LOGGER.error("An error occurred whilst uploading the certificate")

except ValueError as e:

# Returned when duplicate common name detected
if str(e) == "A certificate with that name already exists":
response.status_code = status.HTTP_409_CONFLICT
return self._generate_msg(
False, "A certificate with that common name already exists."
)

# Returned when unable to load PEM file
else:
response.status_code = status.HTTP_400_BAD_REQUEST
return self._generate_msg(
False,
"Failed to upload certificate. Is it in the correct format?")

# Return error if something went wrong
if cert_obj is None:
Expand Down
72 changes: 32 additions & 40 deletions framework/python/src/common/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,52 +598,44 @@ def upload_cert(self, filename, content):

now = datetime.datetime.now(pytz.utc)

try:
# Parse bytes into x509 object
cert = x509.load_pem_x509_certificate(content, default_backend())

# Extract required properties
common_name = cert.subject.get_attributes_for_oid(
NameOID.COMMON_NAME)[0].value

# Check if any existing certificates have the same common name
for cur_cert in self._certs:
if common_name == cur_cert['name']:
raise ValueError('A certificate with that name already exists')

issuer = cert.issuer.get_attributes_for_oid(
NameOID.ORGANIZATION_NAME)[0].value

status = 'Valid'
if now > cert.not_valid_after_utc:
status = 'Expired'

# Craft python dictionary with values
cert_obj = {
'name': common_name,
'status': status,
'organisation': issuer,
'expires': cert.not_valid_after_utc,
'filename': filename
}

with open(os.path.join(CERTS_PATH, filename), 'wb') as f:
f.write(content)
# Parse bytes into x509 object
cert = x509.load_pem_x509_certificate(content, default_backend())

# Extract required properties
common_name = cert.subject.get_attributes_for_oid(
NameOID.COMMON_NAME)[0].value

# Check if any existing certificates have the same common name
for cur_cert in self._certs:
if common_name == cur_cert['name']:
raise ValueError('A certificate with that name already exists')

issuer = cert.issuer.get_attributes_for_oid(
NameOID.ORGANIZATION_NAME)[0].value

status = 'Valid'
if now > cert.not_valid_after_utc:
status = 'Expired'

# Craft python dictionary with values
cert_obj = {
'name': common_name,
'status': status,
'organisation': issuer,
'expires': cert.not_valid_after_utc,
'filename': filename
}

util.run_command(f'chown -R {util.get_host_user()} {CERTS_PATH}')
with open(os.path.join(CERTS_PATH, filename), 'wb') as f:
f.write(content)

return cert_obj
util.run_command(f'chown -R {util.get_host_user()} {CERTS_PATH}')

except ValueError as e:
LOGGER.error(e)
raise
except Exception as e:
LOGGER.error('An error occured whilst parsing a certificate')
LOGGER.debug(e)
return None
return cert_obj

def check_cert_file_name(self, name):

# Check for duplicate file name
if os.path.exists(os.path.join(CERTS_PATH, name)):
return False

Expand Down
10 changes: 9 additions & 1 deletion framework/python/src/test_orc/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,18 @@ def _find_oldest_test(self, completed_tests_dir):
oldest_timestamp = None
oldest_directory = None
for completed_test in os.listdir(completed_tests_dir):
timestamp = datetime.strptime(str(completed_test), "%Y-%m-%dT%H:%M:%S")
try:
timestamp = datetime.strptime(str(completed_test), "%Y-%m-%dT%H:%M:%S")

# Occurs when time does not match format
except ValueError as e:
LOGGER.error(e)
continue

if oldest_timestamp is None or timestamp < oldest_timestamp:
oldest_timestamp = timestamp
oldest_directory = completed_test

if oldest_directory:
return oldest_timestamp, os.path.join(completed_tests_dir,
oldest_directory)
Expand Down