Skip to content

Commit

Permalink
Azure: support user delegation key (#1063)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimbleby authored Oct 7, 2021
1 parent 3222c23 commit 86bb44a
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 36 deletions.
66 changes: 47 additions & 19 deletions storages/backends/azure_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from azure.core.exceptions import ResourceNotFoundError
from azure.storage.blob import (
BlobClient, BlobSasPermissions, ContainerClient, ContentSettings,
BlobClient, BlobSasPermissions, BlobServiceClient, ContentSettings,
generate_blob_sas,
)
from django.core.exceptions import SuspiciousOperation
Expand Down Expand Up @@ -121,7 +121,10 @@ def _get_valid_path(s):
class AzureStorage(BaseStorage):
def __init__(self, **settings):
super().__init__(**settings)
self._service_client = None
self._client = None
self._user_delegation_key = None
self._user_delegation_key_expiry = datetime.utcnow()

def get_default_settings(self):
return {
Expand All @@ -144,36 +147,58 @@ def get_default_settings(self):
"token_credential": setting('AZURE_TOKEN_CREDENTIAL'),
}

def _container_client(self, custom_domain=None, connection_string=None):
if custom_domain is None:
account_domain = "blob.core.windows.net"
else:
account_domain = custom_domain
if connection_string is None:
connection_string = "{}://{}.{}".format(
self.azure_protocol,
self.account_name,
account_domain)
def _get_service_client(self):
if self.connection_string is not None:
return BlobServiceClient.from_connection_string(self.connection_string)

account_domain = self.custom_domain or "blob.core.windows.net"
account_url = "{}://{}.{}".format(
self.azure_protocol, self.account_name, account_domain
)
credential = None
if self.account_key:
credential = self.account_key
elif self.sas_token:
credential = self.sas_token
elif self.token_credential:
credential = self.token_credential
return ContainerClient(
connection_string,
self.azure_container,
credential=credential)
return BlobServiceClient(account_url, credential=credential)

@property
def service_client(self):
if self._service_client is None:
self._service_client = self._get_service_client()
return self._service_client

@property
def client(self):
if self._client is None:
self._client = self._container_client(
custom_domain=self.custom_domain,
connection_string=self.connection_string)
self._client = self.service_client.get_container_client(
self.azure_container
)
return self._client

def get_user_delegation_key(self, expiry):
# We'll only be able to get a user delegation key if we've authenticated with a
# token credential.
if self.token_credential is None:
return None

# Get a new key if we don't already have one, or if the one we have expires too
# soon.
if (
self._user_delegation_key is None
or expiry > self._user_delegation_key_expiry
):
now = datetime.utcnow()
key_expiry_time = now + timedelta(days=7)
self._user_delegation_key = self.service_client.get_user_delegation_key(
key_start_time=now, key_expiry_time=key_expiry_time
)
self._user_delegation_key_expiry = key_expiry_time

return self._user_delegation_key

@property
def azure_protocol(self):
if self.azure_ssl:
Expand Down Expand Up @@ -258,13 +283,16 @@ def url(self, name, expire=None):

credential = None
if expire:
expiry = self._expire_at(expire)
user_delegation_key = self.get_user_delegation_key(expiry)
sas_token = generate_blob_sas(
self.account_name,
self.azure_container,
name,
account_key=self.account_key,
user_delegation_key=user_delegation_key,
permission=BlobSasPermissions(read=True),
expiry=self._expire_at(expire))
expiry=expiry)
credential = sas_token

