Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add grib_tree method #399

Merged
merged 11 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 201 additions & 0 deletions kerchunk/grib2.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import base64
import copy
import logging
from collections import defaultdict
from typing import Iterable, List, Dict, Set

import ujson

try:
import cfgrib
Expand Down Expand Up @@ -354,3 +359,199 @@ def example_combine(
identical_dims=["heightAboveGround", "latitude", "longitude"],
)
return mzz.translate()


def grib_tree(
message_groups: Iterable[Dict],
remote_options=None,
) -> Dict:
"""
Build a hierarchical data model from a set of scanned grib messages.

The iterable input groups should
be a collection of results from scan_grib. Multiple grib files can be processed together to produce an
FMRC like collection.
The time (reference_time) and step coordinates will be used as concat_dims in the MultiZarrToZarr
aggregation. Each variable name will become a group with nested subgroups representing the grib
step type and grib level. The resulting hierarchy can be opened as a zarr_group or a xarray datatree.
Grib message variable names that decode as "unknown" are dropped
Grib typeOfLevel attributes that decode as unknown are treated as a single group
Grib steps that are missing due to WrongStepUnitError are patched with NaT

:param message_groups: a collection of zarr store like dictionaries as produced by scan_grib
:param remote_options: remote options to pass to ZarrToMultiZarr
:return: A new zarr store like dictionary for use as a reference filesystem mapper
"""
from kerchunk.combine import MultiZarrToZarr
emfdavid marked this conversation as resolved.
Show resolved Hide resolved

# Hard code the filters in the correct order for the group hierarchy
filters = ["stepType", "typeOfLevel"]

zarr_store = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow for parquet/bring-your-own output?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we need to figure out how parquet works at all for deep nested data like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the hierarchy in zarr is just a path embedded in the key... it might must work.
If I want to add more tests, should I checkin some json output from scan_grib for a few variables?

zroot = zarr.open_group(store=zarr_store)
result = dict(refs=zarr_store)

aggregations: Dict[str, List] = defaultdict(list)
aggregation_dims: Dict[str, Set] = defaultdict(set)

unknown_counter = 0
for msg_ind, group in enumerate(message_groups):
if "version" not in result:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested with zarr v3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to run scan_grib using zarr v3? As implemented, grib_tree is pretty tightly coupled on the output of scan_grib. I see you can set an ENV var to allow access to the experimental API...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so version should always be 2? Or is this not the zarr version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - you are right - this is the kerchunk version, not the zarr version anyway.

result["version"] = group["version"]

gattrs = ujson.loads(group["refs"][".zattrs"])
coordinates = gattrs["coordinates"].split(" ")

# Find the data variable
vname = None
for key, entry in group["refs"].items():
name = key.split("/")[0]
if name not in [".zattrs", ".zgroup"] and name not in coordinates:
vname = name
break

if vname is None:
raise RuntimeError(
f"Can not find a data var for msg# {msg_ind} in {group['refs'].keys()}"
)

if vname == "unknown":
# To resolve unknown variables add custom grib tables.
# https://confluence.ecmwf.int/display/UDOC/Creating+your+own+local+definitions+-+ecCodes+GRIB+FAQ
# If you process the groups from a single file in order, you can use the msg# to compare with the
# IDX file.
logger.warning(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not include the message about what to do in the logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to put more of the answer in the log message - though I have not been successful in doing it myself... yet.
So far I have used the idx file to confirm that I am unlikely to need these unknown variables in my use case with HRRR and GFS grib2 files.

Example: gs://high-resolution-rapid-refresh/hrrr.20230928/conus/hrrr.t01z.wrfsfcf05.grib2
Msg # indexed from zero

Found unknown variable in msg# 1... it will be dropped
Found unknown variable in msg# 2... it will be dropped
Found unknown variable in msg# 37... it will be dropped
Found unknown variable in msg# 38... it will be dropped
Found unknown variable in msg# 42... it will be dropped
Found unknown variable in msg# 44... it will be dropped
...

IDX file entries (messages indexed from 1)
Example: gs://high-resolution-rapid-refresh/hrrr.20230928/conus/hrrr.t01z.wrfsfcf05.grib2.idx

1:0:d=2023092801:REFC:entire atmosphere:5 hour fcst:
2:375155:d=2023092801:RETOP:cloud top:5 hour fcst:
3:517041:d=2023092801:var discipline=0 center=7 local_table=1 parmcat=16 parm=201:entire atmosphere:5 hour fcst:
4:889615:d=2023092801:VIL:entire atmosphere:5 hour fcst:
5:1157550:d=2023092801:VIS:surface:5 hour fcst:
...
37:24333077:d=2023092801:VGRD:1000 mb:5 hour fcst:
38:24956531:d=2023092801:MAXUVV:100-1000 mb above ground:4-5 hour max fcst:
39:25687250:d=2023092801:MAXDVV:100-1000 mb above ground:4-5 hour max fcst:
40:26410531:d=2023092801:DZDT:0.5-0.8 sigma layer:4-5 hour ave fcst:
41:27042519:d=2023092801:MSLMA:mean sea level:5 hour fcst:
42:27650201:d=2023092801:HGT:1000 mb:5 hour fcst:
43:28342907:d=2023092801:MAXREF:1000 m above ground:4-5 hour max fcst:
44:28588455:d=2023092801:REFD:263 K level:4-5 hour max fcst:
45:28736498:d=2023092801:MXUPHL:5000-2000 m above ground:4-5 hour max fcst:
46:28789333:d=2023092801:MNUPHL:5000-2000 m above ground:4-5 hour min fcst:
...

The metadata in the NOAA EMC generated idx files suggests we are missing local definitions for a number of variables that represent min, max or avg values over a layer. I have asked NOAA-EMC for help with decoding these properly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we can leave this for future improvement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crickets over at NOAA-EMC... I think we might have a working rust gribberish reader before we get a clear answer on how to build the ecCodes tables for these unknown properties.
🤞 @mpiannucci

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the tables are annoying, need to figure that out in rust too to be fully comprehensive

"Found unknown variable in msg# %s... it will be dropped", msg_ind
)
unknown_counter += 1
continue

