Skip to content

IO & Data Management (API)

Measurement Data

pyadm1ode_calibration.io.loaders.measurement_data.MeasurementData(data, metadata=None)

Container for biogas plant measurement data.

This class manages time-series data from biogas plants, providing methods for loading, validation, cleaning (outlier removal), and pre-processing (gap filling, resampling).

Parameters:

Name Type Description Default
data DataFrame

DataFrame containing measurements. If a 'timestamp' column exists, it will be converted to datetime and used as the index.

required
metadata Optional[Dict[str, Any]]

Optional dictionary containing contextual information (e.g., plant ID, location).

None
Source code in pyadm1ode_calibration/io/loaders/measurement_data.py
def __init__(self, data: pd.DataFrame, metadata: Optional[Dict[str, Any]] = None):
    self.data = data
    self.metadata = metadata or {}

    if "timestamp" in self.data.columns:
        if not pd.api.types.is_datetime64_any_dtype(self.data["timestamp"]):
            self.data["timestamp"] = pd.to_datetime(self.data["timestamp"])
        self.data = self.data.set_index("timestamp").sort_index()

Functions

fill_gaps(columns=None, method='interpolate', **kwargs)

Fill missing values (NaNs) in the data.

Parameters:

Name Type Description Default
columns Optional[List[str]]

Columns to fill.

None
method str

Fill method ('interpolate', 'forward', 'backward', 'mean', 'median').

'interpolate'
**kwargs Any

Additional arguments for filling (e.g., 'limit').

{}
Source code in pyadm1ode_calibration/io/loaders/measurement_data.py
def fill_gaps(self, columns: Optional[List[str]] = None, method: str = "interpolate", **kwargs: Any) -> None:
    """
    Fill missing values (NaNs) in the data.

    Args:
        columns (Optional[List[str]]): Columns to fill.
        method (str): Fill method ('interpolate', 'forward', 'backward', 'mean', 'median').
        **kwargs (Any): Additional arguments for filling (e.g., 'limit').
    """
    if columns is None:
        columns = self.data.columns.tolist()

    for col in columns:
        if col not in self.data.columns:
            continue
        if method == "interpolate":
            limit = kwargs.get("limit", None)
            self.data[col] = self.data[col].interpolate(method="linear", limit=limit)
        elif method == "forward":
            limit = kwargs.get("limit", None)
            self.data[col] = self.data[col].ffill(limit=limit)
        elif method == "backward":
            limit = kwargs.get("limit", None)
            self.data[col] = self.data[col].bfill(limit=limit)
        elif method == "mean":
            self.data[col] = self.data[col].fillna(self.data[col].mean())
        elif method == "median":
            self.data[col] = self.data[col].fillna(self.data[col].median())
        else:
            raise ValueError(f"Unknown fill method: {method}")

from_csv(filepath, timestamp_column='timestamp', sep=',', parse_dates=True, resample=None, **kwargs) classmethod

Load measurement data from a CSV file.

Parameters:

Name Type Description Default
filepath str

Path to the CSV file.

required
timestamp_column str

Name of the column containing time information.

'timestamp'
sep str

CSV delimiter. Defaults to ','.

','
parse_dates bool

Whether to parse dates. Defaults to True.

True
resample Optional[str]

Frequency string to resample to (e.g., '1h').

None
**kwargs Any

Additional arguments passed to pd.read_csv.

{}

Returns:

Name Type Description
MeasurementData MeasurementData

A new instance with the loaded data.

Source code in pyadm1ode_calibration/io/loaders/measurement_data.py
@classmethod
def from_csv(
    cls,
    filepath: str,
    timestamp_column: str = "timestamp",
    sep: str = ",",
    parse_dates: bool = True,
    resample: Optional[str] = None,
    **kwargs: Any,
) -> "MeasurementData":
    """
    Load measurement data from a CSV file.

    Args:
        filepath (str): Path to the CSV file.
        timestamp_column (str): Name of the column containing time information.
        sep (str): CSV delimiter. Defaults to ','.
        parse_dates (bool): Whether to parse dates. Defaults to True.
        resample (Optional[str]): Frequency string to resample to (e.g., '1h').
        **kwargs (Any): Additional arguments passed to pd.read_csv.

    Returns:
        MeasurementData: A new instance with the loaded data.
    """
    data = pd.read_csv(filepath, sep=sep, **kwargs)
    if timestamp_column in data.columns:
        data["timestamp"] = pd.to_datetime(data[timestamp_column])
        if timestamp_column != "timestamp":
            data = data.drop(columns=[timestamp_column])
    instance = cls(data)
    if resample is not None:
        instance.resample(resample)
    return instance

get_measurement(column, start_time=None, end_time=None)

Get a specific measurement series, optionally windowed.

Parameters:

Name Type Description Default
column str

Name of the measurement column.

required
start_time Optional[datetime]

Start of window.

None
end_time Optional[datetime]

End of window.

None

Returns:

Type Description
Series

pd.Series: The requested time series.

Source code in pyadm1ode_calibration/io/loaders/measurement_data.py
def get_measurement(
    self, column: str, start_time: Optional[Union[str, datetime]] = None, end_time: Optional[Union[str, datetime]] = None
) -> pd.Series:
    """
    Get a specific measurement series, optionally windowed.

    Args:
        column (str): Name of the measurement column.
        start_time (Optional[datetime]): Start of window.
        end_time (Optional[datetime]): End of window.

    Returns:
        pd.Series: The requested time series.
    """
    if column not in self.data.columns:
        raise ValueError(f"Column '{column}' not found")
    series = self.data[column]
    if start_time is not None or end_time is not None:
        series = series.loc[start_time:end_time]  # type: ignore
    return series

get_substrate_feeds(substrate_columns=None)

Extract substrate feed rates as a 2D numpy array.

Parameters:

Name Type Description Default
substrate_columns Optional[List[str]]

Column names for substrates.

None

Returns:

Type Description
ndarray

np.ndarray: Matrix of feed rates.

Source code in pyadm1ode_calibration/io/loaders/measurement_data.py
def get_substrate_feeds(self, substrate_columns: Optional[List[str]] = None) -> np.ndarray:
    """
    Extract substrate feed rates as a 2D numpy array.

    Args:
        substrate_columns (Optional[List[str]]): Column names for substrates.

    Returns:
        np.ndarray: Matrix of feed rates.
    """
    if substrate_columns is None:
        substrate_columns = [col for col in self.data.columns if col.startswith("Q_sub")]
    if not substrate_columns:
        raise ValueError("No substrate columns found")
    return self.data[substrate_columns].values

get_time_window(start_time, end_time)

Create a new MeasurementData instance for a specific time window.

Parameters:

Name Type Description Default
start_time Union[str, datetime]

Start timestamp.

required
end_time Union[str, datetime]

End timestamp.

required

Returns:

Name Type Description
MeasurementData MeasurementData

A subset of the data.

Source code in pyadm1ode_calibration/io/loaders/measurement_data.py
def get_time_window(self, start_time: Union[str, datetime], end_time: Union[str, datetime]) -> "MeasurementData":
    """
    Create a new MeasurementData instance for a specific time window.

    Args:
        start_time (Union[str, datetime]): Start timestamp.
        end_time (Union[str, datetime]): End timestamp.

    Returns:
        MeasurementData: A subset of the data.
    """
    windowed_data = self.data.loc[start_time:end_time].copy()  # type: ignore
    return MeasurementData(windowed_data, metadata=self.metadata.copy())

remove_outliers(columns=None, method='zscore', threshold=3.0, **kwargs)

Detect and remove outliers from specified columns.

Outliers are replaced with NaN.

Parameters:

Name Type Description Default
columns Optional[List[str]]

Columns to check. Defaults to all numeric.

None
method str

Detection method ('zscore', 'iqr', 'moving_window').

'zscore'
threshold float

Threshold for outlier detection.

3.0
**kwargs Any

Additional arguments for the detection method.

{}

Returns:

Name Type Description
int int

Total number of outliers removed.

