Utilities API Reference

The utilities module provides helper functions for climate data processing.

Overview

The utilities include:

  • Coordinate name detection and validation

  • Data selection and processing

  • Spatial averaging functions

  • Dask client management

  • Advanced chunking strategies and optimization

Available Functions

Coordinate Utilities

climate_diagnostics.utils.coord_utils.get_coord_name(xarray_like_obj, possible_names)[source]

Find the name of a coordinate in an xarray object from a list of possible names.

This function checks for coordinate names in a case-sensitive manner first, then falls back to a case-insensitive check.

Parameters:
  • xarray_like_obj (xr.DataArray or xr.Dataset) – The xarray object to search for coordinates.

  • possible_names (list of str) – A list of possible coordinate names to look for.

Returns:

The found coordinate name, or None if no matching coordinate is found.

Return type:

str or None

climate_diagnostics.utils.coord_utils.filter_by_season(data_subset, season='annual')[source]

Filter climate data for a specific season using month-based selection.

This function implements robust seasonal filtering that handles various time coordinate formats commonly found in climate datasets, including standard datetime64, cftime objects, and numeric time coordinates.

Supported seasons are ‘annual’, ‘jjas’, ‘djf’, ‘mam’, ‘son’, ‘jja’.

Parameters:
  • data_subset (xr.DataArray or xr.Dataset) – The climate data to filter by season.

  • season (str, optional) – The season to filter by. Defaults to ‘annual’ (no filtering). Options: - ‘annual’: All months (no filtering) - ‘jjas’: June-July-August-September (monsoon season) - ‘djf’: December-January-February (winter/dry season) - ‘mam’: March-April-May (spring/pre-monsoon) - ‘son’: September-October-November (autumn/post-monsoon) - ‘jja’: June-July-August (summer)

Returns:

The filtered data containing only the specified season.

Return type:

xr.DataArray or xr.Dataset

Raises:

ValueError – If time coordinate cannot be found or processed for seasonal filtering.

Notes

This function automatically detects time coordinate naming conventions and handles different datetime formats including CF-compliant cftime objects.

Data Utilities

climate_diagnostics.utils.data_utils.validate_and_get_sel_slice(coord_val_param: float | int | slice | List | ndarray, data_coord: DataArray, coord_name_str: str, is_datetime_intent: bool = False) Tuple[float | int | slice | List | ndarray, bool][source]

Validate a coordinate selection parameter against the data’s coordinate range.

Parameters:
  • coord_val_param (Union[float, int, slice, List, np.ndarray]) – The coordinate selection parameter to validate

  • data_coord (xr.DataArray) – The data coordinate array to validate against

  • coord_name_str (str) – Name of the coordinate for error messages

  • is_datetime_intent (bool, optional) – Whether the coordinate is intended to be datetime, by default False

Returns:

Validated selection value and whether nearest neighbor selection is needed

Return type:

Tuple[Union[float, int, slice, List, np.ndarray], bool]

Raises:

ValueError – If coordinate validation fails

climate_diagnostics.utils.data_utils.select_process_data(xarray_obj: Dataset, variable: str, latitude: float | slice | List | None = None, longitude: float | slice | List | None = None, level: float | slice | List | None = None, time_range: slice | None = None, season: str = 'annual', year: int | None = None) DataArray[source]

Select, filter, and process a data variable from the dataset.

Parameters:
  • xarray_obj (xr.Dataset) – The input dataset

  • variable (str) – Name of the variable to select

  • latitude (Optional[Union[float, slice, List]], optional) – Latitude selection parameter, by default None

  • longitude (Optional[Union[float, slice, List]], optional) – Longitude selection parameter, by default None

  • level (Optional[Union[float, slice, List]], optional) – Level selection parameter, by default None

  • time_range (Optional[slice], optional) – Time range selection, by default None

  • season (str, optional) – Season filter, by default ‘annual’

  • year (Optional[int], optional) – Year filter, by default None

Returns:

Processed data variable

Return type:

xr.DataArray

Raises:

ValueError – If variable not found or selection results in empty data

climate_diagnostics.utils.data_utils.get_spatial_mean(data_var, area_weighted=True)[source]

Calculate the spatial mean of a DataArray.

Dask Utilities

climate_diagnostics.utils.dask_utils.get_or_create_dask_client()[source]

Get the active Dask client or create a new one if none exists.

This function provides a centralized way to manage Dask client connections for distributed computing. It follows the pattern of reusing existing clients when available to avoid resource conflicts and connection overhead.

Returns:

The active Dask client for distributed computing tasks.

Return type:

dask.distributed.Client

Notes

This function will: 1. Check for an existing active Dask client 2. Return it if found (avoids duplicate connections) 3. Create a new LocalCluster client if none exists 4. Handle graceful fallback for minimal Dask environments

Examples

