Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add security rules as a resource #304

Merged
merged 15 commits into from
Dec 10, 2024
233 changes: 233 additions & 0 deletions datadog_sync/model/security_monitoring_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""This is the model for the security monitoring rules"""

# Unless explicitly stated otherwise all files in this repository are licensed
# under the 3-clause BSD style license (see LICENSE).
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.

from __future__ import annotations
import copy
import json
from typing import TYPE_CHECKING, Optional, List, Dict, Tuple, cast

from datadog_sync.utils.base_resource import BaseResource, ResourceConfig
from datadog_sync.utils.custom_client import PaginationConfig
from datadog_sync.utils.resource_utils import (
CustomClientHTTPError,
SkipResource,
check_diff,
)


if TYPE_CHECKING:
from datadog_sync.utils.custom_client import CustomClient


class SecurityMonitoringRules(BaseResource):
"""Security Monitoring Rules inherits from BaseResource"""

resource_type = "security_monitoring_rules"
resource_config = ResourceConfig(
base_path="/api/v2/security_monitoring/rules",
excluded_attributes=[
"createdAt",
"creationAuthorId",
"updateAuthorId",
"updatedAt",
"isPartner",
"isBeta",
"isDeleted",
"isDeprecated",
"defaultTags",
"version",
],
non_nullable_attr=[],
null_values={},
)
# maximum page_size for this endpoint is 100 according to public api doc
pagination_config = PaginationConfig(
page_size=100,
page_number_param="page[number]",
page_size_param="page[size]",
remaining_func=lambda *args: 1,
)
destination_rules = {}
# can't even enable or disable immutable rules
immutable_rule_names = [
"Impossible travel event leads to permission enumeration",
]
errors_to_skip = [
"Invalid rule configuration",
]

async def get_resources(self, client: CustomClient) -> List[Dict]:
self.destination_rules = await self.get_destination_rules()
resp = await client.paginated_request(client.get)(
self.resource_config.base_path, pagination_config=self.pagination_config
)

return resp

async def import_resource(self, _id: Optional[str] = None, resource: Optional[Dict] = None) -> Tuple[str, Dict]:
if _id:
source_client = self.config.source_client
resource = (await source_client.get(self.resource_config.base_path + f"/{_id}"))["data"]

resource = cast(dict, resource)

return resource["id"], resource

async def pre_resource_action_hook(self, _id, resource: Dict) -> None:
matching_destination_rule = self.destination_rules.get(resource["name"], None)
if resource.get("isDefault", False) and not matching_destination_rule:
raise SkipResource(_id, self.resource_type, "Default rule does not exist at destination")
if resource["name"] in self.immutable_rule_names:
raise SkipResource(_id, self.resource_type, "This rule is immutable")
if matching_destination_rule and matching_destination_rule.get("isDeprecated", False):
raise SkipResource(_id, self.resource_type, "Cannot update deprecated rules")

async def pre_apply_hook(self) -> None:
self.destination_rules = await self.get_destination_rules()

async def create_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]:
# this method uses rule name for matching default rules
rule_name = resource["name"]

# rule does not exist at the destination, so create it
if rule_name not in self.destination_rules and not resource["isDefault"]:
destination_client = self.config.destination_client
self.handle_special_case_attr(resource)
try:
resp = await destination_client.post(self.resource_config.base_path, resource)
return _id, resp
except CustomClientHTTPError as err:
if err.status_code == 400:
preamble = "400 Bad Request - "
error_json_no_preamble = err.args[0][len(preamble) :]
error_obj = json.loads(error_json_no_preamble)
errors = error_obj["errors"]
for error_message in errors:
if error_message in self.errors_to_skip:
raise SkipResource(_id, self.resource_type, err.args[0])
raise err

# Skip any default rules that do no exist at the destination
matching_destination_rule = self.destination_rules.get(rule_name, None)
if not matching_destination_rule:
raise SkipResource(_id, self.resource_type, "Default rule does not exist at destination")

# if they're different then run an update
rule_copy = copy.deepcopy(resource)
rule_copy.update(matching_destination_rule)
if check_diff(self.resource_config, resource, rule_copy):
self.config.state.destination[self.resource_type][_id] = rule_copy
return await self.update_resource(_id, resource)

# do nothing if they're the same
return _id, rule_copy

async def update_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]:
# Skip any default rules that do no exist at the destination
matching_destination_rule = self.destination_rules.get(resource["name"], None)
if not matching_destination_rule:
raise SkipResource(_id, self.resource_type, "Default rule does not exist at destination")

if resource["isDefault"] != matching_destination_rule["isDefault"]:
raise SkipResource(_id, self.resource_type, "Default status differs between source and destination")

if matching_destination_rule.get("isDeprecated", False):
raise SkipResource(_id, self.resource_type, "Cannot update deprecated rules")

if resource["name"] in self.immutable_rule_names:
raise SkipResource(_id, self.resource_type, "This rule is immutable")

# set the version correctly
resource["version"] = matching_destination_rule["version"]

# only certain fields can be updated on default rules
if (
resource.get("isDefault", False)
or resource.get("isPartner", False)
or resource.get("partnerIntegrationId", None)
):
self.limit_resource(resource)

destination_client = self.config.destination_client
resp = await destination_client.put(
self.resource_config.base_path + f"/{self.config.state.destination[self.resource_type][_id]['id']}",
resource,
)

return _id, resp

async def delete_resource(self, _id: str) -> None:
destination_client = self.config.destination_client
destination_resource = self.config.state.destination[self.resource_type][_id]

if destination_resource["name"] in self.immutable_rule_names:
raise SkipResource(_id, self.resource_type, "This rule is immutable")

if destination_resource.get("isDefault", False):
raise SkipResource(_id, self.resource_type, "Default rule cannot be deleted")

if destination_resource.get("isPartner", False):
raise SkipResource(_id, self.resource_type, "Cannot delete partner rules")

await destination_client.delete(self.resource_config.base_path + f"/{destination_resource['id']}")

def connect_id(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optional[List[str]]:
pass

@staticmethod
def limit_resource(resource):
"""Default and partner security rules have some fields that cannot be updated we need to remove them"""
for field in [
"message",
"name",
"hasExtendedTitle",
"cases",
"complianceSignalOptions",
"filters",
"options",
"queries",
"referenceTables",
"thirdPartyCases",
"type",
]:
resource.pop(field, None)
resource.pop("isDefault", None)
resource.pop("isPartner", None)
resource.pop("partnerIntegrationId", None)

@staticmethod
def handle_special_case_attr(resource):
"""Handle default ComplianceSignal attributes"""
if "complianceSignalOptions" in resource:
default_activation_status = resource["complianceSignalOptions"].get("defaultActivationStatus", None)
user_activation_status = resource["complianceSignalOptions"].get("userActivationStatus", None)
if not user_activation_status:
resource["complianceSignalOptions"]["userActivationStatus"] = default_activation_status
resource["complianceSignalOptions"].pop("defaultActivationStatus")

default_group_by_fields = resource["complianceSignalOptions"].get("defaultGroupByFields", None)
user_group_by_fields = resource["complianceSignalOptions"].get("userGroupByFields", None)
if not user_group_by_fields:
resource["complianceSignalOptions"]["userGroupByFields"] = default_group_by_fields
resource["complianceSignalOptions"].pop("defaultGroupByFields")

async def get_destination_rules(self):
"""Get the existing rules from the destination"""
destination_client = self.config.destination_client
destination_rules = {}
try:
destination_rules_resp = await destination_client.paginated_request(destination_client.get)(
self.resource_config.base_path
)
except CustomClientHTTPError as err:
self.config.logger.error("error retrieving rules: %s", err)
return destination_rules

for rule in destination_rules_resp:
destination_rules[rule["name"]] = rule

return destination_rules
1 change: 1 addition & 0 deletions datadog_sync/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from datadog_sync.model.powerpacks import Powerpacks
from datadog_sync.model.restriction_policies import RestrictionPolicies
from datadog_sync.model.roles import Roles
from datadog_sync.model.security_monitoring_rules import SecurityMonitoringRules
from datadog_sync.model.sensitive_data_scanner_groups import SensitiveDataScannerGroups
from datadog_sync.model.sensitive_data_scanner_groups_order import SensitiveDataScannerGroupsOrder
from datadog_sync.model.sensitive_data_scanner_rules import SensitiveDataScannerRules
Expand Down
25 changes: 16 additions & 9 deletions datadog_sync/utils/resources_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,9 @@ async def _apply_resource_cb(self, q_item: List) -> None:
self.worker.counter.increment_success()

except SkipResource as e:
self.config.logger.info(str(e), resource_type=resource_type, _id=_id)
self.config.logger.info(f"skipping resource: {str(e)}", resource_type=resource_type, _id=_id)
self.worker.counter.increment_skipped()
await r_class._send_action_metrics(
Command.SYNC.value, _id, Status.SKIPPED.value, tags=["reason:up_to_date"]
)
await r_class._send_action_metrics(Command.SYNC.value, _id, Status.SKIPPED.value, tags=["reason:unknown"])
except ResourceConnectionError:
self.worker.counter.increment_skipped()
await r_class._send_action_metrics(
Expand Down Expand Up @@ -216,7 +214,13 @@ async def _diffs_worker_cb(self, q_item: List) -> None:

if not r_class.filter(resource):
return
await r_class._pre_resource_action_hook(_id, resource)

try:
await r_class._pre_resource_action_hook(_id, resource)
except SkipResource as e:
self.config.logger.warning(f"skipping resource: resource_type:{resource_type} id:{_id}")
self.config.logger.debug(str(e))
return

try:
r_class.connect_resources(_id, resource)
Expand All @@ -233,9 +237,7 @@ async def _diffs_worker_cb(self, q_item: List) -> None:
if diff:
self.config.logger.info("diff: \n {}".format(pformat(diff)), resource_type=resource_type, _id=_id)
else:
self.config.logger.info(
"to be created: \n {}".format(pformat(resource)), resource_type=resource_type, _id=_id
)
self.config.logger.info(f"to be created: {resource_type} {_id}")

async def import_resources(self) -> None:
await self.import_resources_without_saving()
Expand Down Expand Up @@ -294,7 +296,7 @@ async def _import_resource(self, q_item: List) -> None:
except SkipResource as e:
self.worker.counter.increment_skipped()
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.SKIPPED.value)
self.config.logger.warning(f"skip importing resource: resource_type:{resource_type} id:{_id}")
self.config.logger.info(f"skipping resource: {str(e)}", resource_type=resource_type, _id=_id)
self.config.logger.debug(str(e))
except Exception as e:
self.worker.counter.increment_failure()
Expand Down Expand Up @@ -327,6 +329,11 @@ async def _cleanup_worker(self, q_item: List) -> None:
await r_class._delete_resource(_id)
self.worker.counter.increment_success()
await r_class._send_action_metrics("delete", _id, Status.SUCCESS.value)
except SkipResource as e:
self.worker.counter.increment_skipped()
await r_class._send_action_metrics("delete", _id, Status.SKIPPED.value, tags=["reason:unknown"])
self.config.logger.info(f"skipping resource: {str(e)}", resource_type=resource_type, _id=_id)
self.config.logger.info(f"skip deleting resource: {str(e)}", resource_type=resource_type, _id=_id)
except Exception as e:
self.worker.counter.increment_failure()
await r_class._send_action_metrics("delete", _id, Status.FAILURE.value)
Expand Down
23 changes: 18 additions & 5 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ class BaseResourcesTestClass:
filter = ""
force_missing_deps = False

@staticmethod
def compute_cleanup_changes(resource_count, num_of_skips):
"""By default we just return the resource count"""
return resource_count

@staticmethod
def compute_import_changes(resource_count, num_of_skips):
"""By default we just return the resource count"""
return resource_count

@pytest.fixture(autouse=True, scope="class")
def setup(self, tmpdir_factory):
my_tmpdir = tmpdir_factory.mktemp("tmp")
Expand Down Expand Up @@ -65,7 +75,8 @@ def test_resource_import(self, runner, caplog):
assert 0 == ret.exit_code

num_resources_to_add = len(RESOURCE_TO_ADD_RE.findall(caplog.text))
assert num_resources_to_add == len(source_resources)
num_resources_skipped = len(RESOURCE_SKIPPED_RE.findall(caplog.text))
assert len(source_resources) == self.compute_import_changes(num_resources_to_add, num_resources_skipped)

def test_resource_sync(self, runner, caplog):
caplog.set_level(logging.DEBUG)
Expand All @@ -88,7 +99,8 @@ def test_resource_sync(self, runner, caplog):

def test_resource_update_sync(self, runner, caplog):
caplog.set_level(logging.DEBUG)
source_resources, _ = open_resources(self.resource_type)
# source_resources, _ = open_resources(self.resource_type)
source_resources, destination_resources = open_resources(self.resource_type)

# update fields and save the file.
for resource in source_resources.values():
Expand Down Expand Up @@ -147,7 +159,7 @@ def test_resource_update_sync(self, runner, caplog):
)
assert 0 == ret.exit_code
assert "to be deleted" not in caplog.text
assert "to be added" not in caplog.text
assert "to be created" not in caplog.text
assert "diff:" not in caplog.text

# Assert number of synced and imported resources match
Expand All @@ -168,7 +180,7 @@ def test_no_resource_diffs(self, runner, caplog):
)

assert "to be deleted" not in caplog.text
assert "to be added" not in caplog.text
assert "to be created" not in caplog.text
assert "diff:" not in caplog.text
assert 0 == ret.exit_code

Expand Down Expand Up @@ -211,7 +223,8 @@ def test_resource_cleanup(self, runner, caplog):

num_resources_skipped = len(RESOURCE_SKIPPED_RE.findall(caplog.text))
source_resources, destination_resources = open_resources(self.resource_type)
assert len(source_resources) == (len(destination_resources) + num_resources_skipped)

assert len(source_resources) == self.compute_cleanup_changes(len(destination_resources), num_resources_skipped)


def save_source_resources(resource_type, resources):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2024-12-06T16:42:12.921945-05:00
Loading
Loading