Source code in pyadm1ode_calibration/io/loaders/measurement_data.py
def remove_outliers(
    self, columns: Optional[List[str]] = None, method: str = "zscore", threshold: float = 3.0, **kwargs: Any
) -> int:
    """
    Detect and remove outliers from specified columns.

    Outliers are replaced with NaN.

    Args:
        columns (Optional[List[str]]): Columns to check. Defaults to all numeric.
        method (str): Detection method ('zscore', 'iqr', 'moving_window').
        threshold (float): Threshold for outlier detection.
        **kwargs (Any): Additional arguments for the detection method.

    Returns:
        int: Total number of outliers removed.
    """
    if columns is None:
        columns = self.data.select_dtypes(include=[np.number]).columns.tolist()

    n_outliers = 0
    for col in columns:
        if col not in self.data.columns:
            continue
        if method == "zscore":
            is_outlier = OutlierDetector.detect_zscore(self.data[col], threshold=threshold)
        elif method == "iqr":
            is_outlier = OutlierDetector.detect_iqr(self.data[col], multiplier=threshold)
        elif method == "moving_window":
            window = kwargs.get("window", 5)
            is_outlier = OutlierDetector.detect_moving_window(self.data[col], window=window, threshold=threshold)
        else:
            raise ValueError(f"Unknown outlier detection method: {method}")

        n_col_outliers = int(is_outlier.sum())
        self.data.loc[is_outlier, col] = np.nan
        n_outliers += n_col_outliers
    return n_outliers

resample(freq, aggregation='mean')

Resample the time series data to a new frequency.

Parameters:

Name Type Description Default
freq str

Frequency string (e.g., '1h', '1d').

required
aggregation str

Aggregation function ('mean', 'sum', 'first', 'last').

'mean'
Source code in pyadm1ode_calibration/io/loaders/measurement_data.py
def resample(self, freq: str, aggregation: str = "mean") -> None:
    """
    Resample the time series data to a new frequency.

    Args:
        freq (str): Frequency string (e.g., '1h', '1d').
        aggregation (str): Aggregation function ('mean', 'sum', 'first', 'last').
    """
    resampler = self.data.resample(freq)
    if aggregation == "mean":
        self.data = resampler.mean()
    elif aggregation == "sum":
        self.data = resampler.sum()
    elif aggregation == "first":
        self.data = resampler.first()
    elif aggregation == "last":
        self.data = resampler.last()
    else:
        raise ValueError(f"Unknown aggregation method: {aggregation}")

summary()

Get a statistical summary of all measurement columns.

Returns:

Type Description
DataFrame

pd.DataFrame: Descriptive statistics.

Source code in pyadm1ode_calibration/io/loaders/measurement_data.py
def summary(self) -> pd.DataFrame:
    """
    Get a statistical summary of all measurement columns.

    Returns:
        pd.DataFrame: Descriptive statistics.
    """
    return self.data.describe()

to_csv(filepath, **kwargs)

Save the current data to a CSV file.

Parameters:

Name Type Description Default
filepath str

Destination path.

required
**kwargs Any

Passed to pd.DataFrame.to_csv.

{}
Source code in pyadm1ode_calibration/io/loaders/measurement_data.py
def to_csv(self, filepath: str, **kwargs: Any) -> None:
    """
    Save the current data to a CSV file.

    Args:
        filepath (str): Destination path.
        **kwargs (Any): Passed to pd.DataFrame.to_csv.
    """
    self.data.to_csv(filepath, **kwargs)

validate(required_columns=None, expected_ranges=None)

Validate measurement data against schema and range expectations.

Parameters:

Name Type Description Default
required_columns Optional[List[str]]

Columns that must be present.

None
expected_ranges Optional[Dict[str, Tuple[float, float]]]

Mapping of column names to (min, max) range tuples.

None

Returns:

Name Type Description
ValidationResult ValidationResult

Result of the validation checks.

Source code in pyadm1ode_calibration/io/loaders/measurement_data.py
def validate(
    self, required_columns: Optional[List[str]] = None, expected_ranges: Optional[Dict[str, Tuple[float, float]]] = None
) -> ValidationResult:
    """
    Validate measurement data against schema and range expectations.

    Args:
        required_columns (Optional[List[str]]): Columns that must be present.
        expected_ranges (Optional[Dict[str, Tuple[float, float]]]):
            Mapping of column names to (min, max) range tuples.

    Returns:
        ValidationResult: Result of the validation checks.
    """
    if expected_ranges is None:
        expected_ranges = {
            "pH": (5.0, 9.0),
            "VFA": (0.0, 20.0),
            "TAC": (0.0, 50.0),
            "Q_gas": (0.0, 5000.0),
            "Q_ch4": (0.0, 3000.0),
            "T_digester": (273.15, 333.15),
        }
    return DataValidator.validate(self.data, required_columns=required_columns, expected_ranges=expected_ranges)

CSV Handler

pyadm1ode_calibration.io.loaders.csv_handler.CSVHandler(decimal_separator='.', thousands_separator=',')

Handler for CSV file operations in PyADM1.

Supports reading and writing various CSV formats used in biogas plant operation and laboratory analysis.

Example

handler = CSVHandler() data = handler.load_substrate_lab_data("lab_results.csv")

Initialize CSV handler.

Parameters:

Name Type Description Default
decimal_separator str

Decimal separator ("." or ",")

'.'
thousands_separator str

Thousands separator ("," or "." or "")

','
Source code in pyadm1ode_calibration/io/loaders/csv_handler.py
def __init__(self, decimal_separator: str = ".", thousands_separator: str = ","):
    """
    Initialize CSV handler.

    Args:
        decimal_separator: Decimal separator ("." or ",")
        thousands_separator: Thousands separator ("," or "." or "")
    """
    self.decimal_separator = decimal_separator
    self.thousands_separator = thousands_separator

Functions

create_template_substrate_csv(filepath, format_type='horizontal')

Create template CSV file for substrate data entry.

Parameters:

Name Type Description Default
filepath str

Output file path

required
format_type str

"horizontal" or "vertical"

'horizontal'
Example

handler.create_template_substrate_csv("template.csv")

Source code in pyadm1ode_calibration/io/loaders/csv_handler.py
def create_template_substrate_csv(self, filepath: str, format_type: str = "horizontal") -> None:
    """
    Create template CSV file for substrate data entry.

    Args:
        filepath: Output file path
        format_type: "horizontal" or "vertical"

    Example:
        >>> handler.create_template_substrate_csv("template.csv")
    """
    if format_type == "horizontal":
        # One row per sample
        template = pd.DataFrame(
            columns=[
                "substrate_name",
                "substrate_type",
                "sample_date",
                "TS",
                "VS",
                "oTS",
                "foTS",
                "RP",
                "RL",
                "RF",
                "NDF",
                "ADF",
                "ADL",
                "pH",
                "NH4_N",
                "TAC",
                "COD_S",
                "BMP",
                "C_content",
                "N_content",
                "C_to_N",
            ]
        )

        # Add example row
        template.loc[0] = [
            "Maize silage",
            "maize",
            "2024-01-15",
            32.5,
            96.2,
            31.3,
            28.5,
            8.5,
            3.2,
            21.5,
            42.1,
            22.3,
            2.1,
            3.9,
            0.5,
            11.0,
            18.5,
            345.0,
            45.2,
            1.8,
            25.1,
        ]

    else:  # vertical
        template = pd.DataFrame(
            {
                "Parameter": [
                    "Substrate name",
                    "Substrate type",
                    "TS",
                    "VS",
                    "RP",
                    "RL",
                    "NDF",
                    "ADF",
                    "ADL",
                    "pH",
                    "NH4-N",
                    "TAC",
                    "COD_S",
                    "BMP",
                ],
                "Value": ["Maize silage", "maize", 32.5, 96.2, 8.5, 3.2, 42.1, 22.3, 2.1, 3.9, 0.5, 11.0, 18.5, 345.0],
                "Unit": [
                    "",
                    "",
                    "% FM",
                    "% TS",
                    "% TS",
                    "% TS",
                    "% TS",
                    "% TS",
                    "% TS",
                    "-",
                    "g/L",
                    "mmol/L",
                    "g/L",
                    "L CH4/kg oTS",
                ],
            }
        )

    template.to_csv(filepath, index=False)
    print(f"✓ Created template CSV at {filepath}")