>>> client = get_or_create_dask_client()
>>> print(f"Dask client dashboard: {client.dashboard_link}")

Chunking Utilities

climate_diagnostics.utils.chunking_utils.get_system_memory_info() Dict[str, float][source]

Get system memory information.

Returns:

Dictionary with ‘total’, ‘available’, and ‘percent_used’ memory in GB.

Return type:

Dict[str, float]

climate_diagnostics.utils.chunking_utils.inspect_disk_chunking(dataset: Dataset, variable: str | None = None) Dict[str, Any][source]

Inspect on-disk chunking from file encoding.

Parameters:
  • dataset (xr.Dataset) – The dataset to inspect.

  • variable (str, optional) – Specific variable to inspect. If None, inspects first available.

Returns:

Dictionary with disk chunking information.

Return type:

Dict[str, Any]

climate_diagnostics.utils.chunking_utils.choose_time_chunk(bytes_per_tstep: float, total_time_steps: int, target_mb: float = 50, max_mb: float = 200, min_chunks: int = 32) int[source]

Return a time-chunk length that optimizes memory usage and performance.

This implementation follows the sophisticated chunking strategy that: • Stays close to target_mb (in MiB) per chunk • Never exceeds max_mb • Produces at least min_chunks chunks over the dataset • Ensures reasonable parallelization opportunities

Parameters:
  • bytes_per_tstep (float) – Bytes required per time step for all spatial dimensions.

  • total_time_steps (int) – #[MODIFICATION] Renamed from time_steps_per_year for clarity, as per review. # This parameter represents the total number of time steps in the dataset. Total time steps in the dataset.

  • target_mb (float, optional) – Target chunk size in megabytes. Defaults to 50 MB.

  • max_mb (float, optional) – Maximum allowed chunk size in megabytes. Defaults to 200 MB.

  • min_chunks (int, optional) – Minimum number of chunks to create. Defaults to 32.

Returns:

Optimal time chunk size in number of time steps.

Return type:

int

Examples

>>> # For 6-hourly data with 10 MB per time step
>>> bytes_per_step = 10 * 1024**2  # 10 MB in bytes
>>> total_steps = 365 * 24 * 10  # 10 years of hourly data
>>> chunk_size = choose_time_chunk(bytes_per_step, total_steps, target_mb=50)
>>> print(f"Optimal chunk: {chunk_size} steps ≈ {chunk_size * bytes_per_step / 1e6:5.1f} MB per variable")
climate_diagnostics.utils.chunking_utils.calculate_optimal_chunks_from_disk(dataset: Dataset, variable: str | None = None, target_mb: float = 50, max_mb: float = 200, min_chunks: int = 32) Dict[str, int][source]

Calculate optimal chunks using disk chunking information and the sophisticated chunking strategy.

This function leverages on-disk chunk information from NetCDF/HDF5 files to make intelligent chunking decisions that respect the original data layout while optimizing for the specified memory and performance constraints.

Parameters:
  • dataset (xr.Dataset) – The dataset to analyze for optimal chunking.

  • variable (str, optional) – Variable to optimize chunking for. If None, uses first available variable.

  • target_mb (float, optional) – Target chunk size in megabytes. Defaults to 50 MB.

  • max_mb (float, optional) – Maximum allowable chunk size in megabytes. Defaults to 200 MB.

  • min_chunks (int, optional) – Minimum number of chunks to ensure good parallelization. Defaults to 32.

Returns:

Dictionary mapping dimension names to optimal chunk sizes.

Return type:

Dict[str, int]

climate_diagnostics.utils.chunking_utils.dynamic_chunk_calculator(dataset: Dataset, operation_type: str = 'general', memory_limit_gb: float | None = None, performance_priority: str = 'balanced') Dict[str, int][source]

Dynamically calculate optimal chunk sizes based on data characteristics, operation type, and system constraints.

Parameters:
  • dataset (xr.Dataset) – The dataset to analyze for optimal chunking.

  • operation_type (str, optional) – Type of operation. Options: ‘general’, ‘timeseries’, ‘spatial’, ‘statistical’, ‘io’.

  • memory_limit_gb (float, optional) – Memory limit in GB. If None, uses 25% of available system memory.

  • performance_priority (str, optional) – Optimization priority. Options: ‘memory’, ‘speed’, ‘balanced’.

Returns:

Dictionary mapping dimension names to optimal chunk sizes.

Return type:

Dict[str, int]

climate_diagnostics.utils.chunking_utils.estimate_bytes_per_timestep(dataset: Dataset, variable: str | None = None) float[source]

Estimate bytes required per time step for a dataset or variable.

climate_diagnostics.utils.chunking_utils.get_optimal_chunks(dataset: Dataset, **kwargs) Dict[str, int][source]

Generate optimal chunk sizes for a climate dataset. This is a convenience wrapper around the core chunking functions.

