Source code for earthkit.transforms.temporal._rates

# Copyright 2024-, European Centre for Medium Range Weather Forecasts.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import typing as T

import pandas as pd
import xarray as xr
from earthkit.utils.decorators import format_handler

from earthkit.transforms import _tools

logger = logging.getLogger(__name__)


[docs] def deaccumulate( dataarray: xr.Dataset | xr.DataArray, *_args, **_kwargs, ) -> xr.Dataset | xr.DataArray: """Alias for `accumulation_to_rate` function with rate_units set to 'step_length'. The returned object will preserve the units and long_name attributes of the input dataarray. Parameters ---------- dataarray : xarray.DataArray | xarray.Dataset Data accumulated along time to be converted into rate (per time step). step : timedelta | str, optional Interval between consecutive time steps. If a string, it should be a valid pandas time frequency string (e.g., '15min', '3h', '1 day'). If not provided, it will be inferred from the data. rate_label : str, optional Suffix to append to the name and long_name of the output dataarray. xp : T.Any The array namespace to use for the reduction. If None, it will be inferred from the dataarray. time_dim : str, optional Name of the time dimension, or coordinate, in the xarray object to use for the calculation, default behaviour is to deduce time dimension from attributes of coordinates, then fall back to `"time"`. accumulation_type : str, optional Type of accumulation used in the input data. Default is "start_of_forecast". Options are: - "start_of_step": accumulation restarts at the beginning of each time step. - "start_of_forecast": accumulation starts at the beginning of the forecast and continues throughout the forecast period. - "start_of_day": accumulation restarts at the beginning of each day (00:00 UTC). from_first_step : bool, optional Only used if `accumulation_type` is "start_of_forecast". If True, the first time step's rate is calculated by dividing the first accumulation value by the step duration. Default is False. provenance : bool, optional If True, appends a history entry to the output dataarray's attributes indicating that the transformation was applied. Default is True. Returns ------- xarray.DataArray | xarray.Dataset Data object with deaccumulated data. """ if "rate_units" in _kwargs: logger.warning("The 'rate_units' parameter is not applicable for `deaccumulate` and will be ignored.") _kwargs["rate_units"] = "step_length" _kwargs.setdefault("accumulation_type", "start_of_forecast") _kwargs.setdefault("rate_label", "") return accumulation_to_rate(dataarray, *_args, **_kwargs)
[docs] @_tools.time_dim_decorator @format_handler() def accumulation_to_rate( dataarray: xr.Dataset | xr.DataArray, *_args, **_kwargs, ) -> xr.Dataset | xr.DataArray: """Convert a variable accumulated from the beginning of the forecast to a rate. The rate is computed by considering first-order discrete differences in data along the inferred, or specified, time dimension. The difference are converted to a rate by dividing by the time step duration, unless specified otherwise. Parameters ---------- dataarray : xarray.DataArray | xarray.Dataset Data accumulated along time to be converted into rate (per second). step : timedelta | str, optional Interval between consecutive time steps. If a string, it should be a valid pandas time frequency string (e.g., '15min', '3h', '1 day'). If not provided, it will be inferred from the data. rate_units : timedelta | str, optional Units for the output rate. If a string, it must be a valid pandas time frequency string (e.g., '15min', '3h', '1 day') or simple units like 'seconds', 'minutes', 'hours', 'days'. If set to 'step_length', the rate will be accumulation per time step ("deaccumulated") and the returned object will preserve the units and long_name attributes of the input dataarray. The default is 'seconds'. rate_label : str or None, optional Suffix to append to the name of the output dataarray. If None, defaults to 'rate' or 'per_step' depending on the rate_units. xp : T.Any The array namespace to use for the reduction. If None, it will be inferred from the dataarray. time_dim : str, optional Name of the time dimension, or coordinate, in the xarray object to use for the calculation, default behaviour is to deduce time dimension from attributes of coordinates, then fall back to `"time"`. accumulation_type : str, optional Type of accumulation used in the input data. Default is "start_of_step". Options are: - "start_of_step": accumulation restarts at the beginning of each time step. - "start_of_forecast": accumulation starts at the beginning of the forecast and continues throughout the forecast period. - "start_of_day": accumulation restarts at the beginning of each day (00:00 UTC). from_first_step : bool, optional Only used if `accumulation_type` is "start_of_forecast". If True, the first time step's rate is calculated by dividing the first accumulation value by the step duration. Default is False. provenance : bool, optional If True, appends a history entry to the output dataarray's attributes indicating that the transformation was applied. Default is True. Returns ------- xarray.DataArray | xarray.Dataset Data object with rate calculated based on the accumulation data. """ if isinstance(dataarray, xr.Dataset): out_ds = xr.Dataset().assign_attrs(dataarray.attrs) for var in dataarray.data_vars: out_da = _accumulation_to_rate_dataarray(dataarray[var], *_args, **_kwargs) out_ds[out_da.name] = out_da return out_ds return _accumulation_to_rate_dataarray(dataarray, *_args, **_kwargs)
def _accumulation_to_rate_dataarray( dataarray: xr.DataArray, step: None | str | pd.Timedelta = None, rate_units: str | pd.Timedelta = "seconds", xp: T.Any = None, time_dim: str | None = None, accumulation_type: str = "start_of_step", from_first_step: bool = False, provenance: bool = True, rate_label: str | None = None, ) -> xr.DataArray: """Convert a variable accumulated from the beginning of the forecast to a rate. The rate is computed by considering first-order discrete differences in data along the time (or leadtime, if time is not a dimension) axis, divided by the number of seconds in the step. # Check it does this If time[k] - time[k-1] != step, then the corresponding output values will be NaN. Example: Compute snowfall rate in m/s based on accumulated snowfall, using a time step of 24 hours. >>> snow_fall_rate = accumulation_to_rate(snow_fall, step=24) Parameters ---------- dataarray : xarray.DataArray Data accumulated along time to be converted into rate (per second). accumulation_type : str, optional Type of accumulation used in the input data. Options are: - "start_of_step": accumulation restarts at the beginning of each time step. - "start_of_forecast": accumulation starts at the beginning of the forecast and continues throughout the forecast period. - "start_of_day": accumulation restarts at the beginning of each day (00:00 UTC). Default is "start_of_step". step : timedelta | str, optional Interval between consecutive time steps. If a string, it should be a valid pandas time frequency string (e.g., '15min', '3h', '1 day'). If not provided, it will be inferred from the data. rate_units : timedelta | str, optional Units for the output rate. If a string, it must be a valid pandas time frequency string (e.g., '15min', '3h', '1 day') or simple units like 'seconds', 'minutes', 'hours', 'days'. If set to 'step_length', the rate will be accumulation per time step ("deaccumulated") and the returned object will preserve the units and long_name attributes of the input dataarray. The default is 'seconds'. rate_label : str or None, optional Suffix to append to the name of the output dataarray name, as _{rate_label}. If None, defaults to 'rate' or 'per_step' depending on the rate_units. xp : T.Any The array namespace to use for the reduction. If None, it will be inferred from the dataarray. time_dim : str, optional Name of the time dimension, or coordinate, in the xarray object to use for the calculation, default behaviour is to deduce time dimension from attributes of coordinates, then fall back to `"time"`. from_first_step : bool, optional Only used if `accumulation_type` is "start_of_forecast" or "start_of_day". If True, the first time step's rate is calculated by dividing the first accumulation value by the step duration. If `accumulation_type` is "start_of_day" and the first time step is at midnight, the first time step's rate will be set to NaN as it is not possible to know the accumulation before it. Default is False. provenance : bool, optional If True, appends a history entry to the output dataarray's attributes indicating that the transformation was applied. Default is True. Returns ------- xarray.DataArray Data object with rate calculated based on the accumulation data. """ if xp is None: xp = _tools.array_namespace_from_object(dataarray) time_dim_array = dataarray[time_dim] # If the time_dim_array is a "forecast_reference_time", then we see if there is a "forecast_period" period_dim_array = None if ( time_dim_array.attrs.get("standard_name") == "forecast_reference_time" or time_dim_array.name == "forecast_reference_time" ): # Check for forecast_period as standard_name or name in coords for coord in dataarray.coords.values(): if coord.attrs.get("standard_name") == "forecast_period" or coord.name == "forecast_period": period_dim_array = coord break else: for name in ["step", "leadtime", "forecast_period"]: if name in dataarray.coords: period_dim_array = dataarray[name] break # decide whether to use period_dim_array or time_dim_array for step calculation if period_dim_array is not None: step_dim_array = period_dim_array else: step_dim_array = time_dim_array step_dim = step_dim_array.name if step is None: if len(step_dim_array) < 2: if period_dim_array is not None: # This means that there is only one forecast period per forecast reference time # So we can treat the period_dim_array as the step_obj directly step_obj = step_dim_array else: raise ValueError("Cannot infer step from time dimension with less than two entries.") else: # step_obj is an array of time differences step_obj = step_dim_array.diff(step_dim, label="upper") if from_first_step or accumulation_type in ["start_of_step"]: # Prepend the first step assuming it's the same as the second step first_step = step_obj.isel(**{step_dim: 0}).expand_dims(step_dim) first_step = first_step.assign_coords( {step_dim: step_dim_array.isel(**{step_dim: 0}).expand_dims(step_dim)} ) step_obj = xr.concat([first_step, step_obj], dim=step_dim) else: # Convert to timedelta step_obj = pd.to_timedelta(step) if rate_units == "step_length": rate_scale_factor = 1.0 rate_units_str = "" # Do not append anything to units attribute if rate_label is None: rate_label = "per_step" else: if isinstance(rate_units, str) and not rate_units[0].isnumeric(): rate_units = "1 " + rate_units rate_obj = pd.to_timedelta(rate_units) rate_units_str = f" {_tools.timedelta_to_largest_unit(rate_obj)}^-1" rate_scale_factor = step_obj / rate_obj if rate_label is None: rate_label = "rate" # Tidy up the rate_units_str for common abbreviations rate_units_str = rate_units_str.replace("seconds", "s").replace("minutes", "min") match accumulation_type: case "start_of_step": # Simple case: rate = accumulation / step output = dataarray / rate_scale_factor case "start_of_forecast": # Compute forward differences diff_data = dataarray.diff(step_dim, label="upper") if from_first_step: # Prepend diff with first value being the same as the first dataarray value first_diff = dataarray.isel({step_dim: 0}).expand_dims(step_dim) diff_data = xr.concat([first_diff, diff_data], dim=step_dim) output = diff_data / rate_scale_factor case "start_of_day": if period_dim_array is not None: # This sceanario should be treated as "start_of_forecast", but we check that no step is > 24h if step_dim_array.max() > pd.Timedelta("1 days"): logger.warning( "For accumulation_type 'start_of_day' with time represented as forecast periods, " "the step between periods should not be greater than 24 hours. " "accumulation_to_rate is proceeding treating accumulation as 'start_of_forecast'. " "Please check that you are performing the correct transformation." ) # where the start of forecast is at the start of each day # Compute forward differences diff_data = dataarray.diff(step_dim, label="upper") if from_first_step: # Prepend diff with first value being the same as the first dataarray value first_diff = dataarray.isel({step_dim: 0}).expand_dims(step_dim) diff_data = xr.concat([first_diff, diff_data], dim=step_dim) output = diff_data / rate_scale_factor else: # TODO: Think of a more robust check to include here instead of a blanket warning logger.warning( "Please be aware that using accumulation_type='start_of_day' with a 'valid_time' " "representation assumes that the data is contiguous with steps that can be identified " "as midnight in order to reset accumulation." ) # Mask for midnight steps of the day (True = midnight) midnight_mask = step_dim_array.dt.floor("h").dt.hour == 0 # Shift the mask forward one step for first step of day. # Leave the first element as NaN as not possible to know if it was the first step of the day, # this eventuality is handled explicitly by the "from_first_step" logic below first_step_of_day_mask = midnight_mask.shift(**{step_dim: 1}) # Compute forward differences diff_data = dataarray.diff(step_dim, label="upper") if from_first_step: if not midnight_mask.data[0]: first_diff = dataarray.isel({step_dim: 0}).expand_dims(step_dim) # diff_data = xr.concat([first_diff, diff_data], dim=step_dim) else: # If the first step is midnight, we do not know the accumulation before it, # so we set it to NaN first_diff = xp.nan * dataarray.isel({step_dim: 0}).expand_dims(step_dim) # And set first step_dim of first_step_of_day_mask to False to avoid # it being repopulated by the original dataarray values in the output below first_step_of_day_mask[{step_dim: 0}] = False diff_data = xr.concat([first_diff, diff_data], dim=step_dim) else: # We must drop the first element of the first_step_of_day_mask to match the diffed array first_step_of_day_mask = first_step_of_day_mask.isel({step_dim: slice(1, None)}) # Calculate the rate for values, excluding the first step of each day output = diff_data / rate_scale_factor # Now calculate the rate for the first step of each day using the original dataarray _indexer: dict[str, xr.DataArray] = {step_dim: output[step_dim]} output = xr.where( first_step_of_day_mask, dataarray.sel(_indexer) / rate_scale_factor, output, ) case _: raise ValueError(f"Unknown accumulation_type: {accumulation_type}") new_name = "_".join(filter(None, [str(dataarray.name), rate_label])) output = output.rename(new_name) # Clear any existing attributes and set new ones output.attrs = {} if "units" in dataarray.attrs: output.attrs.update({"units": dataarray.attrs["units"] + rate_units_str}) if "long_name" in dataarray.attrs: output.attrs["long_name"] = " ".join(filter(None, [dataarray.attrs["long_name"], rate_label.replace("_", " ")])) if provenance or "history" in dataarray.attrs: output.attrs["history"] = "\n".join( filter( None, [ dataarray.attrs.get("history", ""), ("Converted from accumulation to rate using earthkit.transforms.temporal.accumulation_to_rate."), ], ) ) return output