Skip to content

Commit

Permalink
Merge pull request observ-vol-mgt#122 from KalmanMeth/rebase-stage-files
Browse files Browse the repository at this point in the history
Rebase stage files
  • Loading branch information
KalmanMeth authored Jul 10, 2024
2 parents c241b95 + 27fdadf commit 170ae55
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 35 deletions.
42 changes: 41 additions & 1 deletion controller/common/configuration_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

from enum import Enum
from pydantic import BaseModel, ConfigDict
from typing import Optional
from typing import Optional, List


class StageType(Enum):
Expand Down Expand Up @@ -299,3 +299,43 @@ class ReduceSimple(BaseModel):
Configuration for simple reduce operations.
"""
model_config = ConfigDict(extra='forbid') # Configuration for the model


class BaseStageSchedule(BaseModel):
"""
Configuration for order of stages that make up the pipeline.
"""
model_config = ConfigDict(extra='forbid') # Configuration for the model
name: str # Name of stage instance
follows: Optional[List[str]] = [] # List of stages that must precede currently defined sage in the pipeline


class BaseStageParameters(BaseModel):
"""
Configuration for Stage parameters.
"""
model_config = ConfigDict(extra='forbid') # Configuration for the model
name: str # Name of stage instance
type: str # Type of stage instance (e.g. ingest, extract, insights, etc)
subtype: Optional[str] = None # Subtype of stage instance (e.g. file_ingest vs promql ingest)
input_data: Optional[List[str]] = [] # List of input data names for the stage
output_data: Optional[List[str]] = [] # List of output data names for the stage
cache_directory: Optional[str] = None # Directory to store output data
config: Optional[dict] = {} # Stage-specific configuration parameters


class GlobalSettings(BaseModel):
"""
Configuration for global settings.
"""
model_config = ConfigDict(extra='forbid') # Configuration for the model
number_of_workers: Optional[int] = 0 # Number of processes to create to perform map-reduce operations in parallel


class PipelineDefinition(BaseModel):
"""
Configuration for pipeline definition.
"""
global_settings: Optional[dict] = {}
pipeline: List[BaseStageSchedule] # Order of stages that make up the pipeline
parameters: List[BaseStageParameters] # Specific parameters to configure each stage in the pipeline
9 changes: 9 additions & 0 deletions controller/docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ A single list as input and a single list as output.
These must be preregistered in the code base as valid map_reduce `compute` operations.
The `map` and `reduce` operations must likewise be preregistered in the code base as such operations.

The map_reduce operation may be performed in parallel on multiple processes.
This is achieved by specifying an additional configuration parameter under global_settings.

```
global_settings:
number_of_workers: 8
```


## Ingest
An `ingest` type stage usually takes no input data and outputs a single list.

6 changes: 3 additions & 3 deletions controller/workflow_orchestration/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from map_reduce.reduce import reduce
from metadata_classification.metadata_classification import metadata_classification
from workflow_orchestration.map_reduce import MapReduceParameters, create_dummy_compute_stage
from workflow_orchestration.stage import StageParameters, PipelineDefinition, GlobalSettings
from workflow_orchestration.stage import StageParameters


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,7 +73,7 @@ def build_pipeline(self):

# verify that configuration is valid
# if not valid, the following line will throw an exception
pipeline_def = PipelineDefinition(**configuration)
pipeline_def = api.PipelineDefinition(**configuration)

# create stage structs for each of the stages
for stage_params in stages_parameters:
Expand Down Expand Up @@ -135,7 +135,7 @@ def build_pipeline(self):
for stage in stages_params_dict.values():
self.add_stage_to_schedule(stage)

global_settings = GlobalSettings(**pipeline_def.global_settings)
global_settings = api.GlobalSettings(**pipeline_def.global_settings)

# allocate process pool for map_reduce
number_of_workers = global_settings.number_of_workers
Expand Down
32 changes: 1 addition & 31 deletions controller/workflow_orchestration/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pydantic import BaseModel, ConfigDict
from typing import Optional, List


# TODO: Move these items to configuration_api so they get included in the documentation
class BaseStageSchedule(BaseModel):
model_config = ConfigDict(extra='forbid')
name: str
follows: Optional[List[str]] = []


class BaseStageParameters(BaseModel):
model_config = ConfigDict(extra='forbid')
name: str
type: str
subtype: Optional[str] = None
input_data: Optional[List[str]] = []
output_data: Optional[List[str]] = []
cache_directory: Optional[str] = None
config: Optional[dict] = {}


class GlobalSettings(BaseModel):
model_config = ConfigDict(extra='forbid')
number_of_workers: Optional[int] = 0


class PipelineDefinition(BaseModel):
global_settings: Optional[dict] = {}
pipeline: List[BaseStageSchedule]
parameters: List[BaseStageParameters]
from common.configuration_api import BaseStageParameters


class StageParameters:
Expand Down

0 comments on commit 170ae55

Please sign in to comment.