Skip to content

Commit

Permalink
Merge pull request #47 from geoadmin/bug-BGDIINF_SB-2420-origin-header
Browse files Browse the repository at this point in the history
BGDIINF_SB-2420: Fixed browser origin validation and CORS - #patch
  • Loading branch information
ltshb authored Jun 9, 2022
2 parents 2794fc7 + d988ba5 commit a7e7379
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .env.testing
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ALLOWED_DOMAINS=.*\.geo\.admin\.ch,.*\.bgdi\.ch,.*\.swisstopo\.cloud
ALLOWED_DOMAINS=.*\.geo\.admin\.ch,.*\.bgdi\.ch,http://localhost
AWS_ACCESS_KEY_ID=testing
AWS_SECRET_ACCESS_KEY=testing
AWS_SECURITY_TOKEN=testing
Expand Down
51 changes: 40 additions & 11 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
app.config.from_mapping({"TRAP_HTTP_EXCEPTIONS": True})


def is_domain_allowed(domain):
return re.match(ALLOWED_DOMAINS_PATTERN, domain) is not None


@app.before_request
# Add quick log of the routes used to all request.
# Important: this should be the first before_request method, to ensure
Expand All @@ -43,13 +47,36 @@ def validate_origin():
# any origin (anyone)
return

if 'Origin' not in request.headers:
logger.error('Origin header is not set')
# The Origin headers is automatically set by the browser and cannot be changed by the javascript
# application. Unfortunately this header is only set if the request comes from another origin.
# Sec-Fetch-Site header is set to `same-origin` by most of the browser except by Safari !
# The best protection would be to use the Sec-Fetch-Site and Origin header, however this is
# not supported by Safari. Therefore we added a fallback to the Referer header for Safari.
sec_fetch_site = request.headers.get('Sec-Fetch-Site', None)
origin = request.headers.get('Origin', None)
referrer = request.headers.get('Referer', None)

if origin is not None:
if is_domain_allowed(origin):
return
logger.error('Origin=%s is not allowed', origin)
abort(403, 'Permission denied')
if not re.match(ALLOWED_DOMAINS_PATTERN, request.headers['Origin']):
logger.error('Origin %s is not allowed', request.headers['Origin'])

if sec_fetch_site is not None:
if sec_fetch_site in ['same-origin', 'same-site']:
return
logger.error('Sec-Fetch-Site=%s is not allowed', sec_fetch_site)
abort(403, 'Permission denied')

if referrer is not None:
if is_domain_allowed(referrer):
return
logger.error('Referer=%s is not allowed', referrer)
abort(403, 'Permission denied')

logger.error('Referer and/or Origin and/or Sec-Fetch-Site headers not set')
abort(403, 'Permission denied')


@app.after_request
def add_charset(response):
Expand All @@ -66,13 +93,15 @@ def add_generic_cors_header(response):
if request.endpoint == 'checker':
return response

if (
'Origin' in request.headers and
re.match(ALLOWED_DOMAINS_PATTERN, request.headers['Origin'])
):
# Don't add the allow origin if the origin is not allowed, otherwise that would give
# a hint to the user on how to missused this service
response.headers.set('Access-Control-Allow-Origin', request.headers['Origin'])
if request.endpoint == 'get_shortlink' and get_redirect_param(ignore_errors=True):
# redirect endpoint are allowed from all origins
response.headers['Access-Control-Allow-Origin'] = "*"
else:
response.headers['Access-Control-Allow-Origin'] = request.host_url
if 'Origin' in request.headers and is_domain_allowed(request.headers['Origin']):
response.headers['Access-Control-Allow-Origin'] = request.headers['Origin']
response.headers['Vary'] = 'Origin'

# Always add the allowed methods.
response.headers.set(
'Access-Control-Allow-Methods', ', '.join(get_registered_method(app, request.url_rule))
Expand Down
6 changes: 4 additions & 2 deletions app/helpers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ def get_registered_method(app, url_rule):
)


def get_redirect_param():
def get_redirect_param(ignore_errors=False):
try:
redirect = strtobool(request.args.get('redirect', 'true'))
except ValueError as error:
abort(400, f'Invalid "redirect" arg: {error}')
redirect = False
if not ignore_errors:
abort(400, f'Invalid "redirect" arg: {error}')
return redirect


Expand Down
18 changes: 12 additions & 6 deletions tests/unit_tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,18 @@ def setUp(self):
def tearDown(self):
self.table.delete()

