"""Pydantic models for groundwater chemistry data from SGU API."""
from datetime import datetime
from typing import TYPE_CHECKING, Literal
from pydantic import Field
from sgu_client.models.base import SGUBaseModel, SGUResponse
from sgu_client.models.shared import CRS, Geometry, Link
from sgu_client.utils.pandas_helpers import get_pandas, optional_pandas_method
if TYPE_CHECKING:
import pandas as pd
# Sampling site properties (English field names with Swedish API aliases)
[docs]
class SamplingSiteProperties(SGUBaseModel):
"""Properties for a groundwater chemistry sampling site."""
# Site identification
station_id: str | None = Field(
None, alias="platsbeteckning", description="Station identifier"
)
site_name: str | None = Field(
None, alias="provplatsnamn", description="Sampling site name"
)
national_site_id: int | None = Field(
None, alias="nationellt_provplatsid", description="National sampling site ID"
)
eu_station_code: str | None = Field(
None, alias="eucd_stn", description="EU station code"
)
eu_groundwater_body: str | None = Field(
None, alias="eucd_gwb", description="EU groundwater body"
)
# Site classification
site_type_code: str | None = Field(
None, alias="provplatstyp", description="Sampling site type code"
)
site_type_description: str | None = Field(
None, alias="provplatstyp_tx", description="Sampling site type description"
)
site_category_code: str | None = Field(
None,
alias="provplatskat_bedgr",
description="Sampling site category code (assessment basis)",
)
site_category_description: str | None = Field(
None,
alias="provplatskat_bedgr_tx",
description="Sampling site category description",
)
# Coordinates (projected)
north_coordinate: float | None = Field(
None, alias="n", description="North coordinate"
)
east_coordinate: float | None = Field(
None, alias="e", description="East coordinate"
)
# Position metadata
positioning_method_code: str | None = Field(
None, alias="positioneringsmetod", description="Positioning method code"
)
positioning_method_description: str | None = Field(
None,
alias="positioneringsmetod_tx",
description="Positioning method description",
)
position_quality_code: str | None = Field(
None, alias="positionskvalitet", description="Position quality code"
)
position_quality_description: str | None = Field(
None, alias="positionskvalitet_tx", description="Position quality description"
)
# Administrative location
county_code: str | None = Field(None, alias="lanskod", description="County code")
county: str | None = Field(None, alias="lan", description="County")
municipality_code: str | None = Field(
None, alias="kommunkod", description="Municipality code"
)
municipality: str | None = Field(None, alias="kommun", description="Municipality")
region_code: str | None = Field(
None, alias="region_bdgr", description="Region code (assessment basis)"
)
region_description: str | None = Field(
None, alias="region_bdgr_tx", description="Region description"
)
water_district_code: str | None = Field(
None, alias="vattendistrikt", description="Water district code"
)
water_district_description: str | None = Field(
None, alias="vattendistrikt_tx", description="Water district description"
)
# Well/site characteristics
reference_level: float | None = Field(
None, alias="refniva", description="Reference level"
)
elevation_system: str | None = Field(
None, alias="hojdsystem", description="Elevation reference system"
)
well_depth: float | None = Field(None, alias="brunnsdjup", description="Well depth")
well_depth_qualifier: str | None = Field(
None, alias="tecken_brunnsdjup", description="Well depth sign/qualifier"
)
filter_depth_top: float | None = Field(
None, alias="filterdjup_fran", description="Filter depth from (top)"
)
filter_depth_bottom: float | None = Field(
None, alias="filterdjup_till", description="Filter depth to (bottom)"
)
filter_depth_qualifier: str | None = Field(
None, alias="tecken_filterdjup", description="Filter depth sign/qualifier"
)
# Aquifer information
aquifer_code: str | None = Field(None, alias="akvifer", description="Aquifer code")
aquifer_description: str | None = Field(
None, alias="akvifer_tx", description="Aquifer description"
)
# Geology
soil_genesis_code: str | None = Field(
None, alias="genes_jord", description="Soil genesis code"
)
soil_genesis_description: str | None = Field(
None, alias="genes_jord_tx", description="Soil genesis description"
)
rock_type_code: str | None = Field(
None, alias="bergart", description="Rock type code"
)
rock_type_description: str | None = Field(
None, alias="bergart_tx", description="Rock type description"
)
# Site history and monitoring
established_date: str | None = Field(
None, alias="etabldatum", description="Site establishment date"
) # ISO date string
decommissioned_date: str | None = Field(
None, alias="nedlagdatum", description="Site decommissioning date"
) # ISO date string
sample_count: int | None = Field(
None, alias="antal_prov", description="Number of samples collected"
)
program_affiliation: str | None = Field(
None, alias="programkoppl", description="Program affiliation/connections"
)
# Monitoring program flags
national_monitoring: str | None = Field(
None, alias="nationell", description="National monitoring program (ja/nej)"
)
regional_monitoring: str | None = Field(
None, alias="regional", description="Regional monitoring program (ja/nej)"
)
local_monitoring: str | None = Field(
None, alias="lokal", description="Local monitoring program (ja/nej)"
)
# Classification and symbols
symbol: str | None = Field(
None, alias="symbol", description="Symbol/classification marker"
)
# Analysis result links
analyses_csv_url: str | None = Field(
None, alias="analyser_csv", description="URL to CSV file with analysis results"
)
analyses_json_url: str | None = Field(
None,
alias="analyser_json",
description="URL to JSON file with analysis results",
)
@property
def established_datetime(self) -> datetime | None:
"""Parse establishment date as datetime object."""
if self.established_date:
try:
return datetime.fromisoformat(
self.established_date.replace("Z", "+00:00")
)
except (ValueError, AttributeError):
return None
return None
@property
def decommissioned_datetime(self) -> datetime | None:
"""Parse decommissioning date as datetime object."""
if self.decommissioned_date:
try:
return datetime.fromisoformat(
self.decommissioned_date.replace("Z", "+00:00")
)
except (ValueError, AttributeError):
return None
return None
[docs]
class SamplingSite(SGUBaseModel):
"""A groundwater chemistry sampling site (GeoJSON Feature)."""
type: Literal["Feature"] = "Feature"
id: str = Field(..., description="Site ID")
geometry: Geometry | None = Field(None, description="Site geometry")
properties: SamplingSiteProperties = Field(..., description="Site properties")
# Analysis result properties (English field names with Swedish API aliases)
[docs]
class AnalysisResultProperties(SGUBaseModel):
"""Properties for a groundwater chemistry analysis result."""
# Site identification
station_id: str | None = Field(
None, alias="platsbeteckning", description="Station identifier"
)
national_site_id: int | None = Field(
None, alias="nationellt_provplatsid", description="National sampling site ID"
)
county_code: str | None = Field(None, alias="lan", description="County code")
# Sample identification
sample_id: str | None = Field(None, alias="provid", description="Sample ID")
sample_type: str | None = Field(None, alias="provtyp", description="Sample type")
delivery_id: str | None = Field(None, alias="inlevid", description="Delivery ID")
# Monitoring program
program_name: str | None = Field(
None, alias="programnamn", description="Monitoring program name"
)
program_id: str | None = Field(
None, alias="programid", description="Monitoring program ID"
)
monitoring_manual: str | None = Field(
None, alias="overvakningsmanual", description="Monitoring manual reference"
)
# Dates
sampling_date: str | None = Field(
None, alias="provtagningsdat", description="Sampling date"
) # ISO datetime string
submission_date: str | None = Field(
None, alias="inlamningsdat", description="Sample submission date"
) # ISO datetime string
# Parameter (chemical substance measured)
parameter_name: str | None = Field(
None, alias="param", description="Parameter/chemical substance name"
)
parameter_short_name: str | None = Field(
None, alias="param_kort", description="Short parameter name"
)
parameter_sequence_number: int | None = Field(
None, alias="paramlopnr", description="Parameter sequence number"
)
# Sample and analysis preparation
water_preparation: str | None = Field(
None, alias="vattenberedn", description="Water sample preparation"
)
sample_preparation: str | None = Field(
None, alias="provberedn", description="Sample preparation method"
)
# Laboratory and method
laboratory: str | None = Field(None, alias="labb", description="Laboratory name")
method: str | None = Field(None, alias="metod", description="Analysis method used")
# Detection and reporting limits
reporting_limit: float | None = Field(
None, alias="rapporteringsgrans", description="Reporting limit"
)
detection_limit: float | None = Field(
None, alias="detektionsgrans", description="Detection limit"
)
# Measurement value
measurement_value_annotation: str | None = Field(
None,
alias="matvardetalanm",
description="Measurement value annotation/qualifier",
)
measurement_value: float | None = Field(
None, alias="matvardetal", description="Measurement value (numeric)"
)
measurement_value_span: str | None = Field(
None, alias="matvardespar", description="Measurement value span/range"
)
measurement_value_text: str | None = Field(
None, alias="matvardetext", description="Measurement value as text"
)
unit: str | None = Field(None, alias="enhet", description="Unit of measurement")
measurement_uncertainty: str | None = Field(
None, alias="matosakerhet", description="Measurement uncertainty"
)
# Metadata
last_updated: str | None = Field(
None, alias="lastupdate", description="Last update timestamp"
) # ISO datetime string
row_number: int | None = Field(None, alias="radnummer", description="Row number")
@property
def sampling_datetime(self) -> datetime | None:
"""Parse sampling date as datetime object."""
if self.sampling_date:
try:
return datetime.fromisoformat(self.sampling_date.replace("Z", "+00:00"))
except (ValueError, AttributeError):
return None
return None
@property
def submission_datetime(self) -> datetime | None:
"""Parse submission date as datetime object."""
if self.submission_date:
try:
return datetime.fromisoformat(
self.submission_date.replace("Z", "+00:00")
)
except (ValueError, AttributeError):
return None
return None
@property
def last_updated_datetime(self) -> datetime | None:
"""Parse last update as datetime object."""
if self.last_updated:
try:
return datetime.fromisoformat(self.last_updated.replace("Z", "+00:00"))
except (ValueError, AttributeError):
return None
return None
[docs]
class AnalysisResult(SGUBaseModel):
"""A groundwater chemistry analysis result (GeoJSON Feature)."""
type: Literal["Feature"] = "Feature"
id: str = Field(..., description="Result ID")
geometry: Geometry | None = Field(None, description="Result geometry")
properties: AnalysisResultProperties = Field(
..., description="Analysis result properties"
)
# Collection response models
[docs]
class SamplingSiteCollection(SGUResponse):
"""Collection of groundwater chemistry sampling sites (GeoJSON FeatureCollection)."""
type: Literal["FeatureCollection"] = "FeatureCollection"
features: list[SamplingSite] = Field(
default_factory=list, description="Sampling site features"
)
# OGC API Features metadata
totalFeatures: int | None = Field(None, description="Total number of features")
numberMatched: int | None = Field(
None, description="Number of features matching query"
)
numberReturned: int | None = Field(None, description="Number of features returned")
timeStamp: str | None = Field(None, description="Response timestamp")
# Links and CRS
links: list[Link] | None = Field(None, description="Related links")
crs: CRS | None = Field(None, description="Coordinate reference system")
@optional_pandas_method("to_dataframe() method")
def to_dataframe(self) -> "pd.DataFrame":
"""Convert to pandas DataFrame with flattened sampling site properties.
Returns:
DataFrame containing sampling site data with parsed datetime columns.
Examples:
>>> from sgu_client import SGUClient
>>> client = SGUClient()
>>>
>>> sites = client.chemistry.get_sampling_sites(limit=10)
>>> df = sites.to_dataframe()
>>>
>>> # dataFrame includes site properties with datetime parsing
>>> print(df[['station_id', 'site_name', 'municipality', 'established_date', 'sample_count']].head())
>>> # established_date and decommissioned_date are parsed as datetime objects
"""
data = []
for feature in self.features:
row = {
"site_id": feature.id,
}
# Add geometry if present
if feature.geometry:
row.update(
{
"geometry_type": feature.geometry.type,
"longitude": feature.geometry.coordinates[0]
if feature.geometry.coordinates
else None,
"latitude": feature.geometry.coordinates[1]
if len(feature.geometry.coordinates) > 1
else None,
}
)
# Add parsed datetime columns
row["established_date"] = feature.properties.established_datetime
row["decommissioned_date"] = feature.properties.decommissioned_datetime
# Add all properties
row.update(feature.properties.model_dump())
data.append(row)
pd = get_pandas()
df = pd.DataFrame(data)
# Ensure datetime columns are properly typed
if not df.empty:
if "established_date" in df.columns:
df["established_date"] = pd.to_datetime(df["established_date"])
if "decommissioned_date" in df.columns:
df["decommissioned_date"] = pd.to_datetime(df["decommissioned_date"])
return df
[docs]
class AnalysisResultCollection(SGUResponse):
"""Collection of groundwater chemistry analysis results (GeoJSON FeatureCollection)."""
type: Literal["FeatureCollection"] = "FeatureCollection"
features: list[AnalysisResult] = Field(
default_factory=list, description="Analysis result features"
)
# OGC API Features metadata
totalFeatures: int | None = Field(None, description="Total number of features")
numberMatched: int | None = Field(
None, description="Number of features matching query"
)
numberReturned: int | None = Field(None, description="Number of features returned")
timeStamp: str | None = Field(None, description="Response timestamp")
# Links and CRS
links: list[Link] | None = Field(None, description="Related links")
crs: CRS | None = Field(None, description="Coordinate reference system")
@optional_pandas_method("to_dataframe() method")
def to_dataframe(self, sort_by_date: bool = True) -> "pd.DataFrame":
"""Convert to pandas DataFrame with analysis result data.
Args:
sort_by_date: Whether to sort the DataFrame by sampling date.
Returns:
DataFrame containing analysis result data.
Examples:
>>> from sgu_client import SGUClient
>>> client = SGUClient()
>>>
>>> results = client.chemistry.get_results_by_site(site_id="10001_1", limit=100)
>>> df = results.to_dataframe()
>>>
>>> # dataFrame includes chemical analysis results with multiple datetime columns
>>> print(df[['sampling_date', 'parameter_short_name', 'measurement_value', 'unit']].head())
>>> # sampling_date, submission_date, and last_update are all parsed as datetime objects
"""
data = []
for feature in self.features:
row = {
"result_id": feature.id,
"sampling_date": feature.properties.sampling_datetime,
"submission_date": feature.properties.submission_datetime,
"last_update": feature.properties.last_updated_datetime,
}
# Add geometry if present
if feature.geometry:
row.update(
{
"geometry_type": feature.geometry.type,
"longitude": feature.geometry.coordinates[0]
if feature.geometry.coordinates
else None,
"latitude": feature.geometry.coordinates[1]
if len(feature.geometry.coordinates) > 1
else None,
}
)
# Add all properties
row.update(feature.properties.model_dump())
data.append(row)
pd = get_pandas()
df = pd.DataFrame(data)
# Ensure datetime columns are properly typed
if not df.empty:
if "sampling_date" in df.columns:
df["sampling_date"] = pd.to_datetime(df["sampling_date"])
if "submission_date" in df.columns:
df["submission_date"] = pd.to_datetime(df["submission_date"])
if "last_update" in df.columns:
df["last_update"] = pd.to_datetime(df["last_update"])
if sort_by_date and not df.empty and "sampling_date" in df.columns:
df = df.sort_values(by="sampling_date")
return df
@optional_pandas_method("to_series() method")
def to_series(
self,
index: str | None = None,
data: str | None = None,
sort_by_date: bool = True,
) -> "pd.Series":
"""Convert to pandas Series with analysis result data.
Args:
index: Column name to use as index. If None, `sampling_date` is used.
data: Column name to use as data. If None, `measurement_value` is used.
sort_by_date: Whether to sort the data by sampling date before creating the Series.
Returns:
Series containing analysis result data.
Examples:
>>> from sgu_client import SGUClient
>>> client = SGUClient()
>>> results = client.chemistry.get_results_by_site(site_id="10001_1", limit=100)
>>>
>>> # create time series with default columns (sampling_date, measurement_value)
>>> series = results.to_series()
>>> print(series.head())
>>>
>>> # use custom columns - e.g., parameter names as data
>>> series_params = results.to_series(
... index="sampling_date",
... data="parameter_short_name"
... )
"""
df = self.to_dataframe(sort_by_date=sort_by_date)
pd = get_pandas()
if data is None:
data = "measurement_value"
if index is None:
index = "sampling_date"
if df.empty:
return pd.Series(dtype=float)
if index and index not in df.columns:
raise ValueError(f"Index column '{index}' not found in DataFrame.")
if data and data not in df.columns:
raise ValueError(f"Data column '{data}' not found in DataFrame.")
series = pd.Series(data=df[data].values, index=df[index] if index else None)
series.name = data
return series
@optional_pandas_method("pivot_by_parameter() method")
def pivot_by_parameter(
self,
values: str = "measurement_value",
index: str = "sampling_date",
columns: str = "parameter_short_name",
aggfunc: str = "mean",
) -> "pd.DataFrame":
"""Pivot analysis results by parameter for easier time series analysis.
This creates a wide-format DataFrame where each chemical parameter becomes
a column, making it easy to analyze multiple parameters over time.
Args:
values: Column to use for values (default: 'measurement_value')
index: Column to use as index (default: 'sampling_date')
columns: Column to pivot into columns (default: 'parameter_short_name')
aggfunc: Aggregation function if there are duplicate index/column pairs
(default: 'mean'). Can be 'mean', 'median', 'first', 'last', etc.
Returns:
Pivoted DataFrame with parameters as columns.
Example:
>>> from sgu_client import SGUClient()
>>> client = SGUClient()
>>>
>>> results = client.chemistry.get_results_by_site(site_id="10001_1")
>>> df_pivot = results.pivot_by_parameter()
>>> # now df_pivot has columns like 'PH', 'NITRATE', 'CHLORIDE', etc.
"""
df = self.to_dataframe(sort_by_date=True)
pd = get_pandas()
if df.empty:
return pd.DataFrame()
# Validate columns exist
for col in [values, index, columns]:
if col not in df.columns:
raise ValueError(f"Column '{col}' not found in DataFrame.")
# Create pivot table
pivot_df = df.pivot_table(
values=values, index=index, columns=columns, aggfunc=aggfunc
)
return pivot_df