-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnetbox_serve.py
129 lines (101 loc) · 3.82 KB
/
netbox_serve.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python3
import json
import os
import urllib.parse
import jinja2
import psycopg2
import psycopg2.extras
from flask import Flask, g, jsonify
from flask_basicauth import BasicAuth
psycopg2.extras.register_ipaddress()
app = Flask(__name__)
app.config.from_mapping(**os.environ)
basic_auth = BasicAuth(app)
def get_db():
db = getattr(g, '_database', None)
if db is None:
url = urllib.parse.urlparse(os.environ["DATABASE_URL"])
db = g._database = psycopg2.connect(
database=url.path[1:],
user=url.username,
password=url.password,
host=url.hostname,
port=url.port)
return db
@app.teardown_appcontext
def teardown_db(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
@app.route("/devices")
@basic_auth.required
def get_zone():
results = {}
with get_db().cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
# Primary addresses
cur.execute("""\
SELECT DISTINCT
ipam_ipaddress.address as i_address,
dcim_device.name as d_name,
dcim_device.comments as d_comments
FROM
ipam_ipaddress
JOIN dcim_device ON ipam_ipaddress.id = dcim_device.primary_ip4_id
JOIN tenancy_tenant ON ipam_ipaddress.tenant_id = tenancy_tenant.id
WHERE
bool(ipam_ipaddress.status) AND
bool(dcim_device.status) AND
ipam_ipaddress.family = 4 AND
tenancy_tenant.slug = %s
ORDER BY
ipam_ipaddress.address ASC,
dcim_device.name ASC
""", (app.config["NETBOX_TENANT_SLUG"],))
for row in cur:
result = {"primary": row["i_address"].ip.compressed}
if row["d_comments"]:
for line in row["d_comments"].split("\n"):
line = line.strip()
if not line.startswith("`{") or not line.endswith("}`"):
continue
line = line[1:-1]
try:
obj = json.loads(line)
except:
continue
if not isinstance(obj, dict):
continue
if "cnames" in obj and isinstance(obj["cnames"], list) and all(isinstance(x, str) for x in obj["cnames"]):
result["cnames"] = obj["cnames"]
results[row["d_name"].lower()] = result
# Secondary addresses
cur.execute("""\
SELECT DISTINCT
ipam_ipaddress.address as i_address,
dcim_device.name as d_name,
dcim_device.comments as d_comments
FROM
ipam_ipaddress
JOIN dcim_interface ON ipam_ipaddress.interface_id = dcim_interface.id
JOIN dcim_device ON dcim_interface.device_id = dcim_device.id
JOIN tenancy_tenant ON ipam_ipaddress.tenant_id = tenancy_tenant.id
WHERE
bool(ipam_ipaddress.status) AND
bool(dcim_device.status) AND
ipam_ipaddress.family = 4 AND
tenancy_tenant.slug = %s AND
ipam_ipaddress.id != dcim_device.primary_ip4_id
ORDER BY
ipam_ipaddress.address ASC,
dcim_device.name ASC
""", (app.config["NETBOX_TENANT_SLUG"],))
for row in cur:
if row["d_name"] in results:
secondary_ips = results[row["d_name"]].setdefault("secondary_ips", [])
if row["i_address"] not in secondary_ips:
secondary_ips.append(row["i_address"].ip.compressed)
return jsonify(results)
def main():
app.run(debug=True)
if __name__ == "__main__":
main()