"""Functions for creating/updating a recipe with `Dataset`s."""
from __future__ import annotations
import itertools
import logging
import re
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Sequence
from nested_lookup import nested_delete
from esmvalcore.exceptions import RecipeError
from ._io import _load_recipe
if TYPE_CHECKING:
from esmvalcore.dataset import Dataset
logger = logging.getLogger(__name__)
Recipe = Dict[str, Any]
Facets = Dict[str, Any]
def _datasets_to_raw_recipe(datasets: Iterable[Dataset]) -> Recipe:
"""Convert datasets to a recipe dict."""
diagnostics: dict[str, dict[str, Any]] = {}
for dataset in datasets:
diagnostic_name: str = dataset.facets['diagnostic'] # type: ignore
if diagnostic_name not in diagnostics:
diagnostics[diagnostic_name] = {'variables': {}}
variables = diagnostics[diagnostic_name]['variables']
if 'variable_group' in dataset.facets:
variable_group = dataset.facets['variable_group']
else:
variable_group = dataset.facets['short_name']
if variable_group not in variables:
variables[variable_group] = {'additional_datasets': []}
facets: dict[str, Any] = dataset.minimal_facets
facets.pop('diagnostic', None)
if facets['short_name'] == variable_group:
facets.pop('short_name')
if dataset.supplementaries:
facets['supplementary_variables'] = []
for supplementary in dataset.supplementaries:
anc_facets = {}
for key, value in supplementary.minimal_facets.items():
if facets.get(key) != value:
anc_facets[key] = value
facets['supplementary_variables'].append(anc_facets)
variables[variable_group]['additional_datasets'].append(facets)
recipe = {'diagnostics': diagnostics}
return recipe
def _datasets_to_recipe(datasets: Iterable[Dataset]) -> Recipe:
"""Convert datasets to a condensed recipe dict."""
for dataset in datasets:
if 'diagnostic' not in dataset.facets:
raise RecipeError(f"'diagnostic' facet missing from {dataset},"
"unable to convert to recipe.")
recipe = _datasets_to_raw_recipe(datasets)
diagnostics = recipe['diagnostics'].values()
# Group ensemble members
for diagnostic in diagnostics:
for variable in diagnostic['variables'].values():
variable['additional_datasets'] = _group_ensemble_members(
variable['additional_datasets'])
# Move identical facets from dataset to variable
for diagnostic in diagnostics:
diagnostic['variables'] = {
variable_group: _group_identical_facets(variable)
for variable_group, variable in diagnostic['variables'].items()
}
# Deduplicate by moving datasets up from variable to diagnostic to recipe
recipe = _move_datasets_up(recipe)
return recipe
def _move_datasets_up(recipe: Recipe) -> Recipe:
"""Move datasets from variable to diagnostic to recipe."""
# Move `additional_datasets` from variable to diagnostic level
for diagnostic in recipe['diagnostics'].values():
_move_one_level_up(diagnostic, 'variables', 'additional_datasets')
# Move `additional_datasets` from diagnostic to `datasets` at recipe level
_move_one_level_up(recipe, 'diagnostics', 'datasets')
return recipe
def _to_frozen(item):
"""Return a frozen and sorted copy of nested dicts and lists."""
if isinstance(item, list):
return tuple(sorted(_to_frozen(elem) for elem in item))
if isinstance(item, dict):
return tuple(sorted((k, _to_frozen(v)) for k, v in item.items()))
return item
def _move_one_level_up(base: dict, level: str, target: str):
"""Move datasets one level up in the recipe."""
groups = base[level]
if not groups:
return
# Create a mapping from objects that can be hashed to the dicts
# describing the datasets.
dataset_mapping = {}
for name, group in groups.items():
dataset_mapping[name] = {
_to_frozen(ds): ds
for ds in group['additional_datasets']
}
# Set datasets that are common to all groups
first_datasets = next(iter(dataset_mapping.values()))
common_datasets = set(first_datasets)
for datasets in dataset_mapping.values():
common_datasets &= set(datasets)
base[target] = [
v for k, v in first_datasets.items() if k in common_datasets
]
# Remove common datasets from groups
for name, datasets in dataset_mapping.items():
group = groups[name]
var_datasets = set(datasets) - common_datasets
if var_datasets:
group['additional_datasets'] = [
v for k, v in datasets.items() if k in var_datasets
]
else:
group.pop('additional_datasets')
def _group_identical_facets(variable: Mapping[str, Any]) -> Recipe:
"""Move identical facets from datasets to variable."""
result = dict(variable)
dataset_facets = result.pop('additional_datasets')
variable_keys = [
k for k, v in dataset_facets[0].items()
if k != 'dataset' # keep at least one key in every dataset
and all((k, v) in d.items() for d in dataset_facets[1:])
]
result.update(
(k, v) for k, v in dataset_facets[0].items() if k in variable_keys)
result['additional_datasets'] = [{
k: v
for k, v in d.items() if k not in variable_keys
} for d in dataset_facets]
return result
def _group_ensemble_members(dataset_facets: Iterable[Facets]) -> list[Facets]:
"""Group ensemble members.
This is the inverse operation of `Dataset.from_ranges` for
ensembles.
"""
def grouper(facets):
return tuple((k, facets[k]) for k in sorted(facets) if k != 'ensemble')
result = []
for group_facets, group in itertools.groupby(dataset_facets, key=grouper):
ensembles = [f['ensemble'] for f in group if 'ensemble' in f]
if not ensembles:
result.append(dict(group_facets))
else:
for ensemble in _group_ensemble_names(ensembles):
facets = dict(group_facets)
facets['ensemble'] = ensemble
result.append(facets)
return result
def _group_ensemble_names(ensemble_names: Iterable[str]) -> list[str]:
"""Group ensemble names.
Examples
--------
ensemble_names=[
'r1i1p1',
'r2i1p1',
'r3i1p1',
'r1i1p2',
]
will return [
'r(1:3)i1p1',
'r1i1p2',
].
"""
ensemble_tuples = [
tuple(int(i) for i in re.findall(r'\d+', ens))
for ens in ensemble_names
]
ensemble_ranges = _create_ensemble_ranges(ensemble_tuples)
groups = []
for ensemble_range in ensemble_ranges:
txt = ''
for name, value in zip('ripf', ensemble_range):
txt += name
if value[0] == value[1]:
txt += f"{value[0]}"
else:
txt += f"({value[0]}:{value[1]})"
groups.append(txt)
return groups
def _create_ensemble_ranges(
ensembles: Sequence[tuple[int,
...]], ) -> list[tuple[tuple[int, int], ...]]:
"""Create ranges from tuples.
Examples
--------
Input ensemble member tuple (1, 1, 1) represents 'r1i1p1'.
The input tuples will be converted to ranges, for example
ensembles=[
(1, 1, 1),
(2, 1, 1),
(3, 1, 1),
(1, 1, 2),
]
will return [
((1, 3), (1, 1), (1, 1)),
((1, 1), (1, 1), (2, 2)),
].
"""
def order(i, ens):
prefix, suffix = ens[:i], ens[i + 1:]
return (prefix, suffix, ens[i])
def grouper(i, ens):
prefix, suffix = ens[:i], ens[i + 1:]
return (prefix, suffix)
for i in range(len(ensembles[0])):
grouped_ensembles = []
ensembles = sorted(ensembles, key=partial(order, i))
for (prefix,
suffix), ibunch in itertools.groupby(ensembles,
key=partial(grouper, i)):
bunch = list(ibunch)
prev = bunch[0][i]
groups = [[prev]]
for ensemble in bunch[1:]:
if ensemble[i] == prev + 1:
prev += 1
else:
groups[-1].append(prev)
prev = ensemble[i]
groups.append([prev])
groups[-1].append(prev)
result = []
for group in groups:
item = prefix + (tuple(group), ) + suffix
result.append(item)
grouped_ensembles.extend(result)
ensembles = grouped_ensembles
return sorted(ensembles) # type: ignore
def _clean_recipe(recipe: Recipe, diagnostics: list[str]) -> Recipe:
"""Clean up the input recipe."""
# Format description nicer
if 'documentation' in recipe:
doc = recipe['documentation']
for key in ['title', 'description']:
if key in doc:
doc[key] = doc[key].strip()
# Filter out unused diagnostics
recipe['diagnostics'] = {
k: v
for k, v in recipe['diagnostics'].items() if k in diagnostics
}
# Remove legacy supplementary definitions form the recipe
nested_delete(
recipe.get('preprocessors', {}),
'fx_variables',
in_place=True,
)
return recipe
[docs]def datasets_to_recipe(
datasets: Iterable[Dataset],
recipe: Path | str | dict[str, Any] | None = None,
) -> dict:
"""Create or update a recipe from datasets.
Parameters
----------
datasets
Datasets to use in the recipe.
recipe
:ref:`Recipe <recipe>` to load the datasets from. The value
provided here should be either a path to a file, a recipe file
that has been loaded using e.g. :func:`yaml.safe_load`, or an
:obj:`str` that can be loaded using :func:`yaml.safe_load`.
Examples
--------
See :ref:`/notebooks/composing-recipes.ipynb` for example use cases.
Returns
-------
dict
The recipe with the datasets. To convert the :obj:`dict` to a
:ref:`recipe <recipe>`, use e.g. :func:`yaml.safe_dump`.
Raises
------
RecipeError
Raised when a dataset is missing the ``diagnostic`` facet.
"""
recipe = _load_recipe(recipe)
dataset_recipe = _datasets_to_recipe(datasets)
_clean_recipe(recipe, diagnostics=dataset_recipe['diagnostics'])
# Remove dataset sections from recipe
recipe.pop('datasets', None)
nested_delete(recipe, 'additional_datasets', in_place=True)
# Update datasets section
if 'datasets' in dataset_recipe:
recipe['datasets'] = dataset_recipe['datasets']
for diag, dataset_diagnostic in dataset_recipe['diagnostics'].items():
if diag not in recipe['diagnostics']:
recipe['diagnostics'][diag] = {}
diagnostic = recipe['diagnostics'][diag]
# Update diagnostic level datasets
if 'additional_datasets' in dataset_diagnostic:
additional_datasets = dataset_diagnostic['additional_datasets']
diagnostic['additional_datasets'] = additional_datasets
# Update variable level datasets
if 'variables' in dataset_diagnostic:
diagnostic['variables'] = dataset_diagnostic['variables']
return recipe