Skip to content

Commit

Permalink
Merge pull request #47 from arblade:main
Browse files Browse the repository at this point in the history
Improving regex oring
  • Loading branch information
thomaspatzke authored Dec 17, 2024
2 parents 2685a3e + f514e76 commit 210a51f
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 34 deletions.
74 changes: 42 additions & 32 deletions sigma/backends/splunk/splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,41 +41,53 @@ class SplunkDeferredORRegularExpression(DeferredTextQueryExpression):
}

def __init__(self, state, field, arg) -> None:
SplunkDeferredORRegularExpression.add_field(field)
index_suffix = SplunkDeferredORRegularExpression.get_index_suffix(field)
self.template = (
'rex field={field} "(?<{field}Match'
+ index_suffix
+ '>{value})"\n| eval {field}Condition'
+ index_suffix
+ "=if(isnotnull({field}Match"
+ index_suffix
+ '), "true", "false")'
self.add_field(field)
field_condition = self.get_field_condition(field)
field_match = self.get_field_match(field)
self.template = 'rex field={{field}} "(?<{field_match}>{{value}})"\n| eval {field_condition}=if(isnotnull({field_match}), "true", "false")'.format(
field_match=field_match, field_condition=field_condition
)
return super().__init__(state, field, arg)

@staticmethod
def clean_field(field):
# splunk does not allow dots in regex group, so we need to clean variables
return re.sub(".*\\.", "", field)

@classmethod
def add_field(cls, field):
cls.field_counts[field] = (
cls.field_counts.get(field, 0) + 1
) # increment the field count

@classmethod
def get_index_suffix(cls, field):

index_suffix = cls.field_counts.get(field, 0)
def get_field_suffix(cls, field):
index_suffix = cls.field_counts.get(field, "")
if index_suffix == 1:
# return nothing for the first field use
return ""
return str(index_suffix)
index_suffix = ""
return index_suffix

@classmethod
def construct_field_variable(cls, field, variable):
cleaned_field = cls.clean_field(field)
index_suffix = cls.get_field_suffix(field)
return f"{cleaned_field}{variable}{index_suffix}"

@classmethod
def get_field_match(cls, field):
return cls.construct_field_variable(field, "Match")

@classmethod
def get_field_condition(cls, field):
return cls.construct_field_variable(field, "Condition")

@classmethod
def reset(cls):
cls.field_counts = {}


class SplunkDeferredCIDRExpression(DeferredTextQueryExpression):
template = 'where {op}cidrmatch("{value}", {field})'
class SplunkDeferredFieldRefExpression(DeferredTextQueryExpression):
template = "where {op}'{field}'='{value}'"
operators = {
True: "NOT ",
False: "",
Expand Down Expand Up @@ -107,6 +119,7 @@ class SplunkBackend(TextQueryBackend):
)
group_expression: ClassVar[str] = "({expr})"

bool_values = {True: "true", False: "false"}
or_token: ClassVar[str] = "OR"
and_token: ClassVar[str] = " "
not_token: ClassVar[str] = "NOT"
Expand All @@ -125,7 +138,7 @@ class SplunkBackend(TextQueryBackend):
re_escape_char: ClassVar[str] = "\\"
re_escape: ClassVar[Tuple[str]] = ('"',)

cidr_expression: ClassVar[str] = "{value}"
cidr_expression: ClassVar[str] = '{field}="{value}"'

compare_op_expression: ClassVar[str] = "{field}{operator}{value}"
compare_operators: ClassVar[Dict[SigmaCompareExpression.CompareOperators, str]] = {
Expand All @@ -135,6 +148,7 @@ class SplunkBackend(TextQueryBackend):
SigmaCompareExpression.CompareOperators.GTE: ">=",
}

field_equals_field_expression: ClassVar[str] = "{field2}"
field_null_expression: ClassVar[str] = "NOT {field}=*"

convert_or_as_in: ClassVar[bool] = True
Expand Down Expand Up @@ -248,9 +262,7 @@ def convert_condition_field_eq_val_re(
).postprocess(None, cond)

cond_true = ConditionFieldEqualsValueExpression(
cond.field
+ "Condition"
+ str(SplunkDeferredORRegularExpression.get_index_suffix(cond.field)),
SplunkDeferredORRegularExpression.get_field_condition(cond.field),
SigmaString("true"),
)
# returning fieldX=true
Expand All @@ -259,19 +271,19 @@ def convert_condition_field_eq_val_re(
state, cond.field, super().convert_condition_field_eq_val_re(cond, state)
).postprocess(None, cond)

def convert_condition_field_eq_val_cidr(
def convert_condition_field_eq_field(
self,
cond: ConditionFieldEqualsValueExpression,
state: "sigma.conversion.state.ConversionState",
) -> SplunkDeferredCIDRExpression:
"""Defer CIDR network range matching to pipelined where cidrmatch command after main search expression."""
) -> SplunkDeferredFieldRefExpression:
"""Defer FieldRef matching to pipelined with `where` command after main search expression."""
if cond.parent_condition_chain_contains(ConditionOR):
raise SigmaFeatureNotSupportedByBackendError(
"ORing CIDR matching is not yet supported by Splunk backend",
"ORing FieldRef matching is not yet supported by Splunk backend",
source=cond.source,
)
return SplunkDeferredCIDRExpression(
state, cond.field, super().convert_condition_field_eq_val_cidr(cond, state)
return SplunkDeferredFieldRefExpression(
state, cond.field, super().convert_condition_field_eq_field(cond, state)
).postprocess(None, cond)

def finalize_query(
Expand Down Expand Up @@ -381,13 +393,11 @@ def finalize_query_data_model(
cim_fields = " ".join(
splunk_sysmon_process_creation_cim_mapping.values()
)

elif rule.logsource.category == "proxy":
data_model = "Web"
data_set = "Proxy"
cim_fields = " ".join(
splunk_web_proxy_cim_mapping.values()
)
cim_fields = " ".join(splunk_web_proxy_cim_mapping.values())

try:
data_model_set = state.processing_state["data_model_set"]
Expand Down
90 changes: 88 additions & 2 deletions tests/test_backend_splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest
from sigma.backends.splunk import SplunkBackend
from sigma.collection import SigmaCollection
from sigma.processing.pipeline import ProcessingPipeline
from sigma.pipelines.splunk import splunk_cim_data_model


Expand Down Expand Up @@ -224,6 +225,43 @@ def test_splunk_regex_query_explicit_or(splunk_backend: SplunkBackend):
)


def test_splunk_regex_query_explicit_or_with_nested_fields():

pipeline = ProcessingPipeline.from_yaml(
"""
name: Test
priority: 100
transformations:
- id: field_mapping
type: field_name_mapping
mapping:
fieldA: Event.EventData.fieldA
fieldB: Event.EventData.fieldB
"""
)
splunk_backend = SplunkBackend(pipeline)

collection = SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel1:
fieldA|re: foo.*bar
sel2:
fieldB|re: boo.*foo
condition: sel1 or sel2
"""
)

assert splunk_backend.convert(collection) == [
'\n| rex field=Event.EventData.fieldA "(?<fieldAMatch>foo.*bar)"\n| eval fieldACondition=if(isnotnull(fieldAMatch), "true", "false")\n| rex field=Event.EventData.fieldB "(?<fieldBMatch>boo.*foo)"\n| eval fieldBCondition=if(isnotnull(fieldBMatch), "true", "false")\n| search fieldACondition="true" OR fieldBCondition="true"'
]


def test_splunk_single_regex_query(splunk_backend: SplunkBackend):
assert (
splunk_backend.convert(
Expand Down Expand Up @@ -264,12 +302,12 @@ def test_splunk_cidr_query(splunk_backend: SplunkBackend):
"""
)
)
== ['fieldB="foo" fieldC="bar"\n| where cidrmatch("192.168.0.0/16", fieldA)']
== ['fieldA="192.168.0.0/16" fieldB="foo" fieldC="bar"']
)


