Source code for primap2.csg._models

"""Models for the composite source generator."""

import typing
from collections.abc import Hashable

import attrs
import xarray as xr
from attr import define

import primap2
from primap2._data_format import ProcessingStepDescription
from primap2._dim_names import dim_names


def equal_or_in(a, b):
    """Check if a == b (b str) or a in b (otherwise)."""
    if isinstance(b, str):
        return a == b
    else:
        return a in b


def match_selector(*, selector: dict[Hashable, str | list[str]], ts: xr.DataArray) -> bool:
    """Check if a timeseries matches the selector."""
    for k, v in selector.items():
        if k == "entity":
            if not equal_or_in(ts.attrs["entity"], v):
                return False
        elif k == "variable":
            if not equal_or_in(ts.name, v):
                return False
        elif not equal_or_in(ts.coords[k], v):
            return False
    return True


[docs] @define(frozen=True, kw_only=True) class PriorityDefinition: """ Defines source priorities for composing a full dataset or a single timeseries. Attributes ---------- priority_dimensions List of dimensions from which source timeseries are selected. Each priority has to specify all priority dimensions. priorities List of priority selections. Higher priority selections come first. Each selection consists of a dict which maps dimension names to values. Each selection has to specify all priority_dimensions, but may specify additional dimensions (fixed dimensions) to limit the selection to specific cases. You can use primap2.Not as a value for fixed dimensions to easily specify negative selections in the priorities. This should generally only be used if you want to change the priority of a source in specific circumstances. If you want to avoid using a source completely in specific cases, use exclude_input instead. exclude_input Set of selections from the input dataset to exclude from processing. Each selection consists of a dict which maps dimension names to values. Each selection can specify any number of priority dimensions or additional dimensions (fixed dimensions) to limit the selection. All input timeseries which match any selection will be skipped in processing. Note that after the exclusions, there still must be at least one applicable source for each result timeseries, otherwise errors will be raised. If you want to exclude timeseries from the result without raising errors, use exclude_result instead. exclude_result Set of selections from the result to exclude from all processing. Each selection consists of a dict which maps dimension names to values. Each selection can specify any number of additional dimensions (fixed dimensions) to limit the selection. Because it excludes timeseries from the result not from the input, it may not contain priority_dimensions. All timeseries which match any selection will be excluded entirely from processing and will be all-NaN in the result. Examples -------- priorities:: [{"area (ISO3)": ["MEX", "COL"], "source": "A"}, {"source": "B"}] would select source "A" as highest-priority source and source "B" as lower-priority source for Columbia and Mexico, but source "B" as highest-priority (and only) source for all other countries. """ priority_dimensions: list[Hashable] priorities: list[dict[Hashable, str | list[str] | primap2.Not]] exclude_input: list[dict[Hashable, str | list[str]]] = attrs.field(default=[]) exclude_result: list[dict[Hashable, str | list[str]]] = attrs.field(default=[]) def limit(self, dim: Hashable, value: str) -> "PriorityDefinition": """Remove one fixed dimension by limiting to a single value. You can't remove priority dimensions, only fixed dimensions. """ new_priorities = [] for sel in self.priorities: if dim not in sel: new_priorities.append(sel) continue match_value = sel[dim] # for each possible type of match_value, skip this if it *doesn't* match if isinstance(match_value, primap2.Not): not_value = match_value.value if equal_or_in(value, not_value): continue elif isinstance(match_value, str): if match_value != value: continue elif value not in match_value: continue # now we know it is matching, filter out the matching dim new_priorities.append({k: v for k, v in sel.items() if k != dim}) return PriorityDefinition( priority_dimensions=self.priority_dimensions, priorities=new_priorities, exclude_result=self.exclude_result, exclude_input=self.exclude_input, ) def excludes_result(self, ts: xr.DataArray) -> bool: """Check if a selected result timeseries is excluded from processing.""" return any( match_selector(selector=exclude_selector, ts=ts) for exclude_selector in self.exclude_result ) def excludes_input(self, ts: xr.DataArray) -> bool: """Check if a selected input timeseries is excluded from processing.""" return any( match_selector(selector=exclude_selector, ts=ts) for exclude_selector in self.exclude_input ) def check_dimensions(self): """Raise an error if priorities or exclusions use wrong dimensions.""" for sel in self.priorities: for dim in self.priority_dimensions: if dim not in sel: raise ValueError(f"In priority={sel}: missing priority dimension={dim}") if not isinstance(sel[dim], str): raise ValueError( f"In priority={sel}: specified multiple values for priority " f"dimension={dim}, values={sel[dim]}" ) for sel in self.exclude_result: for dim in self.priority_dimensions: if dim in sel: raise ValueError( f"In result exclusion={sel}: excluded priority dimension={dim}" )
class FillingStrategyModel(typing.Protocol, Hashable): """ Fill missing data in a timeseries using another timeseries. You can implement custom filling strategies to use with ``compose`` as long as they follow this protocol and are Hashable. To follow the protocol you need to implement the ``fill`` method with exactly the parameters and return types as defined below and define the ``type`` attribute (see below). To ensure that your class is hashable, you have to make sure instances are immutable after initialization (this helps caching). The easiest way to ensure immutability is usually to use the decorator ``attrs.define`` from the ``attrs`` package with the ``frozen=True`` argument. Attributes ---------- type Short human-readable identifier for your strategy. Avoid special characters and spaces. """ type: str def fill( self, *, ts: xr.DataArray, fill_ts: xr.DataArray, fill_ts_repr: str, ) -> tuple[xr.DataArray, list[ProcessingStepDescription]]: """Fill gaps in ts using data from the fill_ts. Using two input timeseries, this builds a composite timeseries and a description of the processing steps done. The input timeseries must not be modified. Usually, you want to fill missing data (NaNs) in the first timeseries `ts` using data from the second timeseries `fill_ts`. However, you are not limited to this, you could also check data in `ts` for consistency with data in `fill_ts` and discard non-conforming data points so that the resulting timeseries has more missing data points. Parameters ---------- ts Base timeseries. Missing data (NaNs) in this timeseries will be filled or other processing is done. This function must not modify the data in ts, work on a copy instead. fill_ts Fill timeseries. Data from this timeseries will be used (possibly after modification) to fill missing data in the base timeseries or alter the base timeseries in a different form. This function must not modify the data in fill_ts. fill_ts_repr String representation of fill_ts. Human-readable short representation of the fill_ts (e.g. the source). Returns ------- filled_ts, descriptions. filled_ts contains the result, where missing data in ts is (partly) filled using information from fill_ts or other processing is done using ts and fill_ts. descriptions contains human-readable, structured descriptions of how the data was processed, grouped by years for which the same processing steps were taken. Every year for which data in filled_ts is different from data in ts has to be described and no year for which data was not changed is allowed to be described. Raises ------ StrategyUnableToProcess This exception is raised when the strategy is unable to process the given timeseries, possibly due to missing data (e.g. insufficient overlap of the two timeseries), bad numerical conditioning or other reasons. When this exception is raised, the strategy will be skipped and processing continues as if the strategy was not configured for this timeseries, i.e. the next applicable filling strategy is used. If no other applicable filling strategy is available, an error will be raised. """ ...
[docs] @define(frozen=True) class StrategyDefinition: """ Defines filling strategies for a single timeseries. Attributes ---------- strategies List of mappings from a timeseries selector to a filling strategy. When a timeseries will be used to fill missing data, the list will be checked from the start, and the first matching TimeseriesSelector determines the FillingStrategy. Example: [({"source": ["FAOSTAT", "UNFCCC]}, StraightStrategy()), ({}, GlobalStrategy())] Note that the strategy can depend on fixed dimensions as well as priority dimensions. In practice, it is usually a good idea to include a default strategy using the empty selector {} which matches everything. It has to be the last entry - since it matches everything, all entries behind it will be ignored. """ strategies: list[tuple[dict[Hashable, str | list[str]], FillingStrategyModel]] def find_strategy(self, fill_ts: xr.DataArray) -> FillingStrategyModel: """Find the strategy to use for the given filling timeseries.""" try: return next(self.find_strategies(fill_ts)) except StopIteration: raise KeyError(f"No matching strategy found for {fill_ts.coords}") from None def find_strategies( self, fill_ts: xr.DataArray ) -> typing.Generator[FillingStrategyModel, None, None]: """Yields all strategies to use for the timeseries, in configured order.""" for selector, strategy in self.strategies: if match_selector(selector=selector, ts=fill_ts): yield strategy def limit(self, dim: Hashable, value: str) -> "StrategyDefinition": """Limit this strategy definition to strategies applicable with the limit.""" return StrategyDefinition( strategies=[ ({k: v for k, v in sel.items() if k != dim}, strat) for (sel, strat) in self.strategies if self.match_single_dim(selector=sel, dim=dim, value=value) ] ) def check_dimensions(self, ds: xr.Dataset): """Raise an error if the strategy definition uses the wrong dimensions.""" applicable_dimensions = set(dim_names(ds)).union({"entity", "variable"}) for sel, _ in self.strategies: for dim in sel: if dim not in applicable_dimensions: raise ValueError( f"In selector={sel!r}: {dim=} is not a valid dimension. Valid " f"dimensions: {applicable_dimensions}." ) @staticmethod def match_single_dim( *, selector: dict[Hashable, str | list[str]], dim: Hashable, value: str ) -> bool: """Check if a literal value in one dimension can match the selector.""" if dim in selector: return equal_or_in(value, selector[dim]) else: return True