"""Module for finding files on ESGF."""
import itertools
import logging
from functools import lru_cache
import pyesgf.search
import requests.exceptions
from ..config._esgf_pyclient import get_esgf_config
from ..local import (
_get_start_end_date,
_parse_period,
_replace_years_with_timerange,
_truncate_dates,
)
from ._download import ESGFFile
from .facets import DATASET_MAP, FACETS
logger = logging.getLogger(__name__)
def get_esgf_facets(variable):
"""Translate variable to facets for searching on ESGF."""
project = variable.get("project", "")
facets = {"project": project}
for our_name, esgf_name in FACETS[project].items():
if our_name in variable:
values = variable[our_name]
if values == "*":
# Wildcards can be specified on ESGF by omitting the facet
continue
if isinstance(values, (tuple, list)):
values = list(values)
else:
values = [values]
for i, value in enumerate(values):
if our_name == "dataset":
# Replace dataset name by ESGF name for dataset
values[i] = DATASET_MAP[project].get(value, value)
facets[esgf_name] = ",".join(values)
return facets
def select_latest_versions(files, versions):
"""Select only the latest version of files."""
result = []
def same_file(file):
"""Return a versionless identifier for a file."""
# Dataset without the version number
dataset = file.dataset.rsplit(".", 1)[0]
return (dataset, file.name)
if isinstance(versions, str):
versions = (versions,)
files = sorted(files, key=same_file)
for _, group in itertools.groupby(files, key=same_file):
group = sorted(group, reverse=True)
if versions:
selection = [f for f in group if f.facets["version"] in versions]
if not selection:
# Skip the file if it is not the requested version(s).
continue
group = selection
latest_version = group[0]
result.append(latest_version)
if len(group) > 1:
logger.debug(
"Only using the latest version %s, not %s",
latest_version,
group[1:],
)
return result
FIRST_ONLINE_INDEX_NODE = None
"""Remember the first index node that is online."""
def _search_index_nodes(facets):
"""Search for files on ESGF.
Parameters
----------
facets: :obj:`dict` of :obj:`str`
Facets to constrain the search.
Raises
------
FileNotFoundError
If the function was unable to connect to ESGF.
Returns
-------
pyesgf.search.results.ResultSet
A ResultSet containing :obj:`pyesgf.search.results.FileResult`s.
"""
cfg = get_esgf_config()
search_args = dict(cfg["search_connection"])
urls = search_args.pop("urls")
global FIRST_ONLINE_INDEX_NODE
if FIRST_ONLINE_INDEX_NODE:
urls.insert(0, urls.pop(urls.index(FIRST_ONLINE_INDEX_NODE)))
errors = []
for url in urls:
connection = pyesgf.search.SearchConnection(url=url, **search_args)
context = connection.new_context(
pyesgf.search.context.FileSearchContext,
**facets,
)
logger.debug("Searching %s for datasets using facets=%s", url, facets)
try:
results = context.search(
batch_size=500,
ignore_facet_check=True,
)
FIRST_ONLINE_INDEX_NODE = url
return list(results)
except (
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
requests.exceptions.Timeout,
) as error:
logger.debug("Unable to connect to %s due to %s", url, error)
errors.append(error)
raise FileNotFoundError(
"Failed to search ESGF, unable to connect:\n"
+ "\n".join(f"- {e}" for e in errors)
)
def esgf_search_files(facets):
"""Search for files on ESGF.
Parameters
----------
facets: :obj:`dict` of :obj:`str`
Facets to constrain the search.
Returns
-------
list of :py:class:`~ESGFFile`
The found files.
"""
results = _search_index_nodes(facets)
files = ESGFFile._from_results(results, facets)
msg = "none" if not files else "\n" + "\n".join(str(f) for f in files)
logger.debug(
"Found the following files matching facets %s: %s", facets, msg
)
return files
def select_by_time(files, timerange):
"""Select files containing data between a timerange."""
if "*" in timerange:
# TODO: support * combined with a period
return files
selection = []
for file in files:
start_date, end_date = _parse_period(timerange)
try:
start, end = _get_start_end_date(file)
except ValueError:
# If start and end year cannot be read from the filename
# just select everything.
selection.append(file)
else:
start_date, end = _truncate_dates(start_date, end)
end_date, start = _truncate_dates(end_date, start)
if start <= end_date and end >= start_date:
selection.append(file)
return selection
[docs]
def find_files(*, project, short_name, dataset, **facets):
"""Search for files on ESGF.
Parameters
----------
project : str
Choose from CMIP3, CMIP5, CMIP6, CORDEX, or obs4MIPs.
short_name : str
The name of the variable.
dataset : str
The name of the dataset.
**facets : typing.Union[str, list[str]]
Any other search facets. An ``'*'`` can be used to match
any value. By default, only the latest version of a file will
be returned. To select all versions use ``version='*'`` while other
omitted facets will default to ``'*'``. It is also
possible to specify multiple values for a facet, e.g.
``exp=['historical', 'ssp585']`` will match any file that belongs
to either the historical or ssp585 experiment.
The ``timerange`` facet can be specified in `ISO 8601 format
<https://en.wikipedia.org/wiki/ISO_8601>`__.
Note
----
A value of ``timerange='*'`` is supported, but combining a ``'*'`` with
a time or period :ref:`as supported in the recipe <datasets>` is currently
not supported and will return all found files.
Examples
--------
Examples of how to use this function for all supported projects.
Search for a CMIP3 dataset:
>>> find_files(
... project='CMIP3',
... frequency='mon',
... short_name='tas',
... dataset='cccma_cgcm3_1',
... exp='historical',
... ensemble='run1',
... ) # doctest: +SKIP
[ESGFFile:cmip3/CCCma/cccma_cgcm3_1/historical/mon/atmos/run1/tas/v1/tas_a1_20c3m_1_cgcm3.1_t47_1850_2000.nc]
Search for a CMIP5 dataset:
>>> find_files(
... project='CMIP5',
... mip='Amon',
... short_name='tas',
... dataset='inmcm4',
... exp='historical',
... ensemble='r1i1p1',
... ) # doctest: +SKIP
[ESGFFile:cmip5/output1/INM/inmcm4/historical/mon/atmos/Amon/r1i1p1/v20130207/tas_Amon_inmcm4_historical_r1i1p1_185001-200512.nc]
Search for a CMIP6 dataset:
>>> find_files(
... project='CMIP6',
... mip='Amon',
... short_name='tas',
... dataset='CanESM5',
... exp='historical',
... ensemble='r1i1p1f1',
... ) # doctest: +SKIP
[ESGFFile:CMIP6/CMIP/CCCma/CanESM5/historical/r1i1p1f1/Amon/tas/gn/v20190429/tas_Amon_CanESM5_historical_r1i1p1f1_gn_185001-201412.nc]
Search for a CORDEX dataset and limit the search results to files
containing data to the years in the range 1990-2000:
>>> find_files(
... project='CORDEX',
... frequency='mon',
... dataset='COSMO-crCLIM-v1-1',
... short_name='tas',
... exp='historical',
... ensemble='r1i1p1',
... domain='EUR-11',
... driver='MPI-M-MPI-ESM-LR',
... timerange='1990/2000',
... ) # doctest: +SKIP
[ESGFFile:cordex/output/EUR-11/CLMcom-ETH/MPI-M-MPI-ESM-LR/historical/r1i1p1/COSMO-crCLIM-v1-1/v1/mon/tas/v20191219/tas_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-ETH-COSMO-crCLIM-v1-1_v1_mon_198101-199012.nc,
ESGFFile:cordex/output/EUR-11/CLMcom-ETH/MPI-M-MPI-ESM-LR/historical/r1i1p1/COSMO-crCLIM-v1-1/v1/mon/tas/v20191219/tas_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-ETH-COSMO-crCLIM-v1-1_v1_mon_199101-200012.nc]
Search for an obs4MIPs dataset:
>>> find_files(
... project='obs4MIPs',
... frequency='mon',
... dataset='CERES-EBAF',
... short_name='rsutcs',
... ) # doctest: +SKIP
[ESGFFile:obs4MIPs/NASA-LaRC/CERES-EBAF/atmos/mon/v20160610/rsutcs_CERES-EBAF_L3B_Ed2-8_200003-201404.nc]
Search for any ensemble member:
>>> find_files(
... project='CMIP6',
... mip='Amon',
... short_name='tas',
... dataset='BCC-CSM2-MR',
... exp='historical',
... ensemble='*',
... ) # doctest: +SKIP
[ESGFFile:CMIP6/CMIP/BCC/BCC-CSM2-MR/historical/r1i1p1f1/Amon/tas/gn/v20181126/tas_Amon_BCC-CSM2-MR_historical_r1i1p1f1_gn_185001-201412.nc,
ESGFFile:CMIP6/CMIP/BCC/BCC-CSM2-MR/historical/r2i1p1f1/Amon/tas/gn/v20181115/tas_Amon_BCC-CSM2-MR_historical_r2i1p1f1_gn_185001-201412.nc,
ESGFFile:CMIP6/CMIP/BCC/BCC-CSM2-MR/historical/r3i1p1f1/Amon/tas/gn/v20181119/tas_Amon_BCC-CSM2-MR_historical_r3i1p1f1_gn_185001-201412.nc]
Search for all available versions of a file:
>>> find_files(
... project='CMIP5',
... mip='Amon',
... short_name='tas',
... dataset='CCSM4',
... exp='historical',
... ensemble='r1i1p1',
... version='*',
... ) # doctest: +SKIP
[ESGFFile:cmip5/output1/NCAR/CCSM4/historical/mon/atmos/Amon/r1i1p1/v20121031/tas_Amon_CCSM4_historical_r1i1p1_185001-200512.nc,
ESGFFile:cmip5/output1/NCAR/CCSM4/historical/mon/atmos/Amon/r1i1p1/v20130425/tas_Amon_CCSM4_historical_r1i1p1_185001-200512.nc,
ESGFFile:cmip5/output1/NCAR/CCSM4/historical/mon/atmos/Amon/r1i1p1/v20160829/tas_Amon_CCSM4_historical_r1i1p1_185001-200512.nc]
Search for a specific version of a file:
>>> find_files(
... project='CMIP5',
... mip='Amon',
... short_name='tas',
... dataset='CCSM4',
... exp='historical',
... ensemble='r1i1p1',
... version='v20130425',
... ) # doctest: +SKIP
[ESGFFile:cmip5/output1/NCAR/CCSM4/historical/mon/atmos/Amon/r1i1p1/v20130425/tas_Amon_CCSM4_historical_r1i1p1_185001-200512.nc]
Returns
-------
:obj:`list` of :obj:`ESGFFile`
A list of files that have been found.
""" # pylint: disable=locally-disabled, line-too-long
if project not in FACETS:
raise ValueError(
f"Unable to download from ESGF, because project {project} is not"
" on it or is not supported by the esmvalcore.esgf module."
)
# The project is required for the function to work.
facets["project"] = project
# The dataset and short_name facet are not strictly required,
# but without these it seems likely that the user is requesting
# more results than they intended.
facets["dataset"] = dataset
facets["short_name"] = short_name
# Convert lists to tuples to allow caching results
for facet, value in facets.items():
if isinstance(value, list):
facets[facet] = tuple(value)
return cached_search(**facets)
@lru_cache(10000)
def cached_search(**facets):
"""Search for files on ESGF.
A cached search function will speed up recipes that use the same
variable multiple times.
"""
esgf_facets = get_esgf_facets(facets)
files = esgf_search_files(esgf_facets)
if "version" not in facets or facets["version"] != "*":
files = select_latest_versions(files, facets.get("version"))
_replace_years_with_timerange(facets)
if "timerange" in facets:
files = select_by_time(files, facets["timerange"])
logger.debug("Selected files:\n%s", "\n".join(str(f) for f in files))
return files