export_measurement_data(data, filepath, sep=',', encoding='utf-8', include_index=True)

Export measurement data to CSV.

Parameters:

Name Type Description Default
data DataFrame

DataFrame with measurements

required
filepath str

Output file path

required
sep str

Column separator

','
encoding str

File encoding

'utf-8'
include_index bool

Include index (timestamp) in output

True
Example

handler.export_measurement_data(measurements, "export.csv")

Source code in pyadm1ode_calibration/io/loaders/csv_handler.py
def export_measurement_data(
    self, data: pd.DataFrame, filepath: str, sep: str = ",", encoding: str = "utf-8", include_index: bool = True
) -> None:
    """
    Export measurement data to CSV.

    Args:
        data: DataFrame with measurements
        filepath: Output file path
        sep: Column separator
        encoding: File encoding
        include_index: Include index (timestamp) in output

    Example:
        >>> handler.export_measurement_data(measurements, "export.csv")
    """
    data.to_csv(filepath, sep=sep, encoding=encoding, index=include_index)
    print(f"✓ Exported measurement data to {filepath} ({len(data)} rows)")

export_parameter_table(data, filepath, sep=',', encoding='utf-8')

Export parameter table to CSV.

Parameters:

Name Type Description Default
data DataFrame

DataFrame with parameters

required
filepath str

Output file path

required
sep str

Column separator

','
encoding str

File encoding

'utf-8'
Example

handler.export_parameter_table(params_df, "parameters.csv")

Source code in pyadm1ode_calibration/io/loaders/csv_handler.py
def export_parameter_table(self, data: pd.DataFrame, filepath: str, sep: str = ",", encoding: str = "utf-8") -> None:
    """
    Export parameter table to CSV.

    Args:
        data: DataFrame with parameters
        filepath: Output file path
        sep: Column separator
        encoding: File encoding

    Example:
        >>> handler.export_parameter_table(params_df, "parameters.csv")
    """
    data.to_csv(filepath, sep=sep, encoding=encoding)
    print(f"✓ Exported parameter table to {filepath}")

export_simulation_results(results, filepath, sep=',', encoding='utf-8', flatten_components=True)

Export simulation results to CSV.

Parameters:

Name Type Description Default
results List[Dict[str, Any]]

List of result dicts from plant.simulate()

required
filepath str

Output file path

required
sep str

Column separator

','
encoding str

File encoding

'utf-8'
flatten_components bool

Flatten component results into columns

True
Example

results = plant.simulate(duration=30, dt=1/24) handler.export_simulation_results(results, "simulation.csv")

Source code in pyadm1ode_calibration/io/loaders/csv_handler.py
def export_simulation_results(
    self,
    results: List[Dict[str, Any]],
    filepath: str,
    sep: str = ",",
    encoding: str = "utf-8",
    flatten_components: bool = True,
) -> None:
    """
    Export simulation results to CSV.

    Args:
        results: List of result dicts from plant.simulate()
        filepath: Output file path
        sep: Column separator
        encoding: File encoding
        flatten_components: Flatten component results into columns

    Example:
        >>> results = plant.simulate(duration=30, dt=1/24)
        >>> handler.export_simulation_results(results, "simulation.csv")
    """
    if not results:
        warnings.warn("No results to export")
        return

    # Convert to DataFrame
    if flatten_components:
        # Flatten structure: time, component1_metric1, component1_metric2, ...
        rows = []
        for result in results:
            row = {"time": result["time"]}

            for comp_id, comp_data in result["components"].items():
                for metric, value in comp_data.items():
                    # Skip nested dicts (like gas_storage)
                    if isinstance(value, dict):
                        continue
                    col_name = f"{comp_id}_{metric}"
                    row[col_name] = value

            rows.append(row)

        df = pd.DataFrame(rows)
    else:
        # Simple format: just time and first component's data
        first_comp_id = list(results[0]["components"].keys())[0]
        rows = []
        for result in results:
            row = {"time": result["time"]}
            row.update(result["components"][first_comp_id])
            # Remove nested dicts
            row = {k: v for k, v in row.items() if not isinstance(v, dict)}
            rows.append(row)

        df = pd.DataFrame(rows)

    # Export
    df.to_csv(filepath, sep=sep, encoding=encoding, index=False)
    print(f"✓ Exported simulation results to {filepath} ({len(df)} time points)")

export_substrate_data(data, filepath, sep=',', encoding='utf-8')

Export substrate data to CSV.

Parameters:

Name Type Description Default
data Union[Dict[str, Any], DataFrame]

Dict or DataFrame with substrate data

required
filepath str

Output file path

required
sep str

Column separator

','
encoding str

File encoding

'utf-8'
Example

handler.export_substrate_data(substrate_data, "export.csv")

Source code in pyadm1ode_calibration/io/loaders/csv_handler.py
def export_substrate_data(
    self, data: Union[Dict[str, Any], pd.DataFrame], filepath: str, sep: str = ",", encoding: str = "utf-8"
) -> None:
    """
    Export substrate data to CSV.

    Args:
        data: Dict or DataFrame with substrate data
        filepath: Output file path
        sep: Column separator
        encoding: File encoding

    Example:
        >>> handler.export_substrate_data(substrate_data, "export.csv")
    """
    if isinstance(data, dict):
        df = pd.DataFrame([data])
    else:
        df = data

    df.to_csv(filepath, sep=sep, encoding=encoding, index=False)
    print(f"✓ Exported substrate data to {filepath}")

load_measurement_data(filepath, timestamp_column='timestamp', sep=',', encoding='utf-8', parse_dates=True, resample=None)

Load time series measurement data from CSV.

Expected columns: - timestamp (or Zeit, Zeitstempel) - Q_sub_* (substrate feeds) - pH, VFA, TAC, FOS_TAC - T_digester - Q_gas, Q_ch4, Q_co2, CH4_content, P_gas - P_el, P_th

Parameters:

Name Type Description Default
filepath str

Path to CSV file

required
timestamp_column str

Name of timestamp column

'timestamp'
sep str

Column separator

','
encoding str

File encoding

'utf-8'
parse_dates bool

Parse timestamp column

True
resample Optional[str]

Resample frequency (e.g., "1h", "1d")

None

Returns:

Type Description
DataFrame

DataFrame with measurements

Example

handler = CSVHandler() data = handler.load_measurement_data( ... "plant_data.csv", ... resample="1h" ... )

Source code in pyadm1ode_calibration/io/loaders/csv_handler.py
def load_measurement_data(
    self,
    filepath: str,
    timestamp_column: str = "timestamp",
    sep: str = ",",
    encoding: str = "utf-8",
    parse_dates: bool = True,
    resample: Optional[str] = None,
) -> pd.DataFrame:
    """
    Load time series measurement data from CSV.

    Expected columns:
    - timestamp (or Zeit, Zeitstempel)
    - Q_sub_* (substrate feeds)
    - pH, VFA, TAC, FOS_TAC
    - T_digester
    - Q_gas, Q_ch4, Q_co2, CH4_content, P_gas
    - P_el, P_th

    Args:
        filepath: Path to CSV file
        timestamp_column: Name of timestamp column
        sep: Column separator
        encoding: File encoding
        parse_dates: Parse timestamp column
        resample: Resample frequency (e.g., "1h", "1d")

    Returns:
        DataFrame with measurements

    Example:
        >>> handler = CSVHandler()
        >>> data = handler.load_measurement_data(
        ...     "plant_data.csv",
        ...     resample="1h"
        ... )
    """
    # Auto-detect separator
    if sep == "auto":
        sep = self._detect_separator(filepath)

    # Read CSV
    df = pd.read_csv(filepath, sep=sep, encoding=encoding)

    # Map column names
    df = self._map_column_names(df)

    # Parse timestamp
    if timestamp_column in df.columns:
        if parse_dates:
            df[timestamp_column] = pd.to_datetime(df[timestamp_column])
        df = df.set_index(timestamp_column).sort_index()

    # Resample if requested
    if resample is not None:
        df = df.resample(resample).mean()

    return df

