forked from JRCSTU/CO2MPAS-TA
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplot.py
executable file
·240 lines (187 loc) · 6.49 KB
/
plot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# -*- coding: utf-8 -*-
#
# Copyright 2015 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
It contains plotting functions for models and/or output results.
"""
import importlib
import logging
import sys
import os.path as osp
import matplotlib.pyplot as plt
from co2mpas.dispatcher.utils.alg import stlp
log = logging.getLogger(__name__)
def get_model_paths(model_ids=None):
"""
Returns all CO2MPAS models import paths.
:param model_ids:
List of models to be returned
(e.g., ['co2mpas.physical.physical', 'engine', ...]).
.. note:: It it is not specified all models will be returned.
:type model_ids: list, None
:return:
CO2MPAS models import paths.
:rtype: list
"""
co2mpas_model = [
'batch.vehicle_processing_model',
# +
'report.report',
'io.load_inputs',
'io.write_outputs',
# +
] + [
'model.%s' % k for k in
[
'model',
'selector.selector',
'selector.co2_params.co2_params_selector'
] + ['physical.%s' % k for k in
[
'physical',
'cycle.cycle',
'cycle.NEDC.nedc_cycle',
'cycle.WLTP.wltp_cycle',
'vehicle.vehicle',
'wheels.wheels',
'final_drive.final_drive',
'clutch_tc.clutch_torque_converter',
'clutch_tc.clutch.clutch',
'clutch_tc.torque_converter.torque_converter',
'electrics.electrics',
'electrics.electrics_prediction.electrics_prediction',
'engine.engine',
'engine.co2_emission.co2_emission',
'engine.cold_start.cold_start',
'engine.start_stop.start_stop',
'engine.thermal.thermal',
'gear_box.gear_box',
'gear_box.mechanical.mechanical',
'gear_box.cvt.cvt_model',
'gear_box.thermal.thermal',
'gear_box.at_gear.at_gear',
'gear_box.at_gear.at_cmv',
'gear_box.at_gear.at_cmv_cold_hot',
'gear_box.at_gear.at_dt_va',
'gear_box.at_gear.at_dt_vap',
'gear_box.at_gear.at_dt_vat',
'gear_box.at_gear.at_dt_vatp',
'gear_box.at_gear.at_gspv',
'gear_box.at_gear.at_gspv_cold_hot',
]
]
]
co2mpas_model = {'co2mpas.%s' % k for k in co2mpas_model}
if not model_ids:
models = co2mpas_model
else:
model_ids = set(model_ids)
models = model_ids.intersection(co2mpas_model)
for model_id in model_ids - models:
models.update(k for k in co2mpas_model if model_id in k)
return sorted(models)
def plot_model_graphs(model_ids=None, view_in_browser=True,
depth=-1, output_folder=None, **kwargs):
"""
Plots the graph of CO2MPAS models.
:param model_ids:
List of models to be plotted
(e.g., ['co2mpas.physical.physical', 'engine', ...]).
.. note:: It it is not specified all models will be plotted.
:type model_ids: list|tuple
:param view_in_browser:
Open the rendered directed graph in the DOT language with the sys
default opener.
:type view_in_browser: bool, optional
:param output_folder:
Output folder.
:type output_folder: str
:param depth:
Max level of sub-dispatch plots. If `None` or negative, no limit.
:type depth: int, optional
:param kwargs:
Optional :func:`dispatcher.utils.drw.dsp2dot` keywords.
:type kwargs: dict
:return:
A list of directed graphs source code in the DOT language.
:rtype: list
"""
models_path = get_model_paths(model_ids=model_ids)
log.info('Plotting graph for models: %s', models_path)
dot_graphs = []
for model_path in models_path:
model_path = model_path.split('.')
module_path, object_name = '.'.join(model_path[:-1]), model_path[-1]
importlib.import_module(module_path)
module = sys.modules[module_path]
dsp = getattr(module, object_name)()
depth = -1 if depth is None else depth
filename = osp.join(output_folder, dsp.name) if output_folder else None
dot = dsp.plot(view=view_in_browser, depth=depth, filename=filename,
**kwargs)
dot_graphs.append(dot)
return dot_graphs
def plot_time_series(
dsp, x_id, *y_ids, title=None, x_label=None, y_label=None, **kwargs):
"""
Plot time series from the dsp.
:param dsp:
Co2mpas model.
:type dsp: co2mpas.dispatcher.Dispatcher
:param x_id:
Id of X axes.
:type x_id: str | tuple[str]
:param y_ids:
Ids of data to plot.
:type y_ids: tuple[dict | str | tuple[str]]
:param title:
Plot title.
:type title: str
:param x_label:
Label of X axes.
:type x_label: str
:param y_label:
Label of X axes.
:type y_label: str
:param kwargs:
Optional plot kwargs.
:type y_label: dict
"""
x_id = stlp(x_id)
x, x_id = dsp.get_node(*x_id)
if x_label is None:
x_label = dsp.get_node(*x_id, node_attr='description')[0][0]
if x_label:
plt.xlabel(x_label)
if title is not None:
plt.title(title)
for data in y_ids:
if not isinstance(data, dict):
data = {'id': data, 'x': x}
if 'id' in data:
y_id = stlp(data.pop('id'))
des = y_label is None or 'label' not in data
if des or 'y' not in data:
y, y_id = dsp.get_node(*y_id)
if des:
label = dsp.get_node(*y_id, node_attr='description')[0][0]
if y_label is None:
y_label = label
if 'label' not in data:
data['label'] = label or y_id[-1]
if 'y' not in data:
data['y'] = y
elif 'y' not in data:
data['y'] = dsp.get_node(*y_id)[0]
x = data.pop('x', x)
y = data.pop('y')
if x.shape[0] != y.shape[0]:
y = y.T
data.update(kwargs)
plt.plot(x, y, **data)
if y_label:
plt.ylabel(y_label)
plt.legend()