Utilities API Reference¶
The utilities module provides helper functions for climate data processing.
Overview¶
The utilities include:
Coordinate name detection and validation
Data selection and processing
Spatial averaging functions
Dask client management
Advanced chunking strategies and optimization
Available Functions¶
Coordinate Utilities¶
- climate_diagnostics.utils.coord_utils.get_coord_name(xarray_like_obj, possible_names)[source]¶
Find the name of a coordinate in an xarray object from a list of possible names.
This function checks for coordinate names in a case-sensitive manner first, then falls back to a case-insensitive check.
- climate_diagnostics.utils.coord_utils.filter_by_season(data_subset, season='annual')[source]¶
Filter climate data for a specific season using month-based selection.
This function implements robust seasonal filtering that handles various time coordinate formats commonly found in climate datasets, including standard datetime64, cftime objects, and numeric time coordinates.
Supported seasons are ‘annual’, ‘jjas’, ‘djf’, ‘mam’, ‘son’, ‘jja’.
- Parameters:
data_subset (xr.DataArray or xr.Dataset) – The climate data to filter by season.
season (str, optional) – The season to filter by. Defaults to ‘annual’ (no filtering). Options: - ‘annual’: All months (no filtering) - ‘jjas’: June-July-August-September (monsoon season) - ‘djf’: December-January-February (winter/dry season) - ‘mam’: March-April-May (spring/pre-monsoon) - ‘son’: September-October-November (autumn/post-monsoon) - ‘jja’: June-July-August (summer)
- Returns:
The filtered data containing only the specified season.
- Return type:
xr.DataArray or xr.Dataset
- Raises:
ValueError – If time coordinate cannot be found or processed for seasonal filtering.
Notes
This function automatically detects time coordinate naming conventions and handles different datetime formats including CF-compliant cftime objects.
Data Utilities¶
- climate_diagnostics.utils.data_utils.validate_and_get_sel_slice(coord_val_param: float | int | slice | List | ndarray, data_coord: DataArray, coord_name_str: str, is_datetime_intent: bool = False) Tuple[float | int | slice | List | ndarray, bool] [source]¶
Validate a coordinate selection parameter against the data’s coordinate range.
- Parameters:
coord_val_param (Union[float, int, slice, List, np.ndarray]) – The coordinate selection parameter to validate
data_coord (xr.DataArray) – The data coordinate array to validate against
coord_name_str (str) – Name of the coordinate for error messages
is_datetime_intent (bool, optional) – Whether the coordinate is intended to be datetime, by default False
- Returns:
Validated selection value and whether nearest neighbor selection is needed
- Return type:
- Raises:
ValueError – If coordinate validation fails
- climate_diagnostics.utils.data_utils.select_process_data(xarray_obj: Dataset, variable: str, latitude: float | slice | List | None = None, longitude: float | slice | List | None = None, level: float | slice | List | None = None, time_range: slice | None = None, season: str = 'annual', year: int | None = None) DataArray [source]¶
Select, filter, and process a data variable from the dataset.
- Parameters:
xarray_obj (xr.Dataset) – The input dataset
variable (str) – Name of the variable to select
latitude (Optional[Union[float, slice, List]], optional) – Latitude selection parameter, by default None
longitude (Optional[Union[float, slice, List]], optional) – Longitude selection parameter, by default None
level (Optional[Union[float, slice, List]], optional) – Level selection parameter, by default None
time_range (Optional[slice], optional) – Time range selection, by default None
season (str, optional) – Season filter, by default ‘annual’
year (Optional[int], optional) – Year filter, by default None
- Returns:
Processed data variable
- Return type:
xr.DataArray
- Raises:
ValueError – If variable not found or selection results in empty data
Dask Utilities¶
- climate_diagnostics.utils.dask_utils.get_or_create_dask_client()[source]¶
Get the active Dask client or create a new one if none exists.
This function provides a centralized way to manage Dask client connections for distributed computing. It follows the pattern of reusing existing clients when available to avoid resource conflicts and connection overhead.
- Returns:
The active Dask client for distributed computing tasks.
- Return type:
dask.distributed.Client
Notes
This function will: 1. Check for an existing active Dask client 2. Return it if found (avoids duplicate connections) 3. Create a new LocalCluster client if none exists 4. Handle graceful fallback for minimal Dask environments
Examples
>>> client = get_or_create_dask_client() >>> print(f"Dask client dashboard: {client.dashboard_link}")
Chunking Utilities¶
- climate_diagnostics.utils.chunking_utils.get_system_memory_info() Dict[str, float] [source]¶
Get system memory information.
- climate_diagnostics.utils.chunking_utils.inspect_disk_chunking(dataset: Dataset, variable: str | None = None) Dict[str, Any] [source]¶
Inspect on-disk chunking from file encoding.
- climate_diagnostics.utils.chunking_utils.choose_time_chunk(bytes_per_tstep: float, total_time_steps: int, target_mb: float = 50, max_mb: float = 200, min_chunks: int = 32) int [source]¶
Return a time-chunk length that optimizes memory usage and performance.
This implementation follows the sophisticated chunking strategy that: • Stays close to target_mb (in MiB) per chunk • Never exceeds max_mb • Produces at least min_chunks chunks over the dataset • Ensures reasonable parallelization opportunities
- Parameters:
bytes_per_tstep (float) – Bytes required per time step for all spatial dimensions.
total_time_steps (int) – #[MODIFICATION] Renamed from time_steps_per_year for clarity, as per review. # This parameter represents the total number of time steps in the dataset. Total time steps in the dataset.
target_mb (float, optional) – Target chunk size in megabytes. Defaults to 50 MB.
max_mb (float, optional) – Maximum allowed chunk size in megabytes. Defaults to 200 MB.
min_chunks (int, optional) – Minimum number of chunks to create. Defaults to 32.
- Returns:
Optimal time chunk size in number of time steps.
- Return type:
Examples
>>> # For 6-hourly data with 10 MB per time step >>> bytes_per_step = 10 * 1024**2 # 10 MB in bytes >>> total_steps = 365 * 24 * 10 # 10 years of hourly data >>> chunk_size = choose_time_chunk(bytes_per_step, total_steps, target_mb=50) >>> print(f"Optimal chunk: {chunk_size} steps ≈ {chunk_size * bytes_per_step / 1e6:5.1f} MB per variable")
- climate_diagnostics.utils.chunking_utils.calculate_optimal_chunks_from_disk(dataset: Dataset, variable: str | None = None, target_mb: float = 50, max_mb: float = 200, min_chunks: int = 32) Dict[str, int] [source]¶
Calculate optimal chunks using disk chunking information and the sophisticated chunking strategy.
This function leverages on-disk chunk information from NetCDF/HDF5 files to make intelligent chunking decisions that respect the original data layout while optimizing for the specified memory and performance constraints.
- Parameters:
dataset (xr.Dataset) – The dataset to analyze for optimal chunking.
variable (str, optional) – Variable to optimize chunking for. If None, uses first available variable.
target_mb (float, optional) – Target chunk size in megabytes. Defaults to 50 MB.
max_mb (float, optional) – Maximum allowable chunk size in megabytes. Defaults to 200 MB.
min_chunks (int, optional) – Minimum number of chunks to ensure good parallelization. Defaults to 32.
- Returns:
Dictionary mapping dimension names to optimal chunk sizes.
- Return type:
- climate_diagnostics.utils.chunking_utils.dynamic_chunk_calculator(dataset: Dataset, operation_type: str = 'general', memory_limit_gb: float | None = None, performance_priority: str = 'balanced') Dict[str, int] [source]¶
Dynamically calculate optimal chunk sizes based on data characteristics, operation type, and system constraints.
- Parameters:
dataset (xr.Dataset) – The dataset to analyze for optimal chunking.
operation_type (str, optional) – Type of operation. Options: ‘general’, ‘timeseries’, ‘spatial’, ‘statistical’, ‘io’.
memory_limit_gb (float, optional) – Memory limit in GB. If None, uses 25% of available system memory.
performance_priority (str, optional) – Optimization priority. Options: ‘memory’, ‘speed’, ‘balanced’.
- Returns:
Dictionary mapping dimension names to optimal chunk sizes.
- Return type:
- climate_diagnostics.utils.chunking_utils.estimate_bytes_per_timestep(dataset: Dataset, variable: str | None = None) float [source]¶
Estimate bytes required per time step for a dataset or variable.
- climate_diagnostics.utils.chunking_utils.get_optimal_chunks(dataset: Dataset, **kwargs) Dict[str, int] [source]¶
Generate optimal chunk sizes for a climate dataset. This is a convenience wrapper around the core chunking functions.
- Parameters:
dataset (xr.Dataset) – The dataset to analyze.
**kwargs – Arguments passed to calculate_optimal_chunks_from_disk or dynamic_chunk_calculator. E.g., target_mb, max_mb, min_chunks, preserve_disk_chunks, operation_type.
- Returns:
Dictionary mapping dimension names to optimal chunk sizes.
- Return type:
- climate_diagnostics.utils.chunking_utils.rechunk_dataset(dataset: Dataset, **kwargs) Dataset [source]¶
Rechunk a dataset with optimal chunk sizes for performance.
- Parameters:
dataset (xr.Dataset) – Dataset to rechunk.
**kwargs – Arguments passed to get_optimal_chunks, e.g., target_mb, max_mb.
- Returns:
Rechunked dataset.
- Return type:
xr.Dataset
- climate_diagnostics.utils.chunking_utils.print_chunking_info(dataset: Dataset, detailed: bool = False) None [source]¶
Print detailed information about dataset chunking.
Basic Examples¶
Working with Coordinates¶
from climate_diagnostics.utils import get_coord_name
# Find coordinate names automatically
time_coord = get_coord_name(ds, ['time', 't'])
lat_coord = get_coord_name(ds, ['lat', 'latitude'])
print(f"Time coordinate: {time_coord}")
print(f"Latitude coordinate: {lat_coord}")
Spatial Averaging¶
from climate_diagnostics.utils import get_spatial_mean
# Calculate area-weighted spatial mean
global_mean = get_spatial_mean(ds.air, area_weighted=True)
# Simple spatial mean (no area weighting)
simple_mean = get_spatial_mean(ds.air, area_weighted=False)
Data Selection¶
from climate_diagnostics.utils import select_process_data
# Select and process data with automatic coordinate handling
processed_data = select_process_data(
ds,
variable="air",
latitude=slice(30, 60),
longitude=slice(-10, 40),
season="jja" # Summer season
)
Advanced Chunking¶
The chunking_utils module provides sophisticated tools for optimizing Dask chunking strategies, which is critical for performance when working with large climate datasets. The dynamic_chunk_calculator function automatically determines optimal chunk sizes based on the operation being performed and the desired performance characteristics.
from climate_diagnostics.utils.chunking_utils import (
dynamic_chunk_calculator,
suggest_chunking_strategy,
print_chunking_info
)
import xarray as xr
# Load a sample dataset
ds = xr.tutorial.load_dataset("air_temperature")
# Calculate optimal chunks for a time-series analysis that is memory-intensive
optimal_chunks = dynamic_chunk_calculator(
ds,
operation_type='time-series',
performance_priority='memory'
)
print("Optimal chunks for memory-optimized time-series analysis:", optimal_chunks)
# Rechunk the dataset with the optimal chunking scheme
ds_optimized = ds.chunk(optimal_chunks)
# Print chunking information to verify
print_chunking_info(ds_optimized)
Seasonal Filtering¶
from climate_diagnostics.utils import filter_by_season
# Filter data by season
summer_data = filter_by_season(ds, season="jja")
winter_data = filter_by_season(ds, season="djf")
Practical Usage¶
Complete Analysis Workflow¶
import xarray as xr
from climate_diagnostics.utils import (
get_coord_name,
select_process_data,
get_spatial_mean
)
# Load data
ds = xr.open_dataset("temperature_data.nc")
# Check coordinates
time_coord = get_coord_name(ds, ['time', 't'])
print(f"Time coordinate found: {time_coord}")
# Select and process regional data
arctic_data = select_process_data(
ds,
variable="air",
latitude=slice(60, 90),
season="annual"
)
# Calculate regional mean
arctic_mean = get_spatial_mean(arctic_data, area_weighted=True)
# Plot results
import matplotlib.pyplot as plt
arctic_mean.plot()
plt.title("Arctic Mean Temperature")
plt.show()
Memory-Efficient Processing¶
from climate_diagnostics.utils import get_or_create_dask_client
# Ensure Dask client is available for large datasets
client = get_or_create_dask_client()
# Process large dataset
large_ds = xr.open_dataset("large_file.nc", chunks={'time': 100})
result = get_spatial_mean(large_ds.air, area_weighted=True)
# Compute result
computed_result = result.compute()
See Also¶
Time Series Analysis API Reference - Time series analysis methods
Trend Analysis API Reference - Trend analysis methods
Plotting API Reference - Plotting functions