Skip to content

Commit

Permalink
BGDIINF_SB-2420: Fixed browser origin validation and CORS - #patch
Browse files Browse the repository at this point in the history
The browser doesn't add the origin header when the request is made from the
same origin. However it does always send the 'Sec-Fetch-Site' header telling
if the request is from the same origin, cross origin or from user.
See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Sec-Fetch-Site.
Unfortunately this 'Sec-Fetch-Site' header is not supported by Safari !
Moreover we cannot hack the web application to always set the Origin header as
most browser don't allow it.

So we need to check 2 headers: Sec-Fetch-Site and Origin with a fallback to the
Referer for Safari.

Although this is usually not an issue for shortlink as it is always used from a
different origin, the code has been corrected for completeness and consistency
with other services.

Also to have the correct CORS header in case of same origin we need to add the
correct allowed origin in CORS header which is the same as the request. In case
of cross-site then we use the Origin header as allowed origin in CORS. In case
where the Origin header is not allowed, we use the request domain as
Access-Control-Allow-Origin header.

In case of redirect, the `Access-Control-Allow-Origin` header was not set
if the origin was not from the allowed origin. Now this header in case of
redirect is always set to `*` which tell the client that all origins are
allowed for the redirect request.
  • Loading branch information
ltshb committed Jun 9, 2022
1 parent 2794fc7 commit d0efe5e
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .env.testing
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ALLOWED_DOMAINS=.*\.geo\.admin\.ch,.*\.bgdi\.ch,.*\.swisstopo\.cloud
ALLOWED_DOMAINS=.*\.geo\.admin\.ch,.*\.bgdi\.ch,http://localhost
AWS_ACCESS_KEY_ID=testing
AWS_SECRET_ACCESS_KEY=testing
AWS_SECURITY_TOKEN=testing
Expand Down
43 changes: 32 additions & 11 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
app.config.from_mapping({"TRAP_HTTP_EXCEPTIONS": True})


def is_domain_allowed(domain):
return re.match(ALLOWED_DOMAINS_PATTERN, domain) is not None


@app.before_request
# Add quick log of the routes used to all request.
# Important: this should be the first before_request method, to ensure
Expand All @@ -43,11 +47,26 @@ def validate_origin():
# any origin (anyone)
return

if 'Origin' not in request.headers:
logger.error('Origin header is not set')
# The Origin headers is automatically set by the browser and cannot be changed by the javascript
# application. Unfortunately this header is only set if the request comes from another origin.
# Sec-Fetch-Site header is set to `same-origin` by most of the browser except by Safari !
# The best protection would be to use the Sec-Fetch-Site and Origin header, however this is
# not supported by Safari. Therefore we added a fallback to the Referer header for Safari.
sec_fetch_site = request.headers.get('Sec-Fetch-Site', None)
origin = request.headers.get('Origin', None)
referrer = request.headers.get('Referer', None)

if origin is None and referrer is None and sec_fetch_site is None:
logger.error('Referer and/or Origin and/or Sec-Fetch-Site headers not set')
abort(403, 'Permission denied')
if origin is not None and not is_domain_allowed(origin):
logger.error('Origin=%s is not allowed', origin)
abort(403, 'Permission denied')
if not re.match(ALLOWED_DOMAINS_PATTERN, request.headers['Origin']):
logger.error('Origin %s is not allowed', request.headers['Origin'])
if referrer is not None and not is_domain_allowed(referrer):
logger.error('Referer=%s is not allowed', referrer)
abort(403, 'Permission denied')
if sec_fetch_site is not None and sec_fetch_site != 'same-origin':
logger.error('Sec-Fetch-Site=%s is not allowed', sec_fetch_site)
abort(403, 'Permission denied')


Expand All @@ -66,13 +85,15 @@ def add_generic_cors_header(response):
if request.endpoint == 'checker':
return response

if (
'Origin' in request.headers and
re.match(ALLOWED_DOMAINS_PATTERN, request.headers['Origin'])
):
# Don't add the allow origin if the origin is not allowed, otherwise that would give
# a hint to the user on how to missused this service
response.headers.set('Access-Control-Allow-Origin', request.headers['Origin'])
if request.endpoint == 'get_shortlink' and get_redirect_param(ignore_errors=True):
# redirect endpoint are allowed from all origins
response.headers['Access-Control-Allow-Origin'] = "*"
else:
response.headers['Access-Control-Allow-Origin'] = request.host_url
if 'Origin' in request.headers and is_domain_allowed(request.headers['Origin']):
response.headers['Access-Control-Allow-Origin'] = request.headers['Origin']
response.headers['Vary'] = 'Origin'

# Always add the allowed methods.
response.headers.set(
'Access-Control-Allow-Methods', ', '.join(get_registered_method(app, request.url_rule))
Expand Down
6 changes: 4 additions & 2 deletions app/helpers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ def get_registered_method(app, url_rule):
)


def get_redirect_param():
def get_redirect_param(ignore_errors=False):
try:
redirect = strtobool(request.args.get('redirect', 'true'))
except ValueError as error:
abort(400, f'Invalid "redirect" arg: {error}')
redirect = False
if not ignore_errors:
abort(400, f'Invalid "redirect" arg: {error}')
return redirect


Expand Down
18 changes: 12 additions & 6 deletions tests/unit_tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,18 @@ def setUp(self):
def tearDown(self):
self.table.delete()

