Source code for esmvalcore._recipe.from_datasets

"""Functions for creating/updating a recipe with `Dataset`s."""
from __future__ import annotations

import itertools
import logging
import re
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Sequence

from nested_lookup import nested_delete

from esmvalcore.exceptions import RecipeError

from ._io import _load_recipe

    from esmvalcore.dataset import Dataset

logger = logging.getLogger(__name__)

Recipe = Dict[str, Any]
Facets = Dict[str, Any]

def _datasets_to_raw_recipe(datasets: Iterable[Dataset]) -> Recipe:
    """Convert datasets to a recipe dict."""
    diagnostics: dict[str, dict[str, Any]] = {}

    for dataset in datasets:
        diagnostic_name: str = dataset.facets['diagnostic']  # type: ignore
        if diagnostic_name not in diagnostics:
            diagnostics[diagnostic_name] = {'variables': {}}
        variables = diagnostics[diagnostic_name]['variables']
        if 'variable_group' in dataset.facets:
            variable_group = dataset.facets['variable_group']
            variable_group = dataset.facets['short_name']
        if variable_group not in variables:
            variables[variable_group] = {'additional_datasets': []}
        facets: dict[str, Any] = dataset.minimal_facets
        facets.pop('diagnostic', None)
        if facets['short_name'] == variable_group:
        if dataset.supplementaries:
            facets['supplementary_variables'] = []
        for supplementary in dataset.supplementaries:
            anc_facets = {}
            for key, value in supplementary.minimal_facets.items():
                if facets.get(key) != value:
                    anc_facets[key] = value

    recipe = {'diagnostics': diagnostics}
    return recipe