load_multiple_substrate_samples(filepath, sep=',', encoding='utf-8', date_column='sample_date', name_column='substrate_name')

Load multiple substrate samples from CSV.

Expected format: Each row is one sample with columns for all parameters.

Parameters:

Name Type Description Default
filepath str

Path to CSV file

required
sep str

Column separator

','
encoding str

File encoding

'utf-8'
date_column str

Name of date column

'sample_date'
name_column str

Name of substrate name column

'substrate_name'

Returns:

Type Description
DataFrame

DataFrame with substrate data

Example

handler = CSVHandler() samples = handler.load_multiple_substrate_samples( ... "substrate_database.csv" ... ) print(samples.head())

Source code in pyadm1ode_calibration/io/loaders/csv_handler.py
def load_multiple_substrate_samples(
    self,
    filepath: str,
    sep: str = ",",
    encoding: str = "utf-8",
    date_column: str = "sample_date",
    name_column: str = "substrate_name",
) -> pd.DataFrame:
    """
    Load multiple substrate samples from CSV.

    Expected format: Each row is one sample with columns for all parameters.

    Args:
        filepath: Path to CSV file
        sep: Column separator
        encoding: File encoding
        date_column: Name of date column
        name_column: Name of substrate name column

    Returns:
        DataFrame with substrate data

    Example:
        >>> handler = CSVHandler()
        >>> samples = handler.load_multiple_substrate_samples(
        ...     "substrate_database.csv"
        ... )
        >>> print(samples.head())
    """
    df = pd.read_csv(filepath, sep=sep, encoding=encoding)

    # Map column names
    df = self._map_column_names(df)

    # Parse date column
    if date_column in df.columns:
        df[date_column] = pd.to_datetime(df[date_column])

    return df

load_parameter_table(filepath, sep=',', encoding='utf-8', index_col=None)

Load parameter table from CSV.

Expected format: - Rows: Parameters - Columns: Different scenarios/substrates

Parameters:

Name Type Description Default
filepath str

Path to CSV file

required
sep str

Column separator

','
encoding str

File encoding

'utf-8'
index_col Optional[str]

Column to use as index (usually parameter name)

None

Returns:

Type Description
DataFrame

DataFrame with parameters

Example

params = handler.load_parameter_table("parameters.csv")

Source code in pyadm1ode_calibration/io/loaders/csv_handler.py
def load_parameter_table(
    self, filepath: str, sep: str = ",", encoding: str = "utf-8", index_col: Optional[str] = None
) -> pd.DataFrame:
    """
    Load parameter table from CSV.

    Expected format:
    - Rows: Parameters
    - Columns: Different scenarios/substrates

    Args:
        filepath: Path to CSV file
        sep: Column separator
        encoding: File encoding
        index_col: Column to use as index (usually parameter name)

    Returns:
        DataFrame with parameters

    Example:
        >>> params = handler.load_parameter_table("parameters.csv")
    """
    df = pd.read_csv(filepath, sep=sep, encoding=encoding, index_col=index_col)
    return df

load_simulation_results(filepath, sep=',', encoding='utf-8')

Load simulation results from CSV.

Parameters:

Name Type Description Default
filepath str

Path to CSV file

required
sep str

Column separator

','
encoding str

File encoding

'utf-8'

Returns:

Type Description
List[Dict[str, Any]]

List of result dicts

Example

results = handler.load_simulation_results("simulation.csv")

Source code in pyadm1ode_calibration/io/loaders/csv_handler.py
def load_simulation_results(self, filepath: str, sep: str = ",", encoding: str = "utf-8") -> List[Dict[str, Any]]:
    """
    Load simulation results from CSV.

    Args:
        filepath: Path to CSV file
        sep: Column separator
        encoding: File encoding

    Returns:
        List of result dicts

    Example:
        >>> results = handler.load_simulation_results("simulation.csv")
    """
    df = pd.read_csv(filepath, sep=sep, encoding=encoding)

    # Convert back to results format
    results = []
    for _, row in df.iterrows():
        result = {"time": row["time"], "components": {}}

        # Group columns by component
        for col in df.columns:
            if col == "time":
                continue

            if "_" in col:
                comp_id, metric = col.split("_", 1)
                if comp_id not in result["components"]:
                    result["components"][comp_id] = {}
                result["components"][comp_id][metric] = row[col]

        results.append(result)

    return results

load_substrate_lab_data(filepath, substrate_name=None, substrate_type=None, sample_date=None, sep=',', encoding='utf-8', validate=True)

Load substrate characterization data from laboratory CSV.

Expected columns (German or English): - Trockensubstanzgehalt (TS) [% FM] - Organische Trockensubstanz (VS) [% TS] - Fermentierbare organische Trockensubstanz (foTS) [% TS] - Rohprotein (RP) [% TS] - Rohfett (RL) [% TS] - Rohfaser (RF) [% TS] - NDF, ADF, ADL [% TS] - pH-Wert (pH) - Ammoniumstickstoff (NH4-N) [g/L or mg/L] - Alkalinität (TAC) [mmol/L] - Biochemisches Methanpotential (BMP) [L CH4/kg oTS] - CSB des Filtrats (COD_S) [g/L]

Parameters:

Name Type Description Default
filepath str

Path to CSV file

required
substrate_name Optional[str]

Substrate name (if not in file)

None
substrate_type Optional[str]

Substrate type (maize, manure, grass, etc.)

None
sample_date Optional[Union[str, datetime]]

Sample date (if not in file)

None
sep str

Column separator

','
encoding str

File encoding

'utf-8'
validate bool

Validate data ranges

True

Returns:

Type Description
Dict[str, Any]

Dict with substrate data

Example

handler = CSVHandler() data = handler.load_substrate_lab_data( ... "maize_analysis.csv", ... substrate_name="Maize silage batch 23", ... substrate_type="maize", ... sample_date="2024-01-15" ... ) print(f"TS: {data['TS']:.1f}% FM")

Source code in pyadm1ode_calibration/io/loaders/csv_handler.py
def load_substrate_lab_data(
    self,
    filepath: str,
    substrate_name: Optional[str] = None,
    substrate_type: Optional[str] = None,
    sample_date: Optional[Union[str, datetime]] = None,
    sep: str = ",",
    encoding: str = "utf-8",
    validate: bool = True,
) -> Dict[str, Any]:
    """
    Load substrate characterization data from laboratory CSV.

    Expected columns (German or English):
    - Trockensubstanzgehalt (TS) [% FM]
    - Organische Trockensubstanz (VS) [% TS]
    - Fermentierbare organische Trockensubstanz (foTS) [% TS]
    - Rohprotein (RP) [% TS]
    - Rohfett (RL) [% TS]
    - Rohfaser (RF) [% TS]
    - NDF, ADF, ADL [% TS]
    - pH-Wert (pH)
    - Ammoniumstickstoff (NH4-N) [g/L or mg/L]
    - Alkalinität (TAC) [mmol/L]
    - Biochemisches Methanpotential (BMP) [L CH4/kg oTS]
    - CSB des Filtrats (COD_S) [g/L]

    Args:
        filepath: Path to CSV file
        substrate_name: Substrate name (if not in file)
        substrate_type: Substrate type (maize, manure, grass, etc.)
        sample_date: Sample date (if not in file)
        sep: Column separator
        encoding: File encoding
        validate: Validate data ranges

    Returns:
        Dict with substrate data

    Example:
        >>> handler = CSVHandler()
        >>> data = handler.load_substrate_lab_data(
        ...     "maize_analysis.csv",
        ...     substrate_name="Maize silage batch 23",
        ...     substrate_type="maize",
        ...     sample_date="2024-01-15"
        ... )
        >>> print(f"TS: {data['TS']:.1f}% FM")
    """
    # Auto-detect separator if needed
    if sep == "auto":
        sep = self._detect_separator(filepath)

    # Read CSV
    df = pd.read_csv(filepath, sep=sep, encoding=encoding)

    # Try to detect if file is in "vertical" format (parameter, value, unit)
    if len(df.columns) <= 3 and "Parameter" in df.columns or "Messgröße" in df.columns:
        df = self._parse_vertical_format(df)

    # Map column names
    df = self._map_column_names(df)

    # If multiple rows, take the first one (or could aggregate)
    if len(df) > 1:
        warnings.warn(f"CSV contains {len(df)} rows, using first row only")

    row = df.iloc[0]

    # Extract data
    result = {
        "substrate_name": substrate_name or row.get("substrate_name", "Unknown"),
        "substrate_type": substrate_type or row.get("substrate_type", "unknown"),
        "sample_date": sample_date or row.get("sample_date", datetime.now()),
    }

    # Add all available parameters
    for param in [
        "TS",
        "VS",
        "oTS",
        "foTS",
        "RP",
        "RL",
        "RF",
        "RA",
        "NfE",
        "NDF",
        "ADF",
        "ADL",
        "pH",
        "NH4_N",
        "TAC",
        "COD",
        "COD_S",
        "BMP",
        "C_content",
        "N_content",
        "C_to_N",
        "TKN",
    ]:
        if param in df.columns:
            value = row[param]
            # Handle both scalar values and Series
            if isinstance(value, pd.Series):
                value = value.iloc[0] if len(value) > 0 else None
            if pd.notna(value):
                result[param] = float(value)

    # Validate if requested
    if validate:
        result = self._validate_substrate_data(result)

    # TODO: diese Substratparameter müssen in die substrate_....xml geschrieben werden. evtl. gibt es in einer
    #  c# DLL auch bereits eine Methode die man aufrufen kann. glaube aber eher nicht

    return result

