Skip to content

Commit

Permalink
Merge pull request #47 from geoadmin/bug-BGDIINF_SB-2442-fix-origin-c…
Browse files Browse the repository at this point in the history
…heck

BGDIINF_SB-2442: Fixed origin check - #patch
  • Loading branch information
BFLB authored Mar 2, 2023
2 parents aa6d309 + 2aa1606 commit 6e6bb13
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 24 deletions.
46 changes: 36 additions & 10 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
app.config.from_object(settings)


def is_domain_allowed(domain):
return re.match(ALLOWED_DOMAINS_PATTERN, domain) is not None


# NOTE it is better to have this method registered first (before validate_origin) otherwise
# the route might not be logged if another method reject the request.
@app.before_request
Expand All @@ -34,12 +38,35 @@ def log_route():
# Reject request from non allowed origins
@app.before_request
def validate_origin():
if 'Origin' not in request.headers:
logger.error('Origin header is not set')
abort(403, 'Not allowed')
if not re.match(ALLOWED_DOMAINS_PATTERN, request.headers['Origin']):
logger.error('Origin=%s is not allowed', request.headers['Origin'])
abort(403, 'Not allowed')
# The Origin header is automatically set by the browser and cannot be changed by the javascript
# application. Unfortunately this header is only set if the request comes from another origin.
# Sec-Fetch-Site header is set to `same-origin` by most of the browsers except by Safari!
# The best protection would be to use the Sec-Fetch-Site and Origin header, however this is
# not supported by Safari. Therefore we added a fallback to the Referer header for Safari.
sec_fetch_site = request.headers.get('Sec-Fetch-Site', None)
origin = request.headers.get('Origin', None)
referrer = request.headers.get('Referer', None)

if origin is not None:
if is_domain_allowed(origin):
return
logger.error('Origin=%s does not match %s', origin, ALLOWED_DOMAINS_PATTERN)
abort(403, 'Permission denied')

if sec_fetch_site is not None:
if sec_fetch_site in ['same-origin', 'same-site']:
return
logger.error('Sec-Fetch-Site=%s is not allowed', sec_fetch_site)
abort(403, 'Permission denied')

if referrer is not None:
if is_domain_allowed(referrer):
return
logger.error('Referer=%s does not match %s', referrer, ALLOWED_DOMAINS_PATTERN)
abort(403, 'Permission denied')

logger.error('Referer and/or Origin and/or Sec-Fetch-Site headers not set')
abort(403, 'Permission denied')


# Add CORS Headers to all request
Expand All @@ -64,10 +91,9 @@ def log_response(response):
request.path,
response.status,
extra={
'response':
{
"status_code": response.status_code, "headers": dict(response.headers.items())
},
'response': {
"status_code": response.status_code, "headers": dict(response.headers.items())
},
"duration": time.time() - g.get('request_started', time.time())
}
)
Expand Down
3 changes: 2 additions & 1 deletion app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
raise RuntimeError("Environment variable $ALLOWED_DOMAINS was not set")

ALLOWED_DOMAINS = ALLOWED_DOMAINS_STRING.split(',')
ALLOWED_DOMAINS_PATTERN = f"({'|'.join(ALLOWED_DOMAINS)})"

# Proxy settings
FORWARED_ALLOW_IPS = os.getenv('FORWARED_ALLOW_IPS', '*')
Expand All @@ -32,4 +33,4 @@
if WSGI_WORKERS <= 0:
from multiprocessing import cpu_count
WSGI_WORKERS = (cpu_count() * 2) + 1
WSGI_TIMEOUT = int(os.getenv('WSGI_TIMEOUT', '3'))
WSGI_TIMEOUT = int(os.getenv('WSGI_TIMEOUT', '3'))
11 changes: 5 additions & 6 deletions app/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@
# the tag is directly related to the commit or has an additional
# suffix 'v[0-9]+\.[0-9]+\.[0-9]+-beta.[0-9]-[0-9]+-gHASH' denoting
# the 'distance' to the latest tag
with subprocess.Popen(
["git", "describe", "--tags"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
) as proc:
with subprocess.Popen(["git", "describe", "--tags"], stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as proc:
stdout, stderr = proc.communicate()
GIT_VERSION = stdout.decode('utf-8').strip()
if GIT_VERSION == '':
# If theres no git tag found in the history we simply use the short
# version of the latest git commit hash
with subprocess.Popen(
["git", "rev-parse", "--short", "HEAD"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
) as proc:
with subprocess.Popen(["git", "rev-parse", "--short", "HEAD"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as proc:
stdout, stderr = proc.communicate()
APP_VERSION = f"v_{stdout.decode('utf-8').strip()}"
else:
Expand Down
50 changes: 43 additions & 7 deletions tests/unit_tests/test_route.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import unittest

from nose2.tools import params

from flask import url_for

from app import app
Expand All @@ -16,13 +18,7 @@ def setUp(self):
self.context.push()
self.app = app.test_client()
self.app.testing = True
self.origin_headers = {
"allowed": {
"Origin": "some_random_domain"
}, "bad": {
"Origin": "big-bad-wolf.com"
}
}
self.origin_headers = {"allowed": {"Origin": "some_random_domain"}}

def test_checker(self):
response = self.app.get(url_for('checker'), headers=self.origin_headers["allowed"])
Expand All @@ -35,3 +31,43 @@ def test_checker_non_allowed_origin(self):
self.assertEqual(response.status_code, 403)
self.assertEqual(response.content_type, "application/json")
self.assertEqual(response.json["error"]["message"], "Not allowed")

@params(
None,
{'Origin': 'www.example'},
{
'Origin': 'www.example', 'Sec-Fetch-Site': 'cross-site'
},
{
'Origin': 'www.example', 'Sec-Fetch-Site': 'same-site'
},
{
'Origin': 'www.example', 'Sec-Fetch-Site': 'same-origin'
},
{'Referer': 'http://www.example'},
)
def test_feedback_non_allowed_origin(self, headers):
response = self.app.get(url_for('checker'), headers=headers)
self.assertEqual(response.status_code, 403)
self.assertEqual(response.content_type, "application/json")
self.assertEqual(response.json["error"]["message"], "Not allowed")


@params(
{'Origin': 'map.geo.admin.ch'},
{
'Origin': 'map.geo.admin.ch', 'Sec-Fetch-Site': 'same-site'
},
{
'Origin': 'public.geo.admin.ch', 'Sec-Fetch-Site': 'same-origin'
},
{
'Origin': 'http://localhost', 'Sec-Fetch-Site': 'cross-site'
},
{'Sec-Fetch-Site': 'same-origin'},
{'Referer': 'https://map.geo.admin.ch'},
)
def test_feedback_allowed_origin(self, headers):
response = self.app.get(url_for('checker'), headers=headers)
self.assertEqual(response.status_code, 200)
self.assertCors(response, check_origin=False)

0 comments on commit 6e6bb13

Please sign in to comment.