logger.debug("Processing vname: %s", vname)
dattrs = ujson.loads(group["refs"][f"{vname}/.zattrs"])
# filter order matters - it determines the hierarchy
gfilters = {}
for key in filters:
attr_val = dattrs.get(f"GRIB_{key}")
if attr_val is None:
continue
if attr_val == "unknown":
logger.warning(
"Found 'unknown' attribute value for key %s in var %s of msg# %s",
key,
vname,
msg_ind,
)
# Use unknown as a group or drop it?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any cases like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the same HRRR SFCF grib2 files discussed above re: unknown variables, here are two examples of unknown levels:
Example logs

Found 'unknown' attribute value for key typeOfLevel in var gh of msg# 161
Found 'unknown' attribute value for key typeOfLevel in var layth of msg# 164

Corresponding idx file entries.

162:129396944:d=2023092801:HGT:level of free convection:5 hour fcst:
165:135154615:d=2023092801:LAYTH:261 K level - 256 K level:5 hour fcst:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying that we should be able to decode them? The grib object has many many possble attributes, so this seems solvable. Not needed for this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - I believe these are just more custom codes, similar to the unknown variables above.
I think they are all documented and verifiable by reading the idx files, but implementing the tables for ecCodes to read them is more than I can take on right now...


gfilters[key] = attr_val

zgroup = zroot.require_group(vname)
if "name" not in zgroup.attrs:
zgroup.attrs["name"] = dattrs.get("GRIB_name")

for key, value in gfilters.items():
if value: # Ignore empty string and None
# name the group after the attribute values: surface, instant, etc
zgroup = zgroup.require_group(value)
# Add an attribute to give context
zgroup.attrs[key] = value

# add to the list of groups to multi-zarr
aggregations[zgroup.path].append(group)

# keep track of the level coordinate variables and their values
for key, entry in group["refs"].items():
name = key.split("/")[0]
if name == gfilters.get("typeOfLevel") and key.endswith("0"):
if isinstance(entry, list):
entry = tuple(entry)
aggregation_dims[zgroup.path].add(entry)

concat_dims = ["time", "step"]
identical_dims = ["longitude", "latitude"]
for path in aggregations.keys():
# Parallelize this step!
catdims = concat_dims.copy()
idims = identical_dims.copy()

level_dimension_value_count = len(aggregation_dims.get(path, ()))
level_group_name = path.split("/")[-1]
if level_dimension_value_count == 0:
logger.debug(
"Path % has no value coordinate value associated with the level name %s",
path,
level_group_name,
)
elif level_dimension_value_count == 1:
idims.append(level_group_name)
elif level_dimension_value_count > 1:
# The level name should be the last element in the path
catdims.insert(3, level_group_name)

logger.info(
"%s calling MultiZarrToZarr with idims %s and catdims %s",
path,
idims,
catdims,
)

fix_group_step = add_missing_step_var(aggregations[path], path)
mzz = MultiZarrToZarr(
fix_group_step,
remote_options=remote_options,
concat_dims=catdims,
identical_dims=idims,
)
try:
group = mzz.translate()
except KeyError:
import pprint

gstr = pprint.pformat(fix_group_step)
logger.exception(f"Failed to multizarr {path}\n{gstr}")
continue

for key, value in group["refs"].items():
if key not in [".zattrs", ".zgroup"]:
zarr_store[f"{path}/{key}"] = value

return result


