"""Compose a harmonized dataset from multiple input datasets."""
import math
import typing
from collections.abc import Hashable
import numpy as np
import tqdm
import xarray as xr
from loguru import logger
import primap2._data_format
from . import _models
from ._strategies.exceptions import StrategyUnableToProcess
[docs]
def compose(
*,
input_data: xr.Dataset,
priority_definition: _models.PriorityDefinition,
strategy_definition: _models.StrategyDefinition,
progress_bar: type[tqdm.tqdm] | None = tqdm.tqdm,
) -> xr.Dataset:
"""
Compose a harmonized dataset from multiple input datasets.
The input datasets are treated at the timeseries level, for each timeseries:
* the highest priority dataset is chosen according to the priority_definition
* if the dataset contains any nan values (denoting missing information), the next
highest priority dataset is selected to fill the missing values
* a filling strategy is chosen according to the strategy_definition and the filling
is done with the dataset selected in the previous step
* if afterwards there are still missing nan values, the next highest priority
dataset is selected and the previous step is repeated
* when all missing values are filled or all input datasets are exhausted, the
timeseries is finished and written to the result.
In addition to the harmonized data, also a description of the processing steps
done for each timeseries is returned in the result dataset, where for each
variable, a variable of the form "Processing of $variable" is returned, with the
same dimensions as the variable, apart from the time dimension.
Parameters
----------
input_data
Dataset with the input data. The input data dimensions determine the output
data dimensions: the output data has the same dimensions as the input data,
minus the priority dimensions defined in the priority_definition. From the
priority dimensions, the different datasets for the filling are selected, so
they vanish in the result.
priority_definition
Defines the priorities to select timeseries from the input data. Priorities
are formed by a list of selections and are used "from left to right", where the
first matching selection has the highest priority. Each selection has to specify
values for all priority dimensions (so that exactly one timeseries is selected
from the input data), but can also specify other dimensions. That way it is,
e.g., possible to define a different priority for a specific country by listing
it early (i.e. with high priority) before the more general rules which should
be applied for all other countries.
You can also specify the "entity" or "variable" in the selection, which will
limit the rule to a specific entity or variable, respectively. For each
DataArray in the input_data Dataset, the variable is its name, the entity is
the value of the key `entity` in its attrs.
strategy_definition
Defines the filling strategies to be used when filling timeseries with other
timeseries. Again, the priority is defined by a list of selections and
corresponding strategies which are used "from left to right". Selections can use
any dimension and don't have to apply to only one timeseries. For example, to
define a default strategy which should be used for all timeseries unless
something else is configured, configure an empty selection as the last
(rightmost) entry.
You can also specify the "entity" or "variable" in the selection, which will
limit the rule to a specific entity or variable, respectively. For each
DataArray in the input_data Dataset, the variable is its name, the entity is
the value of the key `entity` in its attrs.
progress_bar
By default, show progress bars using the tqdm package during the
operation. If None, don't show any progress bars. You can supply a class
compatible to tqdm.tqdm's protocol if you want to customize the progress bar.
Returns
-------
result
Dataset with the same entities and dimensions as input_data, but with
following changes: the data is composed and filled according to the rules,
the priority dimensions are reduced and not included in the result, and
additional variables of the form "Processing of $variable" are added which
describe the processing steps done for each timeseries.
"""
result_das = {}
input_data = input_data.pr.dequantify()
if progress_bar is None:
variable_iterator = input_data
else:
variable_iterator = progress_bar(input_data)
priority_dimensions = priority_definition.priority_dimensions
priority_definition.check_dimensions()
strategy_definition.check_dimensions(input_data)
for variable in variable_iterator:
if progress_bar is not None:
variable_iterator.set_postfix_str(str(variable))
input_da = input_data[variable]
entity = input_da.attrs["entity"]
# all dimensions are either time, priority selection dimensions, or need to
# be iterated over
group_by_dimensions = tuple(
dim for dim in input_da.dims if dim != "time" and dim not in priority_dimensions
)
(
result_das[variable],
result_das[f"Processing of {variable}"],
) = preallocate_result_arrays(
input_da=input_da,
group_by_dimensions=group_by_dimensions,
priority_dimensions=priority_dimensions,
)
number_of_timeseries = math.prod(len(input_da[dim]) for dim in group_by_dimensions)
if progress_bar is None:
pbar = None
else:
pbar = progress_bar(total=number_of_timeseries, unit="ts", unit_scale=True)
iterate_next_fixed_dimension(
input_da=input_da,
priority_definition=priority_definition.limit("entity", entity).limit(
"variable", variable
),
strategy_definition=strategy_definition.limit("entity", entity).limit(
"variable", variable
),
group_by_dimensions=group_by_dimensions,
result_da=result_das[variable],
result_processing_da=result_das[f"Processing of {variable}"],
progress_bar=pbar,
)
if pbar is not None:
pbar.close()
result_ds = xr.Dataset(result_das).pr.quantify()
# composing removes the priority dimensions, also remove the attrs describing
# the priority dimensions
result_ds.attrs = input_data.attrs.copy()
for dim_key in priority_dimensions:
if isinstance(dim_key, str) and "(" in dim_key:
dim = dim_key.split("(", 1)[0][:-1]
else:
dim = dim_key
if dim == "area":
del result_ds.attrs["area"]
elif dim == "category":
del result_ds.attrs["cat"]
elif dim == "scenario":
del result_ds.attrs["scen"]
return result_ds
def preallocate_result_arrays(
*,
group_by_dimensions: typing.Iterable[Hashable],
input_da: xr.DataArray,
priority_dimensions: typing.Iterable[Hashable],
) -> tuple[xr.DataArray, xr.DataArray]:
"""Create empty arrays in the right shape to hold the result."""
result_da = xr.DataArray(
data=np.nan,
dims=["time", *group_by_dimensions],
coords={
dim: input_da.coords[dim] for dim in input_da.coords if dim not in priority_dimensions
},
attrs=input_da.attrs,
name=input_da.name,
)
processing_result_da = xr.DataArray(
data=np.empty([len(input_da.coords[dim]) for dim in group_by_dimensions], dtype=object),
dims=group_by_dimensions,
coords=[input_da.coords[dim] for dim in group_by_dimensions],
attrs={
"entity": f"Processing of {input_da.name}",
"described_variable": input_da.name,
},
)
return result_da, processing_result_da
def priority_coordinates_repr(*, fill_ts: xr.DataArray, priority_dimensions: list[Hashable]) -> str:
"""Reduce the priority coordinates to a short string representation."""
priority_coordinates: dict[str, str] = {str(k): fill_ts[k].item() for k in priority_dimensions}
if len(priority_coordinates) == 1:
# only one priority dimension, just output the value because it is clear what is
# meant
return repr(next(iter(priority_coordinates.values())))
return repr(priority_coordinates)
def iterate_next_fixed_dimension(
*,
input_da: xr.DataArray,
priority_definition: _models.PriorityDefinition,
strategy_definition: _models.StrategyDefinition,
group_by_dimensions: tuple[Hashable, ...],
result_da: xr.DataArray,
result_processing_da: xr.DataArray,
progress_bar: tqdm.tqdm | None,
) -> None:
"""Recursively iterate over dimensions in group_by_dimensions.
If there is only one dimension left, actually compute results and store
them in the result_da and the result_processing_da. Otherwise, iterate over one
dimension and recursively call iterate_next_fixed_dimension again.
"""
my_dim = group_by_dimensions[0]
new_group_by_dimensions = group_by_dimensions[1:]
for val_array in input_da[my_dim]:
val = val_array.item()
limited_strategy_definition = strategy_definition.limit(dim=my_dim, value=val)
limited_priority_definition = priority_definition.limit(dim=my_dim, value=val)
if new_group_by_dimensions:
# have to iterate further until all dimensions are consumed
iterate_next_fixed_dimension(
input_da=input_da.loc[{my_dim: val}],
priority_definition=limited_priority_definition,
strategy_definition=limited_strategy_definition,
group_by_dimensions=new_group_by_dimensions,
result_da=result_da.loc[{my_dim: val}],
result_processing_da=result_processing_da.loc[{my_dim: val}],
progress_bar=progress_bar,
)
else:
# Result exclusions are handled here (per definition, we don't do any
# processing on result exclusions) but input data exclusions are handled
# in compose_timeseries (per definition, we skip to the next source when
# a source is skipped due to input data exclusions).
if not limited_priority_definition.excludes_result(result_da.loc[{my_dim: val}]):
# actually compute results
(
result_da.loc[{my_dim: val}],
result_processing_da.loc[{my_dim: val}],
) = compose_timeseries(
input_data=input_da.loc[{my_dim: val}],
priority_definition=limited_priority_definition,
strategy_definition=limited_strategy_definition,
)
if progress_bar is not None:
progress_bar.update()
def compose_timeseries(
*,
input_data: xr.DataArray,
priority_definition: _models.PriorityDefinition,
strategy_definition: _models.StrategyDefinition,
) -> tuple[xr.DataArray, primap2._data_format.TimeseriesProcessingDescription]:
"""
Compute a single timeseries from given input data, priorities, and strategies.
Parameters
----------
input_data
The input data which has dimensions time plus the priority dimensions. The
fixed coordinates are supplied as zero-dimensional coordinates.
priority_definition
The definition of priorities within input_data. Each priority selects a single
timeseries (i.e. an array with only time as the dimension), so has to specify
values for all priority dimensions.
strategy_definition
The definition of strategies for timeseries in input_data.
Returns
-------
result_ts, processing_description. In result_ts is the numerical result, with
the time as the only dimension.
processing_description is the representation of the processing steps taken to
derive the result.
"""
context_logger = logger.bind(
fixed_coordinates={k: v for k, v in input_data.coords.items() if v.shape == ()},
priority_coordinates={
k: list(v.data) for k, v in input_data.coords.items() if v.shape != ()
},
priorities=priority_definition.priorities,
strategies=strategy_definition.strategies,
)
result_ts: None | xr.DataArray = None
processing_steps_descriptions = []
for selector in priority_definition.priorities:
try:
fill_ts = input_data.loc[selector]
except KeyError:
context_logger.debug(f"{selector=} matched no input_data, skipping.")
continue
fill_ts_repr = priority_coordinates_repr(
fill_ts=fill_ts,
priority_dimensions=priority_definition.priority_dimensions,
)
# remove priority dimension information, it messes with automatic alignment
# in computations. The corresponding information is now in fill_ts_repr.
fill_ts_no_prio_dims = fill_ts.drop_vars(priority_definition.priority_dimensions)
if result_ts is None:
result_ts = xr.full_like(fill_ts_no_prio_dims, np.nan)
if priority_definition.excludes_input(fill_ts):
processing_steps_descriptions.append(
primap2.ProcessingStepDescription(
time="all",
description=f"{fill_ts_repr} is excluded from processing, skipped",
function="compose_timeseries",
source=fill_ts_repr,
)
)
continue
if fill_ts.isnull().all():
processing_steps_descriptions.append(
primap2.ProcessingStepDescription(
time="all",
description=f"{fill_ts_repr} is fully NaN, skipped",
function="compose_timeseries",
source=fill_ts_repr,
)
)
continue
context_logger.debug(f"Filling with {fill_ts_repr} now.")
for strategy in strategy_definition.find_strategies(fill_ts):
try:
result_ts, descriptions = strategy.fill(
ts=result_ts,
fill_ts=fill_ts_no_prio_dims,
fill_ts_repr=fill_ts_repr,
)
processing_steps_descriptions += descriptions
break
except StrategyUnableToProcess:
processing_steps_descriptions.append(
primap2.ProcessingStepDescription(
time="all",
description=f"strategy {strategy.type} unable to process "
f"{fill_ts_repr}, skipping to next strategy",
function="compose_timeseries",
source=fill_ts_repr,
)
)
# next strategy
continue
else:
# only executed if there was no `break` in the for loop, i.e. if no strategy
# was applicable and worked
raise ValueError(
f"No configured strategy was able to process "
f"\n{input_data.coords}\n{input_data.attrs}\n{strategy_definition=}"
)
if not result_ts.isnull().any():
context_logger.debug("No NaNs remaining, skipping the rest of the sources.")
break
if result_ts is None:
raise ValueError(
f"No priority selector matched for "
f"\n{input_data.coords}\n{input_data.attrs}\n{priority_definition=}"
)
return result_ts, primap2._data_format.TimeseriesProcessingDescription(
steps=processing_steps_descriptions
)