-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcatalog.py
225 lines (164 loc) · 5.83 KB
/
catalog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import os
from collections import OrderedDict
from datetime import datetime
import numpy as np
import pandas as pd
import yaml
from roocs_utils.project_utils import DatasetMapper
from roocs_utils.xarray_utils.xarray_utils import get_coord_by_type, open_xr_dataset
from catalog_maker import CONFIG
from catalog_maker.utils import create_dir
def get_time_info(ds, var_id):
all_times = []
try:
times = ds[var_id].time.values
all_times.extend(list(times))
ds.close()
except AttributeError:
return "undefined", "undefined"
start_time = all_times[0].isoformat(timespec="seconds")
end_time = all_times[-1].isoformat(timespec="seconds")
# check for dates in 4000s
if start_time.startswith("4") or end_time.startswith("4"):
raise Exception(
f"Time is not in expected range. Time range is {start_time} to {end_time}"
)
return (start_time, end_time)
def get_coord_info(coord):
data = coord.values
mn, mx = data.min(), data.max()
if np.isnan(mn) or np.isnan(mx):
mn, mx = float(coord.min()), float(coord.max())
return mn, mx
def get_bbox(ds):
lat = get_coord_by_type(ds, "latitude", ignore_aux_coords=False)
lon = get_coord_by_type(ds, "longitude", ignore_aux_coords=False)
min_y, max_y = get_coord_info(lat)
min_x, max_x = get_coord_info(lon)
if min_y < -90 or max_y > 90:
raise Exception(
f"Latitude is not within expected bounds. The minimum and maximum are {min_y}, {max_y}"
)
if min_x < -360 or max_x > 360:
raise Exception(
f"Longitude is not within expected bounds. The minimum and maximum are {min_x}, {max_x}"
)
bbox = f"{min_x:.2f}, {min_y:.2f}, {max_x:.2f}, {max_y:.2f}"
return bbox
def get_level_info(ds):
coord = get_coord_by_type(ds, "level", ignore_aux_coords=False)
level = " "
if coord is not None:
levels = coord.values
if levels.size > 1:
levels.sort()
for l in levels:
level += f" {l:.2f}"
else:
level += f" {levels:.2f}"
return level.lstrip(" ")
def get_size_data(fpath):
# get file size
size = os.path.getsize(fpath)
size_gb = round(size / 1e9, 2)
return size, size_gb
def get_files(ds_id):
fpaths = DatasetMapper(ds_id).files
if len(fpaths) < 1:
raise FileNotFoundError("No files were found for this dataset")
return fpaths
def build_dict(ds_id, fpath, proj_dict):
comps = ds_id.split(".")
ds = open_xr_dataset(fpath)
facet_rule = proj_dict["facet_rule"]
facets = dict([_ for _ in zip(facet_rule, comps)])
var_id = facets.get("variable") or facets.get("variable_id")
size, size_gb = get_size_data(fpath)
start_time, end_time = get_time_info(ds, var_id)
bbox = get_bbox(ds)
level = get_level_info(ds)
d = OrderedDict()
d["ds_id"] = ds_id
d["path"] = "/".join(ds_id.split(".")[1:]) + "/" + fpath.split("/")[-1]
d["size"] = size
# d["size_gb"] = size_gb
d.update(facets)
d["start_time"] = start_time
d["end_time"] = end_time
d["bbox"] = bbox
d["level"] = level
return d
def create_catalog(project, ds_id, fpath):
proj_dict = CONFIG[f"project:{project}"]
d = build_dict(ds_id, fpath, proj_dict)
return d
def write_catalog(df, project, last_updated, csv_dir, compress):
version_stamp = last_updated.strftime("v%Y%m%d")
cat_name = f"{project}_{version_stamp}.csv"
if compress:
cat_name += ".gz"
compression = "gzip"
else:
compression = None
cat_path = os.path.join(csv_dir, cat_name)
df.to_csv(cat_path, index=False, compression=compression)
return cat_path
def update_catalog(project, path, last_updated, cat_dir):
cat_name = "c3s.yaml"
cat_path = os.path.join(cat_dir, cat_name)
# dict to create yaml
d = {
f"{project}": {
"description": f"{project} datasets",
"driver": "intake.source.csv.CSVSource",
# "cache": [{"argkey": "urlpath", "type": "file"}],
"csv_kwargs": {
"blocksize": None,
"compression": "gzip",
"dtype": {"level": "object"},
},
"args": {"urlpath": ""},
"metadata": {"last_updated": ""},
}
}
if not os.path.exists(cat_path):
with open(cat_path, "w") as yaml_file:
yaml.dump({"sources": d}, yaml_file, default_flow_style=False)
with open(cat_path) as fin:
cat = yaml.load(fin, Loader=yaml.SafeLoader)
# check whether entry for project already exists
try:
cat["sources"][project]
except KeyError:
with open(cat_path, "a") as yaml_file:
cat["sources"].update(d)
yaml.dump(cat, yaml_file, default_flow_style=False)
with open(cat_path) as fin:
cat = yaml.load(fin, Loader=yaml.SafeLoader)
cat["sources"][project]["args"][
"urlpath"
] = "{{ CATALOG_DIR }}/" + os.path.relpath(path, start=cat_dir)
timestamp = last_updated.strftime("%Y-%m-%dT%H:%M:%SZ")
cat["sources"][project]["metadata"]["last_updated"] = timestamp
with open(cat_path, "w") as fout:
yaml.dump(cat, fout)
return cat_path
def to_csv(content, project, compress):
# create the dataframe
df = pd.DataFrame(content)
# write catalog
cat_dir = CONFIG[f"project:{project}"]["catalog_dir"]
csv_dir = CONFIG[f"project:{project}"]["csv_dir"]
# make sure directories exist
create_dir(cat_dir)
create_dir(csv_dir)
last_updated = datetime.now().utcnow()
cat_path = write_catalog(
df,
project,
last_updated,
csv_dir,
compress=compress,
)
print(f"Catalog written {cat_path}")
return cat_path, last_updated