Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

D assistant take 2 #355

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
28 changes: 19 additions & 9 deletions contentctl/actions/build.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
import datetime
import json
import pathlib
import shutil

import uuid
from dataclasses import dataclass

from contentctl.input.director import DirectorOutputDto
from contentctl.objects.config import build
from contentctl.objects.lookup import CSVLookup, Lookup_Type
from contentctl.output.api_json_output import ApiJsonOutput
from contentctl.output.conf_output import ConfOutput
from contentctl.output.conf_writer import ConfWriter
from contentctl.output.api_json_output import ApiJsonOutput
from contentctl.output.data_source_writer import DataSourceWriter
from contentctl.objects.lookup import CSVLookup, Lookup_Type
import pathlib
import json
import datetime
import uuid

from contentctl.objects.config import build


@dataclass(frozen=True)
Expand Down Expand Up @@ -46,9 +44,21 @@
/ "data_sources.csv"
)

deprecation_lookup_fake_yml_path = (

Check failure on line 47 in contentctl/actions/build.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (F841)

contentctl/actions/build.py:47:13: F841 Local variable `deprecation_lookup_fake_yml_path` is assigned to but never used
input_dto.config.getPackageDirectoryPath()
/ "lookups"
/ "deprecated_content.yml"
)

DataSourceWriter.writeDataSourceCsv(
input_dto.director_output_dto.data_sources, data_sources_lookup_csv_path
)
deprecation_lookup_csv_path = (

Check failure on line 56 in contentctl/actions/build.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (F841)

contentctl/actions/build.py:56:13: F841 Local variable `deprecation_lookup_csv_path` is assigned to but never used
input_dto.config.getPackageDirectoryPath()
/ "lookups"
/ "deprecated_content.csv"
)