def assertCors(self, response, expected_allowed_methods, check_origin=True): # pylint: disable=invalid-name
if check_origin:
self.assertIn('Access-Control-Allow-Origin', response.headers)
self.assertTrue(
re.match(ALLOWED_DOMAINS_PATTERN, response.headers['Access-Control-Allow-Origin'])
)
def assertCors(
self,
response,
expected_allowed_methods,
origin_pattern=ALLOWED_DOMAINS_PATTERN
): # pylint: disable=invalid-name
self.assertIn('Access-Control-Allow-Origin', response.headers)
self.assertIsNotNone(
re.match(origin_pattern, response.headers['Access-Control-Allow-Origin']),
msg=f"Access-Control-Allow-Origin={response.headers['Access-Control-Allow-Origin']}"
f" doesn't match {origin_pattern}"
)
self.assertIn('Access-Control-Allow-Methods', response.headers)
self.assertListEqual(
sorted(expected_allowed_methods),
Expand Down
185 changes: 148 additions & 37 deletions tests/unit_tests/test_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging.config
import re

from nose2.tools import params

from flask import url_for

from app.settings import SHORT_ID_SIZE
Expand Down Expand Up @@ -54,14 +56,16 @@ def test_create_shortlink_no_json(self):
self.assertEqual(415, response.status_code)
self.assertCors(response, ['POST', 'OPTIONS'])
self.assertIn('application/json', response.content_type)
self.assertEqual({
'success': False,
'error': {
'code': 415,
'message': 'Input data missing or from wrong type, must be application/json'
}
},
response.json)
self.assertEqual(
{
'success': False,
'error': {
'code': 415,
'message': 'Input data missing or from wrong type, must be application/json'
}
},
response.json,
)

def test_create_shortlink_no_url(self):
response = self.app.post(
Expand All @@ -70,13 +74,15 @@ def test_create_shortlink_no_url(self):
self.assertEqual(400, response.status_code)
self.assertCors(response, ['POST', 'OPTIONS'])
self.assertIn('application/json', response.content_type)
self.assertEqual({
'success': False,
'error': {
'code': 400, 'message': 'Url parameter missing from request'
}
},
response.json)
self.assertEqual(
{
'success': False,
'error': {
'code': 400, 'message': 'Url parameter missing from request'
}
},
response.json,
)

def test_create_shortlink_no_hostname(self):
wrong_url = "/test"
Expand Down Expand Up @@ -145,7 +151,7 @@ def test_redirect_shortlink_ok(self):
for short_id, url in self.uuid_to_url_dict.items():
response = self.app.get(url_for('get_shortlink', shortlink_id=short_id))
self.assertEqual(response.status_code, 301)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], check_origin=False)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], origin_pattern=r"^\*$")
self.assertIn('Cache-Control', response.headers)
self.assertIn('max-age=', response.headers['Cache-Control'])
self.assertEqual(response.content_type, "text/html; charset=utf-8")
Expand All @@ -159,7 +165,7 @@ def test_redirect_shortlink_ok_with_query(self):
headers={"Origin": "www.example.com"}
)
self.assertEqual(response.status_code, 301)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], check_origin=False)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], origin_pattern=r"^\*$")
self.assertIn('Cache-Control', response.headers)
self.assertIn('max-age=', response.headers['Cache-Control'])
self.assertEqual(response.content_type, "text/html; charset=utf-8")
Expand All @@ -181,7 +187,10 @@ def test_shortlink_fetch_nok_invalid_redirect_parameter(self):
}
}
self.assertEqual(response.status_code, 400)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'])
self.assertCors(
response,
['GET', 'HEAD', 'OPTIONS'],
)
self.assertIn('Cache-Control', response.headers)
self.assertIn('max-age=3600', response.headers['Cache-Control'])
self.assertIn('application/json', response.content_type)
Expand All @@ -199,7 +208,7 @@ def test_redirect_shortlink_url_not_found(self):
}
}
self.assertEqual(response.status_code, 404)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'])
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], origin_pattern=r"^\*$")
self.assertIn('Cache-Control', response.headers)
self.assertIn('max-age=3600', response.headers['Cache-Control'])
self.assertIn('application/json', response.content_type)
Expand Down Expand Up @@ -256,26 +265,128 @@ def test_fetch_full_url_from_shortlink_url_not_found(self):
}
self.assertEqual(response.json, expected_json)