Database

pyadm1ode_calibration.io.persistence.database.Database(connection_string=None, config=None)

PostgreSQL database interface for PyADM1.

Handles connection pooling, session management, and CRUD operations for all calibration-related entities.

Parameters:

Name Type Description Default
connection_string Optional[str]

Database URL.

None
config Optional[DatabaseConfig]

Database configuration object.

None
Source code in pyadm1ode_calibration/io/persistence/database.py
def __init__(self, connection_string: Optional[str] = None, config: Optional[DatabaseConfig] = None):
    self.connection_manager = ConnectionManager(connection_string, config)
    self.engine = self.connection_manager.engine
    self.SessionLocal = self.connection_manager.SessionLocal
    self.connection_string = self.connection_manager.connection_string

Functions

close()

Close the database connection and dispose of the engine.

Source code in pyadm1ode_calibration/io/persistence/database.py
def close(self) -> None:
    """
    Close the database connection and dispose of the engine.
    """
    self.engine.dispose()

create_all_tables()

Create all tables defined in the ORM models.

Source code in pyadm1ode_calibration/io/persistence/database.py
def create_all_tables(self) -> None:
    """
    Create all tables defined in the ORM models.
    """
    Base.metadata.create_all(bind=self.engine)

create_plant(plant_id, name, location=None, operator=None, V_liq=None, V_gas=None, T_ad=None, P_el_nom=None, configuration=None)

Register a new biogas plant in the database.

Parameters:

Name Type Description Default
plant_id str

Unique identifier.

required
name str

Human-readable name.

required
location Optional[str]

Geographic location.

None
operator Optional[str]

Entity operating the plant.

None
V_liq Optional[float]

Liquid volume in m3.

None
V_gas Optional[float]

Gas volume in m3.

None
T_ad Optional[float]

Operating temperature in K.

None
P_el_nom Optional[float]

Nominal electrical power in kW.

None
configuration Optional[Dict]

Additional technical configuration.

None

Returns:

Name Type Description
Plant Plant

The created plant instance.

Raises:

Type Description
ValueError

If plant_id already exists.

Source code in pyadm1ode_calibration/io/persistence/database.py
def create_plant(
    self,
    plant_id: str,
    name: str,
    location: Optional[str] = None,
    operator: Optional[str] = None,
    V_liq: Optional[float] = None,
    V_gas: Optional[float] = None,
    T_ad: Optional[float] = None,
    P_el_nom: Optional[float] = None,
    configuration: Optional[Dict] = None,
) -> Plant:
    """
    Register a new biogas plant in the database.

    Args:
        plant_id (str): Unique identifier.
        name (str): Human-readable name.
        location (Optional[str]): Geographic location.
        operator (Optional[str]): Entity operating the plant.
        V_liq (Optional[float]): Liquid volume in m3.
        V_gas (Optional[float]): Gas volume in m3.
        T_ad (Optional[float]): Operating temperature in K.
        P_el_nom (Optional[float]): Nominal electrical power in kW.
        configuration (Optional[Dict]): Additional technical configuration.

    Returns:
        Plant: The created plant instance.

    Raises:
        ValueError: If plant_id already exists.
    """
    session = self.SessionLocal()
    try:
        plant = Plant(
            id=plant_id,
            name=name,
            location=location,
            operator=operator,
            V_liq=V_liq,
            V_gas=V_gas,
            T_ad=T_ad,
            P_el_nom=P_el_nom,
            configuration=configuration,
        )
        session.add(plant)
        session.commit()
        session.refresh(plant)
        session.expunge(plant)
        return plant
    except IntegrityError:
        session.rollback()
        raise ValueError(f"Plant with ID '{plant_id}' already exists")
    finally:
        session.close()

drop_all_tables()

Drop all tables from the database.

Source code in pyadm1ode_calibration/io/persistence/database.py
def drop_all_tables(self) -> None:
    """
    Drop all tables from the database.
    """
    Base.metadata.drop_all(bind=self.engine)

execute_query(query, params=None)

Execute a custom read-only SQL query.

Parameters:

Name Type Description Default
query str

SQL query string.

required
params Optional[Dict[str, Any]]

Query parameters.

None

Returns:

Type Description
DataFrame

pd.DataFrame: Query results.

Raises:

Type Description
ValueError

If dangerous keywords are detected.

Source code in pyadm1ode_calibration/io/persistence/database.py
def execute_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
    """
    Execute a custom read-only SQL query.

    Args:
        query (str): SQL query string.
        params (Optional[Dict[str, Any]]): Query parameters.

    Returns:
        pd.DataFrame: Query results.

    Raises:
        ValueError: If dangerous keywords are detected.
    """
    dangerous = ["DROP", "DELETE", "TRUNCATE", "ALTER"]
    if any(kw in query.upper() for kw in dangerous):
        raise ValueError("Dangerous keyword detected in query")
    return pd.read_sql(query, self.engine, params=params)

from_env(prefix='DB') classmethod

Create a database instance from environment variables.

Parameters:

Name Type Description Default
prefix str

Prefix for environment variables (e.g., 'DB' -> 'DB_HOST').

'DB'

Returns:

Name Type Description
Database Database

A configured database instance.

Raises:

Type Description
ValueError

If required variables are missing.

Source code in pyadm1ode_calibration/io/persistence/database.py
@classmethod
def from_env(cls, prefix: str = "DB") -> "Database":
    """
    Create a database instance from environment variables.

    Args:
        prefix (str): Prefix for environment variables (e.g., 'DB' -> 'DB_HOST').

    Returns:
        Database: A configured database instance.

    Raises:
        ValueError: If required variables are missing.
    """
    import os
    from urllib.parse import quote_plus

    host = os.getenv(f"{prefix}_HOST", "localhost")
    port = os.getenv(f"{prefix}_PORT", "5432")
    database = os.getenv(f"{prefix}_NAME")
    username = os.getenv(f"{prefix}_USER")
    password = os.getenv(f"{prefix}_PASSWORD")

    if not all([database, username, password]):
        raise ValueError("Missing required environment variables")

    conn_str = f"postgresql://{username}:{quote_plus(password)}@{host}:{port}/{database}"
    return cls(connection_string=conn_str)

get_latest_calibration(plant_id)

Get the most recent calibration for a plant.

Parameters:

Name Type Description Default
plant_id str

Plant ID.

required

Returns:

Type Description
Optional[Dict[str, Any]]

Optional[Dict[str, Any]]: Latest calibration record.

Source code in pyadm1ode_calibration/io/persistence/database.py
def get_latest_calibration(self, plant_id: str) -> Optional[Dict[str, Any]]:
    """
    Get the most recent calibration for a plant.

    Args:
        plant_id (str): Plant ID.

    Returns:
        Optional[Dict[str, Any]]: Latest calibration record.
    """
    cals = self.load_calibrations(plant_id, limit=1)
    return cals[0] if cals else None

