Skip to content

Commit

Permalink
Merge pull request observ-vol-mgt#125 from KalmanMeth/trim-signals
Browse files Browse the repository at this point in the history
Trim signals
  • Loading branch information
KalmanMeth authored Jul 21, 2024
2 parents 2d853f3 + a28e480 commit bc643a6
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ parameters:
config:
sampling_frequency: 0.0333
resample_rate: "5s"
features_json_file: "feature_extraction/tsfel_conf/limited_features.json"
features_json_file: "extract/tsfel_conf/limited_features.json"
- name: generate_insights
type: insights
subtype:
Expand Down
6 changes: 4 additions & 2 deletions controller/common/configuration_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class StageType(Enum):
"""
INGEST = "ingest" # `ingest`: ingest data from various sources into the controller
# `extract`: performs feature extraction on the signals
FEATURES_EXTRACTION = "extract"
EXTRACT = "extract"
INSIGHTS = "insights" # `insights`: generates insights (analytics)
# `config_generator`: Generates and apply processor configurations
CONFIG_GENERATOR = "config_generator"
Expand Down Expand Up @@ -132,6 +132,7 @@ class ExtractSubType(Enum):
Enumerates different subtypes for metadata extraction.
"""
PIPELINE_EXTRACT_TSFEL = "tsfel"
PIPELINE_EXTRACT_TRIM = "trim" # trim time series