def test_create_shortlink_no_origin_header(self):
response = self.app.post("/")
@params(
None,
{'Origin': 'www.example'},
{'Origin': ''},
{
'Origin': 'www.example', 'Sec-Fetch-Site': 'cross-site'
},
{
'Origin': 'www.example', 'Sec-Fetch-Site': 'same-site'
},
{
'Origin': 'www.example', 'Sec-Fetch-Site': 'same-origin'
},
{'Referer': 'http://www.example'},
{'Referer': ''},
)
def test_create_shortlink_origin_not_allowed(self, headers):
response = self.app.post("/", headers=headers)
self.assertEqual(403, response.status_code)
self.assertCors(response, ['POST', 'OPTIONS'], check_origin=False)
self.assertCors(response, ['POST', 'OPTIONS'])
self.assertIn('application/json', response.content_type)
self.assertEqual({
'success': False, 'error': {
'code': 403, 'message': 'Permission denied'
}
self.assertEqual(
{
'success': False, 'error': {
'code': 403, 'message': 'Permission denied'
}
},
response.json,
)

@params(
{'Origin': 'map.geo.admin.ch'},
{
'Origin': 'map.geo.admin.ch', 'Sec-Fetch-Site': 'same-site'
},
{
'Origin': 's.geo.admin.ch', 'Sec-Fetch-Site': 'same-origin'
},
response.json)
{
'Origin': 'http://localhost', 'Sec-Fetch-Site': 'cross-site'
},
{'Sec-Fetch-Site': 'same-origin'},
{'Referer': 'https://map.geo.admin.ch'},
)
def test_create_shortlink_origin_allowed(self, headers):
url = "https://map.geo.admin.ch/test"
response = self.app.post(url_for('create_shortlink'), json={"url": url}, headers=headers)
self.assertEqual(response.status_code, 201)
self.assertCors(response, ['POST', 'OPTIONS'])
self.assertEqual(response.content_type, "application/json; charset=utf-8")
self.assertEqual(response.json.get('success'), True)

def test_create_shortlink_non_allowed_origin_header(self):
response = self.app.post("/", headers={"Origin": "big-bad-wolf.com"})
self.assertEqual(403, response.status_code)
self.assertCors(response, ['POST', 'OPTIONS'], check_origin=False)
self.assertIn('application/json', response.content_type)
self.assertEqual({
'success': False, 'error': {
'code': 403, 'message': 'Permission denied'
}
@params(
None,
{},
{'Origin': "www.example"},
{'Origin': 'map.geo.admin.ch'},
{'Origin': ''},
{
'Origin': 'www.example', 'Sec-Fetch-Site': 'cross-site'
},
{
'Origin': 'www.example', 'Sec-Fetch-Site': 'same-site'
},
response.json)
{
'Origin': 'www.example', 'Sec-Fetch-Site': 'same-origin'
},
{
'Origin': 'www.example', 'Sec-Fetch-Site': 'none'
},
{
'Origin': 'map.geo.admin.ch', 'Sec-Fetch-Site': 'same-site'
},
{
'Origin': 's.geo.admin.ch', 'Sec-Fetch-Site': 'same-origin'
},
{
'Origin': 'http://localhost', 'Sec-Fetch-Site': 'cross-site'
},
{'Referer': 'http://www.example'},
{'Referer': 'https://map.geo.admin.ch'},
{'Referer': ''},
{'Sec-Fetch-Site': 'same-origin'},
{'Sec-Fetch-Site': 'same-site'},
{'Sec-Fetch-Site': 'none'},
{'Sec-Fetch-Site': 'cross-site'},
)
def test_get_shortlink_redirect_origin_allowed(self, headers):
short_id = next(iter(self.uuid_to_url_dict.keys()))
response = self.app.get(
url_for('get_shortlink', shortlink_id=short_id),
query_string={'redirect': 'true'},
headers=headers
)
self.assertEqual(response.status_code, 301)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], origin_pattern=r"^\*$")

response = self.app.get(url_for('get_shortlink', shortlink_id=short_id), headers=headers)
self.assertEqual(response.status_code, 301)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'], origin_pattern=r"^\*$")

@params(
{'Origin': 'map.geo.admin.ch'},
{
'Origin': 'map.geo.admin.ch', 'Sec-Fetch-Site': 'same-site'
},
{
'Origin': 's.geo.admin.ch', 'Sec-Fetch-Site': 'same-origin'
},
{
'Origin': 'http://localhost', 'Sec-Fetch-Site': 'cross-site'
},
{'Sec-Fetch-Site': 'same-origin'},
{'Sec-Fetch-Site': 'same-site'},
{'Referer': 'https://map.geo.admin.ch'},
)
def test_get_shortlink_origin_allowed(self, headers):
short_id = next(iter(self.uuid_to_url_dict.keys()))
response = self.app.get(
url_for('get_shortlink', shortlink_id=short_id),
query_string={'redirect': 'false'},
headers=headers
)
self.assertEqual(response.status_code, 200)
self.assertCors(response, ['GET', 'HEAD', 'OPTIONS'])

0 comments on commit a7e7379

Please sign in to comment.