get_plant(plant_id)

Retrieve a plant by its ID.

Parameters:

Name Type Description Default
plant_id str

The plant identifier.

required

Returns:

Name Type Description
Plant Plant

The plant instance.

Raises:

Type Description
ValueError

If plant is not found.

DatabaseError

On SQL failure.

Source code in pyadm1ode_calibration/io/persistence/database.py
def get_plant(self, plant_id: str) -> Plant:
    """
    Retrieve a plant by its ID.

    Args:
        plant_id (str): The plant identifier.

    Returns:
        Plant: The plant instance.

    Raises:
        ValueError: If plant is not found.
        DatabaseError: On SQL failure.
    """
    session = self.SessionLocal()
    try:
        plant = session.query(Plant).filter(Plant.id == plant_id).first()
        if plant is None:
            raise ValueError(f"Plant '{plant_id}' not found")
        session.expunge(plant)
        return plant
    except SQLAlchemyError as e:
        raise DatabaseError(f"Failed to retrieve plant '{plant_id}': {e}")
    finally:
        session.close()

get_session()

Context manager for SQLAlchemy database sessions.

Yields:

Name Type Description
Session Session

An active database session.

Source code in pyadm1ode_calibration/io/persistence/database.py
@contextmanager
def get_session(self) -> Session:
    """
    Context manager for SQLAlchemy database sessions.

    Yields:
        Session: An active database session.
    """
    session = self.SessionLocal()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

get_statistics(plant_id)

Get database usage statistics for a specific plant.

Parameters:

Name Type Description Default
plant_id str

Plant ID.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Counts and time ranges of stored data.

Source code in pyadm1ode_calibration/io/persistence/database.py
def get_statistics(self, plant_id: str) -> Dict[str, Any]:
    """
    Get database usage statistics for a specific plant.

    Args:
        plant_id (str): Plant ID.

    Returns:
        Dict[str, Any]: Counts and time ranges of stored data.
    """
    with self.get_session() as session:
        return {
            "plant_id": plant_id,
            "n_measurements": session.query(Measurement).filter(Measurement.plant_id == plant_id).count(),
            "n_simulations": session.query(Simulation).filter(Simulation.plant_id == plant_id).count(),
            "n_calibrations": session.query(Calibration).filter(Calibration.plant_id == plant_id).count(),
            "n_substrates": session.query(Substrate).filter(Substrate.plant_id == plant_id).count(),
            "first_measurement": (
                session.query(Measurement.timestamp)
                .filter(Measurement.plant_id == plant_id)
                .order_by(Measurement.timestamp)
                .first()[0]
                if session.query(Measurement.timestamp).filter(Measurement.plant_id == plant_id).count() > 0
                else None
            ),
            "last_measurement": (
                session.query(Measurement.timestamp)
                .filter(Measurement.plant_id == plant_id)
                .order_by(Measurement.timestamp.desc())
                .first()[0]
                if session.query(Measurement.timestamp).filter(Measurement.plant_id == plant_id).count() > 0
                else None
            ),
        }

list_plants()

List all registered plants.

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: List of plant summary dictionaries.

Source code in pyadm1ode_calibration/io/persistence/database.py
def list_plants(self) -> List[Dict[str, Any]]:
    """
    List all registered plants.

    Returns:
        List[Dict[str, Any]]: List of plant summary dictionaries.
    """
    with self.get_session() as session:
        plants = session.query(Plant).all()
        return [
            {
                "id": p.id,
                "name": p.name,
                "location": p.location,
                "V_liq": p.V_liq,
                "V_gas": p.V_gas,
                "T_ad": p.T_ad,
                "created_at": p.created_at,
            }
            for p in plants
        ]

list_simulations(plant_id=None, scenario=None)

List simulations matching specific criteria.

Parameters:

Name Type Description Default
plant_id Optional[str]

Filter by plant.

None
scenario Optional[str]

Filter by scenario.

None

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: List of simulation summaries.

Source code in pyadm1ode_calibration/io/persistence/database.py
def list_simulations(self, plant_id: Optional[str] = None, scenario: Optional[str] = None) -> List[Dict[str, Any]]:
    """
    List simulations matching specific criteria.

    Args:
        plant_id (Optional[str]): Filter by plant.
        scenario (Optional[str]): Filter by scenario.

    Returns:
        List[Dict[str, Any]]: List of simulation summaries.
    """
    with self.get_session() as session:
        query = session.query(Simulation)
        if plant_id:
            query = query.filter(Simulation.plant_id == plant_id)
        if scenario:
            query = query.filter(Simulation.scenario == scenario)
        simulations = query.order_by(Simulation.created_at.desc()).all()
        return [
            {
                "id": s.id,
                "plant_id": s.plant_id,
                "name": s.name,
                "scenario": s.scenario,
                "duration": s.duration,
                "avg_Q_ch4": s.avg_Q_ch4,
                "status": s.status,
                "created_at": s.created_at,
            }
            for s in simulations
        ]

load_calibrations(plant_id, calibration_type=None, limit=10)

Load past calibrations for a plant.

Parameters:

Name Type Description Default
plant_id str

Plant ID.

required
calibration_type Optional[str]

Filter by type.

None
limit int

Max records to return.

10

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: List of calibration records.

Source code in pyadm1ode_calibration/io/persistence/database.py
def load_calibrations(
    self, plant_id: str, calibration_type: Optional[str] = None, limit: int = 10
) -> List[Dict[str, Any]]:
    """
    Load past calibrations for a plant.

    Args:
        plant_id (str): Plant ID.
        calibration_type (Optional[str]): Filter by type.
        limit (int): Max records to return.

    Returns:
        List[Dict[str, Any]]: List of calibration records.
    """
    with self.get_session() as session:
        query = session.query(Calibration).filter(Calibration.plant_id == plant_id)
        if calibration_type:
            query = query.filter(Calibration.calibration_type == calibration_type)
        cals = query.order_by(Calibration.created_at.desc()).limit(limit).all()
        return [{c.name: getattr(cal, c.name) for c in Calibration.__table__.columns} for cal in cals]

load_measurements(plant_id, start_time=None, end_time=None, columns=None, source=None)

Load measurements as a pandas DataFrame.

Parameters:

Name Type Description Default
plant_id str

Plant ID.

required
start_time Optional[datetime]

Start of window.

None
end_time Optional[datetime]

End of window.

None
columns Optional[List[str]]

Specific columns to load.

None
source Optional[str]

Filter by data source.

None

Returns:

Type Description
DataFrame

pd.DataFrame: Measurements indexed by timestamp.

Source code in pyadm1ode_calibration/io/persistence/database.py
def load_measurements(self, plant_id: str, start_time=None, end_time=None, columns=None, source=None) -> pd.DataFrame:
    """
    Load measurements as a pandas DataFrame.

    Args:
        plant_id (str): Plant ID.
        start_time (Optional[datetime]): Start of window.
        end_time (Optional[datetime]): End of window.
        columns (Optional[List[str]]): Specific columns to load.
        source (Optional[str]): Filter by data source.

    Returns:
        pd.DataFrame: Measurements indexed by timestamp.
    """
    if isinstance(start_time, str):
        start_time = pd.to_datetime(start_time)
    if isinstance(end_time, str):
        end_time = pd.to_datetime(end_time)
    with self.get_session() as session:
        query = session.query(Measurement).filter(Measurement.plant_id == plant_id)
        if start_time:
            query = query.filter(Measurement.timestamp >= start_time)
        if end_time:
            query = query.filter(Measurement.timestamp <= end_time)
        if source:
            query = query.filter(Measurement.source == source)
        results = query.order_by(Measurement.timestamp).all()
        if not results:
            return pd.DataFrame()
        data_dict = {"timestamp": [r.timestamp for r in results]}
        if columns is None:
            columns = [
                c.name
                for c in Measurement.__table__.columns
                if c.name not in ["id", "plant_id", "timestamp", "source", "created_at"]
            ]
        for col in columns:
            data_dict[col] = [getattr(r, col) for r in results]
        return pd.DataFrame(data_dict).set_index("timestamp")

