This repository has been archived by the owner on Aug 20, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathort_operator.py
127 lines (94 loc) · 4.14 KB
/
ort_operator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Copyright (C) 2023 Haiko Schol
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import os
import kopf
import kubernetes
import yaml
# stages of an ORT run
ANALYZER, SCANNER, REPORTER = 'analyzer', 'scanner', 'reporter'
# job templates
TEMPLATES = {
ANALYZER: f'{ANALYZER}-job.yaml',
SCANNER: f'{SCANNER}-job.yaml',
REPORTER: f'{REPORTER}-job.yaml',
}
# possible job states for each stage
PENDING, CREATED, RUNNING, SUCCEEDED, FAILED, ABORTED = 'Pending', 'Created', 'Running', 'Succeeded', 'Failed', 'Aborted'
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.posting.level = logging.ERROR
@kopf.on.create('ortruns')
def create_fn(name, namespace, spec, patch, **_):
repo_url = spec.get('repoUrl')
if not repo_url:
raise kopf.PermanentError('OrtRun needs a repoUrl')
create_pvc(name, namespace)
create_job(ANALYZER, name, namespace, repo_url)
patch.status[ANALYZER] = CREATED
patch.status[SCANNER] = PENDING
patch.status[REPORTER] = PENDING
def is_modified(event, **_):
return event.get('type', '') == 'MODIFIED'
@kopf.on.event('batch', 'v1', 'jobs', annotations={'ortStage': kopf.PRESENT}, when=is_modified)
def handle_job_status_change(meta, body, logger, **_):
stage = meta.annotations['ortStage']
stage_status = get_stage_status(body.get('status', {}))
repo_url = body.get('spec', {}).get('repoUrl', '')
_, parent_name = meta.name.split('-', 1)
update_ortrun_status(parent_name, meta.namespace, stage, stage_status)
if stage_status == SUCCEEDED:
next_stage = SCANNER if stage == ANALYZER else REPORTER if stage == SCANNER else None
if next_stage:
create_job(next_stage, parent_name, meta.namespace, repo_url)
update_ortrun_status(parent_name, meta.namespace, next_stage, CREATED)
elif stage_status == FAILED and stage != REPORTER:
update_ortrun_status(parent_name, meta.namespace, REPORTER, ABORTED)
if stage == ANALYZER:
update_ortrun_status(parent_name, meta.namespace, SCANNER, ABORTED)
def get_stage_status(job_status):
if job_status.get('failed', 0) > 0:
return FAILED
if job_status.get('succeeded', 0) > 0:
return SUCCEEDED
if job_status.get('active', 0) > 0:
return RUNNING
return PENDING
def create_job(stage, parent_name, namespace, repo_url):
path = os.path.join(os.path.dirname(__file__), TEMPLATES[stage])
name = f'{stage}-{parent_name}'
with open(path, 'rt') as f:
tmpl = f.read()
text = tmpl.format(name=name, parent_name=parent_name, repo_url=repo_url)
data = yaml.safe_load(text)
kopf.adopt(data)
api = kubernetes.client.BatchV1Api()
return api.create_namespaced_job(namespace, data)
def create_pvc(parent_name, namespace):
path = os.path.join(os.path.dirname(__file__), 'pvc.yaml')
with open(path, 'rt') as f:
tmpl = f.read()
text = tmpl.format(parent_name=parent_name)
data = yaml.safe_load(text)
kopf.adopt(data)
api = kubernetes.client.CoreV1Api()
return api.create_namespaced_persistent_volume_claim(namespace, data)
def update_ortrun_status(name, namespace, stage, status):
api = kubernetes.client.CustomObjectsApi()
try:
api.patch_namespaced_custom_object('inocybe.io', 'v1', namespace, 'ortruns', name, {'status': {stage: status}})
except kubernetes.client.exceptions.ApiException:
raise kopf.PermanentError(f'updating status of run "{name}" for stage "{stage}" to "{status}" failed')