Skip to content

Commit

Permalink
Merge pull request #34 from dynatrace-oss/32-enable-to-define-bulk-at…
Browse files Browse the repository at this point in the history
…tribute-extraction-rules

32 enable to define bulk attribute extraction rules
  • Loading branch information
azimnicki authored Jul 7, 2023
2 parents d8a072c + 781e66d commit a315317
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 16 deletions.
21 changes: 20 additions & 1 deletion docs/log_processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,27 @@ attribute_extraction_jmespath_expression: Optional[dict] # --> JMESPATH expressi
# Example to map the 'eventTime' value in a JSON log, to 'timestamp'
# (recognized field in Dynatrace for event timestamp):
# - timestamp: eventTime
ttribute_extraction_from_top_level_json: Optional[dict] # --> valid only for json_stream processing with array of log entries inside. Adds as attributes the defined JSON keys to
attribute_extraction_from_top_level_json: Optional[dict] # --> valid only for json_stream processing with array of log entries inside. Adds as attributes the defined JSON keys to
# all the log entries
attribute_mapping_from_json_keys: Optional[dict] # --> (Experimental) Allows to define which original JSON keys should be converted into log attributes
# and whether a custom prefix/postfix should be appended to them.
# It is especially useful when processing rule is used for different JSON schemas.
#
# Set of JSON keys for further processing is configured by either one of the mandatory keys 'include'/'exclude'
# Adding prefix or postfix to the keys is optional and can be configured by corresponding keys 'prefix'/'postfix'
#
# Notes:
# - As of now only top level attributes of JSON could be selected for the mapping.
# - If defined, logic is applied after grok expressions.
#
# Example:
# Map the values of all keys except for 'exec_time' and 'process_time' in a JSON log.
# Additionally add custom prefix and postfix so that final attributes would match pattern 'my.*_mapped'
#
# attribute_mapping_from_json_keys:
# exclude: ['exec_time', 'process_time']
# prefix: 'my.'
# postfix: '_mapped'
```

You can find an example custom processing rule under `config/log-processing-rules.yaml` used to process [VPC DNS Query logs](https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/resolver-query-logs.html) from AWS.
43 changes: 32 additions & 11 deletions src/log/processing/log_processing_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def parse_date_from_string(date_string: str):

return datetime


@dataclass(frozen=True)
class LogProcessingRule:
name: str
Expand All @@ -49,17 +50,18 @@ class LogProcessingRule:
log_format: str
# if json_stream, we may want to filter out specific objects from a string
# containing a specific key/value pair
filter_json_objects_key: Optional[str]
filter_json_objects_value: Optional[str]
filter_json_objects_key: Optional[str] = None
filter_json_objects_value: Optional[str] = None
# if json or json_stream, a key may contain the list of log entries
log_entries_key: Optional[str]
annotations: Optional[dict]
requester: Optional[List[str]]
attribute_extraction_from_key_name: Optional[dict]
attribute_extraction_grok_expression: Optional[str]
attribute_extraction_jmespath_expression: Optional[dict]
log_entries_key: Optional[str] = None
annotations: Optional[dict] = None
requester: Optional[List[str]] = None
attribute_extraction_from_key_name: Optional[dict] = None
attribute_extraction_grok_expression: Optional[str] = None
attribute_extraction_jmespath_expression: Optional[dict] = None
# if json_stream with log entry list, we may want to inherit attributes from top level json
attribute_extraction_from_top_level_json: Optional[dict]
attribute_extraction_from_top_level_json: Optional[dict] = None
attribute_mapping_from_json_keys: Optional[dict] = None
known_key_path_pattern_regex: re.Pattern = field(init=False)
attribute_extraction_from_key_name_regex: re.Pattern = field(init=False)
attribute_extraction_grok_object: Grok = field(init=False)
Expand All @@ -85,9 +87,14 @@ def validate(self):
# validate optional dicts
for i in [self.annotations, self.attribute_extraction_from_key_name,
self.attribute_extraction_jmespath_expression,
self.attribute_extraction_from_top_level_json]:
self.attribute_extraction_from_top_level_json,
self.attribute_mapping_from_json_keys]:
if not (isinstance(i, dict) or i is None):
raise ValueError(f"{i} is not a dict.")
if self.attribute_mapping_from_json_keys is not None:
if not (('include' in self.attribute_mapping_from_json_keys) ^
('exclude' in self.attribute_mapping_from_json_keys)):
raise ValueError(f"{self.attribute_mapping_from_json_keys} should define exactly one of 'include' or 'exclude'")

# validate attribute extraction from top level json
if (self.attribute_extraction_from_top_level_json and self.log_format != "json_stream" and
Expand Down Expand Up @@ -206,12 +213,26 @@ def get_extracted_log_attributes(self, message) -> dict:
else:
logger.warning('No matches for JMESPATH expression %s', v)

if self.attribute_mapping_from_json_keys is not None:
_prefix = self.attribute_mapping_from_json_keys.get('prefix')
_postfix = self.attribute_mapping_from_json_keys.get('postfix')
_include = self.attribute_mapping_from_json_keys.get('include')
_exclude = self.attribute_mapping_from_json_keys.get('exclude')

_attributes_dict = {
_prefix + k + _postfix: v for k, v in json_message.items()
if (_include and k in _include) or
(_exclude and k not in _exclude)
}

attributes_dict.update(_attributes_dict)

# Check if timestamp needs to be translated to ISO format
if "timestamp_to_transform" in attributes_dict:
attributes_dict['timestamp'] = parse_date_from_string(
attributes_dict['timestamp_to_transform'])
attributes_dict.pop('timestamp_to_transform')

# Check if aws.log_group exists to extract aws.service and aws.resource.id
if "aws.log_group" in json_message and "aws.log_stream" in json_message:
attributes_dict.update(get_attributes_from_cloudwatch_logs_data(
Expand Down
15 changes: 11 additions & 4 deletions src/log/processing/log_processing_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def create_log_processing_rule(rule_dict):
optional_attributes = ['log_entries_key', 'annotations', 'requester',
'attribute_extraction_from_key_name', 'attribute_extraction_grok_expression',
'attribute_extraction_jmespath_expression', 'filter_json_objects_key',
'filter_json_objects_value', 'attribute_extraction_from_top_level_json']
'filter_json_objects_value', 'attribute_extraction_from_top_level_json',
'attribute_mapping_from_json_keys']

for attribute in required_attributes:
if attribute not in rule_dict:
Expand Down Expand Up @@ -90,7 +91,13 @@ def create_log_processing_rule(rule_dict):
'attribute_extraction_jmespath_expression'],
attribute_extraction_from_top_level_json=rule_dict[
'attribute_extraction_from_top_level_json'],
skip_header_lines=rule_dict.get('skip_header_lines',0)
attribute_mapping_from_json_keys={
'prefix': '',
'postfix': '',
**rule_dict['attribute_mapping_from_json_keys']
}
if rule_dict.get('attribute_mapping_from_json_keys') else None,
skip_header_lines=rule_dict.get('skip_header_lines', 0)
)
except ValueError as ex:
raise InvalidLogProcessingRuleFile(
Expand Down Expand Up @@ -191,7 +198,7 @@ def load_custom_rules_from_aws_appconfig():

def load_custom_rules_from_local_file():
'''
Loads custom log processing rules from a local file config/log-processing-rules.yaml. Returns a tuple
Loads custom log processing rules from a local file config/log-processing-rules.yaml. Returns a tuple
containing a dict with the rules and the Configuration-Version number.
'''

Expand Down Expand Up @@ -221,7 +228,7 @@ def __str__(self):

def load_built_in_rules():
'''
Load built-in log processing rules.
Load built-in log processing rules.
'''
return load_rules_from_dir(BUILT_IN_PROCESSING_RULES_PATH)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import unittest

from log.processing import LogProcessingRule


class TestAttributeMappingFromJsonProcessingRule(unittest.TestCase):
input_message = {
'test_attribute_1': 'test_value_1',
'test_attribute_2': 'test_value_2',
'test_attribute_3': 'test_value_3',
'test_attribute_4': 'test_value_4',
}

def test_include_parameter(self):
processing_rule = create_log_processing_rule({
'include': ['test_attribute_1', 'test_attribute_3'],
'postfix': '_mapped',
'prefix': 'my_'
})

self.assertEqual(processing_rule.get_extracted_log_attributes(self.input_message), {
'my_test_attribute_1_mapped': 'test_value_1',
'my_test_attribute_3_mapped': 'test_value_3'
})

def test_exclude_parameter(self):
processing_rule = create_log_processing_rule({
'exclude': ['test_attribute_1', 'test_attribute_3'],
'postfix': '_mapped',
'prefix': 'my_'
})

self.assertEqual(processing_rule.get_extracted_log_attributes(self.input_message), {
'my_test_attribute_2_mapped': 'test_value_2',
'my_test_attribute_4_mapped': 'test_value_4'
})

def test_rule_validation(self):
with self.assertRaises(ValueError):
create_log_processing_rule(
{
'include': ['test_attribute_1', 'test_attribute_3'],
'exclude': ['test_attribute_2', 'test_attribute_4'],
'postfix': '_mapped',
'prefix': 'my_'
}
)


def create_log_processing_rule(attribute_mapping):
return LogProcessingRule(
'name', 'source', 'pattern', 'json',
attribute_mapping_from_json_keys=attribute_mapping,
skip_header_lines=0
)


if __name__ == '__main__':
unittest.main()

0 comments on commit a315317

Please sign in to comment.