Parameters:
  • dataset (xr.Dataset) – The dataset to analyze.

  • **kwargs – Arguments passed to calculate_optimal_chunks_from_disk or dynamic_chunk_calculator. E.g., target_mb, max_mb, min_chunks, preserve_disk_chunks, operation_type.

Returns:

Dictionary mapping dimension names to optimal chunk sizes.

Return type:

Dict[str, int]

climate_diagnostics.utils.chunking_utils.rechunk_dataset(dataset: Dataset, **kwargs) Dataset[source]

Rechunk a dataset with optimal chunk sizes for performance.

Parameters:
  • dataset (xr.Dataset) – Dataset to rechunk.

  • **kwargs – Arguments passed to get_optimal_chunks, e.g., target_mb, max_mb.

Returns:

Rechunked dataset.

Return type:

xr.Dataset

climate_diagnostics.utils.chunking_utils.print_chunking_info(dataset: Dataset, detailed: bool = False) None[source]

Print detailed information about dataset chunking.

climate_diagnostics.utils.chunking_utils.suggest_chunking_strategy(dataset: Dataset, target_use_case: str = 'general') Dict[str, Any][source]

Suggest chunking strategies for different use cases.

Parameters:
  • dataset (xr.Dataset) – Dataset to analyze.

  • target_use_case (str, optional) – Options: ‘general’, ‘time_series’, ‘spatial_analysis’, ‘memory_limited’.

Returns:

Dictionary with chunking recommendations and rationale.

Return type:

Dict[str, Any]

Basic Examples

Working with Coordinates

from climate_diagnostics.utils import get_coord_name

# Find coordinate names automatically
time_coord = get_coord_name(ds, ['time', 't'])
lat_coord = get_coord_name(ds, ['lat', 'latitude'])

print(f"Time coordinate: {time_coord}")
print(f"Latitude coordinate: {lat_coord}")

Spatial Averaging

from climate_diagnostics.utils import get_spatial_mean

# Calculate area-weighted spatial mean
global_mean = get_spatial_mean(ds.air, area_weighted=True)

# Simple spatial mean (no area weighting)
simple_mean = get_spatial_mean(ds.air, area_weighted=False)

Data Selection

from climate_diagnostics.utils import select_process_data

# Select and process data with automatic coordinate handling
processed_data = select_process_data(
    ds,
    variable="air",
    latitude=slice(30, 60),
    longitude=slice(-10, 40),
    season="jja"  # Summer season
)

Advanced Chunking

The chunking_utils module provides sophisticated tools for optimizing Dask chunking strategies, which is critical for performance when working with large climate datasets. The dynamic_chunk_calculator function automatically determines optimal chunk sizes based on the operation being performed and the desired performance characteristics.

from climate_diagnostics.utils.chunking_utils import (
    dynamic_chunk_calculator,
    suggest_chunking_strategy,
    print_chunking_info
)
import xarray as xr

# Load a sample dataset
ds = xr.tutorial.load_dataset("air_temperature")

# Calculate optimal chunks for a time-series analysis that is memory-intensive
optimal_chunks = dynamic_chunk_calculator(
    ds,
    operation_type='time-series',
    performance_priority='memory'
)
print("Optimal chunks for memory-optimized time-series analysis:", optimal_chunks)

# Rechunk the dataset with the optimal chunking scheme
ds_optimized = ds.chunk(optimal_chunks)

# Print chunking information to verify
print_chunking_info(ds_optimized)

Seasonal Filtering

from climate_diagnostics.utils import filter_by_season

# Filter data by season
summer_data = filter_by_season(ds, season="jja")
winter_data = filter_by_season(ds, season="djf")

Practical Usage

Complete Analysis Workflow

import xarray as xr
from climate_diagnostics.utils import (
    get_coord_name,
    select_process_data,
    get_spatial_mean
)

# Load data
ds = xr.open_dataset("temperature_data.nc")

# Check coordinates
time_coord = get_coord_name(ds, ['time', 't'])
print(f"Time coordinate found: {time_coord}")

# Select and process regional data
arctic_data = select_process_data(
    ds,
    variable="air",
    latitude=slice(60, 90),
    season="annual"
)

# Calculate regional mean
arctic_mean = get_spatial_mean(arctic_data, area_weighted=True)

# Plot results
import matplotlib.pyplot as plt
arctic_mean.plot()
plt.title("Arctic Mean Temperature")
plt.show()

Memory-Efficient Processing

from climate_diagnostics.utils import get_or_create_dask_client

# Ensure Dask client is available for large datasets
client = get_or_create_dask_client()

# Process large dataset
large_ds = xr.open_dataset("large_file.nc", chunks={'time': 100})
result = get_spatial_mean(large_ds.air, area_weighted=True)

# Compute result
computed_result = result.compute()

See Also