load_simulation(simulation_id)

Load simulation metadata and its full time series.

Parameters:

Name Type Description Default
simulation_id str

ID of the simulation.

required

Returns:

Type Description
Optional[Dict[str, Any]]

Optional[Dict[str, Any]]: Dictionary containing metadata and 'time_series' DataFrame.

Source code in pyadm1ode_calibration/io/persistence/database.py
def load_simulation(self, simulation_id: str) -> Optional[Dict[str, Any]]:
    """
    Load simulation metadata and its full time series.

    Args:
        simulation_id (str): ID of the simulation.

    Returns:
        Optional[Dict[str, Any]]: Dictionary containing metadata and 'time_series' DataFrame.
    """
    with self.get_session() as session:
        sim = session.query(Simulation).filter(Simulation.id == simulation_id).first()
        if not sim:
            return None
        ts = (
            session.query(SimulationTimeSeries)
            .filter(SimulationTimeSeries.simulation_id == simulation_id)
            .order_by(SimulationTimeSeries.time)
            .all()
        )
        df = (
            pd.DataFrame(
                {
                    c.name: [getattr(t, c.name) for t in ts]
                    for c in SimulationTimeSeries.__table__.columns
                    if c.name not in ["id", "simulation_id"]
                }
            )
            if ts
            else pd.DataFrame()
        )
        return {**{c.name: getattr(sim, c.name) for c in Simulation.__table__.columns}, "time_series": df}

load_substrates(plant_id, substrate_type=None, start_date=None, end_date=None)

Load substrate data as a DataFrame.

Parameters:

Name Type Description Default
plant_id str

Plant ID.

required
substrate_type Optional[str]

Filter by type.

None
start_date Optional[datetime]

Start date.

None
end_date Optional[datetime]

End date.

None

Returns:

Type Description
DataFrame

pd.DataFrame: Table of substrate analyses.

Source code in pyadm1ode_calibration/io/persistence/database.py
def load_substrates(
    self,
    plant_id: str,
    substrate_type: Optional[str] = None,
    start_date: Optional[Union[str, datetime]] = None,
    end_date: Optional[Union[str, datetime]] = None,
) -> pd.DataFrame:
    """
    Load substrate data as a DataFrame.

    Args:
        plant_id (str): Plant ID.
        substrate_type (Optional[str]): Filter by type.
        start_date (Optional[datetime]): Start date.
        end_date (Optional[datetime]): End date.

    Returns:
        pd.DataFrame: Table of substrate analyses.
    """
    if isinstance(start_date, str):
        start_date = pd.to_datetime(start_date)
    if isinstance(end_date, str):
        end_date = pd.to_datetime(end_date)
    with self.get_session() as session:
        query = session.query(Substrate).filter(Substrate.plant_id == plant_id)
        if substrate_type:
            query = query.filter(Substrate.substrate_type == substrate_type)
        if start_date:
            query = query.filter(Substrate.sample_date >= start_date)
        if end_date:
            query = query.filter(Substrate.sample_date <= end_date)
        substrates = query.order_by(Substrate.sample_date).all()
        if not substrates:
            return pd.DataFrame()
        cols = [
            "sample_date",
            "substrate_name",
            "substrate_type",
            "sample_id",
            "TS",
            "VS",
            "oTS",
            "foTS",
            "RP",
            "RL",
            "RF",
            "NDF",
            "ADF",
            "ADL",
            "pH",
            "NH4_N",
            "TAC",
            "COD_S",
            "BMP",
            "C_to_N",
            "lab_name",
        ]
        return pd.DataFrame([{c: getattr(s, c) for c in cols} for s in substrates])

store_calibration(plant_id, calibration_type, method, parameters, objective_value, objectives, validation_metrics=None, data_start=None, data_end=None, success=True, message=None)

Store a calibration result.

Parameters:

Name Type Description Default
plant_id str

Plant ID.

required
calibration_type str

'initial' or 'online'.

required
method str

Optimization method.

required
parameters Dict[str, float]

Calibrated values.

required
objective_value float

Final cost value.

required
objectives List[str]

Variables used in objective.

required
validation_metrics Optional[Dict[str, float]]

RMSE, R2 etc.

None
data_start Optional[datetime]

Start of data window.

None
data_end Optional[datetime]

End of data window.

None
success bool

Whether calibration converged.

True
message Optional[str]

Status message.

None

Returns:

Name Type Description
Calibration Calibration

Stored record.

Source code in pyadm1ode_calibration/io/persistence/database.py
def store_calibration(
    self,
    plant_id: str,
    calibration_type: str,
    method: str,
    parameters: Dict[str, float],
    objective_value: float,
    objectives: List[str],
    validation_metrics: Optional[Dict[str, float]] = None,
    data_start: Optional[datetime] = None,
    data_end: Optional[datetime] = None,
    success: bool = True,
    message: Optional[str] = None,
) -> Calibration:
    """
    Store a calibration result.

    Args:
        plant_id (str): Plant ID.
        calibration_type (str): 'initial' or 'online'.
        method (str): Optimization method.
        parameters (Dict[str, float]): Calibrated values.
        objective_value (float): Final cost value.
        objectives (List[str]): Variables used in objective.
        validation_metrics (Optional[Dict[str, float]]): RMSE, R2 etc.
        data_start (Optional[datetime]): Start of data window.
        data_end (Optional[datetime]): End of data window.
        success (bool): Whether calibration converged.
        message (Optional[str]): Status message.

    Returns:
        Calibration: Stored record.
    """
    with self.get_session() as session:
        cal = Calibration(
            plant_id=plant_id,
            calibration_type=calibration_type,
            method=method,
            parameters=parameters,
            objective_value=objective_value,
            objectives=objectives,
            validation_metrics=validation_metrics,
            data_start=data_start,
            data_end=data_end,
            success=success,
            message=message,
        )
        session.add(cal)
        return cal

store_measurements(plant_id, data, source='SCADA', validate=True)

Bulk store measurement data for a plant.

Parameters:

Name Type Description Default
plant_id str

ID of the plant.

required
data DataFrame

Measurements with 'timestamp' column.

required
source str

Data source name. Defaults to 'SCADA'.

'SCADA'
validate bool

Whether to run quality checks before storing.

True

Returns:

Name Type Description
int int

Number of records stored.

Source code in pyadm1ode_calibration/io/persistence/database.py
def store_measurements(self, plant_id: str, data: pd.DataFrame, source: str = "SCADA", validate: bool = True) -> int:
    """
    Bulk store measurement data for a plant.

    Args:
        plant_id (str): ID of the plant.
        data (pd.DataFrame): Measurements with 'timestamp' column.
        source (str): Data source name. Defaults to 'SCADA'.
        validate (bool): Whether to run quality checks before storing.

    Returns:
        int: Number of records stored.
    """
    self.get_plant(plant_id)
    if "timestamp" not in data.columns:
        raise ValueError("DataFrame must have 'timestamp' column")
    if not pd.api.types.is_datetime64_any_dtype(data["timestamp"]):
        data["timestamp"] = pd.to_datetime(data["timestamp"])
    if validate:
        from ..validation.validators import DataValidator

        DataValidator.validate(data)

    records = []
    for _, row in data.iterrows():
        record = {"plant_id": plant_id, "timestamp": row["timestamp"], "source": source}
        for col in data.columns:
            if col != "timestamp" and col in Measurement.__table__.columns:
                val = row[col]
                if pd.notna(val):
                    record[col] = float(val) if isinstance(val, (int, float, np.number)) else val
        records.append(record)
    with self.get_session() as session:
        try:
            session.bulk_insert_mappings(Measurement, records)
            return len(records)
        except SQLAlchemyError as e:
            raise DatabaseError(f"Failed to store measurements: {e}")

store_simulation(simulation_id, plant_id, results, name=None, description=None, duration=None, parameters=None, scenario='baseline')

Store simulation metadata and time series.

Parameters:

Name Type Description Default
simulation_id str

Unique ID for the simulation.

required
plant_id str

Associated plant ID.

required
results List[Dict[str, Any]]

