Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib/sync/outgoing: add dry run #13244

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Meta:
"user_delete_action",
"group_delete_action",
"default_group_email_domain",
"dry_run",
]
extra_kwargs = {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

from authentik.enterprise.providers.google_workspace.models import GoogleWorkspaceProvider
from authentik.lib.sync.outgoing import HTTP_CONFLICT
from authentik.lib.sync.outgoing.base import BaseOutgoingSyncClient
from authentik.lib.sync.outgoing.base import SAFE_METHODS, BaseOutgoingSyncClient
from authentik.lib.sync.outgoing.exceptions import (
BadRequestSyncException,
DryRunRejected,
NotFoundSyncException,
ObjectExistsSyncException,
StopSync,
Expand Down Expand Up @@ -43,6 +44,8 @@
self.domains.append(domain_name)

def _request(self, request: HttpRequest):
if self.provider.dry_run and request.method.upper() not in SAFE_METHODS:
raise DryRunRejected(request.uri, request.method, request.body)

Check warning on line 48 in authentik/enterprise/providers/google_workspace/clients/base.py

View check run for this annotation

Codecov / codecov/patch

authentik/enterprise/providers/google_workspace/clients/base.py#L48

Added line #L48 was not covered by tests
try:
response = request.execute()
except GoogleAuthError as exc:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Generated by Django 5.0.12 on 2025-02-24 19:43

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
(
"authentik_providers_google_workspace",
"0003_googleworkspaceprovidergroup_attributes_and_more",
),
]

operations = [
migrations.AddField(
model_name="googleworkspaceprovider",
name="dry_run",
field=models.BooleanField(
default=False,
help_text="When enabled, provider will not modify or create objects in the remote system.",
),
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Meta:
"filter_group",
"user_delete_action",
"group_delete_action",
"dry_run",
]
extra_kwargs = {}

Expand Down
56 changes: 41 additions & 15 deletions authentik/enterprise/providers/microsoft_entra/clients/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dataclasses import asdict
from typing import Any

import httpx
from azure.core.exceptions import (
ClientAuthenticationError,
ServiceRequestError,
Expand All @@ -12,6 +13,7 @@
from django.db.models import Model
from django.http import HttpResponseBadRequest, HttpResponseNotFound
from kiota_abstractions.api_error import APIError
from kiota_abstractions.request_information import RequestInformation
from kiota_authentication_azure.azure_identity_authentication_provider import (
AzureIdentityAuthenticationProvider,
)
Expand All @@ -21,34 +23,40 @@
from msgraph.graph_request_adapter import GraphRequestAdapter, options
from msgraph.graph_service_client import GraphServiceClient
from msgraph_core import GraphClientFactory
from opentelemetry import trace

from authentik.enterprise.providers.microsoft_entra.models import MicrosoftEntraProvider
from authentik.events.utils import sanitize_item
from authentik.lib.sync.outgoing import HTTP_CONFLICT
from authentik.lib.sync.outgoing.base import BaseOutgoingSyncClient
from authentik.lib.sync.outgoing.base import SAFE_METHODS, BaseOutgoingSyncClient
from authentik.lib.sync.outgoing.exceptions import (
BadRequestSyncException,
DryRunRejected,
NotFoundSyncException,
ObjectExistsSyncException,
StopSync,
TransientSyncException,
)


def get_request_adapter(
credentials: ClientSecretCredential, scopes: list[str] | None = None
) -> GraphRequestAdapter:
if scopes:
auth_provider = AzureIdentityAuthenticationProvider(credentials=credentials, scopes=scopes)
else:
auth_provider = AzureIdentityAuthenticationProvider(credentials=credentials)
class AuthentikRequestAdapter(GraphRequestAdapter):
def __init__(self, auth_provider, provider: MicrosoftEntraProvider, client=None):
super().__init__(auth_provider, client)
self._provider = provider

return GraphRequestAdapter(
auth_provider=auth_provider,
client=GraphClientFactory.create_with_default_middleware(
options=options, client=KiotaClientFactory.get_default_client()
),
)
async def get_http_response_message(
self,
request_info: RequestInformation,
parent_span: trace.Span,
claims: str = "",
) -> httpx.Response:
if self._provider.dry_run and request_info.http_method.value.upper() not in SAFE_METHODS:
raise DryRunRejected(
url=request_info.url,
method=request_info.http_method.value,
body=request_info.content.decode("utf-8"),
)
return await super().get_http_response_message(request_info, parent_span, claims=claims)


class MicrosoftEntraSyncClient[TModel: Model, TConnection: Model, TSchema: dict](
Expand All @@ -63,9 +71,27 @@
self.credentials = provider.microsoft_credentials()
self.__prefetch_domains()

def get_request_adapter(
self, credentials: ClientSecretCredential, scopes: list[str] | None = None
) -> AuthentikRequestAdapter:
if scopes:
auth_provider = AzureIdentityAuthenticationProvider(

Check warning on line 78 in authentik/enterprise/providers/microsoft_entra/clients/base.py

View check run for this annotation

Codecov / codecov/patch

authentik/enterprise/providers/microsoft_entra/clients/base.py#L78

Added line #L78 was not covered by tests
credentials=credentials, scopes=scopes
)
else:
auth_provider = AzureIdentityAuthenticationProvider(credentials=credentials)

return AuthentikRequestAdapter(
auth_provider=auth_provider,
provider=self.provider,
client=GraphClientFactory.create_with_default_middleware(
options=options, client=KiotaClientFactory.get_default_client()
),
)

@property
def client(self):
return GraphServiceClient(request_adapter=get_request_adapter(**self.credentials))
return GraphServiceClient(request_adapter=self.get_request_adapter(**self.credentials))

def _request[T](self, request: Coroutine[Any, Any, T]) -> T:
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Generated by Django 5.0.12 on 2025-02-24 19:43

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
(
"authentik_providers_microsoft_entra",
"0002_microsoftentraprovidergroup_attributes_and_more",
),
]

operations = [
migrations.AddField(
model_name="microsoftentraprovider",
name="dry_run",
field=models.BooleanField(
default=False,
help_text="When enabled, provider will not modify or create objects in the remote system.",
),
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class MicrosoftEntraUserTests(APITestCase):

@apply_blueprint("system/providers-microsoft-entra.yaml")
def setUp(self) -> None:

# Delete all users and groups as the mocked HTTP responses only return one ID
# which will cause errors with multiple users
Tenant.objects.update(avatars="none")
Expand Down Expand Up @@ -97,6 +96,38 @@ def test_user_create(self):
self.assertFalse(Event.objects.filter(action=EventAction.SYSTEM_EXCEPTION).exists())
user_create.assert_called_once()

def test_user_create_dry_run(self):
"""Test user creation (dry run)"""
self.provider.dry_run = True
self.provider.save()
uid = generate_id()
with (
patch(
"authentik.enterprise.providers.microsoft_entra.models.MicrosoftEntraProvider.microsoft_credentials",
MagicMock(return_value={"credentials": self.creds}),
),
patch(
"msgraph.generated.organization.organization_request_builder.OrganizationRequestBuilder.get",
AsyncMock(
return_value=OrganizationCollectionResponse(
value=[
Organization(verified_domains=[VerifiedDomain(name="goauthentik.io")])
]
)
),
),
):
user = User.objects.create(
username=uid,
name=f"{uid} {uid}",
email=f"{uid}@goauthentik.io",
)
microsoft_user = MicrosoftEntraProviderUser.objects.filter(
provider=self.provider, user=user
).first()
self.assertIsNone(microsoft_user)
self.assertFalse(Event.objects.filter(action=EventAction.SYSTEM_EXCEPTION).exists())

def test_user_not_created(self):
"""Test without property mappings, no group is created"""
self.provider.property_mappings.clear()
Expand Down
2 changes: 2 additions & 0 deletions authentik/lib/sync/outgoing/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class SyncObjectSerializer(PassiveSerializer):
)
)
sync_object_id = CharField()
override_dry_run = BooleanField(default=False)


class SyncObjectResultSerializer(PassiveSerializer):
Expand Down Expand Up @@ -98,6 +99,7 @@ def sync_object(self, request: Request, pk: int) -> Response:
page=1,
provider_pk=provider.pk,
pk=params.validated_data["sync_object_id"],
override_dry_run=params.validated_data["override_dry_run"],
).get()
return Response(SyncObjectResultSerializer(instance={"messages": res}).data)

Expand Down
8 changes: 8 additions & 0 deletions authentik/lib/sync/outgoing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ class Direction(StrEnum):
remove = "remove"


SAFE_METHODS = [
"GET",
"HEAD",
"OPTIONS",
"TRACE",
]


class BaseOutgoingSyncClient[
TModel: "Model", TConnection: "Model", TSchema: dict, TProvider: "OutgoingSyncProvider"
]:
Expand Down
16 changes: 16 additions & 0 deletions authentik/lib/sync/outgoing/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,22 @@ class BadRequestSyncException(BaseSyncException):
"""Exception when invalid data was sent to the remote system"""


class DryRunRejected(BaseSyncException):
"""When dry_run is enabled and a provider dropped a mutating request"""

def __init__(self, url: str, method: str, body: dict):
super().__init__()
self.url = url
self.method = method
self.body = body

def __repr__(self):
return self.__str__()

def __str__(self):
return f"Dry-run rejected request: {self.method} {self.url}"


class StopSync(BaseSyncException):
"""Exception raised when a configuration error should stop the sync process"""

Expand Down
13 changes: 11 additions & 2 deletions authentik/lib/sync/outgoing/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Any, Self

import pglock
from django.db import connection
from django.db import connection, models
from django.db.models import Model, QuerySet, TextChoices
from django.utils.translation import gettext_lazy as _

from authentik.core.models import Group, User
from authentik.lib.sync.outgoing.base import BaseOutgoingSyncClient
Expand All @@ -18,6 +19,14 @@ class OutgoingSyncDeleteAction(TextChoices):


class OutgoingSyncProvider(Model):
"""Base abstract models for providers implementing outgoing sync"""

dry_run = models.BooleanField(
default=False,
help_text=_(
"When enabled, provider will not modify or create objects in the remote system."
),
)

class Meta:
abstract = True
Expand All @@ -32,7 +41,7 @@ def get_object_qs[T: User | Group](self, type: type[T]) -> QuerySet[T]:

@property
def sync_lock(self) -> pglock.advisory:
"""Postgres lock for syncing SCIM to prevent multiple parallel syncs happening"""
"""Postgres lock for syncing to prevent multiple parallel syncs happening"""
return pglock.advisory(
lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}",
timeout=0,
Expand Down
33 changes: 30 additions & 3 deletions authentik/lib/sync/outgoing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from authentik.lib.sync.outgoing.base import Direction
from authentik.lib.sync.outgoing.exceptions import (
BadRequestSyncException,
DryRunRejected,
StopSync,
TransientSyncException,
)
Expand Down Expand Up @@ -105,7 +106,9 @@
return
task.set_status(TaskStatus.SUCCESSFUL, *messages)

def sync_objects(self, object_type: str, page: int, provider_pk: int, **filter):
def sync_objects(
self, object_type: str, page: int, provider_pk: int, override_dry_run=False, **filter
):
_object_type = path_to_class(object_type)
self.logger = get_logger().bind(
provider_type=class_to_path(self._provider_model),
Expand All @@ -116,6 +119,10 @@
provider = self._provider_model.objects.filter(pk=provider_pk).first()
if not provider:
return messages
# Override dry run mode if requested, however don't save the provider
# so that scheduled sync tasks still run in dry_run mode
if override_dry_run:
provider.dry_run = False

Check warning on line 125 in authentik/lib/sync/outgoing/tasks.py

View check run for this annotation

Codecov / codecov/patch

authentik/lib/sync/outgoing/tasks.py#L125

Added line #L125 was not covered by tests
try:
client = provider.client_for_model(_object_type)
except TransientSyncException:
Expand All @@ -132,6 +139,22 @@
except SkipObjectException:
self.logger.debug("skipping object due to SkipObject", obj=obj)
continue
except DryRunRejected as exc:
messages.append(
asdict(
LogEvent(
_("Dropping mutating request due to dry run"),
log_level="info",
logger=f"{provider._meta.verbose_name}@{object_type}",
attributes={
"obj": sanitize_item(obj),
"method": exc.method,
"url": exc.url,
"body": exc.body,
},
)
)
)
except BadRequestSyncException as exc:
self.logger.warning("failed to sync object", exc=exc, obj=obj)
messages.append(
Expand Down Expand Up @@ -231,8 +254,10 @@
raise Retry() from exc
except SkipObjectException:
continue
except DryRunRejected as exc:
self.logger.info("Rejected dry-run event", exc=exc)
except StopSync as exc:
self.logger.warning(exc, provider_pk=provider.pk)
self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk)

def sync_signal_m2m(self, group_pk: str, action: str, pk_set: list[int]):
self.logger = get_logger().bind(
Expand Down Expand Up @@ -263,5 +288,7 @@
raise Retry() from exc
except SkipObjectException:
continue
except DryRunRejected as exc:
self.logger.info("Rejected dry-run event", exc=exc)

Check warning on line 292 in authentik/lib/sync/outgoing/tasks.py

View check run for this annotation

Codecov / codecov/patch

authentik/lib/sync/outgoing/tasks.py#L291-L292

Added lines #L291 - L292 were not covered by tests
except StopSync as exc:
self.logger.warning(exc, provider_pk=provider.pk)
self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk)

Check warning on line 294 in authentik/lib/sync/outgoing/tasks.py

View check run for this annotation

Codecov / codecov/patch

authentik/lib/sync/outgoing/tasks.py#L294

Added line #L294 was not covered by tests
Loading
Loading