Source code for sgu_client.client.levels.observed

"""Observed groundwater level client endpoints."""

import logging
from datetime import datetime
from typing import Any

from sgu_client.client.base import BaseClient
from sgu_client.models.observed import (
    GroundwaterMeasurement,
    GroundwaterMeasurementCollection,
    GroundwaterStation,
    GroundwaterStationCollection,
)

logger = logging.getLogger(__name__)


[docs] class ObservedGroundwaterLevelClient: """Client for observed groundwater level-related SGU API endpoints.""" BASE_PATH = "collections"
[docs] def __init__(self, base_client: BaseClient): """Initialize observed groundwater level client. Args: base_client: Base HTTP client instance """ self._client = base_client
[docs] def get_stations( self, bbox: list[float] | None = None, datetime: str | None = None, limit: int | None = None, filter_expr: str | None = None, sortby: list[str] | None = None, **kwargs: Any, ) -> GroundwaterStationCollection: """Get groundwater monitoring stations. Args: bbox: Bounding box as [min_lon, min_lat, max_lon, max_lat] datetime: Date/time filter (RFC 3339 format or interval) limit: Maximum number of features to return (automatically paginated if needed) filter_expr: CQL filter expression sortby: List of sort expressions (e.g., ['+name', '-date']) **kwargs: Additional query parameters Returns: Typed collection of groundwater monitoring stations Example: >>> from sgu_client import SGUClient >>> client = SGUClient() >>> >>> # get stations in southern Sweden >>> stations = client.levels.observed.get_stations( ... bbox=[12.0, 55.0, 16.0, 58.0] ... ) >>> >>> # filter stations by municipality using CQL >>> stations = client.levels.observed.get_stations( ... filter_expr="kommun='Uppsala' AND akvifer='JS'" ... ) """ endpoint = f"{self.BASE_PATH}/stationer/items" params = self._build_query_params( bbox=bbox, datetime=datetime, limit=limit, filter=filter_expr, sortby=sortby, **kwargs, ) response = self._make_request(endpoint, params) return GroundwaterStationCollection(**response)
[docs] def get_station(self, station_id: str) -> GroundwaterStation: """Get a specific groundwater monitoring station by ID. This endpoint is provided by the OGC API but likely not used by any user. Args: station_id: Station identifier Returns: Typed groundwater monitoring station Raises: ValueError: If station not found or multiple stations returned """ endpoint = f"{self.BASE_PATH}/stationer/items/{station_id}" response = self._make_request(endpoint, {}) # SGU API returns a FeatureCollection even for single items collection = GroundwaterStationCollection(**response) if not collection.features: raise ValueError(f"Station {station_id} not found") if len(collection.features) > 1: raise ValueError(f"Multiple stations returned for ID {station_id}") return collection.features[0]
[docs] def get_measurements( self, bbox: list[float] | None = None, datetime: str | None = None, limit: int | None = None, filter_expr: str | None = None, sortby: list[str] | None = None, **kwargs: Any, ) -> GroundwaterMeasurementCollection: """Get groundwater level measurements. This method is used internally by convenience functions like `get_measurements_by_name() by constructing filter expressions. A user may also do this, but readability is greater when using the built-in convenience functions. Args: bbox: Bounding box as [min_lon, min_lat, max_lon, max_lat] datetime: Date/time filter (RFC 3339 format or interval) limit: Maximum number of features to return (automatically paginated if needed) filter_expr: CQL filter expression sortby: List of sort expressions (e.g., ['+date', '-value']) **kwargs: Additional query parameters Returns: Typed collection of groundwater level measurements """ endpoint = f"{self.BASE_PATH}/nivaer/items" params = self._build_query_params( bbox=bbox, datetime=datetime, limit=limit, filter=filter_expr, sortby=sortby, **kwargs, ) response = self._make_request(endpoint, params) return GroundwaterMeasurementCollection(**response)
[docs] def get_measurement(self, measurement_id: str) -> GroundwaterMeasurement: """Get a specific groundwater level measurement by ID. This endpoint is provided by the OGGC API but likely not used by any user. Args: measurement_id: Measurement identifier Returns: Typed groundwater level measurement Raises: ValueError: If measurement not found or multiple measurements returned """ endpoint = f"{self.BASE_PATH}/nivaer/items/{measurement_id}" response = self._make_request(endpoint, {}) # SGU API returns a FeatureCollection even for single items collection = GroundwaterMeasurementCollection(**response) if not collection.features: raise ValueError(f"Measurement {measurement_id} not found") if len(collection.features) > 1: raise ValueError(f"Multiple measurements returned for ID {measurement_id}") return collection.features[0]
[docs] def get_station_by_name( self, station_id: str | None = None, station_name: str | None = None, **kwargs: Any, ) -> GroundwaterStation: """Convenience function to get a station by name ('station_id' or 'station_name'). Args: station_id: Station identifier (maps to 'platsbeteckning' in API) station_name: Station name (maps to 'obsplatsnamn' in API) **kwargs: Additional query parameters (e.g., limit) Returns: Typed groundwater monitoring station Raises: ValueError: If neither parameter is provided, both are provided, or if multiple stations are found Example: >>> from sgu_client import SGUClient >>> client = SGUClient() >>> >>> # find station by platsbeteckning (recommended) >>> station = client.levels.observed.get_station_by_name(station_id="95_2") >>> >>> # find station by obsplatsnamn >>> station = client.levels.observed.get_station_by_name( ... station_name="Lagga_2" ... ) >>> print(station.properties.aquifer_description) """ if not station_id and not station_name: raise ValueError("Either 'station_id' or 'station_name' must be provided.") if station_id and station_name: raise ValueError( "Only one of 'station_id' or 'station_name' can be provided." ) # Map English parameter names to Swedish API field names if station_id: name_type = "platsbeteckning" station = station_id else: name_type = "obsplatsnamn" station = station_name filter_expr = f"{name_type}='{station}'" response = self.get_stations(filter_expr=filter_expr, **kwargs) if len(response.features) > 1: raise ValueError(f"Multiple stations found for {filter_expr}") return response.features[0]
[docs] def get_stations_by_names( self, station_id: list[str] | None = None, station_name: list[str] | None = None, **kwargs: Any, ) -> GroundwaterStationCollection: """Convenience function to get multiple stations by name ('station_id' or 'station_name'). Args: station_id: List of station identifiers (maps to 'platsbeteckning' in API) station_name: List of station names (maps to 'obsplatsnamn' in API) **kwargs: Additional query parameters (e.g., limit) Returns: Typed collection of groundwater monitoring stations Raises: ValueError: If neither parameter is provided or both are provided Example: >>> from sgu_client import SGUClient >>> client = SGUClient() >>> >>> # get multiple stations by their IDs >>> stations = client.levels.observed.get_stations_by_names( ... station_id=["95_2", "101_1", "102_3"] ... ) >>> for station in stations.features: ... print(f"{station.properties.station_id}: {station.properties.aquifer_code}") """ if not station_id and not station_name: raise ValueError("Either 'station_id' or 'station_name' must be provided.") if station_id and station_name: raise ValueError( "Only one of 'station_id' or 'station_name' can be provided." ) # Map English parameter names to Swedish API field names if station_id: name_type = "platsbeteckning" stations = station_id else: name_type = "obsplatsnamn" stations = station_name # Build filter expression for multiple stations using IN clause # stations is guaranteed to not be None by the validation above quoted_stations = [f"'{station}'" for station in stations] # type: ignore[union-attr] filter_expr = f"{name_type} in ({', '.join(quoted_stations)})" return self.get_stations(filter_expr=filter_expr, **kwargs)
[docs] def get_measurements_by_name( self, station_id: str | None = None, station_name: str | None = None, tmin: str | datetime | None = None, tmax: str | datetime | None = None, limit: int | None = None, **kwargs: Any, ) -> GroundwaterMeasurementCollection: """Get measurements for a specific station by name with optional time filtering. Args: station_id: Station identifier (maps to 'platsbeteckning' in API) station_name: Station name (maps to 'obsplatsnamn' in API) tmin: Start time (ISO string or datetime object) tmax: End time (ISO string or datetime object) limit: Maximum number of measurements to return **kwargs: Additional query parameters Returns: Typed collection of groundwater level measurements Raises: ValueError: If neither or both name parameters are provided, or if station lookup fails Example: >>> from sgu_client import SGUClient >>> from datetime import datetime, timezone >>> client = SGUClient() >>> >>> # get all measurements for a station >>> measurements = client.levels.observed.get_measurements_by_name( ... station_id="95_2", ... limit=100 ... ) >>> >>> # get measurements with time filtering >>> measurements = client.levels.observed.get_measurements_by_name( ... station_id="95_2", ... tmin=datetime(2020, 1, 1, tzinfo=timezone.utc), ... tmax=datetime(2021, 1, 1, tzinfo=timezone.utc) ... ) >>> >>> # using ISO string for time filtering >>> measurements = client.levels.observed.get_measurements_by_name( ... station_id="95_2", ... tmin="2020-01-01T00:00:00Z", ... tmax="2021-01-01T00:00:00Z" ... ) """ if not station_id and not station_name: raise ValueError("Either 'station_id' or 'station_name' must be provided.") if station_id and station_name: raise ValueError( "Only one of 'station_id' or 'station_name' can be provided." ) # If station_name provided, look up the station to get station_id if station_name: logger.warning( "Using 'station_name' requires an additional API request to lookup the station. " "For better performance, use 'station_id' directly if available." ) # Don't pass kwargs to station lookup as it's a single result operation station = self.get_station_by_name(station_name=station_name) target_platsbeteckning = station.properties.station_id if not target_platsbeteckning: raise ValueError( f"Station with station_name '{station_name}' has no station_id" ) else: target_platsbeteckning = station_id # Build filter expressions filters = [f"platsbeteckning='{target_platsbeteckning}'"] # Add datetime filters if tmin/tmax provided datetime_filters = self._build_datetime_filters(tmin, tmax) filters.extend(datetime_filters) # Combine all filters with AND combined_filter = " AND ".join(filters) return self.get_measurements( filter_expr=combined_filter, limit=limit, **kwargs, )
[docs] def get_measurements_by_names( self, station_id: list[str] | None = None, station_name: list[str] | None = None, tmin: str | datetime | None = None, tmax: str | datetime | None = None, limit: int | None = None, **kwargs: Any, ) -> GroundwaterMeasurementCollection: """Get measurements for multiple stations by name with optional time filtering. Args: station_id: List of station identifiers (maps to 'platsbeteckning' in API) station_name: List of station names (maps to 'obsplatsnamn' in API) tmin: Start time (ISO string or datetime object) tmax: End time (ISO string or datetime object) limit: Maximum number of measurements to return **kwargs: Additional query parameters Returns: Typed collection of groundwater level measurements Raises: ValueError: If neither or both name parameters are provided, or if station lookup fails Example: >>> from sgu_client import SGUClient >>> client = SGUClient() >>> >>> # get measurements for multiple stations >>> measurements = client.levels.observed.get_measurements_by_names( ... station_id=["95_2", "101_1"], ... tmin="2020-01-01T00:00:00Z", ... tmax="2021-01-01T00:00:00Z", ... limit=1000 ... ) >>> print(f"Found {len(measurements.features)} measurements") """ if not station_id and not station_name: raise ValueError("Either 'station_id' or 'station_name' must be provided.") if station_id and station_name: raise ValueError( "Only one of 'station_id' or 'station_name' can be provided." ) # If station_name provided, look up stations to get station_id values if station_name: logger.warning( "Using 'station_name' requires an additional API request to lookup stations. " "For better performance, use 'station_id' directly if available." ) # Don't pass kwargs to station lookup as it's a separate operation stations = self.get_stations_by_names(station_name=station_name) target_platsbeteckningar = [] for station in stations.features: if station.properties.station_id: target_platsbeteckningar.append(station.properties.station_id) else: raise ValueError( f"Station {station.id} with station_name '{station.properties.station_name}' " f"has no station_id" ) else: target_platsbeteckningar = station_id # Build filter expressions # target_platsbeteckningar is guaranteed to not be None by validation above quoted_stations = [f"'{station}'" for station in target_platsbeteckningar] # type: ignore[union-attr] filters = [f"platsbeteckning in ({', '.join(quoted_stations)})"] # Add datetime filters if tmin/tmax provided datetime_filters = self._build_datetime_filters(tmin, tmax) filters.extend(datetime_filters) # Combine all filters with AND combined_filter = " AND ".join(filters) return self.get_measurements( filter_expr=combined_filter, limit=limit, **kwargs, )
def _build_query_params(self, **params: Any) -> dict[str, Any]: """Build query parameters for API requests. Args: **params: Raw parameter values Returns: Cleaned dictionary of query parameters """ query_params = {} for key, value in params.items(): if value is None: continue if key == "bbox" and isinstance(value, list): query_params[key] = ",".join(map(str, value)) elif key == "sortby" and isinstance(value, list): query_params[key] = ",".join(value) else: query_params[key] = value return query_params def _build_datetime_filters( self, tmin: str | datetime | None, tmax: str | datetime | None ) -> list[str]: """Build CQL datetime filter expressions from tmin/tmax parameters. Args: tmin: Start time (ISO string or datetime object) tmax: End time (ISO string or datetime object) Returns: List of CQL filter expressions for datetime constraints """ filters = [] # Convert datetime objects to ISO strings def to_iso_string(dt: str | datetime | None) -> str | None: if dt is None: return None if isinstance(dt, datetime): return dt.isoformat() return dt if tmin: tmin_str = to_iso_string(tmin) filters.append(f"obsdatum >= '{tmin_str}'") if tmax: tmax_str = to_iso_string(tmax) filters.append(f"obsdatum <= '{tmax_str}'") return filters def _make_request(self, endpoint: str, params: dict[str, Any]) -> dict[str, Any]: """Make HTTP request to SGU API. Args: endpoint: API endpoint path params: Query parameters Returns: JSON response data Raises: Various HTTP and API exceptions via base client """ return self._client.get(endpoint, params=params)