input_dto.director_output_dto.addContentToDictMappings(
CSVLookup.model_construct(
name="data_sources",
Expand Down
11 changes: 6 additions & 5 deletions contentctl/actions/validate.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import pathlib

from contentctl.input.director import Director, DirectorOutputDto
from contentctl.objects.config import validate
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.lookup import FileBackedLookup
from contentctl.helper.splunk_app import SplunkApp
from contentctl.helper.utils import Utils
from contentctl.input.director import Director, DirectorOutputDto
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.config import validate
from contentctl.objects.data_source import DataSource
from contentctl.helper.splunk_app import SplunkApp
from contentctl.objects.lookup import FileBackedLookup


class Validate:
Expand All @@ -27,6 +27,7 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
[],
[],
[],
[],
)

director = Director(director_output_dto)
Expand Down
35 changes: 15 additions & 20 deletions contentctl/helper/utils.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,34 @@
import os
import git
import shutil
import requests
import pathlib
import random
import shutil
import string
from math import ceil
from timeit import default_timer
import pathlib
from typing import TYPE_CHECKING, Tuple, Union

from typing import Union, Tuple
import git
import requests
import tqdm
from math import ceil

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.security_content_object import SecurityContentObject

from contentctl.objects.security_content_object import SecurityContentObject

TOTAL_BYTES = 0
ALWAYS_PULL = True


class Utils:
@staticmethod
def get_all_yml_files_from_directory(path: str) -> list[pathlib.Path]:
listOfFiles: list[pathlib.Path] = []
base_path = pathlib.Path(path)
if not base_path.exists():
return listOfFiles
for dirpath, dirnames, filenames in os.walk(path):
for file in filenames:
if file.endswith(".yml"):
listOfFiles.append(pathlib.Path(os.path.join(dirpath, file)))
def get_all_yml_files_from_directory(path: pathlib.Path) -> list[pathlib.Path]:
if not path.exists():
raise FileNotFoundError(
f"Trying to find content files in the directory {path}, but it does not exist. "
"It is not mandatory to have content in this directory, but it must exist"
)

return sorted(listOfFiles)
return sorted(pathlib.Path(yml_path) for yml_path in path.glob("**/*.yml"))

@staticmethod
def get_security_content_files_from_directory(
Expand Down Expand Up @@ -490,3 +484,4 @@ def getPercent(numerator: float, denominator: float, decimal_places: int) -> str
ratio = numerator / denominator
percent = ratio * 100
return Utils.getFixedWidth(percent, decimal_places) + "%"
return Utils.getFixedWidth(percent, decimal_places) + "%"
164 changes: 61 additions & 103 deletions contentctl/input/director.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import os
import sys
from dataclasses import dataclass, field
from pathlib import Path
from uuid import UUID

from pydantic import ValidationError
from pydantic import TypeAdapter, ValidationError

from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment
Expand All @@ -16,10 +15,18 @@
from contentctl.objects.dashboard import Dashboard
from contentctl.objects.data_source import DataSource
from contentctl.objects.deployment import Deployment
from contentctl.objects.deprecated_security_content_object import (
DeprecatedSecurityContentObject,
)
from contentctl.objects.detection import Detection
from contentctl.objects.enums import SecurityContentType
from contentctl.objects.investigation import Investigation
from contentctl.objects.lookup import Lookup, LookupAdapter
from contentctl.objects.lookup import (
CSVLookup,
KVStoreLookup,
Lookup,
LookupAdapter,
MlModel,
)
from contentctl.objects.macro import Macro
from contentctl.objects.playbook import Playbook
from contentctl.objects.security_content_object import SecurityContentObject
Expand All @@ -42,6 +49,7 @@ class DirectorOutputDto:
lookups: list[Lookup]
deployments: list[Deployment]
dashboards: list[Dashboard]
deprecated: list[DeprecatedSecurityContentObject]

data_sources: list[DataSource]
name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict)
Expand Down Expand Up @@ -82,9 +90,10 @@ def addContentToDictMappings(self, content: SecurityContentObject):
self.detections.append(content)
elif isinstance(content, Dashboard):
self.dashboards.append(content)

elif isinstance(content, DataSource):
self.data_sources.append(content)
elif isinstance(content, DeprecatedSecurityContentObject):
self.deprecated.append(content)
else:
raise Exception(f"Unknown security content type: {type(content)}")

Expand All @@ -101,40 +110,46 @@ def __init__(self, output_dto: DirectorOutputDto) -> None:

def execute(self, input_dto: validate) -> None:
self.input_dto = input_dto
self.createSecurityContent(SecurityContentType.deployments)
self.createSecurityContent(SecurityContentType.lookups)
self.createSecurityContent(SecurityContentType.macros)
self.createSecurityContent(SecurityContentType.stories)
self.createSecurityContent(SecurityContentType.baselines)
self.createSecurityContent(SecurityContentType.investigations)
self.createSecurityContent(SecurityContentType.data_sources)
self.createSecurityContent(SecurityContentType.playbooks)
self.createSecurityContent(SecurityContentType.detections)
self.createSecurityContent(SecurityContentType.dashboards)
self.createSecurityContent(Deployment)
self.createSecurityContent(LookupAdapter)
self.createSecurityContent(Macro)
self.createSecurityContent(Story)
self.createSecurityContent(Baseline)
self.createSecurityContent(Investigation)
self.createSecurityContent(DataSource)
self.createSecurityContent(Playbook)
self.createSecurityContent(Detection)
self.createSecurityContent(Dashboard)
self.createSecurityContent(DeprecatedSecurityContentObject)
self.validateDeprecation()

def validateDeprecation(self):
data = YmlReader.load_file(
self.input_dto.path / "deprecated" / "deprecated_detection_mapping.yml"
)
from contentctl.objects.abstract_security_content_objects.security_content_object_abstract import (
DeprecationDocumentationFile,
)

def createSecurityContent(self, contentType: SecurityContentType) -> None:
if contentType in [
SecurityContentType.deployments,
SecurityContentType.lookups,
SecurityContentType.macros,
SecurityContentType.stories,
SecurityContentType.baselines,
SecurityContentType.investigations,
SecurityContentType.playbooks,
SecurityContentType.detections,
SecurityContentType.data_sources,
SecurityContentType.dashboards,
]:
files = Utils.get_all_yml_files_from_directory(
os.path.join(self.input_dto.path, str(contentType.name))
)
security_content_files = [f for f in files]
else:
raise (
Exception(
f"Cannot createSecurityContent for unknown product {contentType}."
)
)
mapping = DeprecationDocumentationFile.model_validate(
data, context={"output_dto": self.output_dto, "config": self.input_dto}
)

for detection in mapping.detections:
detection.enforceDeprecationRequirement(self.input_dto)

def createSecurityContent(
self,
contentType: type[SecurityContentObject]
| TypeAdapter[CSVLookup | KVStoreLookup | MlModel],
) -> None:
files = Utils.get_all_yml_files_from_directory(
self.input_dto.path / contentType.containing_folder() # type: ignore
)

# convert this generator to a list so that we can
# calculate progress as we iterate over the files
security_content_files = [f for f in files]

validation_errors: list[tuple[Path, ValueError]] = []

Expand All @@ -144,80 +159,23 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
for index, file in enumerate(security_content_files):
progress_percent = ((index + 1) / len(security_content_files)) * 100
try:
type_string = contentType.name.upper()
type_string = contentType.__name__.upper() # type: ignore
modelDict = YmlReader.load_file(file)

if contentType == SecurityContentType.lookups:
lookup = LookupAdapter.validate_python(
modelDict,
context={
"output_dto": self.output_dto,
"config": self.input_dto,
},
)
# lookup = Lookup.model_validate(modelDict, context={"output_dto":self.output_dto, "config":self.input_dto})
self.output_dto.addContentToDictMappings(lookup)

elif contentType == SecurityContentType.macros:
macro = Macro.model_validate(
modelDict, context={"output_dto": self.output_dto}
)
self.output_dto.addContentToDictMappings(macro)

elif contentType == SecurityContentType.deployments:
deployment = Deployment.model_validate(
if contentType != LookupAdapter:
content = contentType.model_validate(
modelDict, context={"output_dto": self.output_dto}
)
self.output_dto.addContentToDictMappings(deployment)

elif contentType == SecurityContentType.playbooks:
playbook = Playbook.model_validate(
modelDict, context={"output_dto": self.output_dto}
)
self.output_dto.addContentToDictMappings(playbook)

elif contentType == SecurityContentType.baselines:
baseline = Baseline.model_validate(
modelDict, context={"output_dto": self.output_dto}
)
self.output_dto.addContentToDictMappings(baseline)

elif contentType == SecurityContentType.investigations:
investigation = Investigation.model_validate(
modelDict, context={"output_dto": self.output_dto}
)
self.output_dto.addContentToDictMappings(investigation)

elif contentType == SecurityContentType.stories:
story = Story.model_validate(
modelDict, context={"output_dto": self.output_dto}
)
self.output_dto.addContentToDictMappings(story)

elif contentType == SecurityContentType.detections:
detection = Detection.model_validate(
else:
content = contentType.validate_python(
modelDict,
context={
"output_dto": self.output_dto,
"app": self.input_dto.app,
"config": self.input_dto,
},
)
self.output_dto.addContentToDictMappings(detection)

elif contentType == SecurityContentType.dashboards:
dashboard = Dashboard.model_validate(
modelDict, context={"output_dto": self.output_dto}
)
self.output_dto.addContentToDictMappings(dashboard)

elif contentType == SecurityContentType.data_sources:
data_source = DataSource.model_validate(
modelDict, context={"output_dto": self.output_dto}
)
self.output_dto.addContentToDictMappings(data_source)

else:
raise Exception(f"Unsupported type: [{contentType}]")
self.output_dto.addContentToDictMappings(content)

if (
sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()
Expand All @@ -236,7 +194,7 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
validation_errors.append((relative_path, e))

print(
f"\r{f'{contentType.name.upper()} Progress'.rjust(23)}: [{progress_percent:3.0f}%]...",
f"\r{f'{contentType.__name__.upper()} Progress'.rjust(23)}: [{progress_percent:3.0f}%]...",
end="",
flush=True,
)
Expand Down
Loading
Loading