From d0efe5ea2d319abbb5a8c5acd5f9a6aeea3c916d Mon Sep 17 00:00:00 2001 From: Brice Schaffner Date: Thu, 9 Jun 2022 08:21:33 +0200 Subject: [PATCH] BGDIINF_SB-2420: Fixed browser origin validation and CORS - #patch The browser doesn't add the origin header when the request is made from the same origin. However it does always send the 'Sec-Fetch-Site' header telling if the request is from the same origin, cross origin or from user. See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Sec-Fetch-Site. Unfortunately this 'Sec-Fetch-Site' header is not supported by Safari ! Moreover we cannot hack the web application to always set the Origin header as most browser don't allow it. So we need to check 2 headers: Sec-Fetch-Site and Origin with a fallback to the Referer for Safari. Although this is usually not an issue for shortlink as it is always used from a different origin, the code has been corrected for completeness and consistency with other services. Also to have the correct CORS header in case of same origin we need to add the correct allowed origin in CORS header which is the same as the request. In case of cross-site then we use the Origin header as allowed origin in CORS. In case where the Origin header is not allowed, we use the request domain as Access-Control-Allow-Origin header. In case of redirect, the `Access-Control-Allow-Origin` header was not set if the origin was not from the allowed origin. Now this header in case of redirect is always set to `*` which tell the client that all origins are allowed for the redirect request. --- .env.testing | 2 +- app/__init__.py | 43 +++++++++++++----- app/helpers/utils.py | 6 ++- tests/unit_tests/base.py | 18 +++++--- tests/unit_tests/test_routes.py | 77 +++++++++++++++++++-------------- 5 files changed, 93 insertions(+), 53 deletions(-) diff --git a/.env.testing b/.env.testing index 578cfd8..bc16a53 100644 --- a/.env.testing +++ b/.env.testing @@ -1,4 +1,4 @@ -ALLOWED_DOMAINS=.*\.geo\.admin\.ch,.*\.bgdi\.ch,.*\.swisstopo\.cloud +ALLOWED_DOMAINS=.*\.geo\.admin\.ch,.*\.bgdi\.ch,http://localhost AWS_ACCESS_KEY_ID=testing AWS_SECRET_ACCESS_KEY=testing AWS_SECURITY_TOKEN=testing diff --git a/app/__init__.py b/app/__init__.py index 223d3de..5c1f63f 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -25,6 +25,10 @@ app.config.from_mapping({"TRAP_HTTP_EXCEPTIONS": True}) +def is_domain_allowed(domain): + return re.match(ALLOWED_DOMAINS_PATTERN, domain) is not None + + @app.before_request # Add quick log of the routes used to all request. # Important: this should be the first before_request method, to ensure @@ -43,11 +47,26 @@ def validate_origin(): # any origin (anyone) return - if 'Origin' not in request.headers: - logger.error('Origin header is not set') + # The Origin headers is automatically set by the browser and cannot be changed by the javascript + # application. Unfortunately this header is only set if the request comes from another origin. + # Sec-Fetch-Site header is set to `same-origin` by most of the browser except by Safari ! + # The best protection would be to use the Sec-Fetch-Site and Origin header, however this is + # not supported by Safari. Therefore we added a fallback to the Referer header for Safari. + sec_fetch_site = request.headers.get('Sec-Fetch-Site', None) + origin = request.headers.get('Origin', None) + referrer = request.headers.get('Referer', None) + + if origin is None and referrer is None and sec_fetch_site is None: + logger.error('Referer and/or Origin and/or Sec-Fetch-Site headers not set') + abort(403, 'Permission denied') + if origin is not None and not is_domain_allowed(origin): + logger.error('Origin=%s is not allowed', origin) abort(403, 'Permission denied') - if not re.match(ALLOWED_DOMAINS_PATTERN, request.headers['Origin']): - logger.error('Origin %s is not allowed', request.headers['Origin']) + if referrer is not None and not is_domain_allowed(referrer): + logger.error('Referer=%s is not allowed', referrer) + abort(403, 'Permission denied') + if sec_fetch_site is not None and sec_fetch_site != 'same-origin': + logger.error('Sec-Fetch-Site=%s is not allowed', sec_fetch_site) abort(403, 'Permission denied') @@ -66,13 +85,15 @@ def add_generic_cors_header(response): if request.endpoint == 'checker': return response - if ( - 'Origin' in request.headers and - re.match(ALLOWED_DOMAINS_PATTERN, request.headers['Origin']) - ): - # Don't add the allow origin if the origin is not allowed, otherwise that would give - # a hint to the user on how to missused this service - response.headers.set('Access-Control-Allow-Origin', request.headers['Origin']) + if request.endpoint == 'get_shortlink' and get_redirect_param(ignore_errors=True): + # redirect endpoint are allowed from all origins + response.headers['Access-Control-Allow-Origin'] = "*" + else: + response.headers['Access-Control-Allow-Origin'] = request.host_url + if 'Origin' in request.headers and is_domain_allowed(request.headers['Origin']): + response.headers['Access-Control-Allow-Origin'] = request.headers['Origin'] + response.headers['Vary'] = 'Origin' + # Always add the allowed methods. response.headers.set( 'Access-Control-Allow-Methods', ', '.join(get_registered_method(app, request.url_rule)) diff --git a/app/helpers/utils.py b/app/helpers/utils.py index 5fe39c0..8428ed0 100644 --- a/app/helpers/utils.py +++ b/app/helpers/utils.py @@ -59,11 +59,13 @@ def get_registered_method(app, url_rule): ) -def get_redirect_param(): +def get_redirect_param(ignore_errors=False): try: redirect = strtobool(request.args.get('redirect', 'true')) except ValueError as error: - abort(400, f'Invalid "redirect" arg: {error}') + redirect = False + if not ignore_errors: + abort(400, f'Invalid "redirect" arg: {error}') return redirect diff --git a/tests/unit_tests/base.py b/tests/unit_tests/base.py index d3738f2..a2454e2 100644 --- a/tests/unit_tests/base.py +++ b/tests/unit_tests/base.py @@ -83,12 +83,18 @@ def setUp(self): def tearDown(self): self.table.delete() - def assertCors(self, response, expected_allowed_methods, check_origin=True): # pylint: disable=invalid-name - if check_origin: - self.assertIn('Access-Control-Allow-Origin', response.headers) - self.assertTrue( - re.match(ALLOWED_DOMAINS_PATTERN, response.headers['Access-Control-Allow-Origin']) - ) + def assertCors( + self, + response, + expected_allowed_methods, + origin_pattern=ALLOWED_DOMAINS_PATTERN + ): # pylint: disable=invalid-name + self.assertIn('Access-Control-Allow-Origin', response.headers) + self.assertIsNotNone( + re.match(origin_pattern, response.headers['Access-Control-Allow-Origin']), + msg=f"Access-Control-Allow-Origin={response.headers['Access-Control-Allow-Origin']}" + f" doesn't match {origin_pattern}" + ) self.assertIn('Access-Control-Allow-Methods', response.headers) self.assertListEqual( sorted(expected_allowed_methods), diff --git a/tests/unit_tests/test_routes.py b/tests/unit_tests/test_routes.py index 4fb7205..1c88517 100644 --- a/tests/unit_tests/test_routes.py +++ b/tests/unit_tests/test_routes.py @@ -54,14 +54,16 @@ def test_create_shortlink_no_json(self): self.assertEqual(415, response.status_code) self.assertCors(response, ['POST', 'OPTIONS']) self.assertIn('application/json', response.content_type) - self.assertEqual({ - 'success': False, - 'error': { - 'code': 415, - 'message': 'Input data missing or from wrong type, must be application/json' - } - }, - response.json) + self.assertEqual( + { + 'success': False, + 'error': { + 'code': 415, + 'message': 'Input data missing or from wrong type, must be application/json' + } + }, + response.json, + ) def test_create_shortlink_no_url(self): response = self.app.post( @@ -70,13 +72,15 @@ def test_create_shortlink_no_url(self): self.assertEqual(400, response.status_code) self.assertCors(response, ['POST', 'OPTIONS']) self.assertIn('application/json', response.content_type) - self.assertEqual({ - 'success': False, - 'error': { - 'code': 400, 'message': 'Url parameter missing from request' - } - }, - response.json) + self.assertEqual( + { + 'success': False, + 'error': { + 'code': 400, 'message': 'Url parameter missing from request' + } + }, + response.json, + ) def test_create_shortlink_no_hostname(self): wrong_url = "/test" @@ -145,7 +149,7 @@ def test_redirect_shortlink_ok(self): for short_id, url in self.uuid_to_url_dict.items(): response = self.app.get(url_for('get_shortlink', shortlink_id=short_id)) self.assertEqual(response.status_code, 301) - self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], check_origin=False) + self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], origin_pattern=r"^\*$") self.assertIn('Cache-Control', response.headers) self.assertIn('max-age=', response.headers['Cache-Control']) self.assertEqual(response.content_type, "text/html; charset=utf-8") @@ -159,7 +163,7 @@ def test_redirect_shortlink_ok_with_query(self): headers={"Origin": "www.example.com"} ) self.assertEqual(response.status_code, 301) - self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], check_origin=False) + self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], origin_pattern=r"^\*$") self.assertIn('Cache-Control', response.headers) self.assertIn('max-age=', response.headers['Cache-Control']) self.assertEqual(response.content_type, "text/html; charset=utf-8") @@ -181,7 +185,10 @@ def test_shortlink_fetch_nok_invalid_redirect_parameter(self): } } self.assertEqual(response.status_code, 400) - self.assertCors(response, ['GET', 'HEAD', 'OPTIONS']) + self.assertCors( + response, + ['GET', 'HEAD', 'OPTIONS'], + ) self.assertIn('Cache-Control', response.headers) self.assertIn('max-age=3600', response.headers['Cache-Control']) self.assertIn('application/json', response.content_type) @@ -199,7 +206,7 @@ def test_redirect_shortlink_url_not_found(self): } } self.assertEqual(response.status_code, 404) - self.assertCors(response, ['GET', 'HEAD', 'OPTIONS']) + self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], origin_pattern=r"^\*$") self.assertIn('Cache-Control', response.headers) self.assertIn('max-age=3600', response.headers['Cache-Control']) self.assertIn('application/json', response.content_type) @@ -259,23 +266,27 @@ def test_fetch_full_url_from_shortlink_url_not_found(self): def test_create_shortlink_no_origin_header(self): response = self.app.post("/") self.assertEqual(403, response.status_code) - self.assertCors(response, ['POST', 'OPTIONS'], check_origin=False) + self.assertCors(response, ['POST', 'OPTIONS']) self.assertIn('application/json', response.content_type) - self.assertEqual({ - 'success': False, 'error': { - 'code': 403, 'message': 'Permission denied' - } - }, - response.json) + self.assertEqual( + { + 'success': False, 'error': { + 'code': 403, 'message': 'Permission denied' + } + }, + response.json, + ) def test_create_shortlink_non_allowed_origin_header(self): response = self.app.post("/", headers={"Origin": "big-bad-wolf.com"}) self.assertEqual(403, response.status_code) - self.assertCors(response, ['POST', 'OPTIONS'], check_origin=False) + self.assertCors(response, ['POST', 'OPTIONS']) self.assertIn('application/json', response.content_type) - self.assertEqual({ - 'success': False, 'error': { - 'code': 403, 'message': 'Permission denied' - } - }, - response.json) + self.assertEqual( + { + 'success': False, 'error': { + 'code': 403, 'message': 'Permission denied' + } + }, + response.json, + )