container_blob_url = self.client.get_blob_client(name).url
Expand Down
104 changes: 87 additions & 17 deletions tests/test_azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,47 +165,117 @@ def test_url_expire(self, generate_blob_sas_mocked):
fixed_time = utc.localize(datetime.datetime(2016, 11, 6, 4))
with mock.patch('storages.backends.azure_storage.datetime') as d_mocked:
d_mocked.utcnow.return_value = fixed_time
self.assertEquals(
self.assertEqual(
self.storage.url('some blob', 100),
'https://ret_foo.blob.core.windows.net/test/some%20blob')
generate_blob_sas_mocked.assert_called_once_with(
self.account_name,
self.container_name,
'some blob',
account_key=self.account_key,
user_delegation_key=None,
permission=mock.ANY,
expiry=fixed_time + timedelta(seconds=100))

@mock.patch('storages.backends.azure_storage.generate_blob_sas')
def test_url_expire_user_delegation_key(self, generate_blob_sas_mocked):
generate_blob_sas_mocked.return_value = 'foo_token'
blob_mock = mock.MagicMock()
blob_mock.url = 'https://ret_foo.blob.core.windows.net/test/some%20blob'
self.storage._client.get_blob_client.return_value = blob_mock
self.storage.account_name = self.account_name
service_client = mock.MagicMock()
self.storage._service_client = service_client
self.storage.token_credential = 'token_credential'

utc = pytz.timezone('UTC')
fixed_time = utc.localize(datetime.datetime(2016, 11, 6, 4))
with mock.patch('storages.backends.azure_storage.datetime') as d_mocked:
d_mocked.utcnow.return_value = fixed_time
service_client.get_user_delegation_key.return_value = 'user delegation key'
self.assertEqual(
self.storage.url('some blob', 100),
'https://ret_foo.blob.core.windows.net/test/some%20blob')
generate_blob_sas_mocked.assert_called_once_with(
self.account_name,
self.container_name,
'some blob',
account_key=self.account_key,
user_delegation_key='user delegation key',
permission=mock.ANY,
expiry=fixed_time + timedelta(seconds=100))

def test_container_client_default_params(self):
storage = azure_storage.AzureStorage()
storage.account_name = self.account_name
with mock.patch(
'storages.backends.azure_storage.ContainerClient',
autospec=True) as c_mocked:
self.assertIsNotNone(storage.client)
c_mocked.assert_called_once_with(
'storages.backends.azure_storage.BlobServiceClient',
autospec=True) as bsc_mocked:
client_mock = mock.MagicMock()
bsc_mocked.return_value.get_container_client.return_value = client_mock
self.assertEqual(storage.client, client_mock)
bsc_mocked.assert_called_once_with(
'https://test.blob.core.windows.net',
None,
credential=None)

def test_container_client_params(self):
def test_container_client_params_account_key(self):
storage = azure_storage.AzureStorage()
storage.account_name = 'foo_name'
storage.account_key = 'foo_key'
storage.sas_token = 'foo_token'
storage.azure_ssl = True
storage.custom_domain = 'foo_domain'
storage.connection_string = 'foo_conn'
storage.token_credential = 'foo_cred'
storage.account_key = 'foo_key'
with mock.patch(
'storages.backends.azure_storage.ContainerClient',
autospec=True) as c_mocked:
self.assertIsNotNone(storage.client)
c_mocked.assert_called_once_with(
'foo_conn',
None,
'storages.backends.azure_storage.BlobServiceClient',
autospec=True) as bsc_mocked:
client_mock = mock.MagicMock()
bsc_mocked.return_value.get_container_client.return_value = client_mock
self.assertEqual(storage.client, client_mock)
bsc_mocked.assert_called_once_with(
'https://foo_name.foo_domain',
credential='foo_key')

def test_container_client_params_sas_token(self):
storage = azure_storage.AzureStorage()
storage.account_name = 'foo_name'
storage.azure_ssl = False
storage.custom_domain = 'foo_domain'
storage.sas_token = 'foo_token'
with mock.patch(
'storages.backends.azure_storage.BlobServiceClient',
autospec=True) as bsc_mocked:
client_mock = mock.MagicMock()
bsc_mocked.return_value.get_container_client.return_value = client_mock
self.assertEqual(storage.client, client_mock)
bsc_mocked.assert_called_once_with(
'http://foo_name.foo_domain',
credential='foo_token')

def test_container_client_params_token_credential(self):
storage = azure_storage.AzureStorage()
storage.account_name = self.account_name
storage.token_credential = 'foo_cred'
with mock.patch(
'storages.backends.azure_storage.BlobServiceClient',
autospec=True) as bsc_mocked:
client_mock = mock.MagicMock()
bsc_mocked.return_value.get_container_client.return_value = client_mock
self.assertEqual(storage.client, client_mock)
bsc_mocked.assert_called_once_with(
'https://test.blob.core.windows.net',
credential='foo_cred')

def test_container_client_params_connection_string(self):
storage = azure_storage.AzureStorage()
storage.account_name = self.account_name
storage.connection_string = 'foo_conn'
with mock.patch(
'storages.backends.azure_storage.BlobServiceClient.from_connection_string',
spec=azure_storage.BlobServiceClient.from_connection_string) as bsc_mocked:
client_mock = mock.MagicMock()
bsc_mocked.return_value.get_container_client.return_value = client_mock
self.assertEqual(storage.client, client_mock)
bsc_mocked.assert_called_once_with('foo_conn')

# From boto3

def test_storage_save(self):
Expand Down

0 comments on commit 86bb44a

Please sign in to comment.