def add_missing_step_var(groups: List[dict], path: str) -> List[dict]:
"""
Attempt to fill in missing step var. Should this be done where the step unit error is handled
in scan grib?
:param groups:
:param path:
:return:
"""
result = []
for group in groups:
if "step/.zarray" not in group["refs"]:
group = copy.deepcopy(group)
logger.warning("Adding missing step variable to group path %s", path)
group["refs"]["step/.zarray"] = (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I put this in scan_grib where the cfgrib WrongStepUnitError is handled?
I don't like doing this here. Another alternative would be to use the preprocess hook in MultiZarrToZarr?
We should be able to calculate the correct step value by subtracting the valid_time from the time (reference time).
That requires decoding those values though and then encoding the step correctly...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the HRRR SubHF dataset, even the step values that are decoded (don't raise an exception) are wrong.

scans = scan_grib("gs://high-resolution-rapid-refresh/hrrr.20210928/conus/hrrr.t00z.wrfsubhf01.grib2")
dsets = []
zsets = []
vname = "dswrf"
for gg in scans:
  if f"{vname}/.zattrs" in gg['refs']:
    fs = fsspec.filesystem("reference", fo=gg)
    dsets.append(xr.open_dataset(fs.get_mapper(""), engine="zarr", consolidated=False))
    zsets.append(zarr.open_group(fs.get_mapper("")))
for ds, zs in zip(dsets, zsets):
  print(
      f"{ds[vname].attrs['GRIB_stepType']}, zs units {zs.step.attrs['units'] if 'step' in ds else None}: "
      f"ds step - {ds.step.values[()].astype('timedelta64[h]') if 'step' in ds else None}, zs step - {zs.step[()] if 'step' in zs else None};"
      f" valid_time - {ds.valid_time.values[()]}; time - {ds.time.values[()]}"
  )
avg, zs units None: ds step - None, zs step - None; valid_time - 2021-09-28T00:15:00.000000000; time - 2021-09-28T00:00:00.000000000
instant, zs units hours: ds step - 15 hours, zs step - 15.0; valid_time - 2021-09-28T00:15:00.000000000; time - 2021-09-28T00:00:00.000000000
avg, zs units hours: ds step - 30 hours, zs step - 30.0; valid_time - 2021-09-28T00:30:00.000000000; time - 2021-09-28T00:00:00.000000000
instant, zs units hours: ds step - 30 hours, zs step - 30.0; valid_time - 2021-09-28T00:30:00.000000000; time - 2021-09-28T00:00:00.000000000
avg, zs units hours: ds step - 45 hours, zs step - 45.0; valid_time - 2021-09-28T00:45:00.000000000; time - 2021-09-28T00:00:00.000000000
instant, zs units hours: ds step - 45 hours, zs step - 45.0; valid_time - 2021-09-28T00:45:00.000000000; time - 2021-09-28T00:00:00.000000000
avg, zs units hours: ds step - 60 hours, zs step - 60.0; valid_time - 2021-09-28T01:00:00.000000000; time - 2021-09-28T00:00:00.000000000
instant, zs units hours: ds step - 1 hours, zs step - 1.0; valid_time - 2021-09-28T01:00:00.000000000; time - 2021-09-28T00:00:00.000000000

The values are in floating point minute but the units are specified as hours.
Only the stepType instant for the end of the forecast is correct - 1 hour.

'{"chunks":[],"compressor":null,"dtype":"<f8","fill_value":"NaN","filters":null,"order":"C",'
'"shape":[],"zarr_format":2}'
)
group["refs"]["step/.zattrs"] = (
'{"_ARRAY_DIMENSIONS":[],"long_name":"time since forecast_reference_time",'
'"standard_name":"forecast_period","units":"hours"}'
)

# # Try to set the value - this doesn't work
# import xarray
# fo = fsspec.filesystem("reference", fo=group, mode="rw")
# xd = xarray.open_dataset(fo.get_mapper(), engine="zarr", consolidated=False)
# if np.isnan(xd.step.values[()]):
# logger.info("%s has step val %s", path, xd.step)
# xd.step[()] = xd.valid_time.values - xd.time.values
# xd.close()
# for k, v in group["refs"].items():
# if "step" in k:
# logger.info("New step value %s, %s", k, v)

result.append(group)

return result
18 changes: 16 additions & 2 deletions kerchunk/tests/test_grib.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import numpy as np
import pytest
import xarray as xr

from kerchunk.grib2 import scan_grib, _split_file, GribToZarr
import zarr
from kerchunk.grib2 import scan_grib, _split_file, GribToZarr, grib_tree

cfgrib = pytest.importorskip("cfgrib")
here = os.path.dirname(__file__)
Expand Down Expand Up @@ -83,3 +83,17 @@ def test_subhourly():
fpath = os.path.join(here, "hrrr.wrfsubhf.sample.grib2")
result = scan_grib(fpath)
assert len(result) == 2, "Expected two grib messages"


def test_grib_tree():
"""
Additional testing here would be good.
Maybe add json files with scan_grib output?
"""
fpath = os.path.join(here, "hrrr.wrfsubhf.sample.grib2")
scanned_msg_groups = scan_grib(fpath)
result = grib_tree(scanned_msg_groups)
fs = fsspec.filesystem("reference", fo=result)
zg = zarr.open_group(fs.get_mapper(""))
isinstance(zg["refc/instant/atmosphere/refc"], zarr.Array)
isinstance(zg["vbdsf/avg/surface/vbdsf"], zarr.Array)
Loading