class ConfigGeneratorSubType(Enum):
Expand Down Expand Up @@ -227,9 +228,10 @@ class FeatureExtractionTsfel(BaseModel):
"""
model_config = ConfigDict(extra='forbid') # Configuration for the model
features_json_file: Optional[str] = \
"feature_extraction/tsfel_conf/limited_statistical.json" # JSON file for features
"extract/tsfel_conf/limited_statistical.json" # JSON file for features
resample_rate: Optional[str] = "30s" # Resampling rate
sampling_frequency: Optional[float] = (1/30) # Sampling frequency
trim: Optional[bool] = False


class GenerateInsights(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ parameters:
name: extract_in_parallel
type: extract
subtype: tsfel
config:
trim: True
reduce_function:
name: reduce1
type: reduce
Expand Down
65 changes: 61 additions & 4 deletions controller/docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,68 @@ parameters:

A stage may follow multiple other stages, and may receive input from multiple earlier stages.

# Data Types

## Time Series
Time Series data is typically provided using the prometheus model with the following fields:
```
{
"status": "success",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"__name__": "commit_latency_ms",
"instance": "02:00:06:ff:fe:4b:b5:ac",
"plugin": "ceph",
"label": "ceph",
"snapshotId": "T9Arslj9-1Y3UYv6biDGDizVqzI",
"job": "prometheus"
},
"values": [
[1715252491.0, 0.0],
[1715252492.0, 0.0],
....
]
}
.....
]
}
}
```

The `metric` field contains various metadata, including the `__name__` of the metric being reported; other fields are optional.
The time series data are contained in the `values` field as a list of `<timestamp, value>` ordered pairs.

## Signal
`Signal` is an internal data type used to store time-series data.
It stores the metadata provided in the `metric` field and the time-series provided in the `values` field.

## Signals
`Signals` is an internal data type used to store multiple objects of type `Signal`.
It has some internally defined metadata plus a list of `Signal` structures.

# Details about some types of stages

## Ingest
An `ingest`-type stage typically reads data from some external source.
The details of the external source (file location, url, security parameters) are provided in the `config` section of the stage parameters.
An `ingest` stage is expected to have no `input_data` (`input_data: []`).
An `ingest` type stage outputs a list containing a single element (of type `Signals`).

## Extract
An `extract`-type stage typically performs some kind of transformation or metadata generation on `Signals`.
The `input_data` should contain a single `Signals` element and the `output_data` should contain a single `Signals` element.
For example:
```commandline
- name: feature_extraction_tsfel
type: extract
subtype: tsfel
input_data: [classified_signals]
output_data: [extracted_signals]
```

## Map Reduce
A map_reduce stage takes some input, breaks it up into some number of pieces,
and then runs some computation (possibly in parallel) on each of the pieces.
Expand Down Expand Up @@ -84,7 +144,7 @@ The config of a map_reduce stage looks like the following:
```

All map_reduce `compute` operations are of the same structure.
A single list as input and a single list as output.
A single list as input (of type Signals) and a single list as output (of type Signals).
These must be preregistered in the code base as valid map_reduce `compute` operations.
The `map` and `reduce` operations must likewise be preregistered in the code base as such operations.

Expand All @@ -97,6 +157,3 @@ global_settings:
```


## Ingest
An `ingest` type stage usually takes no input data and outputs a single list.

File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@
logger = logging.getLogger(__name__)


def feature_extraction(subtype, config, input_data):
def extract(subtype, config, input_data):
if len(input_data) != 1:
raise "feature_extraction configuration should have one input"
raise "extract configuration should have one input"
signals_list = input_data[0]
# switch based on the configuration feature_extraction type
# switch based on the configuration extract type
# verify config parameters conform to structure
if subtype == api.ExtractSubType.PIPELINE_EXTRACT_TSFEL.value:
tsfel_config = api.FeatureExtractionTsfel(**config)
logger.debug("using tsfel feature_extraction")
from feature_extraction.feature_extraction_tsfel import extract
from extract.feature_extraction_tsfel import extract
extracted_signals = extract(tsfel_config, signals_list)
elif subtype == api.ExtractSubType.PIPELINE_EXTRACT_TRIM.value:
logger.debug("using trim_time_series")
from extract.trim_time_series import extract
extracted_signals = extract(None, signals_list)
else:
raise "unsupported feature_extraction configuration"
raise "unsupported extract configuration"
return [extracted_signals]
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ def extract(tsfel_config, signals):
verbose = 1 if logging.getLogger().getEffectiveLevel() == logging.DEBUG else 0

# features extraction
for index, signal in enumerate(signals):
for index, signal in enumerate(signals.signals):
# extract features from the signal
extracted_signal = extract_signal(
signal, features_json_file, resample_rate, sampling_frequency, verbose)
extracted_signals.append(extracted_signal)

if tsfel_config.trim:
from extract.trim_time_series import extract as extract_trim
extracted_signals = extract_trim(None, extracted_signals)

return extracted_signals
File renamed without changes.
30 changes: 30 additions & 0 deletions controller/extract/trim_time_series.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2024 IBM, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from common.signal import Signals, Signal

logger = logging.getLogger(__name__)


# Take each Signal in Signals, save its metadata, but discard the time-series data
def extract(config, signals):
extracted_signals = Signals(metadata=signals.metadata, signals=None)

for index, signal in enumerate(signals.signals):
new_signal = Signal(signal.type, signal.metadata)
extracted_signals.append(new_signal)

return extracted_signals
2 changes: 1 addition & 1 deletion controller/map_reduce/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _map(subtype, config, input_data):
logger.debug(f"map config = {config}")
logger.debug(f"len(input_data) = {len(input_data)}")
if len(input_data) != 1:
raise "feature_extraction configuration should have one input"
raise "extract configuration should have one input"

signals = input_data[0]
# switch based on the configuration ingest type
Expand Down
1 change: 1 addition & 0 deletions controller/map_reduce/simple_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def reduce(config, input_data):
sublist = item[0].signals
output_list.extend(sublist)

# TODO: figure out what is the proper thing to do for the combined metadata
new_signals = Signals(input_data[0][0].metadata, output_list)

return [new_signals]
14 changes: 9 additions & 5 deletions controller/workflow_orchestration/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import logging
import os
import pickle
import time

import common.configuration_api as api
from common.conf import get_configuration
from multiprocessing import Pool

from config_generator.config_generator import config_generator
from encode.encode import encode
from feature_extraction.feature_extraction import feature_extraction
from extract.extract import extract
from ingest.ingest import ingest
from insights.insights import generate_insights
from map_reduce.map import _map
Expand Down Expand Up @@ -165,7 +166,7 @@ def run_stage_wrapper(self, args):
self.extracted_signals = output_data[0]
elif stage.base_stage.type == api.StageType.METADATA_CLASSIFICATION.value:
self.classified_signals = output_data[0]
elif stage.base_stage.type == api.StageType.FEATURES_EXTRACTION.value:
elif stage.base_stage.type == api.StageType.EXTRACT.value:
self.extracted_signals = output_data[0]
elif stage.base_stage.type == api.StageType.INSIGHTS.value:
self.signals_to_keep, self.signals_to_reduce, self.text_insights = output_data[
Expand Down Expand Up @@ -331,13 +332,14 @@ def run_stage(args):
if found:
return signals_out
logger.info(f"running stage: {stage.base_stage.name}, len(input_data) = {len(input_data)}")
start_time = time.time()
logger.debug(f"stage = {stage}, input = {input_data}")
if stage.base_stage.type == api.StageType.INGEST.value:
output_data = ingest(stage.base_stage.subtype, stage.base_stage.config)
elif stage.base_stage.type == api.StageType.METADATA_CLASSIFICATION.value:
output_data = metadata_classification(stage.base_stage.subtype, stage.base_stage.config, input_data)
elif stage.base_stage.type == api.StageType.FEATURES_EXTRACTION.value:
output_data = feature_extraction(stage.base_stage.subtype, stage.base_stage.config, input_data)
elif stage.base_stage.type == api.StageType.EXTRACT.value:
output_data = extract(stage.base_stage.subtype, stage.base_stage.config, input_data)
elif stage.base_stage.type == api.StageType.INSIGHTS.value:
output_data = generate_insights(stage.base_stage.subtype, stage.base_stage.config, input_data)
elif stage.base_stage.type == api.StageType.CONFIG_GENERATOR.value:
Expand All @@ -349,7 +351,9 @@ def run_stage(args):
else:
raise Exception(f"stage type not implemented: {stage.base_stage.type}")
stage.set_latest_output_data(output_data)
logger.info(f"finished stage: {stage.base_stage.name}")
end_time = time.time()
elapsed_time = end_time - start_time
logger.info(f"finished stage: {stage.base_stage.name}; elapsed time = {elapsed_time}")
if stage.base_stage.cache_directory is not None:
# save data in cache directory
cache_output_data(stage.base_stage.cache_directory, input_data[0], output_data)
Expand Down

0 comments on commit bc643a6

Please sign in to comment.