Source code for earthkit.transforms._aggregate

# Copyright 2024-, European Centre for Medium Range Weather Forecasts.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import typing as T

import xarray as xr

# Use numpy.ndarray for Array type hinting
from numpy import ndarray as Array

from earthkit.transforms import _tools


def how_label_rename(
    dataarray: xr.Dataset | xr.DataArray,
    how_label: str | None = None,
) -> xr.Dataset | xr.DataArray:
    if how_label is not None:
        # Update variable names, depends on dataset or dataarray format
        if isinstance(dataarray, xr.Dataset):
            renames = {data_arr: f"{data_arr}_{how_label}" for data_arr in dataarray}
            dataarray = dataarray.rename(renames)
        else:
            dataarray = dataarray.rename(f"{dataarray.name}_{how_label}")

    return dataarray


def _reduce_dataarray(
    dataarray: xr.DataArray,
    how: T.Callable | str = "mean",
    weights: None | str | Array = None,
    how_label: str | None = None,
    how_dropna: bool = False,
    xp: T.Any = None,
    **kwargs,
):
    """Reduce an xarray.dataarray or xarray.dataset using a specified `how` method.

    With the option to apply weights either directly or using a specified
    `weights` method.

    Parameters
    ----------
    dataarray : xarray.DataArray
        Data object to reduce
    how: str or callable
        Method used to reduce data. Default='mean', which will implement the xarray in-built mean.
        If string, it must be an in-built xarray reduce method, a earthkit how method or any numpy method.
        In the case of duplicate names, method selection is first in the order: xarray, earthkit, numpy.
        Otherwise it can be any function which can be called in the form `f(x, axis=axis, **kwargs)`
        to return the result of reducing an xp.ndarray over an integer valued axis
    weights : str
        Choose a recognised method to apply weighting. Currently available methods are; 'latitude'
    how_dropna : str or None
        Choose how to drop nan values.
        Default is None and na values are preserved. Options are 'any' and 'all'.
    how_label : str
        Label to append to the name of the variable in the reduced object, default is nothing
    xp : T.Any
        The array namespace to use for the reduction. If None, it will be inferred from the dataarray.
    **kwargs :
        kwargs recognised by the how :func: `reduce`

    Returns
    -------
    A data array with reduce dimensions removed.

    """
    if xp is None:
        xp = _tools.array_namespace_from_object(dataarray)
    # If weighted, use xarray weighted methods
    if weights is not None:
        # Create any standard weights, e.g. latitude
        if isinstance(weights, str):
            _weights = _tools.standard_weights(dataarray, weights, **kwargs)
        else:
            _weights = weights
        # We ensure the callable is always a string
        if callable(how):
            how = weighted_how = how.__name__
        # map any alias methods:
        weighted_how = _tools.WEIGHTED_HOW_METHODS.get(how, how)

        red_array = dataarray.weighted(_weights).__getattribute__(weighted_how)(**kwargs)

    else:
        # If how is string, fetch function from dictionary:
        if isinstance(how, str) and how in dir(dataarray):
            red_array = dataarray.__getattribute__(how)(**kwargs)
        else:
            if isinstance(how, str):
                how = _tools.get_how_xp(how, xp=xp)
            assert callable(how), f"how method not recognised: {how}"

            red_array = dataarray.reduce(how, **kwargs)

    red_array = how_label_rename(red_array, how_label=how_label)

    if how_dropna:
        red_array = red_array.dropna(how_dropna)

    return red_array


