"""Classes and functions for defining, finding, and loading data."""
from __future__ import annotations
import logging
import pprint
import re
import textwrap
import uuid
from copy import deepcopy
from fnmatch import fnmatchcase
from itertools import groupby
from pathlib import Path
from typing import Any, Iterator, Sequence, Union
from iris.cube import Cube
from esmvalcore import esgf, local
from esmvalcore._recipe import check
from esmvalcore._recipe.from_datasets import datasets_to_recipe
from esmvalcore.cmor.table import _get_mips, _update_cmor_facets
from esmvalcore.config import CFG, Session
from esmvalcore.config._config import (
get_activity,
get_extra_facets,
get_ignored_warnings,
get_institutes,
)
from esmvalcore.exceptions import InputFilesNotFound, RecipeError
from esmvalcore.local import (
_dates_to_timerange,
_get_output_file,
_get_start_end_date,
)
from esmvalcore.preprocessor import preprocess
from esmvalcore.typing import Facets, FacetValue
__all__ = [
"Dataset",
"INHERITED_FACETS",
"datasets_to_recipe",
]
logger = logging.getLogger(__name__)
File = Union[esgf.ESGFFile, local.LocalFile]
INHERITED_FACETS: list[str] = [
"dataset",
"domain",
"driver",
"grid",
"project",
"timerange",
]
"""Inherited facets.
Supplementary datasets created based on the available files using the
:func:`Dataset.from_files` method will inherit the values of these facets from
the main dataset.
"""
def _augment(base: dict, update: dict):
"""Update dict `base` with values from dict `update`."""
for key in update:
if key not in base:
base[key] = update[key]
def _isglob(facet_value: FacetValue | None) -> bool:
"""Check if a facet value is a glob pattern."""
return isinstance(facet_value, str) and bool(
re.match(r".*[\*\?]+.*|.*\[.*\].*", facet_value)
)
def _ismatch(facet_value: FacetValue, pattern: FacetValue) -> bool:
"""Check if a facet value matches a glob pattern."""
return (
isinstance(pattern, str)
and isinstance(facet_value, str)
and fnmatchcase(facet_value, pattern)
)
[docs]
class Dataset:
"""Define datasets, find the related files, and load them.
Parameters
----------
**facets
Facets describing the dataset. See
:obj:`esmvalcore.esgf.facets.FACETS` for the mapping between
the facet names used by ESMValCore and those used on ESGF.
Attributes
----------
supplementaries : list[Dataset]
List of supplementary datasets.
facets: :obj:`esmvalcore.typing.Facets`
Facets describing the dataset.
"""
_SUMMARY_FACETS = (
"short_name",
"mip",
"project",
"dataset",
"rcm_version",
"driver",
"domain",
"activity",
"exp",
"ensemble",
"grid",
"version",
)
"""Facets used to create a summary of a Dataset instance."""
def __init__(self, **facets: FacetValue):
self.facets: Facets = {}
self.supplementaries: list["Dataset"] = []
self._persist: set[str] = set()
self._session: Session | None = None
self._files: Sequence[File] | None = None
self._file_globs: Sequence[Path] | None = None
for key, value in facets.items():
self.set_facet(key, deepcopy(value), persist=True)
[docs]
@staticmethod
def from_recipe(
recipe: Path | str | dict,
session: Session,
) -> list["Dataset"]:
"""Read datasets from a recipe.
Parameters
----------
recipe
:ref:`Recipe <recipe>` to load the datasets from. The value
provided here should be either a path to a file, a recipe file
that has been loaded using e.g. :func:`yaml.safe_load`, or an
:obj:`str` that can be loaded using :func:`yaml.safe_load`.
session
Datasets to use in the recipe.
Returns
-------
list[Dataset]
A list of datasets.
"""
from esmvalcore._recipe.to_datasets import datasets_from_recipe
return datasets_from_recipe(recipe, session)
def _file_to_dataset(
self,
file: esgf.ESGFFile | local.LocalFile,
) -> Dataset:
"""Create a dataset from a file with a `facets` attribute."""
facets = dict(file.facets)
if "version" not in self.facets:
# Remove version facet if no specific version requested
facets.pop("version", None)
updated_facets = {
f: v
for f, v in facets.items()
if f in self.facets
and _isglob(self.facets[f])
and _ismatch(v, self.facets[f])
}
dataset = self.copy()
dataset.facets.update(updated_facets)
# If possible, remove unexpanded facets that can be automatically
# populated.
unexpanded = {f for f, v in dataset.facets.items() if _isglob(v)}
required_for_augment = {"project", "mip", "short_name", "dataset"}
if unexpanded and not unexpanded & required_for_augment:
copy = dataset.copy()
copy.supplementaries = []
for facet in unexpanded:
copy.facets.pop(facet)
copy.augment_facets()
for facet in unexpanded:
if facet in copy.facets:
dataset.facets.pop(facet)
return dataset
def _get_available_datasets(self) -> Iterator[Dataset]:
"""Yield datasets based on the available files.
This function requires that self.facets['mip'] is not a glob pattern.
"""
dataset_template = self.copy()
dataset_template.supplementaries = []
if _isglob(dataset_template.facets.get("timerange")):
# Remove wildcard `timerange` facet, because data finding cannot
# handle it
dataset_template.facets.pop("timerange")
seen = set()
partially_defined = []
expanded = False
for file in dataset_template.files:
dataset = self._file_to_dataset(file)
# Filter out identical datasets
facetset = frozenset(
(f, frozenset(v) if isinstance(v, list) else v)
for f, v in dataset.facets.items()
)
if facetset not in seen:
seen.add(facetset)
if any(
_isglob(v)
for f, v in dataset.facets.items()
if f != "timerange"
):
partially_defined.append((dataset, file))
else:
dataset._update_timerange()
dataset._supplementaries_from_files()
expanded = True
yield dataset
# Only yield datasets with globs if there is no better alternative
for dataset, file in partially_defined:
msg = (
f"{dataset} with unexpanded wildcards, created from file "
f"{file} with facets {file.facets}. Are the missing facets "
"in the path to the file?"
if isinstance(file, local.LocalFile)
else "available on ESGF?"
)
if expanded:
logger.info("Ignoring %s", msg)
else:
logger.debug(
"Not updating timerange and supplementaries for %s "
"because it still contains wildcards.",
msg,
)
yield dataset
[docs]
def from_files(self) -> Iterator["Dataset"]:
"""Create datasets based on the available files.
The facet values for local files are retrieved from the directory tree
where the directories represent the facets values.
Reading facet values from file names is not yet supported.
See :ref:`CMOR-DRS` for more information on this kind of file
organization.
:func:`glob.glob` patterns can be used as facet values to select
multiple datasets.
If for some of the datasets not all glob patterns can be expanded
(e.g. because the required facet values cannot be inferred from the
directory names), these datasets will be ignored, unless this happens
to be all datasets.
If :func:`glob.glob` patterns are used in supplementary variables and
multiple matching datasets are found, only the supplementary dataset
that has most facets in common with the main dataset will be attached.
Supplementary datasets will in inherit the facet values from the main
dataset for those facets listed in :obj:`INHERITED_FACETS`.
Examples
--------
See :ref:`/notebooks/discovering-data.ipynb` for example use cases.
Yields
------
Dataset
Datasets representing the available files.
"""
expanded = False
if any(_isglob(v) for v in self.facets.values()):
if _isglob(self.facets["mip"]):
available_mips = _get_mips(
self.facets["project"], # type: ignore
self.facets["short_name"], # type: ignore
)
mips = [
mip
for mip in available_mips
if _ismatch(mip, self.facets["mip"])
]
else:
mips = [self.facets["mip"]] # type: ignore
for mip in mips:
dataset_template = self.copy(mip=mip)
for dataset in dataset_template._get_available_datasets():
expanded = True
yield dataset
if not expanded:
# If the definition contains no wildcards, no files were found,
# or the file facets didn't match the specification, yield the
# original, but do expand any supplementary globs.
self._supplementaries_from_files()
yield self
def _supplementaries_from_files(self) -> None:
"""Expand wildcards in supplementary datasets."""
supplementaries: list[Dataset] = []
for supplementary_ds in self.supplementaries:
for facet in INHERITED_FACETS:
if facet in self.facets:
supplementary_ds.facets[facet] = self.facets[facet]
supplementaries.extend(supplementary_ds.from_files())
self.supplementaries = supplementaries
self._remove_unexpanded_supplementaries()
self._remove_duplicate_supplementaries()
self._fix_fx_exp()
def _remove_unexpanded_supplementaries(self) -> None:
"""Remove supplementaries where wildcards could not be expanded."""
supplementaries = []
for supplementary_ds in self.supplementaries:
unexpanded = [
f for f, v in supplementary_ds.facets.items() if _isglob(v)
]
if unexpanded:
logger.info(
"For %s: ignoring supplementary variable '%s', "
"unable to expand wildcards %s.",
self.summary(shorten=True),
supplementary_ds.facets["short_name"],
", ".join(f"'{f}'" for f in unexpanded),
)
else:
supplementaries.append(supplementary_ds)
self.supplementaries = supplementaries
def _match(self, other: Dataset) -> int:
"""Compute the match between two datasets."""
score = 0
for facet, value2 in self.facets.items():
if facet in other.facets:
value1 = other.facets[facet]
if isinstance(value1, (list, tuple)):
if isinstance(value2, (list, tuple)):
score += any(elem in value2 for elem in value1)
else:
score += value2 in value1
else:
if isinstance(value2, (list, tuple)):
score += value1 in value2
else:
score += value1 == value2
return score
def _remove_duplicate_supplementaries(self) -> None:
"""Remove supplementaries that are duplicates."""
not_used = []
supplementaries = list(self.supplementaries)
self.supplementaries.clear()
for _, duplicates in groupby(
supplementaries, key=lambda ds: ds["short_name"]
):
group = sorted(duplicates, key=self._match, reverse=True)
self.supplementaries.append(group[0])
not_used.extend(group[1:])
if not_used:
logger.debug(
"List of all supplementary datasets found for %s:\n%s",
self.summary(shorten=True),
"\n".join(
sorted(ds.summary(shorten=True) for ds in supplementaries)
),
)
def _fix_fx_exp(self) -> None:
for supplementary_ds in self.supplementaries:
exps = supplementary_ds.facets.get("exp")
frequency = supplementary_ds.facets.get("frequency")
if isinstance(exps, list) and len(exps) > 1 and frequency == "fx":
for exp in exps:
dataset = supplementary_ds.copy(exp=exp)
if dataset.files:
supplementary_ds.facets["exp"] = exp
logger.info(
"Corrected wrong 'exp' from '%s' to '%s' for "
"supplementary variable '%s' of %s",
exps,
exp,
supplementary_ds.facets["short_name"],
self.summary(shorten=True),
)
break
[docs]
def copy(self, **facets: FacetValue) -> "Dataset":
"""Create a copy.
Parameters
----------
**facets
Update these facets in the copy. Note that for supplementary
datasets attached to the dataset, the ``'short_name'`` and
``'mip'`` facets will not be updated with these values.
Returns
-------
Dataset
A copy of the dataset.
"""
new = self.__class__()
new._session = self._session
for key, value in self.facets.items():
new.set_facet(key, deepcopy(value), key in self._persist)
for key, value in facets.items():
new.set_facet(key, deepcopy(value))
for supplementary in self.supplementaries:
# The short_name and mip of the supplementary variable are probably
# different from the main variable, so don't copy those facets.
skip = ("short_name", "mip")
supplementary_facets = {
k: v for k, v in facets.items() if k not in skip
}
new_supplementary = supplementary.copy(**supplementary_facets)
new.supplementaries.append(new_supplementary)
return new
def __eq__(self, other) -> bool:
"""Compare with another dataset."""
return (
isinstance(other, self.__class__)
and self._session == other._session
and self.facets == other.facets
and self.supplementaries == other.supplementaries
)
def __repr__(self) -> str:
"""Create a string representation."""
first_keys = (
"diagnostic",
"variable_group",
"dataset",
"project",
"mip",
"short_name",
)
def facets2str(facets):
view = {k: facets[k] for k in first_keys if k in facets}
for key, value in sorted(facets.items()):
if key not in first_keys:
view[key] = value
return pprint.pformat(view, sort_dicts=False)
txt = [
f"{self.__class__.__name__}:",
facets2str(self.facets),
]
if self.supplementaries:
txt.append("supplementaries:")
txt.extend(
textwrap.indent(facets2str(a.facets), " ")
for a in self.supplementaries
)
if self._session:
txt.append(f"session: '{self.session.session_name}'")
return "\n".join(txt)
def _get_joined_summary_facets(
self,
separator: str,
join_lists: bool = False,
) -> str:
"""Get string consisting of joined summary facets."""
summary_facets_vals = []
for key in self._SUMMARY_FACETS:
if key not in self.facets:
continue
val = self.facets[key]
if join_lists and isinstance(val, (tuple, list)):
val = "-".join(str(elem) for elem in val)
else:
val = str(val)
summary_facets_vals.append(val)
return separator.join(summary_facets_vals)
[docs]
def summary(self, shorten: bool = False) -> str:
"""Summarize the content of dataset.
Parameters
----------
shorten
Shorten the summary.
Returns
-------
str
A summary describing the dataset.
"""
if not shorten:
return repr(self)
title = self.__class__.__name__
txt = f"{title}: " + self._get_joined_summary_facets(", ")
def supplementary_summary(dataset):
return ", ".join(
str(dataset.facets[k])
for k in self._SUMMARY_FACETS
if k in dataset.facets and dataset[k] != self.facets.get(k)
)
if self.supplementaries:
txt += (
", supplementaries: "
+ "; ".join(
supplementary_summary(a) for a in self.supplementaries
)
+ ""
)
return txt
def __getitem__(self, key):
"""Get a facet value."""
return self.facets[key]
def __setitem__(self, key, value):
"""Set a facet value."""
self.facets[key] = value
[docs]
def set_facet(self, key: str, value: FacetValue, persist: bool = True):
"""Set facet.
Parameters
----------
key
The name of the facet.
value
The value of the facet.
persist
When writing a dataset to a recipe, only persistent facets
will get written.
"""
self.facets[key] = value
if persist:
self._persist.add(key)
@property
def minimal_facets(self) -> Facets:
"""Return a dictionary with the persistent facets."""
return {k: v for k, v in self.facets.items() if k in self._persist}
[docs]
def set_version(self) -> None:
"""Set the ``'version'`` facet based on the available data."""
versions: set[str] = set()
for file in self.files:
if "version" in file.facets:
versions.add(file.facets["version"]) # type: ignore
version = versions.pop() if len(versions) == 1 else sorted(versions)
if version:
self.set_facet("version", version)
for supplementary_ds in self.supplementaries:
supplementary_ds.set_version()
@property
def session(self) -> Session:
"""A :obj:`esmvalcore.config.Session` associated with the dataset."""
if self._session is None:
session_name = f"session-{uuid.uuid4()}"
self._session = CFG.start_session(session_name)
return self._session
@session.setter
def session(self, session: Session | None) -> None:
self._session = session
for supplementary in self.supplementaries:
supplementary._session = session
[docs]
def add_supplementary(self, **facets: FacetValue) -> None:
"""Add an supplementary dataset.
This is a convenience function that will create a copy of the current
dataset, update its facets with the values specified in ``**facets``,
and append it to :obj:`Dataset.supplementaries`. For more control
over the creation of the supplementary dataset, first create a new
:class:`Dataset` describing the supplementary dataset and then append
it to :obj:`Dataset.supplementaries`.
Parameters
----------
**facets
Facets describing the supplementary variable.
"""
supplementary = self.copy(**facets)
supplementary.supplementaries = []
self.supplementaries.append(supplementary)
[docs]
def augment_facets(self) -> None:
"""Add extra facets.
This function will update the dataset with additional facets
from various sources.
"""
self._augment_facets()
for supplementary in self.supplementaries:
supplementary._augment_facets()
def _augment_facets(self):
extra_facets = get_extra_facets(self, self.session["extra_facets_dir"])
_augment(self.facets, extra_facets)
if "institute" not in self.facets:
institute = get_institutes(self.facets)
if institute:
self.facets["institute"] = institute
if "activity" not in self.facets:
activity = get_activity(self.facets)
if activity:
self.facets["activity"] = activity
_update_cmor_facets(self.facets)
if self.facets.get("frequency") == "fx":
self.facets.pop("timerange", None)
[docs]
def find_files(self) -> None:
"""Find files.
Look for files and populate the :obj:`Dataset.files` property of
the dataset and its supplementary datasets.
"""
self.augment_facets()
if _isglob(self.facets.get("timerange")):
self._update_timerange()
self._find_files()
for supplementary in self.supplementaries:
supplementary.find_files()
def _find_files(self) -> None:
self.files, self._file_globs = local.find_files(
debug=True,
**self.facets,
)
# If project does not support automatic downloads from ESGF, stop here
if self.facets["project"] not in esgf.facets.FACETS:
return
# 'never' mode: never download files from ESGF and stop here
if self.session["search_esgf"] == "never":
return
# 'when_missing' mode: if files are available locally, do not check
# ESGF
if self.session["search_esgf"] == "when_missing":
try:
check.data_availability(self, log=False)
except InputFilesNotFound:
pass # search ESGF for files
else:
return # use local files
# Local files are not available in 'when_missing' mode or 'always' mode
# is used: check ESGF
local_files = {f.name: f for f in self.files}
search_result = esgf.find_files(**self.facets)
for file in search_result:
if file.name not in local_files:
# Use ESGF files that are not available locally.
self.files.append(file)
else:
# Use ESGF files that are newer than the locally available
# files.
local_file = local_files[file.name]
if "version" in local_file.facets:
if file.facets["version"] > local_file.facets["version"]:
idx = self.files.index(local_file)
self.files[idx] = file
@property
def files(self) -> Sequence[File]:
"""The files associated with this dataset."""
if self._files is None:
self.find_files()
return self._files # type: ignore
@files.setter
def files(self, value):
self._files = value
[docs]
def load(self) -> Cube:
"""Load dataset.
Raises
------
InputFilesNotFound
When no files were found.
Returns
-------
iris.cube.Cube
An :mod:`iris` cube with the data corresponding the the dataset.
"""
input_files = list(self.files)
for supplementary_dataset in self.supplementaries:
input_files.extend(supplementary_dataset.files)
esgf.download(input_files, self.session["download_dir"])
cube = self._load()
supplementary_cubes = []
for supplementary_dataset in self.supplementaries:
supplementary_cube = supplementary_dataset._load()
supplementary_cubes.append(supplementary_cube)
output_file = _get_output_file(self.facets, self.session.preproc_dir)
cubes = preprocess(
[cube],
"add_supplementary_variables",
input_files=input_files,
output_file=output_file,
debug=self.session["save_intermediary_cubes"],
supplementary_cubes=supplementary_cubes,
)
return cubes[0]
def _load(self) -> Cube:
"""Load self.files into an iris cube and return it."""
if not self.files:
lines = [
f"No files were found for {self}",
"locally using glob patterns:",
"\n".join(str(f) for f in self._file_globs or []),
]
if self.session["search_esgf"] != "never":
lines.append("or on ESGF.")
msg = "\n".join(lines)
raise InputFilesNotFound(msg)
output_file = _get_output_file(self.facets, self.session.preproc_dir)
fix_dir_prefix = Path(
self.session._fixed_file_dir,
self._get_joined_summary_facets("_", join_lists=True) + "_",
)
settings: dict[str, dict[str, Any]] = {}
settings["fix_file"] = {
"output_dir": fix_dir_prefix,
"add_unique_suffix": True,
"session": self.session,
**self.facets,
}
settings["load"] = {
"ignore_warnings": get_ignored_warnings(
self.facets["project"], "load"
),
}
settings["fix_metadata"] = {
"session": self.session,
**self.facets,
}
settings["concatenate"] = {"check_level": self.session["check_level"]}
settings["cmor_check_metadata"] = {
"check_level": self.session["check_level"],
"cmor_table": self.facets["project"],
"mip": self.facets["mip"],
"frequency": self.facets["frequency"],
"short_name": self.facets["short_name"],
}
if "timerange" in self.facets:
settings["clip_timerange"] = {
"timerange": self.facets["timerange"],
}
settings["fix_data"] = {
"session": self.session,
**self.facets,
}
settings["cmor_check_data"] = {
"check_level": self.session["check_level"],
"cmor_table": self.facets["project"],
"mip": self.facets["mip"],
"frequency": self.facets["frequency"],
"short_name": self.facets["short_name"],
}
result = [
file.local_file(self.session["download_dir"])
if isinstance(file, esgf.ESGFFile)
else file
for file in self.files
]
for step, kwargs in settings.items():
result = preprocess(
result,
step,
input_files=self.files,
output_file=output_file,
debug=self.session["save_intermediary_cubes"],
**kwargs,
)
cube = result[0]
return cube
[docs]
def from_ranges(self) -> list["Dataset"]:
"""Create a list of datasets from short notations.
This expands the ``'ensemble'`` and ``'sub_experiment'`` facets in the
dataset definition if they are ranges.
For example ``'ensemble'='r(1:3)i1p1f1'`` will be expanded to
three datasets, with ``'ensemble'`` values ``'r1i1p1f1'``,
``'r2i1p1f1'``, ``'r3i1p1f1'``.
Returns
-------
list[Dataset]
The datasets.
"""
datasets = [self]
for key in "ensemble", "sub_experiment":
if key in self.facets:
datasets = [
ds.copy(**{key: value})
for ds in datasets
for value in ds._expand_range(key)
]
return datasets
def _expand_range(self, input_tag):
"""Expand ranges such as ensemble members or start dates.
Expansion only supports ensembles defined as strings, not lists.
"""
expanded = []
regex = re.compile(r"\(\d+:\d+\)")
def expand_range(input_range):
match = regex.search(input_range)
if match:
start, end = match.group(0)[1:-1].split(":")
for i in range(int(start), int(end) + 1):
range_ = regex.sub(str(i), input_range, 1)
expand_range(range_)
else:
expanded.append(input_range)
tag = self.facets.get(input_tag, "")
if isinstance(tag, (list, tuple)):
for elem in tag:
if regex.search(elem):
raise RecipeError(
f"In {self}: {input_tag} expansion "
f"cannot be combined with {input_tag} lists"
)
expanded.append(tag)
else:
expand_range(tag)
return expanded
def _update_timerange(self):
"""Update wildcards in timerange with found datetime values.
If the timerange is given as a year, it ensures it's formatted
as a 4-digit value (YYYY).
"""
dataset = self.copy()
dataset.supplementaries = []
dataset.augment_facets()
if "timerange" not in dataset.facets:
self.facets.pop("timerange", None)
return
timerange = self.facets["timerange"]
if not isinstance(timerange, str):
raise TypeError(
f"timerange should be a string, got '{timerange!r}'"
)
check.valid_time_selection(timerange)
if "*" in timerange:
dataset = self.copy()
dataset.facets.pop("timerange")
dataset.supplementaries = []
check.data_availability(dataset)
intervals = [_get_start_end_date(f) for f in dataset.files]
min_date = min(interval[0] for interval in intervals)
max_date = max(interval[1] for interval in intervals)
if timerange == "*":
timerange = f"{min_date}/{max_date}"
if "*" in timerange.split("/")[0]:
timerange = timerange.replace("*", min_date)
if "*" in timerange.split("/")[1]:
timerange = timerange.replace("*", max_date)
# Make sure that years are in format YYYY
start_date, end_date = timerange.split("/")
timerange = _dates_to_timerange(start_date, end_date)
check.valid_time_selection(timerange)
self.set_facet("timerange", timerange)