Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
David Stuebe committed Nov 28, 2023
1 parent f28f41b commit 5809526
Show file tree
Hide file tree
Showing 4 changed files with 582 additions and 59 deletions.
12 changes: 10 additions & 2 deletions kerchunk/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,13 @@ def store_coords(self):
for _ in v
]
).ravel()
if "fill_value" not in kw and data.dtype.kind == "i":
kw["fill_value"] = None
if "fill_value" not in kw:
if data.dtype.kind == "i":
kw["fill_value"] = None
else:
# Fall back to existing fill value
kw["fill_value"] = z[k].fill_value

arr = group.create_dataset(
name=k,
data=data,
Expand All @@ -323,6 +328,9 @@ def store_coords(self):
)
if k in z:
# copy attributes if values came from an original variable
logger.warning(
"Zarr attrs: %s", {ii: vv for ii, vv in z[k].attrs.items()}
)
arr.attrs.update(z[k].attrs)
arr.attrs["_ARRAY_DIMENSIONS"] = [k]
if self.cf_units and k in self.cf_units:
Expand Down
96 changes: 40 additions & 56 deletions kerchunk/grib2.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import fsspec
import zarr
import xarray
import numpy as np

from kerchunk.utils import class_factory, _encode_for_JSON
Expand Down Expand Up @@ -376,6 +377,7 @@ def grib_tree(
Grib message variable names that decode as "unknown" are dropped
Grib typeOfLevel attributes that decode as unknown are treated as a single group
Grib steps that are missing due to WrongStepUnitError are patched with NaT
The input message_groups should not be modified by this method
:param message_groups: a collection of zarr store like dictionaries as produced by scan_grib
:param remote_options: remote options to pass to ZarrToMultiZarr
Expand Down Expand Up @@ -418,10 +420,10 @@ def grib_tree(
# To resolve unknown variables add custom grib tables.
# https://confluence.ecmwf.int/display/UDOC/Creating+your+own+local+definitions+-+ecCodes+GRIB+FAQ
# If you process the groups from a single file in order, you can use the msg# to compare with the
# IDX file.
# IDX file. The idx files message index is 1 based where the grib_tree message count is zero based
logger.warning(
"Dropping unknown variable in msg# %d. Compare with the grib idx file to identify and build"
" a ecCodes local grib definitions to fix it.",
"Dropping unknown variable in msg# %d. Compare with the grib idx file to help identify it"
" and build an ecCodes local grib definitions file to fix it.",
msg_ind,
)
unknown_counter += 1
Expand Down Expand Up @@ -496,21 +498,13 @@ def grib_tree(
catdims,
)

fix_group_step = add_missing_step_var(aggregations[path], path)
mzz = MultiZarrToZarr(
fix_group_step,
aggregations[path],
remote_options=remote_options,
concat_dims=catdims,
identical_dims=idims,
)
try:
group = mzz.translate()
except Exception:
import pprint

gstr = pprint.pformat(fix_group_step)
logger.exception(f"Failed to multizarr {path}\n{gstr}")
raise
group = mzz.translate()

for key, value in group["refs"].items():
if key not in [".zattrs", ".zgroup"]:
Expand All @@ -519,53 +513,43 @@ def grib_tree(
return result


def add_missing_step_var(groups: List[dict], path: str) -> List[dict]:
def correct_hrrr_subhf_group_step(group: dict) -> dict:
"""
Attempt to fill in missing step var. Should this be done where the step unit error is handled
in scan grib?
:param groups:
:param path:
:return:
Overrides the definition of the step variable. Sets the value equal to the `valid_time - time`
in hours as a floating point value. This fixes issues with the HRRR SubHF grib2 step as read by
cfgrib.
The result is a deep copy, the original data is unmodified.
:param groups: the list of groups to fix
:param path: the path of the group
:return: a new deep copy of the corrected listed
"""
result = []
for group in groups:
group = copy.deepcopy(group)
if "step/.zarray" not in group["refs"]:
logger.warning("Adding missing step variable to group path %s", path)
group["refs"]["step/.zarray"] = (
'{"chunks":[],"compressor":null,"dtype":"<f8","fill_value":"NaN","filters":null,"order":"C",'
'"shape":[],"zarr_format":2}'
)
group["refs"]["step/.zattrs"] = (
'{"_ARRAY_DIMENSIONS":[],"long_name":"time since forecast_reference_time",'
'"standard_name":"forecast_period","units":"hours"}'
)
group = copy.deepcopy(group)
group["refs"]["step/.zarray"] = (
'{"chunks":[],"compressor":null,"dtype":"<f8","fill_value":"NaN","filters":null,"order":"C",'
'"shape":[],"zarr_format":2}'
)
group["refs"]["step/.zattrs"] = (
'{"_ARRAY_DIMENSIONS":[],"long_name":"time since forecast_reference_time",'
'"standard_name":"forecast_period","units":"hours"}'
)

# Try to set the value - this doesn't work
import xarray
fo = fsspec.filesystem("reference", fo=group, mode="r")
xd = xarray.open_dataset(fo.get_mapper(), engine="zarr", consolidated=False)

gcopy = copy.deepcopy(group)
fo = fsspec.filesystem("reference", fo=gcopy, mode="r")
xd = xarray.open_dataset(fo.get_mapper(), engine="zarr", consolidated=False)
correct_step = xd.valid_time.values - xd.time.values

logger.info("%s has step val %s", path, xd.step.values)
correct_step = xd.valid_time.values - xd.time.values
assert correct_step.shape == ()
step_float = correct_step.astype("timedelta64[s]").astype("float") / 3600.0
step_bytes = step_float.tobytes()
try:
# easiest way to test if data is ascii
bytes = step_bytes.decode("ascii")
except UnicodeDecodeError:
bytes = (b"base64:" + base64.b64encode(step_bytes)).decode("ascii")
assert correct_step.shape == ()
step_float = correct_step.astype("timedelta64[s]").astype("float") / 3600.0
logger.info(
"Orig val %s, new val %s", xd.step.values.astype("timedelta64[s]"), step_float
)

group["refs"]["step/0"] = bytes
logger.info(
"Computed step float: %s, current step/0 %s, orig step %s",
step_float,
group["refs"].get("step/0"),
gcopy["refs"].get("step/0"),
)
result.append(group)
step_bytes = step_float.tobytes()
try:
enocded_val = step_bytes.decode("ascii")
except UnicodeDecodeError:
enocded_val = (b"base64:" + base64.b64encode(step_bytes)).decode("ascii")

return result
group["refs"]["step/0"] = enocded_val

return group
Loading

0 comments on commit 5809526

Please sign in to comment.