def test_splunk_cidr_or(splunk_backend: SplunkBackend):
with pytest.raises(SigmaFeatureNotSupportedByBackendError, match="ORing CIDR"):
assert (
splunk_backend.convert(
SigmaCollection.from_yaml(
"""
Expand All @@ -289,6 +327,54 @@ def test_splunk_cidr_or(splunk_backend: SplunkBackend):
"""
)
)
== ['fieldA="192.168.0.0/16" OR fieldA="10.0.0.0/8" fieldB="foo" fieldC="bar"']
)


def test_splunk_fieldref_query(splunk_backend: SplunkBackend):
assert (
splunk_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|fieldref: fieldD
fieldB: foo
fieldC: bar
condition: sel
"""
)
)
== ["fieldB=\"foo\" fieldC=\"bar\"\n| where 'fieldA'='fieldD'"]
)


def test_splunk_fieldref_or(splunk_backend: SplunkBackend):
with pytest.raises(SigmaFeatureNotSupportedByBackendError, match="ORing FieldRef"):
splunk_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|fieldref:
- fieldD
- fieldE
fieldB: foo
fieldC: bar
condition: sel
"""
)
)


def test_splunk_fields_output(splunk_backend: SplunkBackend):
Expand Down

0 comments on commit 210a51f

Please sign in to comment.