def assertCors(self, response, expected_allowed_methods, check_origin=True): # pylint: disable=invalid-name
if check_origin:
self.assertIn('Access-Control-Allow-Origin', response.headers)
self.assertTrue(
re.match(ALLOWED_DOMAINS_PATTERN, response.headers['Access-Control-Allow-Origin'])
)
def assertCors(
self,
response,
expected_allowed_methods,
origin_pattern=ALLOWED_DOMAINS_PATTERN
): # pylint: disable=invalid-name
self.assertIn('Access-Control-Allow-Origin', response.headers)
self.assertIsNotNone(
re.match(origin_pattern, response.headers['Access-Control-Allow-Origin']),
msg=f"Access-Control-Allow-Origin={response.headers['Access-Control-Allow-Origin']}"
f" doesn't match {origin_pattern}"
)
self.assertIn('Access-Control-Allow-Methods', response.headers)
self.assertListEqual(
sorted(expected_allowed_methods),
Expand Down
77 changes: 44 additions & 33 deletions tests/unit_tests/test_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,16 @@ def test_create_shortlink_no_json(self):
self.assertEqual(415, response.status_code)
self.assertCors(response, ['POST', 'OPTIONS'])
self.assertIn('application/json', response.content_type)
self.assertEqual({
'success': False,
'error': {
'code': 415,
'message': 'Input data missing or from wrong type, must be application/json'
}
},
response.json)
self.assertEqual(
{
'success': False,
'error': {
'code': 415,
'message': 'Input data missing or from wrong type, must be application/json'
}
},
response.json,
)

def test_create_shortlink_no_url(self):
response = self.app.post(
Expand All @@ -70,13 +72,15 @@ def test_create_shortlink_no_url(self):
self.assertEqual(400, response.status_code)
self.assertCors(response, ['POST', 'OPTIONS'])
self.assertIn('application/json', response.content_type)
self.assertEqual({
'success': False,
'error': {
'code': 400, 'message': 'Url parameter missing from request'
}
},
response.json)
self.assertEqual(
{
'success': False,
'error': {
'code': 400, 'message': 'Url parameter missing from request'
}
},
response.json,
)

def test_create_shortlink_no_hostname(self):
wrong_url = "/test"
Expand Down Expand Up @@ -145,7 +149,7 @@ def test_redirect_shortlink_ok(self):
for short_id, url in self.uuid_to_url_dict.items():
response = self.app.get(url_for('get_shortlink', shortlink_id=short_id))
self.assertEqual(response.status_code, 301)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], check_origin=False)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], origin_pattern=r"^\*$")
self.assertIn('Cache-Control', response.headers)
self.assertIn('max-age=', response.headers['Cache-Control'])
self.assertEqual(response.content_type, "text/html; charset=utf-8")
Expand All @@ -159,7 +163,7 @@ def test_redirect_shortlink_ok_with_query(self):
headers={"Origin": "www.example.com"}
)
self.assertEqual(response.status_code, 301)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], check_origin=False)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], origin_pattern=r"^\*$")
self.assertIn('Cache-Control', response.headers)
self.assertIn('max-age=', response.headers['Cache-Control'])
self.assertEqual(response.content_type, "text/html; charset=utf-8")
Expand All @@ -181,7 +185,10 @@ def test_shortlink_fetch_nok_invalid_redirect_parameter(self):
}
}
self.assertEqual(response.status_code, 400)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'])
self.assertCors(
response,
['GET', 'HEAD', 'OPTIONS'],
)
self.assertIn('Cache-Control', response.headers)
self.assertIn('max-age=3600', response.headers['Cache-Control'])
self.assertIn('application/json', response.content_type)
Expand All @@ -199,7 +206,7 @@ def test_redirect_shortlink_url_not_found(self):
}
}
self.assertEqual(response.status_code, 404)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'])
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], origin_pattern=r"^\*$")
self.assertIn('Cache-Control', response.headers)
self.assertIn('max-age=3600', response.headers['Cache-Control'])
self.assertIn('application/json', response.content_type)
Expand Down Expand Up @@ -259,23 +266,27 @@ def test_fetch_full_url_from_shortlink_url_not_found(self):
def test_create_shortlink_no_origin_header(self):
response = self.app.post("/")
self.assertEqual(403, response.status_code)
self.assertCors(response, ['POST', 'OPTIONS'], check_origin=False)
self.assertCors(response, ['POST', 'OPTIONS'])
self.assertIn('application/json', response.content_type)
self.assertEqual({
'success': False, 'error': {
'code': 403, 'message': 'Permission denied'
}
},
response.json)
self.assertEqual(
{
'success': False, 'error': {
'code': 403, 'message': 'Permission denied'
}
},
response.json,
)

def test_create_shortlink_non_allowed_origin_header(self):
response = self.app.post("/", headers={"Origin": "big-bad-wolf.com"})
self.assertEqual(403, response.status_code)
self.assertCors(response, ['POST', 'OPTIONS'], check_origin=False)
self.assertCors(response, ['POST', 'OPTIONS'])
self.assertIn('application/json', response.content_type)
self.assertEqual({
'success': False, 'error': {
'code': 403, 'message': 'Permission denied'
}
},
response.json)
self.assertEqual(
{
'success': False, 'error': {
'code': 403, 'message': 'Permission denied'
}
},
response.json,
)

0 comments on commit d0efe5e

Please sign in to comment.