Source code for primap2.csg._strategies.gaps

import typing

import numpy as np
import pandas as pd
import xarray as xr
from attrs import frozen
from loguru import logger


@frozen
class Gap:
    """
    Class to define a gap in a time-series

    Attributes
    ----------
    type
        type of the gap
        possible types:
            'start': start of timeseries boundary (nan, nan, X, X)
            'end': end of timeseries boundary (X, X, nan, nan)
            'gap': gap (X, nan, nan, X)
    left
        left end of the gap
    right
        right end of the gap
    """

    type: str

    left: np.datetime64  # left end of the gap
    right: np.datetime64  # right end of the gap

    def get_date_slice(self) -> dict[str, slice]:
        """Return a xr.loc type filter for 'time' with a slice from left to right
        end of the gap."""
        return {"time": slice(self.left, self.right)}


[docs] @frozen class FitParameters: """ Class to represent parameters for a polynomial fit. While `min_data_points` refers to the actual number of data points `trend_length` does not. `trend_length` and `trend_length_unit` together define a time span which is independent of the actual data points and their spacing. Note: Very unevenly distributed data points can lead to fit problems, e.g. if we use a 10 year period and have 5 data point all in one year. But as the normal use case are evenly distributed data points, sometimes with gaps it's currently not relevant Attributes ---------- fit_degree : degree of the polynomial to fit to calculate the trend value. 0 for mean value and 1 for linear trend make most sense. The higher the order, the higher the chance of unexpected results fallback_degree : Fallback degree to use if less than min_trend_points of not-nan data trend_length : length of the trend in time steps (usually years) trend_length_unit : Unit for the length of the trend. String passed to the `freq` argument of `pd.date_range`. Default is 'YS' (yearly at start of year) min_trend_points : minimal number of points to calculate the trend. Default is 1, but if the degree of the fit polynomial is higher than 1, the minimal number of data points the degree of the fit polynomial """ fit_degree: int = 1 fallback_degree: int = 0 trend_length: int = 10 trend_length_unit: str = "YS" min_trend_points: int = 5 def __attrs_post_init__(self): if self.min_trend_points < self.fit_degree: raise ValueError( f"min_trend_points ({self.min_trend_points}) " f"must not be smaller than " f"fit_degree ({self.fit_degree})." ) def log_string(self, fallback: bool = False) -> str: """Create a string with the classes parameters.""" log_str = ( f"fit_degree: {self.fit_degree}, " f"trend_length: {self.trend_length}, " f"trend_length_unit: {self.trend_length_unit}, " f"min_trend_points: {self.min_trend_points}" ) if fallback: log_str = log_str + f", fallback_degree: {self.fallback_degree}.\n" else: log_str = log_str + ".\n" return log_str def get_fallback(self): """Return FitParameters object with the `fit_degree` set to the `fallback_degree` of the original object.""" return FitParameters( fit_degree=self.fallback_degree, trend_length=self.trend_length, trend_length_unit=self.trend_length_unit, min_trend_points=1, )
def get_gaps(ts: xr.DataArray) -> list[Gap]: """ Find nans on the boundaries of the timeseries and gaps in the time-series Parameters ---------- ts : Time-series to analyze Returns ------- list of Gaps """ ts_roll = ts.rolling(time=3, min_periods=1, center=True).sum() gaps = [] # check for left boundary if ts[0].isnull(): right = ts_roll.dropna(dim="time").coords["time"].data[0] gaps.append(Gap(type="start", left=ts.coords["time"].data[0], right=right)) left_bound = ts.dropna(dim="time").coords["time"].data[0] else: left_bound = ts.coords["time"].data[0] # check for right boundary if ts[-1].isnull(): left = ts_roll.dropna(dim="time").coords["time"].data[-1] gaps.append(Gap(type="end", left=left, right=ts.coords["time"].data[-1])) right_bound = ts.dropna(dim="time").coords["time"].data[-1] else: right_bound = ts.coords["time"].data[-1] # get gaps. constrain ts using left and right boundary ts_inner = ts.pr.loc[{"time": slice(left_bound, right_bound)}] if any(ts_inner.isnull()): # we have gaps, detect them # first make a mask for the nan values ts_nan = ts_inner.notnull() # make a rolling sum with width two and left alignment and take a nan mask ts_left_nan = ts_inner.rolling(time=2, min_periods=1, center=False).sum().notnull() # shift the sum right for a right alignment rolling sum ts_right_nan = ts_left_nan.shift(time=-1, fill_value=True) # by xor-ing with the original nan mask we can get the left and the right boundaries left_boundary_mask = xr.apply_ufunc(np.logical_xor, ts_nan, ts_left_nan) right_boundary_mask = xr.apply_ufunc(np.logical_xor, ts_nan, ts_right_nan) right_boundaries = list(ts_inner.coords["time"].data[right_boundary_mask]) left_boundaries = list(ts_inner.coords["time"].data[left_boundary_mask]) for i, left_bound in enumerate(left_boundaries): gaps.append(Gap(type="gap", left=left_bound, right=right_boundaries[i])) return gaps def calculate_boundary_trend_with_fallback( ts: xr.DataArray, gap: Gap, fit_params: FitParameters, ) -> np.array: """ Calculate trend values for boundary points. Uses fallback if not enough fit points available. Parameters ---------- ts : Time-series to calculate trend for gap : Gap definition fit_params : FitParameters object which holds all parameters for the fit Returns ------- Tuple with calculated trend values for left and right boundary of the gap. If trend calculation is not possible, `None` is returned so the calling strategy can raise the StrategyUnableToProcess error. """ trend_ts = calculate_boundary_trend( ts, gap=gap, fit_params=fit_params, ) if any(np.isnan(trend_ts)): trend_ts = calculate_boundary_trend( ts, gap=gap, fit_params=fit_params.get_fallback(), ) if all(np.isnan(trend_ts)): logger.info( f"Not enough values to calculate fit for ts and gap:" f"{gap.type}, [{gap.left}:{gap.right}].\n" f"{fit_params.log_string(fallback=True)}" f"Timeseries info: {timeseries_coord_repr(ts)}" ) # fill nan from other boundary if np.isnan(trend_ts[0]): trend_ts[0] = trend_ts[1] elif np.isnan(trend_ts[1]): trend_ts[1] = trend_ts[0] return trend_ts def calculate_boundary_trend( ts: xr.DataArray, gap: Gap, fit_params: FitParameters, ) -> np.array: """ Calculate trend values for boundary points. Parameters ---------- ts : Time-series to calculate trend for gap : Gap definition fit_params : FitParameters object which holds all parameters for the fit. This function does not handle fallback options thus the fallback attribute is ignored. Returns ------- Tuple with calculated trend values for left and right boundary of the gap. If trend calculation is not possible, `None` is returned so the calling strategy can raise the StrategyUnableToProcess error. """ if gap.type == "gap": # right boundary right = calculate_boundary_trend_inner( ts, side="right", boundary=gap.right, fit_params=fit_params, ) # left boundary left = calculate_boundary_trend_inner( ts, side="left", boundary=gap.left, fit_params=fit_params, ) elif gap.type == "end": # left boundary left = calculate_boundary_trend_inner( ts, side="left", boundary=gap.left, fit_params=fit_params, ) right = left elif gap.type == "start": # right boundary right = calculate_boundary_trend_inner( ts, side="right", boundary=gap.right, fit_params=fit_params, ) left = right else: raise ValueError(f"Unknown gap type: {gap.type}") return np.array([left, right]) def calculate_boundary_trend_inner( ts: xr.DataArray, side: typing.Literal["left", "right"], boundary: np.datetime64, fit_params: FitParameters, ) -> float: """ Calculate trend value for leftmost or rightmost boundary point. Parameters ---------- ts : Time-series to calculate trend for side : "left" or "right" If the left or right boundary point should be processed. boundary : time index boundary point (last NaN value) fit_params : FitParameters object which holds all parameters for the fit. This function does not handle fallback options thus the fallback attribute is ignored. Returns ------- Calculated trend value for boundary point. If trend calculation is not possible, `None` is returned so the calling strategy can raise the StrategyUnableToProcess error. """ point_to_modify = get_shifted_time_value( ts, original_value=boundary, shift=1 if side == "right" else -1 ) trend_index = pd.date_range( start=point_to_modify if side == "right" else None, end=point_to_modify if side == "left" else None, periods=fit_params.trend_length, freq=fit_params.trend_length_unit, ) trend_index = trend_index.intersection(ts.coords["time"]) ts_fit = ts.pr.loc[{"time": trend_index}] if len(ts_fit.where(ts_fit.notnull(), drop=True)) >= fit_params.min_trend_points: fit = ts_fit.polyfit(dim="time", deg=fit_params.fit_degree, skipna=True) value = xr.polyval( ts_fit.coords["time"].pr.loc[{"time": point_to_modify}], fit.polyfit_coefficients, ) return float(value.data) else: logger.info( f"Not enough values to calculate fit for {side} boundary at " f"{point_to_modify}.\n" f"{fit_params.log_string(fallback=False)}" f"Timeseries info: {timeseries_coord_repr(ts)}" ) return np.nan def calculate_scaling_factor( ts: xr.DataArray, fill_ts: xr.DataArray, gap: Gap, fit_params: FitParameters, ) -> np.array: """ Calculate scaling factor(s) to fill gaps Both trend values (for ts and fill_ts) are calculated in the same way. If default trend fails for one of the timeseries we use the fallback for both, so we compare the same thing when calculating the factors Parameters ---------- ts : Timeseries ith gaps to fill fill_ts : Timeseries to fill gaps with gap : Definition of the gap fit_params : FitParameters object which holds all parameters for the fit. Returns ------- tuple with left and right scaling factors. For a start and end boundary one of the elements is None """ trend_ts = calculate_boundary_trend_with_fallback( ts, gap=gap, fit_params=fit_params, ) if any(np.isnan(trend_ts)): # logging has been done already return np.array([np.nan, np.nan]) # trend values for fill_ts trend_fill = calculate_boundary_trend_with_fallback( fill_ts, gap=gap, fit_params=fit_params, ) if any(np.isnan(trend_fill)): # logging has been done already return np.array([np.nan, np.nan]) factor = np.divide(trend_ts, trend_fill) if any(np.isnan(factor)) or any(np.isinf(factor)): # we have some nan or inf values which have to come from division by zero # we fill them with 0 in case the trend values are zero as well nan_mask_factor = np.isnan(factor) | np.isinf(factor) zero_mask_ts = trend_ts == 0 factor[nan_mask_factor & zero_mask_ts] = trend_ts[nan_mask_factor & zero_mask_ts] return factor def fill_gap( ts: xr.DataArray, fill_ts: xr.DataArray, gap: Gap, factor: np.array, ) -> xr.DataArray: """Fill a gap in a timeseries using data from another timeseries and factors. Parameters ---------- ts: xr.DataArray Timeseries with a gap to be filled. fill_ts: xr.DataArray Timeseries from which to take the data to fill in the gap in ts. gap: Gap The gap that should be filled. factor: np.array Numpy array containing two floats, the scaling factor to use at the left side of the gap and the scaling factor to use at the right side of the gap. The data from fill_ts will be multiplied with the factor before filling the gap in ts. If the factors differ, a linear interpolation of the scaling factor between both values will be performed. Returns ----------- result DataArray with the same size and shape as ts, but with the given gap filled from fill_ts. """ if factor[0] == factor[1]: fill_gap_ts = fill_ts.pr.loc[gap.get_date_slice()] * factor[0] else: delta_t = gap.right - gap.left factor_ts = fill_ts.pr.loc[gap.get_date_slice()].copy() factor_ts.data = [ factor[0] * (gap.right - t) / delta_t + factor[1] * (t - gap.left) / delta_t for t in fill_ts.coords["time"].pr.loc[gap.get_date_slice()] ] fill_gap_ts = fill_ts.pr.loc[gap.get_date_slice()] * factor_ts # fill nans (in the gap only) ts_aligned, fill_ts_aligned = xr.align(ts, fill_gap_ts, join="left") filled_ts = ts_aligned.fillna(fill_ts_aligned) return filled_ts def get_shifted_time_value( ts: xr.DataArray, original_value: np.datetime64, shift: int, ) -> np.datetime64: """Get time coordinate value `shift` positions from `original_value` Parameters ---------- ts : time series to use (for time coordinate) shift : position of the value to get relative to the `original_value` Returns ------- time coordinate value at desired relative position """ # For actually getting the index of a value in an index, it is easiest to work with # the underlying numpy arrays. time_points = ts["time"].values original_index = np.where(time_points == original_value)[0][0] new_index = original_index + shift return time_points[new_index] def timeseries_coord_repr(ts: xr.DataArray) -> str: """Make short string representation for coordinate values for logging""" dims = set(ts.coords.keys()) - {"time"} coords: dict[str, str] = {str(k): ts[k].item() for k in dims} coords = dict(sorted(coords.items())) return repr(coords)