Source code for esmvalcore.cmor.table

"""
CMOR information reader for ESMValTool.

Read variable information from CMOR 2 and CMOR 3 tables and make it easily
available for the other components of ESMValTool
"""
import copy
import errno
import glob
import json
import logging
import os
from functools import total_ordering
from pathlib import Path

import yaml

logger = logging.getLogger(__name__)

CMOR_TABLES = {}
"""dict of str, obj: CMOR info objects."""


[docs]def get_var_info(project, mip, short_name): """Get variable information. Parameters ---------- project : str Dataset's project. mip : str Variable's cmor table. short_name : str Variable's short name. """ return CMOR_TABLES[project].get_variable(mip, short_name)
[docs]def read_cmor_tables(cfg_developer=None): """Read cmor tables required in the configuration. Parameters ---------- cfg_developer : dict of str Parsed config-developer file """ if cfg_developer is None: cfg_file = Path(__file__).parent.parent / 'config-developer.yml' with cfg_file.open() as file: cfg_developer = yaml.safe_load(file) custom = CustomInfo() CMOR_TABLES.clear() CMOR_TABLES['custom'] = custom install_dir = os.path.dirname(os.path.realpath(__file__)) for table in cfg_developer: project = cfg_developer[table] cmor_type = project.get('cmor_type', 'CMIP5') default_path = os.path.join(install_dir, 'tables', cmor_type.lower()) table_path = project.get('cmor_path', default_path) table_path = os.path.expandvars(os.path.expanduser(table_path)) cmor_strict = project.get('cmor_strict', True) default_table_prefix = project.get('cmor_default_table_prefix', '') if cmor_type == 'CMIP3': CMOR_TABLES[table] = CMIP3Info( table_path, default=custom, strict=cmor_strict, ) elif cmor_type == 'CMIP5': CMOR_TABLES[table] = CMIP5Info( table_path, default=custom, strict=cmor_strict, ) elif cmor_type == 'CMIP6': CMOR_TABLES[table] = CMIP6Info( table_path, default=custom, strict=cmor_strict, default_table_prefix=default_table_prefix)
[docs]class CMIP6Info(object): """ Class to read CMIP6-like data request. This uses CMOR 3 json format Parameters ---------- cmor_tables_path: basestring Path to the folder containing the Tables folder with the json files default: object Default table to look variables on if not found strict: bool If False, will look for a variable in other tables if it can not be found in the requested one """ _CMIP_5to6_varname = { 'sic': 'siconc', 'sit': 'sivol', 'tro3': 'o3', 'usi': 'siu', 'vsi': 'siv', } def __init__(self, cmor_tables_path, default=None, strict=True, default_table_prefix=''): cmor_tables_path = self._get_cmor_path(cmor_tables_path) self._cmor_folder = os.path.join(cmor_tables_path, 'Tables') if glob.glob(os.path.join(self._cmor_folder, '*_CV.json')): self._load_controlled_vocabulary() self.default = default self.strict = strict self.default_table_prefix = default_table_prefix self.tables = {} self.var_to_freq = {} self.strict = strict self._load_coordinates() for json_file in glob.glob(os.path.join(self._cmor_folder, '*.json')): if 'CV_test' in json_file or 'grids' in json_file: continue try: self._load_table(json_file) except Exception: msg = f"Exception raised when loading {json_file}" # Logger may not be ready at this stage if logger.handlers: logger.error(msg) else: print(msg) raise @staticmethod def _get_cmor_path(cmor_tables_path): if os.path.isdir(cmor_tables_path): return cmor_tables_path cwd = os.path.dirname(os.path.realpath(__file__)) cmor_tables_path = os.path.join(cwd, 'tables', cmor_tables_path) if os.path.isdir(cmor_tables_path): return cmor_tables_path raise ValueError( 'CMOR tables not found in {}'.format(cmor_tables_path)) def _load_table(self, json_file): with open(json_file) as inf: raw_data = json.loads(inf.read()) if not self._is_table(raw_data): return table = TableInfo() header = raw_data['Header'] table.name = header['table_id'].split(' ')[-1] self.tables[table.name] = table generic_levels = header['generic_levels'].split() table.frequency = header.get('frequency', '') self.var_to_freq[table.name] = {} for var_name, var_data in raw_data['variable_entry'].items(): var = VariableInfo('CMIP6', var_name) var.read_json(var_data, table.frequency) self._assign_dimensions(var, generic_levels) table[var_name] = var self.var_to_freq[table.name][var_name] = var.frequency if not table.frequency: from collections import Counter var_freqs = (var.frequency for var in table.values()) table_freq, _ = Counter(var_freqs).most_common(1)[0] table.frequency = table_freq self.tables[table.name] = table def _assign_dimensions(self, var, generic_levels): for dimension in var.dimensions: if dimension in generic_levels: coord = CoordinateInfo(dimension) coord.generic_level = True coord.axis = 'Z' else: try: coord = self.coords[dimension] except KeyError: logger.exception( 'Can not find dimension %s for variable %s', dimension, var) raise var.coordinates[dimension] = coord def _load_coordinates(self): self.coords = {} for json_file in glob.glob( os.path.join(self._cmor_folder, '*coordinate*.json')): with open(json_file) as inf: table_data = json.loads(inf.read()) for coord_name in table_data['axis_entry'].keys(): coord = CoordinateInfo(coord_name) coord.read_json(table_data['axis_entry'][coord_name]) self.coords[coord_name] = coord def _load_controlled_vocabulary(self): self.activities = {} self.institutes = {} for json_file in glob.glob(os.path.join(self._cmor_folder, '*_CV.json')): with open(json_file) as inf: table_data = json.loads(inf.read()) try: exps = table_data['CV']['experiment_id'] for exp_id in exps: activity = exps[exp_id]['activity_id'][0].split(' ') self.activities[exp_id] = activity except (KeyError, AttributeError): pass try: sources = table_data['CV']['source_id'] for source_id in sources: institution = sources[source_id]['institution_id'] self.institutes[source_id] = institution except (KeyError, AttributeError): pass
[docs] def get_table(self, table): """ Search and return the table info. Parameters ---------- table: basestring Table name Returns ------- TableInfo Return the TableInfo object for the requested table if found, returns None if not """ try: return self.tables[table] except KeyError: return self.tables.get(''.join((self.default_table_prefix, table)))
[docs] def get_variable(self, table_name, short_name, derived=False): """ Search and return the variable info. Parameters ---------- table_name: basestring Table name short_name: basestring Variable's short name derived: bool, optional Variable is derived. Info retrieval is less strict Returns ------- VariableInfo Return the VariableInfo object for the requested variable if found, returns None if not """ table = self.get_table(table_name) if table: try: return table[short_name] except KeyError: pass if short_name in CMIP6Info._CMIP_5to6_varname: new_short_name = CMIP6Info._CMIP_5to6_varname[short_name] return self.get_variable(table_name, new_short_name, derived) var_info = None if not self.strict: for table_vars in sorted(self.tables.values()): if short_name in table_vars: var_info = table_vars[short_name] break if not var_info and (not self.strict or derived): var_info = self.default.get_variable(table_name, short_name) if var_info: mip_info = self.get_table(table_name) if mip_info: var_info = var_info.copy() var_info.frequency = mip_info.frequency return var_info
@staticmethod def _is_table(table_data): if 'variable_entry' not in table_data: return False if 'Header' not in table_data: return False return True
[docs]@total_ordering class TableInfo(dict): """Container class for storing a CMOR table.""" def __init__(self, *args, **kwargs): """Create a new TableInfo object for storing VariableInfo objects.""" super(TableInfo, self).__init__(*args, **kwargs) self.name = '' self.frequency = '' self.realm = '' def __eq__(self, other): return (self.name, self.frequency, self.realm) == \ (other.name, other.frequency, other.realm) def __ne__(self, other): return (self.name, self.frequency, self.realm) != \ (other.name, other.frequency, other.realm) def __lt__(self, other): return (self.name, self.frequency, self.realm) < \ (other.name, other.frequency, other.realm)
[docs]class JsonInfo(object): """ Base class for the info classes. Provides common utility methods to read json variables """ def __init__(self): self._json_data = {} def _read_json_variable(self, parameter, default=''): """ Read a json parameter in json_data. Parameters ---------- parameter: str parameter to read Returns ------- str Option's value or empty string if parameter is not present """ if parameter not in self._json_data: return default return str(self._json_data[parameter]) def _read_json_list_variable(self, parameter): """ Read a json list parameter in json_data. Parameters ---------- parameter: str parameter to read Returns ------- str Option's value or empty list if parameter is not present """ if parameter not in self._json_data: return [] return self._json_data[parameter]
[docs]class VariableInfo(JsonInfo): """Class to read and store variable information.""" def __init__(self, table_type, short_name): """ Class to read and store variable information. Parameters ---------- short_name: str variable's short name """ super(VariableInfo, self).__init__() self.table_type = table_type self.modeling_realm = [] """Modeling realm""" self.short_name = short_name """Short name""" self.standard_name = '' """Standard name""" self.long_name = '' """Long name""" self.units = '' """Data units""" self.valid_min = '' """Minimum admitted value""" self.valid_max = '' """Maximum admitted value""" self.frequency = '' """Data frequency""" self.positive = '' """Increasing direction""" self.dimensions = [] """List of dimensions""" self.coordinates = {} """Coordinates This is a dict with the names of the dimensions as keys and CoordinateInfo objects as values. """ self._json_data = None
[docs] def copy(self): """ Return a shallow copy of VariableInfo. Returns ------- VariableInfo Shallow copy of this object """ return copy.copy(self)
[docs] def read_json(self, json_data, default_freq): """ Read variable information from json. Non-present options will be set to empty Parameters ---------- json_data: dict dictionary created by the json reader containing variable information default_freq: str Default frequency to use if it is not defined at variable level """ self._json_data = json_data self.standard_name = self._read_json_variable('standard_name') self.long_name = self._read_json_variable('long_name') self.units = self._read_json_variable('units') self.valid_min = self._read_json_variable('valid_min') self.valid_max = self._read_json_variable('valid_max') self.positive = self._read_json_variable('positive') self.modeling_realm = \ self._read_json_variable('modeling_realm').split() self.frequency = self._read_json_variable('frequency', default_freq) self.dimensions = self._read_json_variable('dimensions').split()
[docs]class CoordinateInfo(JsonInfo): """Class to read and store coordinate information.""" def __init__(self, name): """ Class to read and store coordinate information. Parameters ---------- name: str coordinate's name """ super(CoordinateInfo, self).__init__() self.name = name self.generic_level = False self.axis = "" """Axis""" self.value = "" """Coordinate value""" self.standard_name = "" """Standard name""" self.long_name = "" """Long name""" self.out_name = "" """ Out name This is the name of the variable in the file """ self.var_name = "" """Short name""" self.units = "" """Units""" self.stored_direction = "" """Direction in which the coordinate increases""" self.requested = [] """Values requested""" self.valid_min = "" """Minimum allowed value""" self.valid_max = "" """Maximum allowed value""" self.must_have_bounds = "" """Whether bounds are required on this dimension"""
[docs] def read_json(self, json_data): """ Read coordinate information from json. Non-present options will be set to empty Parameters ---------- json_data: dict dictionary created by the json reader containing coordinate information """ self._json_data = json_data self.axis = self._read_json_variable('axis') self.value = self._read_json_variable('value') self.out_name = self._read_json_variable('out_name') self.var_name = self._read_json_variable('var_name') self.standard_name = self._read_json_variable('standard_name') self.long_name = self._read_json_variable('long_name') self.units = self._read_json_variable('units') self.stored_direction = self._read_json_variable('stored_direction') self.valid_min = self._read_json_variable('valid_min') self.valid_max = self._read_json_variable('valid_max') self.requested = self._read_json_list_variable('requested') self.must_have_bounds = self._read_json_variable('must_have_bounds')
[docs]class CMIP5Info(object): """ Class to read CMIP5-like data request. Parameters ---------- cmor_tables_path: basestring Path to the folder containing the Tables folder with the json files default: object Default table to look variables on if not found strict: bool If False, will look for a variable in other tables if it can not be found in the requested one """ def __init__(self, cmor_tables_path, default=None, strict=True): cmor_tables_path = self._get_cmor_path(cmor_tables_path) self._cmor_folder = os.path.join(cmor_tables_path, 'Tables') if not os.path.isdir(self._cmor_folder): raise OSError(errno.ENOTDIR, "CMOR tables path is not a directory", self._cmor_folder) self.strict = strict self.tables = {} self.coords = {} self.default = default self.strict = strict self._current_table = None self._last_line_read = None for table_file in glob.glob(os.path.join(self._cmor_folder, '*')): if '_grids' in table_file: continue try: self._load_table(table_file) except Exception: msg = f"Exception raised when loading {table_file}" # Logger may not be ready at this stage if logger.handlers: logger.error(msg) else: print(msg) raise @staticmethod def _get_cmor_path(cmor_tables_path): if os.path.isdir(cmor_tables_path): return cmor_tables_path cwd = os.path.dirname(os.path.realpath(__file__)) cmor_tables_path = os.path.join(cwd, 'tables', cmor_tables_path) return cmor_tables_path def _load_table(self, table_file, table_name=''): if table_name and table_name in self.tables: # special case used for updating a table with custom variable file table = self.tables[table_name] else: # default case: table name is first line of table file table = None self._read_table_file(table_file, table) def _read_table_file(self, table_file, table=None): with open(table_file) as self._current_table: self._read_line() while True: key, value = self._last_line_read if key == 'table_id': table = TableInfo() table.name = value[len('Table '):] self.tables[table.name] = table elif key == 'frequency': table.frequency = value elif key == 'modeling_realm': table.realm = value elif key == 'generic_levels': for dim in value.split(' '): coord = CoordinateInfo(dim) coord.generic_level = True coord.axis = 'Z' self.coords[dim] = coord elif key == 'axis_entry': self.coords[value] = self._read_coordinate(value) continue elif key == 'variable_entry': table[value] = self._read_variable(value, table.frequency) continue if not self._read_line(): return def _read_line(self): line = self._current_table.readline() if line == '': return False if line.startswith('!'): return self._read_line() line = line.replace('\n', '') if '!' in line: line = line[:line.index('!')] line = line.strip() if not line: self._last_line_read = ('', '') else: index = line.index(':') self._last_line_read = (line[:index].strip(), line[index + 1:].strip()) return True def _read_coordinate(self, value): coord = CoordinateInfo(value) while self._read_line(): key, value = self._last_line_read if key in ('variable_entry', 'axis_entry'): return coord if key == 'requested': coord.requested.extend( (val for val in value.split(' ') if val)) continue if hasattr(coord, key): setattr(coord, key, value) return coord def _read_variable(self, short_name, frequency): var = VariableInfo('CMIP5', short_name) var.frequency = frequency while self._read_line(): key, value = self._last_line_read if key in ('variable_entry', 'axis_entry'): break if key in ('dimensions', 'modeling_realm'): setattr(var, key, value.split()) elif hasattr(var, key): setattr(var, key, value) for dim in var.dimensions: var.coordinates[dim] = self.coords[dim] return var
[docs] def get_table(self, table): """ Search and return the table info. Parameters ---------- table: basestring Table name Returns ------- TableInfo Return the TableInfo object for the requested table if found, returns None if not """ return self.tables.get(table)
[docs] def get_variable(self, table, short_name, derived=False): """ Search and return the variable info. Parameters ---------- table: basestring Table name short_name: basestring Variable's short name derived: bool, optional Variable is derived. Info retrieval is less strict Returns ------- VariableInfo Return the VariableInfo object for the requested variable if found, returns None if not """ var_info = self.tables.get(table, {}).get(short_name, None) if var_info: return var_info if not self.strict: for table_vars in sorted(self.tables.values()): if short_name in table_vars: var_info = table_vars[short_name] break if not var_info and (derived or not self.strict): var_info = self.default.get_variable(table, short_name) if var_info: mip_info = self.get_table(table) var_info = var_info.copy() if mip_info: var_info.frequency = mip_info.frequency return var_info
[docs]class CMIP3Info(CMIP5Info): """ Class to read CMIP3-like data request. Parameters ---------- cmor_tables_path: basestring Path to the folder containing the Tables folder with the json files default: object Default table to look variables on if not found strict: bool If False, will look for a variable in other tables if it can not be found in the requested one """ def _read_table_file(self, table_file, table=None): for dim in ('zlevel', ): coord = CoordinateInfo(dim) coord.generic_level = True coord.axis = 'Z' self.coords[dim] = coord super()._read_table_file(table_file, table) def _read_coordinate(self, value): coord = super()._read_coordinate(value) if not coord.out_name: coord.out_name = coord.name coord.var_name = coord.name return coord def _read_variable(self, short_name, frequency): var = super()._read_variable(short_name, frequency) var.frequency = None var.modeling_realm = None return var
[docs]class CustomInfo(CMIP5Info): """ Class to read custom var info for ESMVal. Parameters ---------- cmor_tables_path: basestring or None Full path to the table or name for the table if it is present in ESMValTool repository """ def __init__(self, cmor_tables_path=None): cwd = os.path.dirname(os.path.realpath(__file__)) self._cmor_folder = os.path.join(cwd, 'tables', 'custom') self.tables = {} self.var_to_freq = {} table = TableInfo() table.name = 'custom' self.tables[table.name] = table self._coordinates_file = os.path.join( self._cmor_folder, 'CMOR_coordinates.dat', ) self.coords = {} self._read_table_file(self._coordinates_file, self.tables['custom']) for dat_file in glob.glob(os.path.join(self._cmor_folder, '*.dat')): if dat_file == self._coordinates_file: continue try: self._read_table_file(dat_file, self.tables['custom']) except Exception: msg = f"Exception raised when loading {dat_file}" # Logger may not be ready at this stage if logger.handlers: logger.error(msg) else: print(msg) raise
[docs] def get_table(self, table): """ Search and return the table info. Parameters ---------- table: basestring Table name Returns ------- TableInfo Return the TableInfo object for the requested table if found, returns None if not """ return self.tables.get(table)
[docs] def get_variable(self, table, short_name, derived=False): """ Search and return the variable info. Parameters ---------- table: basestring Table name short_name: basestring Variable's short name derived: bool, optional Variable is derived. Info retrieval is less strict Returns ------- VariableInfo Return the VariableInfo object for the requested variable if found, returns None if not """ return self.tables['custom'].get(short_name, None)
def _read_table_file(self, table_file, table=None): with open(table_file) as self._current_table: self._read_line() while True: key, value = self._last_line_read if key == 'generic_levels': for dim in value.split(' '): coord = CoordinateInfo(dim) coord.generic_level = True coord.axis = 'Z' self.coords[dim] = coord elif key == 'axis_entry': self.coords[value] = self._read_coordinate(value) continue elif key == 'variable_entry': table[value] = self._read_variable(value, '') continue if not self._read_line(): return
read_cmor_tables()