[docs] def reduce( dataarray: xr.Dataset | xr.DataArray, *_args, **kwargs, ): """Reduce an xarray.dataarray or xarray.dataset using a specified `how` method. With the option to apply weights either directly or using a specified `weights` method. Parameters ---------- dataarray : xarray.DataArray or xarray.Dataset Data object to reduce how: str or callable Method used to reduce data. Default='mean', which will implement the xarray in-built mean. If string, it must be an in-built xarray reduce method, a earthkit how method or any numpy method. In the case of duplicate names, method selection is first in the order: xarray, earthkit, numpy. Otherwise it can be any function which can be called in the form `f(x, axis=axis, **kwargs)` to return the result of reducing an xp.ndarray over an integer valued axis weights : str Choose a recognised method to apply weighting. Currently available methods are; 'latitude' how_label : str Label to append to the name of the variable in the reduced object how_dropna : str or None Choose how to drop nan values. Default is None and na values are preserved. Options are 'any' and 'all'. **kwargs : kwargs recognised by the how :func: `reduce` Returns ------- xarray.DataArray or xarray.Dataset A data array with reduced dimensions removed. """ # handle how as arg or kwarg kwargs["how"] = _args[0] if _args else kwargs.get("how", "mean") if isinstance(dataarray, xr.Dataset): out_ds = xr.Dataset().assign_attrs(dataarray.attrs) for var in dataarray.data_vars: out_da = _reduce_dataarray(dataarray[var], **kwargs) out_ds[out_da.name] = out_da return out_ds out = _reduce_dataarray(dataarray, **kwargs) return out
[docs] def rolling_reduce(dataarray: xr.Dataset | xr.DataArray, *_args, **kwargs) -> xr.Dataset | xr.DataArray: """Return reduced data using a moving window over which to apply the reduction. Parameters ---------- dataarray : xarray.DataArray or xarray.Dataset Data over which the moving window is applied according to the reduction method. windows : windows for the rolling groups, for example `time=10` to perform a reduction in the time dimension with a bin size of 10. the rolling groups can be defined over any number of dimensions. **see documentation for xarray.dataarray.rolling**. min_periods : integer The minimum number of observations in the window required to have a value (otherwise result is NaN). Default is to set **min_periods** equal to the size of the window. **see documentation for xarray.dataarray.rolling** center : bool Set the labels at the centre of the window, **see documentation for xarray.dataarray.rolling**. how_reduce : str, Function to be applied for reduction. Default is 'mean'. how_dropna : str or None Determine if dimension is removed from the output when we have at least one NaN or all NaN. **how_dropna** can be None, 'any' or 'all'. Default is 'any'. **kwargs : Any kwargs that are compatible with the select `how_reduce` method. Returns ------- xarray.DataArray or xarray.Dataset (as provided) """ if isinstance(dataarray, (xr.Dataset)): out_ds = xr.Dataset().assign_attrs(dataarray.attrs) for var in dataarray.data_vars: out_da = _rolling_reduce_dataarray(dataarray[var], *_args, **kwargs) out_ds[out_da.name] = out_da return out_ds else: return _rolling_reduce_dataarray(dataarray, *_args, **kwargs)
def _rolling_reduce_dataarray( dataarray: xr.DataArray, how_reduce="mean", how_dropna=None, chunk=True, **kwargs ) -> xr.DataArray: """Return reduced data using a moving window over which to apply the reduction. Parameters ---------- dataarray : xarray.DataArray Data over which the moving window is applied according to the reduction method. windows : windows for the rolling groups, for example `time=10` to perform a reduction in the time dimension with a bin size of 10. the rolling groups can be defined over any number of dimensions. **see documentation for xarray.dataarray.rolling**. min_periods : integer The minimum number of observations in the window required to have a value (otherwise result is NaN). Default is to set **min_periods** equal to the size of the window. **see documentation for xarray.dataarray.rolling** center : bool Set the labels at the centre of the window, **see documentation for xarray.dataarray.rolling**. how_reduce : str, Function to be applied for reduction. Default is 'mean'. how_dropna : str or None Determine if dimension is removed from the output when we have at least one NaN or all NaN. **how_dropna** can be None, 'any' or 'all'. Default is None. chunk: bool If True, the dataarray is chunked before the rolling operation. **kwargs : Any kwargs that are compatible with the select `how_reduce` method. Returns ------- xarray.DataArray """ xp = _tools.array_namespace_from_object(dataarray) if chunk: dataarray = dataarray.chunk() # Expand dim kwarg to individual kwargs if isinstance(kwargs.get("dim"), dict): kwargs.update(kwargs.pop("dim")) window_dims = [str(_dim) for _dim in list(dataarray.dims) if _dim in list(kwargs)] rolling_kwargs_keys = ["min_periods", "center"] + window_dims rolling_kwargs_keys = [_kwarg for _kwarg in kwargs if _kwarg in rolling_kwargs_keys] rolling_kwargs = {_kwarg: kwargs.pop(_kwarg) for _kwarg in rolling_kwargs_keys} # Any kwargs left after above reductions are kwargs for reduction method reduce_kwargs = kwargs # Create rolling groups: data_rolling = dataarray.rolling(**rolling_kwargs) reduce_kwargs.setdefault("how", how_reduce) # TODO: remove type ignore when xarray puts types in stable location data_windowed = _reduce_dataarray(data_rolling, xp=xp, **reduce_kwargs) # type: ignore data_windowed = _dropna(data_windowed, window_dims, how_dropna) data_windowed.attrs.update(dataarray.attrs) return data_windowed def _dropna(data, dims, how): """Drop nan values from data.""" if how in [None, "None", "none"]: return data for dim in dims: data = data.dropna(dim, how=how) return data
[docs] @_tools.time_dim_decorator def resample( dataarray: xr.Dataset | xr.DataArray, frequency: str | int | float, time_dim: str = "time", how: str = "mean", skipna: bool = True, how_args: list[T.Any] | None = None, how_kwargs: dict[str, T.Any] | None = None, how_label: str | None = None, extra_reduce_dims: str | list[str] | None = None, **kwargs, ) -> xr.Dataset | xr.DataArray: """Resample dataarray to a user-defined frequency using a user-defined "how" method. Parameters ---------- dataarray : xarray.DataArray or xarray.Dataset Data object to be resampled. frequency : str, int, float The frequency at which to resample the chosen dimension. The format must be applicable to the chosen dimension. time_dim: str The dimension to resample along, default is `time` how: str The reduction method for resampling, default is `mean` how_label : str Label to append to the name of the variable in the reduced object, default is nothing skipna : bool If True, exclude missing values (na values) from the reduction. how_args : list List of arguments to be passed to the reduction method. how_kwargs : dict Dictionary of keyword arguments to be passed to the reduction method. extra_reduce_dims : str or list of str, optional Extra dimensions to reduce over in addition to the resampling dimension. These dimensions will be reduced over using the same `how` method as the resampling dimension. Default is None (no extra dimensions). **kwargs Keyword arguments to be passed to :func:`resample`. Defaults have been set as: `{"skipna": True}` Returns ------- xarray.Dataset | xarray.DataArray """ # Normalise mutable defaults if how_args is None: how_args = [] if how_kwargs is None: how_kwargs = {} # Normalise extra_reduce_dims: None → [], str → [str] _extra_reduce_dims = _tools.normalize_dims(extra_reduce_dims) # Handle legacy API instances: time_dim = kwargs.pop("dim", time_dim) # Get any how kwargs into appropriate dictionary: for _k in ["q", "p"]: if _k in kwargs: how_kwargs[_k] = kwargs.pop(_k) # Translate and xarray frequencies to pandas language: frequency = _tools._PANDAS_FREQUENCIES_R.get(frequency, frequency) kwargs[time_dim] = frequency _resample = dataarray.resample(skipna=skipna, **kwargs) result = _resample.__getattribute__(how)(*how_args, dim=[time_dim] + _extra_reduce_dims, **how_kwargs) result = how_label_rename(result, how_label=how_label) return result