Time-series results from simulation.

required
name Optional[str]

Simulation name.

None
description Optional[str]

Optional description.

None
duration Optional[float]

Duration in days.

None
parameters Optional[Dict]

Parameters used in this run.

None
scenario str

Scenario label. Defaults to 'baseline'.

'baseline'

Returns:

Name Type Description
Simulation Simulation

Stored simulation record.

Source code in pyadm1ode_calibration/io/persistence/database.py
def store_simulation(
    self,
    simulation_id: str,
    plant_id: str,
    results: List[Dict[str, Any]],
    name: Optional[str] = None,
    description: Optional[str] = None,
    duration: Optional[float] = None,
    parameters: Optional[Dict] = None,
    scenario: str = "baseline",
) -> Simulation:
    """
    Store simulation metadata and time series.

    Args:
        simulation_id (str): Unique ID for the simulation.
        plant_id (str): Associated plant ID.
        results (List[Dict[str, Any]]): Time-series results from simulation.
        name (Optional[str]): Simulation name.
        description (Optional[str]): Optional description.
        duration (Optional[float]): Duration in days.
        parameters (Optional[Dict]): Parameters used in this run.
        scenario (str): Scenario label. Defaults to 'baseline'.

    Returns:
        Simulation: Stored simulation record.
    """
    self.get_plant(plant_id)
    metrics = self._calculate_simulation_metrics(results)
    with self.get_session() as session:
        sim = Simulation(
            id=simulation_id,
            plant_id=plant_id,
            name=name,
            description=description,
            duration=duration or (results[-1]["time"] if results else 0),
            scenario=scenario,
            parameters=parameters,
            avg_Q_gas=metrics.get("avg_Q_gas"),
            avg_Q_ch4=metrics.get("avg_Q_ch4"),
            avg_CH4_content=metrics.get("avg_CH4_content"),
            avg_pH=metrics.get("avg_pH"),
            avg_VFA=metrics.get("avg_VFA"),
            total_energy=metrics.get("total_energy"),
            status="completed",
            started_at=datetime.utcnow(),
            completed_at=datetime.utcnow(),
        )
        try:
            session.add(sim)
            session.flush()
            ts_records = []
            for res in results:
                comp_data = next(iter(res["components"].values()))
                record = {
                    "simulation_id": simulation_id,
                    "time": res["time"],
                    "Q_gas": comp_data.get("Q_gas"),
                    "Q_ch4": comp_data.get("Q_ch4"),
                    "Q_co2": comp_data.get("Q_co2"),
                    "pH": comp_data.get("pH"),
                    "VFA": comp_data.get("VFA"),
                    "TAC": comp_data.get("TAC"),
                }
                if record["Q_gas"] and record["Q_ch4"] and record["Q_gas"] > 0:
                    record["CH4_content"] = (record["Q_ch4"] / record["Q_gas"]) * 100
                ts_records.append(record)
            session.bulk_insert_mappings(SimulationTimeSeries, ts_records)
            return sim
        except IntegrityError:
            raise ValueError(f"Simulation with ID '{simulation_id}' already exists")

store_substrate(plant_id, substrate_name, substrate_type, sample_date, lab_data, sample_id=None, lab_name=None, notes=None)

Store substrate laboratory analysis data.

Parameters:

Name Type Description Default
plant_id str

Associated plant ID.

required
substrate_name str

Name (e.g., 'Maize Silage').

required
substrate_type str

Category.

required
sample_date datetime

Date of sampling.

required
lab_data Dict[str, float]

Chemical properties (TS, VS, oTS, etc.).

required
sample_id Optional[str]

Lab internal ID.

None
lab_name Optional[str]

Lab name.

None
notes Optional[str]

Additional comments.

None

Returns:

Name Type Description
Substrate Substrate

Stored record.

Source code in pyadm1ode_calibration/io/persistence/database.py
def store_substrate(
    self,
    plant_id: str,
    substrate_name: str,
    substrate_type: str,
    sample_date: Union[str, datetime],
    lab_data: Dict[str, float],
    sample_id: Optional[str] = None,
    lab_name: Optional[str] = None,
    notes: Optional[str] = None,
) -> Substrate:
    """
    Store substrate laboratory analysis data.

    Args:
        plant_id (str): Associated plant ID.
        substrate_name (str): Name (e.g., 'Maize Silage').
        substrate_type (str): Category.
        sample_date (datetime): Date of sampling.
        lab_data (Dict[str, float]): Chemical properties (TS, VS, oTS, etc.).
        sample_id (Optional[str]): Lab internal ID.
        lab_name (Optional[str]): Lab name.
        notes (Optional[str]): Additional comments.

    Returns:
        Substrate: Stored record.
    """
    if isinstance(sample_date, str):
        sample_date = pd.to_datetime(sample_date)
    session = self.SessionLocal()
    try:
        substrate = Substrate(
            plant_id=plant_id,
            substrate_name=substrate_name,
            substrate_type=substrate_type,
            sample_date=sample_date,
            sample_id=sample_id,
            lab_name=lab_name,
            notes=notes,
        )
        for key, value in lab_data.items():
            if hasattr(substrate, key):
                setattr(substrate, key, value)
        session.add(substrate)
        session.commit()
        session.refresh(substrate)
        session.expunge(substrate)
        return substrate
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

Validation

pyadm1ode_calibration.io.validation.validators.DataValidator

Validates measurement data quality and consistency.

Functions

validate(data, required_columns=None, expected_ranges=None) staticmethod

Perform comprehensive data validation.

Parameters:

Name Type Description Default
data DataFrame

DataFrame to validate

required
required_columns Optional[List[str]]

Columns that must be present

None
expected_ranges Optional[Dict[str, Tuple[float, float]]]

Mapping of column names to (min, max)

None

Returns:

Type Description
ValidationResult

ValidationResult object

Source code in pyadm1ode_calibration/io/validation/validators.py
@staticmethod
def validate(
    data: pd.DataFrame,
    required_columns: Optional[List[str]] = None,
    expected_ranges: Optional[Dict[str, Tuple[float, float]]] = None,
) -> ValidationResult:
    """
    Perform comprehensive data validation.

    Args:
        data: DataFrame to validate
        required_columns: Columns that must be present
        expected_ranges: Mapping of column names to (min, max)

    Returns:
        ValidationResult object
    """
    issues = []
    warnings = []
    missing_counts = data.isnull().sum().to_dict()
    missing_pct = data.isnull().mean()

    # 1. Check required columns
    if required_columns:
        for col in required_columns:
            if col not in data.columns:
                issues.append(f"Required column '{col}' is missing")

    # 2. Check for empty data
    if data.empty:
        issues.append("Dataset is empty")
        return ValidationResult(is_valid=False, quality_score=0.0, issues=issues, missing_data=missing_counts)

    # 3. Check for missing values (NaN)
    for col, pct in missing_pct.items():
        if pct > 0.3:
            issues.append(f"Column '{col}' has {pct*100:.1f}% missing values")
        elif pct > 0:
            warnings.append(f"Column '{col}' has {pct*100:.1f}% missing values")

    # 4. Check expected ranges
    if expected_ranges:
        for col, (vmin, vmax) in expected_ranges.items():
            if col in data.columns:
                out_of_range = (data[col] < vmin) | (data[col] > vmax)
                n_out = out_of_range.sum()
                if n_out > 0:
                    pct = n_out / len(data)
                    if pct > 0.2:
                        issues.append(f"Column '{col}' has {n_out} values outside range [{vmin}, {vmax}]")
                    else:
                        warnings.append(f"Column '{col}' has {n_out} values outside range [{vmin}, {vmax}]")

    # 5. Check for duplicates in index (timestamps)
    if data.index.duplicated().any():
        issues.append("Dataset has duplicate timestamps")

    # Calculate quality score
    quality_score = DataValidator._calculate_quality_score(len(issues), len(warnings), missing_pct.mean())

    return ValidationResult(
        is_valid=len(issues) == 0,
        quality_score=quality_score,
        issues=issues,
        warnings=warnings,
        missing_data=missing_counts,
        statistics={"missing_pct_avg": float(missing_pct.mean())},
    )