def _datasets_to_recipe(datasets: Iterable[Dataset]) -> Recipe:
    """Convert datasets to a condensed recipe dict."""
    for dataset in datasets:
        if 'diagnostic' not in dataset.facets:
            raise RecipeError(f"'diagnostic' facet missing from {dataset},"
                              "unable to convert to recipe.")

    recipe = _datasets_to_raw_recipe(datasets)
    diagnostics = recipe['diagnostics'].values()

    # Group ensemble members
    for diagnostic in diagnostics:
        for variable in diagnostic['variables'].values():
            variable['additional_datasets'] = _group_ensemble_members(

    # Move identical facets from dataset to variable
    for diagnostic in diagnostics:
        diagnostic['variables'] = {
            variable_group: _group_identical_facets(variable)
            for variable_group, variable in diagnostic['variables'].items()

    # Deduplicate by moving datasets up from variable to diagnostic to recipe
    recipe = _move_datasets_up(recipe)

    return recipe

def _move_datasets_up(recipe: Recipe) -> Recipe:
    """Move datasets from variable to diagnostic to recipe."""
    # Move `additional_datasets` from variable to diagnostic level
    for diagnostic in recipe['diagnostics'].values():
        _move_one_level_up(diagnostic, 'variables', 'additional_datasets')

    # Move `additional_datasets` from diagnostic to `datasets` at recipe level
    _move_one_level_up(recipe, 'diagnostics', 'datasets')

    return recipe

def _to_frozen(item):
    """Return a frozen and sorted copy of nested dicts and lists."""
    if isinstance(item, list):
        return tuple(sorted(_to_frozen(elem) for elem in item))
    if isinstance(item, dict):
        return tuple(sorted((k, _to_frozen(v)) for k, v in item.items()))
    return item

def _move_one_level_up(base: dict, level: str, target: str):
    """Move datasets one level up in the recipe."""
    groups = base[level]
    if not groups:

    # Create a mapping from objects that can be hashed to the dicts
    # describing the datasets.
    dataset_mapping = {}
    for name, group in groups.items():
        dataset_mapping[name] = {
            _to_frozen(ds): ds
            for ds in group['additional_datasets']

    # Set datasets that are common to all groups
    first_datasets = next(iter(dataset_mapping.values()))
    common_datasets = set(first_datasets)
    for datasets in dataset_mapping.values():
        common_datasets &= set(datasets)
    base[target] = [
        v for k, v in first_datasets.items() if k in common_datasets

    # Remove common datasets from groups
    for name, datasets in dataset_mapping.items():
        group = groups[name]
        var_datasets = set(datasets) - common_datasets
        if var_datasets:
            group['additional_datasets'] = [
                v for k, v in datasets.items() if k in var_datasets

def _group_identical_facets(variable: Mapping[str, Any]) -> Recipe:
    """Move identical facets from datasets to variable."""
    result = dict(variable)
    dataset_facets = result.pop('additional_datasets')
    variable_keys = [
        k for k, v in dataset_facets[0].items()
        if k != 'dataset'  # keep at least one key in every dataset
        and all((k, v) in d.items() for d in dataset_facets[1:])
        (k, v) for k, v in dataset_facets[0].items() if k in variable_keys)
    result['additional_datasets'] = [{
        k: v
        for k, v in d.items() if k not in variable_keys
    } for d in dataset_facets]
    return result

def _group_ensemble_members(dataset_facets: Iterable[Facets]) -> list[Facets]:
    """Group ensemble members.

    This is the inverse operation of `Dataset.from_ranges` for

    def grouper(facets):
        return sorted(
            (f, str(v)) for f, v in facets.items() if f != 'ensemble')

    result = []
    dataset_facets = sorted(dataset_facets, key=grouper)
    for _, group_iter in itertools.groupby(dataset_facets, key=grouper):
        group = list(group_iter)
        ensembles = [f['ensemble'] for f in group if 'ensemble' in f]
        group_facets = group[0]
        if not ensembles:
            for ensemble in _group_ensemble_names(ensembles):
                facets = dict(group_facets)
                facets['ensemble'] = ensemble
    return result

def _group_ensemble_names(ensemble_names: Iterable[str]) -> list[str]:
    """Group ensemble names.

    will return [
    ensemble_tuples = [
        tuple(int(i) for i in re.findall(r'\d+', ens))
        for ens in ensemble_names

    ensemble_ranges = _create_ensemble_ranges(ensemble_tuples)

    groups = []
    for ensemble_range in ensemble_ranges:
        txt = ''
        for name, value in zip('ripf', ensemble_range):
            txt += name
            if value[0] == value[1]:
                txt += f"{value[0]}"
                txt += f"({value[0]}:{value[1]})"

    return groups

def _create_ensemble_ranges(
    ensembles: Sequence[tuple[int,
                              ...]], ) -> list[tuple[tuple[int, int], ...]]:
    """Create ranges from tuples.

    Input ensemble member tuple (1, 1, 1) represents 'r1i1p1'.
    The input tuples will be converted to ranges, for example
        (1, 1, 1),
        (2, 1, 1),
        (3, 1, 1),
        (1, 1, 2),
    will return [
        ((1, 3), (1, 1), (1, 1)),
        ((1, 1), (1, 1), (2, 2)),

    def order(i, ens):
        prefix, suffix = ens[:i], ens[i + 1:]
        return (prefix, suffix, ens[i])

    def grouper(i, ens):
        prefix, suffix = ens[:i], ens[i + 1:]
        return (prefix, suffix)

    for i in range(len(ensembles[0])):
        grouped_ensembles = []
        ensembles = sorted(ensembles, key=partial(order, i))
        for (prefix,
             suffix), ibunch in itertools.groupby(ensembles,
                                                  key=partial(grouper, i)):
            bunch = list(ibunch)
            prev = bunch[0][i]
            groups = [[prev]]
            for ensemble in bunch[1:]:
                if ensemble[i] == prev + 1:
                    prev += 1
                    prev = ensemble[i]
            result = []
            for group in groups:
                item = prefix + (tuple(group), ) + suffix

        ensembles = grouped_ensembles

    return sorted(ensembles)  # type: ignore

def _clean_recipe(recipe: Recipe, diagnostics: list[str]) -> Recipe:
    """Clean up the input recipe."""
    # Format description nicer
    if 'documentation' in recipe:
        doc = recipe['documentation']
        for key in ['title', 'description']:
            if key in doc:
                doc[key] = doc[key].strip()

    # Filter out unused diagnostics
    recipe['diagnostics'] = {
        k: v
        for k, v in recipe['diagnostics'].items() if k in diagnostics

    # Remove legacy supplementary definitions form the recipe
        recipe.get('preprocessors', {}),

    return recipe

[docs] def datasets_to_recipe( datasets: Iterable[Dataset], recipe: Path | str | dict[str, Any] | None = None, ) -> dict: """Create or update a recipe from datasets. Parameters ---------- datasets Datasets to use in the recipe. recipe :ref:`Recipe <recipe>` to load the datasets from. The value provided here should be either a path to a file, a recipe file that has been loaded using e.g. :func:`yaml.safe_load`, or an :obj:`str` that can be loaded using :func:`yaml.safe_load`. Examples -------- See :ref:`/notebooks/composing-recipes.ipynb` for example use cases. Returns ------- dict The recipe with the datasets. To convert the :obj:`dict` to a :ref:`recipe <recipe>`, use e.g. :func:`yaml.safe_dump`. Raises ------ RecipeError Raised when a dataset is missing the ``diagnostic`` facet. """ recipe = _load_recipe(recipe) dataset_recipe = _datasets_to_recipe(datasets) _clean_recipe(recipe, diagnostics=dataset_recipe['diagnostics']) # Remove dataset sections from recipe recipe.pop('datasets', None) nested_delete(recipe, 'additional_datasets', in_place=True) # Update datasets section if 'datasets' in dataset_recipe: recipe['datasets'] = dataset_recipe['datasets'] for diag, dataset_diagnostic in dataset_recipe['diagnostics'].items(): if diag not in recipe['diagnostics']: recipe['diagnostics'][diag] = {} diagnostic = recipe['diagnostics'][diag] # Update diagnostic level datasets if 'additional_datasets' in dataset_diagnostic: additional_datasets = dataset_diagnostic['additional_datasets'] diagnostic['additional_datasets'] = additional_datasets # Update variable level datasets if 'variables' in dataset_diagnostic: diagnostic['variables'] = dataset_diagnostic['variables'] return recipe