-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauto-openvas.py
executable file
·461 lines (324 loc) · 13.5 KB
/
auto-openvas.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
#!/usr/bin/python3
# Copyright (c) 2020 by T.Magerl, GPLv3
""" AutoVAS primary logic """
import argparse
import os
from datetime import datetime
from time import sleep
from typing import Tuple, Iterator
import modules.syslog as log
from modules.JSONfile import JSONfile
from modules.Machine import Machine
from modules.OpenVAS import OpenVAS
from modules.nettools import crawl_net, get_mac
from modules.vcheck import vcheck
log.init('AutoVAS')
log.log_level = 5
def registered(ip: str, keep: bool = None) -> bool:
""" create machine object and return True if doesn't exist yet """
new = False
try:
machine[ip]
except KeyError:
machine[ip] = Machine(ip)
machine[ip].data['created'] = now()
new = True
if not keep is None:
machine[ip].keep = keep
return new
def import_targets(path: str) -> None:
""" call method for file or folder """
if os.path.isfile(path):
import_target_file(path)
elif os.path.isdir(path):
import_target_folder(path)
else:
log.err('nothing to import from `{0}`?'.format(path))
def import_target_file(path: str) -> None:
""" import single job file """
configfile = JSONfile(path)
data = configfile.read()
if isinstance(data, dict):
mk_machine(data, path)
elif isinstance(data, list):
for jsonset in data:
mk_machine(jsonset, path)
def import_target_folder(path: str) -> None:
""" crawl import folder and load jobs """
for path, dirs, files in os.walk(path):
for f in files:
configfile = JSONfile(path + f)
mk_machine(configfile.read(), path + '/' + f)
def mk_machine(json_data: dict, source: str) -> None:
""" create new machine object """
try:
json_data['ip'], json_data['mac']
except KeyError:
log.err('failed importing `{0}`'.format(source))
return
ip = json_data['ip']
new = registered(ip, True)
if not machine[ip].data['mac']:
machine[ip].data['mac'] = json_data['mac'].upper()
if json_data.get('skip', False):
machine[ip].skip = machine[ip].data['skip'] = True
machine[ip].data['comment'] = json_data['comment']
if new:
log.debug('config for {0} [{1}] imported'.format(machine[ip].data['ip'], machine[ip].data['mac']))
def days_diff(date1: datetime, date2: datetime = None) -> int:
""" return days between 2 dates """
if date1 > date2:
r = (date1 - date2).days
else:
r = (date2 - date1).days
return abs(r)
def get_args() -> argparse.Namespace:
""" argparser """
p = argparse.ArgumentParser(description="AutoOpenVAS")
p.add_argument('-scan', type=str, default=None, help="search (sub)net for IPs (e.g. 192.168.0.0/24)")
p.add_argument('-run', action='store_true', help='start one task')
p.add_argument('-print', action='store_true', help='print results to console')
p.add_argument('-verify', action='store_true', help='skip on unexpected mac')
p.set_defaults(verify=False)
p.add_argument('-v', action='store_true', help='verbose')
p.add_argument('-vv', action='store_true', help='very verbose')
p.set_defaults(runcheck=False)
p.set_defaults(v=False)
p.set_defaults(vv=False)
return p.parse_args()
def load_previous_data() -> None:
""" restore previously saved data (ip, mac and time stamps) """
datafile = JSONfile(config['data_file'])
data = datafile.read()
if not data:
return
for dataset in data:
ip = dataset['ip']
for toddel in ('link', 'severity'):
dataset.pop(toddel, None)
registered(ip)
machine[ip].data = {**machine[ip].data, **dataset}
for timeset in ('created', 'last_attempt', 'last_report'):
if not machine[ip].data[timeset]:
continue
machine[ip].data[timeset] = datetime.strptime(
machine[ip].data[timeset][:19], '%Y-%m-%d %H:%M:%S')
def openvas_analysis() -> None:
""" handle gathering from openvas """
for ip, report, stamp in ov.reports():
registered(ip)
if not machine[ip].report \
or (report == machine[ip].report and stamp < machine[ip].data['last_report']) \
or (report != machine[ip].report and stamp > machine[ip].data['last_report']):
machine[ip].data['last_report'] = stamp
machine[ip].report = report
machine[ip].data['link'] = "{0}get_report&report_id={1}".format(config['base_url'], report)
for ip, mac, task, running in ov.tasks():
registered(ip)
machine[ip].data['mac'] = mac
machine[ip].task = task
machine[ip].running = running
if running:
machine[ip].keep = True
if not machine[ip].data['link']:
machine[ip].data['link'] = "{0}get_task&task_id={1}".format(config['base_url'], task)
for ip, target in ov.targets():
registered(ip)
machine[ip].target = target
if not machine[ip].data['link']:
machine[ip].data['link'] = "{0}get_target&target_id={1}".format(config['base_url'], target)
for ip, severity, stamp in ov.results():
registered(ip)
if stamp >= machine[ip].data['last_report']:
if machine[ip].data['severity'] == -1:
machine[ip].data['severity'] = 0
machine[ip].data['severity'] += severity
for ip in machine:
if not machine[ip].running and \
machine[ip].data['last_attempt'] > machine[ip].data['last_report']:
machine[ip].attempt_sort = 0
else:
machine[ip].attempt_sort = 1
def openvas_populate() -> None:
""" create missing openvas targets/tasks """
for ip in machine:
if not machine[ip].keep or machine[ip].skip:
continue
if not machine[ip].target:
target = ov.mk_target(ip, machine[ip].data['mac'], config['default_portlist_id'],
machine[ip].data['comment'])
if target:
machine[ip].target = target
machine[ip].data['link'] = "{0}get_target&target_id={1}".format(config['base_url'], target)
if machine[ip].target and not machine[ip].task:
task = ov.mk_task(ip, machine[ip].data['mac'], machine[ip].target,
config['default_scan_config_id'], machine[ip].data['comment'])
if task:
machine[ip].task = task
machine[ip].data['link'] = "{0}get_task&task_id={1}".format(config['base_url'], task)
def load_config() -> dict:
""" get config + add app path"""
r = {
'self.path': os.path.dirname(os.path.realpath(__file__)),
'keep_days': 31
}
configfile = JSONfile(r['self.path'] + '/auto-openvas.conf')
r = {**r, **configfile.read()}
r['base_url'] = "https://{0}:{1}/omp?cmd=".format(r['openvas_ip'], r['openvas_web_port'])
r['run_limit'] = 3
return r
def save_results() -> None:
""" prepare and export gathered information """
export = []
for ip in machine:
if not machine[ip].keep:
continue
export.append(machine[ip].data)
datafile = JSONfile(config['data_file'])
if not datafile.write(export, list_sort_key='last_report'):
log.crit("failed writing to `{0}`".format(config['cron_job_file']))
def scan_net(subnet: str) -> None:
""" crawl net and add found IPs """
for ip, mac, comment in crawl_net(subnet):
new = registered(ip, True)
if new:
log.debug('found {1} ({0}, {2})'.format(ip, mac, comment))
if machine[ip].data['comment'] == '':
machine[ip].data['comment'] = comment
if not machine[ip].data['mac']:
machine[ip].data['mac'] = mac
# TODO : mac conflict
def now() -> datetime:
return datetime.utcnow().replace(microsecond=0)
def clean_up():
""" remove machines with last successfull test older than threshold days """
now = datetime.now()
for ip in machine:
if machine[ip].running or machine[ip].keep:
continue
last_success = now - machine[ip].data['last_report']
last_success = last_success.days
last_attempt = now - machine[ip].data['last_attempt']
last_attempt = last_attempt.days
if not last_success > config['clean_up_threshold_days'] \
or not last_attempt < config['clean_up_threshold_days']:
continue
if machine[ip].task and not (ov.rm('task', machine[ip].task)):
log.err("[{0}]: failed removing task ({1})".format(machine[ip].data['mac'], ip))
if machine[ip].target and not ov.rm('target', machine[ip].target):
log.err("[{0}]: failed removing target ({1})".format(machine[ip].data['mac'], ip))
def test_order(sort_key: str) -> Iterator[Tuple[str, str]]:
""" return list (ip, sort_key) ordered by sort_key value """
r = []
for ip in machine:
if not machine[ip].keep or machine[ip].skip:
log.debug('excluded {0} from run-test-candidates.'.format(ip))
continue
# TODO optional min duration between tests of single machines
r.append((ip, machine[ip].data[sort_key]))
r.sort(key=lambda k: k[1])
return r
def task_running(ip: str) -> bool:
""" confirm that task is running """
for check_ip, mac, task, running in ov.tasks():
if not check_ip == ip:
continue
if running:
machine[ip].running = True
return True
else:
return False
def tasks_running() -> int:
""" count running tasks """
count = 0
for ip in machine:
if machine[ip].running:
count += 1
return count
def openvas_run_task() -> None:
""" try to start OV task until success, ordered by last_report """
if tasks_running() > config['run_limit']:
log.info('limit of {0} running tasks reached, no new task.'.format(config['run_limit']))
return
for ip, _ in test_order('last_attempt'):
if machine[ip].running:
log.info('[{0}]: task already running'.format(machine[ip].data['mac']))
continue
machine[ip].data['last_attempt'] = now()
if args.verify:
real_mac = get_mac(ip)
if real_mac and not machine[ip].data['mac'] == real_mac:
log.warn('[{0}]: ERROR: address conflict {1} ({2})'.format(machine[ip].data['mac'], real_mac, ip))
continue
if ov.run_task(machine[ip].task):
log.notice('[{0}]: started OpenVAS task'.format(machine[ip].data['mac']))
sleep(60 * 2)
if task_running(ip):
log.debug('[{0}]: running task verified'.format(machine[ip].data['mac']))
machine[ip].keep = True
break
else:
log.warn('[{0}]: task was aborted'.format(machine[ip].data['mac']))
continue
else:
log.warn('[{0}]: FAILED starting OpenVAS task'.format(machine[ip].data['mac']))
continue
else:
log.err('FAILED to start any OpenVAS task.')
def print_console() -> None:
""" print stats to console """
print('MAC\t\t\t IP\t\t Verified\t Severity\t Last report\t\t\t Comment')
for ip in machine:
if not args.verify:
verification = '#\t'
else:
real_mac = get_mac(ip)
if machine[ip].data['mac'] == real_mac:
verification = '+\t'
else:
verification = '-\t'
if machine[ip].running:
last_report = 'running\t'
elif machine[ip].data['last_report'] == datetime.strptime('1970-01-01 00:00:00', '%Y-%m-%d %H:%M:%S'):
last_report = '#\t\t'
else:
last_report = machine[ip].data['last_report']
comment = '#' if machine[ip].data['comment'] == '' else machine[ip].data['comment']
severity = '#' if machine[ip].data['severity'] == -1 else machine[ip].data['severity']
print(machine[ip].data['mac'], '\t',
ip, '\t',
verification, '\t',
severity, '\t\t',
last_report, '\t\t',
comment
)
print('{0} running tasks'.format(tasks_running()))
if __name__ == '__main__':
args = get_args()
if args.vv:
log.log_level = 7
elif args.v:
log.log_level = 6
if args.print:
log.use_tty = False
log.debug('initiated...')
vcheck('https://raw.githubusercontent.com/TMagerl/AutoOpenVAS/master/VERSION')
config = load_config()
ov = OpenVAS('admin', config['passwd'], config['openvas_ip'],
config['openvas_omp_port'], config['openvas_web_port'])
machine = {}
load_previous_data()
import_targets(config['job_source'])
if args.scan:
scan_net(args.scan)
openvas_analysis()
openvas_populate()
if args.run:
openvas_run_task()
clean_up()
save_results()
if args.print:
print_console()
log.notice('{0} running tasks'.format(tasks_running()))
log.debug('done.')
raise SystemExit(0)