Zum Inhalt

Sensor-Komponenten API

pyadm1.components.sensors._base.AbstractSensor

Bases: Component

Abstract base class for all biogas plant sensor components.

Handles the constructor parameters, state variables, and helper methods that are identical across PhysicalSensor, ChemicalSensor, and GasSensor.

Subclasses must implement step(), to_dict(), and from_dict(). The shared initialize() calls the _initialize_subclass() hook to let subclasses reset their own state and build state / outputs_data dicts; the _initialized flag is set automatically afterwards.

Subclasses must set sensor_type (a string-valued Enum) before calling super().__init__(), since shared helpers read its .value.

Source code in pyadm1/components/sensors/_base.py
class AbstractSensor(Component):
    """
    Abstract base class for all biogas plant sensor components.

    Handles the constructor parameters, state variables, and helper methods
    that are identical across PhysicalSensor, ChemicalSensor, and GasSensor.

    Subclasses must implement ``step()``, ``to_dict()``, and ``from_dict()``.
    The shared ``initialize()`` calls the ``_initialize_subclass()`` hook to
    let subclasses reset their own state and build ``state`` / ``outputs_data``
    dicts; the ``_initialized`` flag is set automatically afterwards.

    Subclasses must set ``sensor_type`` (a string-valued ``Enum``) before
    calling ``super().__init__()``, since shared helpers read its ``.value``.
    """

    sensor_type: Enum

    def __init__(
        self,
        component_id: str,
        signal_key: str,
        candidate_keys: Tuple[str, ...],
        measurement_range: Tuple[float, float],
        measurement_noise: float = 0.0,
        accuracy: float = 0.0,
        drift_rate: float = 0.0,
        sample_interval: float = 0.0,
        unit: str = "",
        output_key: Optional[str] = None,
        rng_seed: Optional[int] = None,
        name: Optional[str] = None,
    ):
        super().__init__(component_id, ComponentType.SENSOR, name)

        self.signal_key = signal_key
        self._candidate_keys = candidate_keys
        self.measurement_range: Tuple[float, float] = tuple(measurement_range)  # type: ignore[assignment]
        self.measurement_noise = float(max(0.0, measurement_noise))
        self.accuracy = float(max(0.0, accuracy))
        self.drift_rate = float(drift_rate)
        self.sample_interval = float(max(0.0, sample_interval))
        self.unit = unit
        self.output_key = output_key or f"{component_id}_measurement"

        self._rng = np.random.default_rng(rng_seed)
        self.calibration_offset = float(self._rng.uniform(-self.accuracy, self.accuracy)) if self.accuracy > 0 else 0.0

        # Common state — properly initialised by each subclass's initialize()
        self.true_value: float = np.nan
        self.measured_value: float = np.nan
        self.drift_offset: float = 0.0
        self.last_sample_time: float = -np.inf
        self.is_valid: bool = False
        self.in_range: bool = True

    # ------------------------------------------------------------------
    # Abstract interface
    # ------------------------------------------------------------------

    @abstractmethod
    def step(self, t: float, dt: float, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Advance the sensor by one timestep and return its output dictionary."""
        ...

    @abstractmethod
    def to_dict(self) -> Dict[str, Any]:
        """Serialize the sensor to a configuration dictionary."""
        ...

    @classmethod
    @abstractmethod
    def from_dict(cls, config: Dict[str, Any]) -> "AbstractSensor":
        """Reconstruct a sensor instance from a configuration dictionary."""
        ...

    # ------------------------------------------------------------------
    # Shared initialize — template-method pattern
    # ------------------------------------------------------------------

    def initialize(self, initial_state: Optional[Dict[str, Any]] = None) -> None:
        """Reset common sensor state, restore from *initial_state*, run subclass hook."""
        self.true_value = np.nan
        self.measured_value = np.nan
        self.drift_offset = 0.0
        self.last_sample_time = -np.inf
        self.is_valid = False
        self.in_range = True

        if initial_state:
            self.true_value = float(initial_state.get("true_value", np.nan))
            self.measured_value = float(initial_state.get("measured_value", np.nan))
            self.drift_offset = float(initial_state.get("drift_offset", 0.0))
            self.last_sample_time = float(initial_state.get("last_sample_time", -np.inf))
            self.is_valid = bool(initial_state.get("is_valid", False))
            self.in_range = bool(initial_state.get("in_range", True))

        self._initialize_subclass(initial_state)
        self._initialized = True

    def _initialize_subclass(self, initial_state: Optional[Dict[str, Any]]) -> None:
        """Hook for subclass-specific reset/restore and ``state`` / ``outputs_data`` build.

        Default no-op so subclasses without extra state need not override.
        """

    # ------------------------------------------------------------------
    # Common measurement helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _parse_enum(
        value: str,
        aliases: Mapping[str, _E],
        enum_cls: Type[_E],  # noqa: ARG004 — kept for type symmetry / future use
        label: str,
    ) -> _E:
        """Normalize *value* (case- and whitespace-insensitive) via *aliases*."""
        normalized = value.strip().lower()
        if normalized not in aliases:
            raise ValueError(f"Unsupported {label} '{value}'")
        return aliases[normalized]

    def _advance_drift_and_read(self, dt: float, inputs: Dict[str, Any]) -> None:
        """Step preamble: integrate drift over *dt* and refresh ``true_value``."""
        self.drift_offset += self.drift_rate * dt
        true_value = self._read_true_value(inputs)
        if true_value is not None:
            self.true_value = true_value

    def _read_true_value(self, inputs: Dict[str, Any]) -> Optional[float]:
        """Resolve the measured signal from upstream component outputs."""
        for key in self._candidate_keys:
            if key in inputs:
                try:
                    return float(inputs[key])
                except (TypeError, ValueError):
                    return None
        return None

    def _should_sample(self, t: float) -> bool:
        """Return True when the next discrete sample is due."""
        if self.sample_interval <= 0:
            return True
        return (t - self.last_sample_time) >= self.sample_interval

    @staticmethod
    def _apply_response_lag(
        true_value: float,
        filtered_value: float,
        response_time: float,
        dt: float,
    ) -> float:
        """Apply a first-order lag filter and return the updated filtered value."""
        if np.isnan(filtered_value) or response_time <= 0:
            return true_value
        alpha = min(1.0, dt / max(response_time, 1.0e-12))
        return filtered_value + alpha * (true_value - filtered_value)

    def _apply_errors(self, value: float) -> float:
        """Apply calibration offset, accumulated drift, and Gaussian noise."""
        value = value + self.calibration_offset + self.drift_offset
        if self.measurement_noise > 0:
            value += float(self._rng.normal(0.0, self.measurement_noise))
        return value

    def _clamp_to_range(self, value: float) -> Tuple[float, bool]:
        """Clamp *value* to the measurement range. Returns ``(clamped_value, in_range)``."""
        min_v, max_v = self.measurement_range
        in_range = min_v <= value <= max_v
        return float(np.clip(value, min_v, max_v)), in_range

    # ------------------------------------------------------------------
    # Serialization helpers
    # ------------------------------------------------------------------

    def _base_state_dict(self) -> Dict[str, Any]:
        """Common state fields shared by all sensor types."""
        return {
            "true_value": float(self.true_value),
            "measured_value": float(self.measured_value),
            "drift_offset": float(self.drift_offset),
            "calibration_offset": float(self.calibration_offset),
            "last_sample_time": float(self.last_sample_time),
            "is_valid": bool(self.is_valid),
            "in_range": bool(self.in_range),
        }

    def _base_config_dict(self) -> Dict[str, Any]:
        """Common configuration fields for ``to_dict()``."""
        return {
            "component_id": self.component_id,
            "component_type": self.component_type.value,
            "name": self.name,
            "signal_key": self.signal_key,
            "measurement_range": list(self.measurement_range),
            "measurement_noise": self.measurement_noise,
            "accuracy": self.accuracy,
            "drift_rate": self.drift_rate,
            "sample_interval": self.sample_interval,
            "unit": self.unit,
            "output_key": self.output_key,
            "inputs": self.inputs,
            "outputs": self.outputs,
        }

    def _build_outputs(
        self,
        measurement: float,
        extras: Optional[Dict[str, Any]] = None,
        include_drift: bool = False,
    ) -> Dict[str, Any]:
        """Build the common ``outputs_data`` dict; *extras* are merged on top.

        Includes ``drift_offset`` only when *include_drift* is True (step phase).
        Subclasses pass type-specific keys (``analyzer_method``, ``is_detected``,
        ``temperature_value``, …) via *extras*.
        """
        out: Dict[str, Any] = {
            "measurement": float(measurement),
            self.output_key: float(measurement),
            "true_value": float(self.true_value),
            "sensor_type": self.sensor_type.value,
            "signal_key": self.signal_key,
            "unit": self.unit,
            "is_valid": bool(self.is_valid),
            "in_range": bool(self.in_range),
        }
        if include_drift:
            out["drift_offset"] = float(self.drift_offset)
        if extras:
            out.update(extras)
        return out

    @staticmethod
    def _restore_io(sensor: "AbstractSensor", config: Dict[str, Any]) -> None:
        """Restore ``state`` and wire ``inputs`` / ``outputs`` from a serialized config."""
        if "state" in config:
            sensor.initialize(config["state"])
        for input_id in config.get("inputs", []):
            sensor.add_input(input_id)
        for output_id in config.get("outputs", []):
            sensor.add_output(output_id)

Functions

from_dict(config) abstractmethod classmethod

Reconstruct a sensor instance from a configuration dictionary.

Source code in pyadm1/components/sensors/_base.py
@classmethod
@abstractmethod
def from_dict(cls, config: Dict[str, Any]) -> "AbstractSensor":
    """Reconstruct a sensor instance from a configuration dictionary."""
    ...

initialize(initial_state=None)

Reset common sensor state, restore from initial_state, run subclass hook.

Source code in pyadm1/components/sensors/_base.py
def initialize(self, initial_state: Optional[Dict[str, Any]] = None) -> None:
    """Reset common sensor state, restore from *initial_state*, run subclass hook."""
    self.true_value = np.nan
    self.measured_value = np.nan
    self.drift_offset = 0.0
    self.last_sample_time = -np.inf
    self.is_valid = False
    self.in_range = True

    if initial_state:
        self.true_value = float(initial_state.get("true_value", np.nan))
        self.measured_value = float(initial_state.get("measured_value", np.nan))
        self.drift_offset = float(initial_state.get("drift_offset", 0.0))
        self.last_sample_time = float(initial_state.get("last_sample_time", -np.inf))
        self.is_valid = bool(initial_state.get("is_valid", False))
        self.in_range = bool(initial_state.get("in_range", True))

    self._initialize_subclass(initial_state)
    self._initialized = True

step(t, dt, inputs) abstractmethod

Advance the sensor by one timestep and return its output dictionary.

Source code in pyadm1/components/sensors/_base.py
@abstractmethod
def step(self, t: float, dt: float, inputs: Dict[str, Any]) -> Dict[str, Any]:
    """Advance the sensor by one timestep and return its output dictionary."""
    ...

to_dict() abstractmethod

Serialize the sensor to a configuration dictionary.

Source code in pyadm1/components/sensors/_base.py
@abstractmethod
def to_dict(self) -> Dict[str, Any]:
    """Serialize the sensor to a configuration dictionary."""
    ...

pyadm1.components.sensors.physical.PhysicalSensor

Bases: AbstractSensor

Generic physical sensor with realistic measurement behavior.

For sensor_type="pH" an optional Nernst temperature compensation can be enabled by supplying temperature_signal_key. Without it the sensor behaves as if the process temperature always equals the calibration reference (no systematic temperature error).

The Nernst correction models what an un-compensated pH electrode reads at the actual process temperature::

pH_apparent = pH_iso + (pH_true − pH_iso) × (T_ref / T_actual)

This systematic shift is applied after the response lag and before the noise / calibration errors, because it is a physical electrode property, not measurement uncertainty.

Parameters:

Name Type Description Default
component_id str

Unique component identifier.

required
sensor_type str

One of pH, temperature, pressure, level, or flow.

'temperature'
signal_key Optional[str]

Input key to read from connected upstream outputs. If omitted a type-specific default is used.

None
measurement_range Optional[Tuple[float, float]]

Inclusive valid measurement range.

None
measurement_noise float

Gaussian noise standard deviation in engineering units.

0.0
accuracy float

Maximum fixed calibration offset in engineering units.

0.0
drift_rate float

Linear drift rate in engineering units per day.

0.0
response_time float

First-order lag time constant in days.

0.0
sample_interval float

Sampling interval in days. The output is held between samples.

0.0
unit Optional[str]

Engineering unit label.

None
output_key Optional[str]

Namespaced output key for the measured value.

None
rng_seed Optional[int]

Optional random seed for deterministic runs.

None
name Optional[str]

Human-readable component name.

None
temperature_signal_key Optional[str]

Input key that provides the process temperature in Kelvin. Only used when sensor_type="pH". When None (default) no Nernst correction is applied.

None
temperature_reference float

Electrode calibration temperature in Kelvin. Defaults to 298.15 K (25 °C).

298.15
pH_isopotential float

pH at which the electrode gives the same reading regardless of temperature. Defaults to 7.0.

7.0
Source code in pyadm1/components/sensors/physical.py
class PhysicalSensor(AbstractSensor):
    """
    Generic physical sensor with realistic measurement behavior.

    For ``sensor_type="pH"`` an optional Nernst temperature compensation can
    be enabled by supplying *temperature_signal_key*.  Without it the sensor
    behaves as if the process temperature always equals the calibration
    reference (no systematic temperature error).

    The Nernst correction models what an un-compensated pH electrode reads at
    the actual process temperature::

        pH_apparent = pH_iso + (pH_true − pH_iso) × (T_ref / T_actual)

    This systematic shift is applied after the response lag and before the
    noise / calibration errors, because it is a physical electrode property,
    not measurement uncertainty.

    Args:
        component_id: Unique component identifier.
        sensor_type: One of ``pH``, ``temperature``, ``pressure``, ``level``,
            or ``flow``.
        signal_key: Input key to read from connected upstream outputs. If
            omitted a type-specific default is used.
        measurement_range: Inclusive valid measurement range.
        measurement_noise: Gaussian noise standard deviation in engineering
            units.
        accuracy: Maximum fixed calibration offset in engineering units.
        drift_rate: Linear drift rate in engineering units per day.
        response_time: First-order lag time constant in days.
        sample_interval: Sampling interval in days. The output is held between
            samples.
        unit: Engineering unit label.
        output_key: Namespaced output key for the measured value.
        rng_seed: Optional random seed for deterministic runs.
        name: Human-readable component name.
        temperature_signal_key: Input key that provides the process temperature
            in Kelvin.  Only used when ``sensor_type="pH"``.  When ``None``
            (default) no Nernst correction is applied.
        temperature_reference: Electrode calibration temperature in Kelvin.
            Defaults to 298.15 K (25 °C).
        pH_isopotential: pH at which the electrode gives the same reading
            regardless of temperature.  Defaults to 7.0.
    """

    def __init__(
        self,
        component_id: str,
        sensor_type: str = "temperature",
        signal_key: Optional[str] = None,
        measurement_range: Optional[Tuple[float, float]] = None,
        measurement_noise: float = 0.0,
        accuracy: float = 0.0,
        drift_rate: float = 0.0,
        response_time: float = 0.0,
        sample_interval: float = 0.0,
        unit: Optional[str] = None,
        output_key: Optional[str] = None,
        rng_seed: Optional[int] = None,
        name: Optional[str] = None,
        temperature_signal_key: Optional[str] = None,
        temperature_reference: float = 298.15,
        pH_isopotential: float = 7.0,
    ):
        self.sensor_type = self._parse_sensor_type(sensor_type)
        defaults = _DEFAULT_SENSOR_CONFIG[self.sensor_type]

        resolved_signal_key = signal_key or defaults["signal_key"]
        resolved_candidate_keys = (resolved_signal_key,) if signal_key else defaults["candidate_keys"]

        super().__init__(
            component_id=component_id,
            signal_key=resolved_signal_key,
            candidate_keys=resolved_candidate_keys,
            measurement_range=measurement_range or defaults["measurement_range"],
            measurement_noise=measurement_noise,
            accuracy=accuracy,
            drift_rate=drift_rate,
            sample_interval=sample_interval,
            unit=unit or defaults["unit"],
            output_key=output_key,
            rng_seed=rng_seed,
            name=name,
        )

        self.response_time = float(max(0.0, response_time))

        # Nernst compensation — only active for pH sensors with a temperature key
        self.temperature_signal_key = temperature_signal_key if self.sensor_type == PhysicalSensorType.PH else None
        self.temperature_reference = float(temperature_reference)
        self.pH_isopotential = float(pH_isopotential)

        self.filtered_value: float = np.nan
        self._temperature_value: float = np.nan

        self.initialize()

    @staticmethod
    def _parse_sensor_type(sensor_type: str) -> PhysicalSensorType:
        """Normalize user input into a supported sensor type."""
        aliases = {
            "ph": PhysicalSensorType.PH,
            "p_h": PhysicalSensorType.PH,
            "temperature": PhysicalSensorType.TEMPERATURE,
            "temp": PhysicalSensorType.TEMPERATURE,
            "pressure": PhysicalSensorType.PRESSURE,
            "level": PhysicalSensorType.LEVEL,
            "flow": PhysicalSensorType.FLOW,
        }
        return AbstractSensor._parse_enum(sensor_type, aliases, PhysicalSensorType, "physical sensor type")

    def _apply_nernst_correction(self, pH_value: float, inputs: Dict[str, Any]) -> float:
        """Apply Nernst temperature correction to *pH_value*.

        Returns *pH_value* unchanged when *temperature_signal_key* is not set,
        when the temperature cannot be read, or when the temperature is
        non-positive.

        Side-effect: updates ``self._temperature_value`` with the temperature
        that was used.
        """
        if self.temperature_signal_key is None:
            return pH_value

        raw_temp = inputs.get(self.temperature_signal_key)
        if raw_temp is None:
            return pH_value
        try:
            T_actual = float(raw_temp)
        except (TypeError, ValueError):
            return pH_value
        if T_actual <= 0.0:
            return pH_value

        self._temperature_value = T_actual

        # Without ATC, an electrode calibrated at T_ref reads at T_actual:
        #   pH_apparent = pH_iso + (pH_true − pH_iso) × (T_ref / T_actual)
        # The Nernst slope S(T) = RT·ln(10)/F scales linearly with T, so the
        # electrode over-reports the deviation from the isopotential point when
        # T_actual > T_ref.
        return self.pH_isopotential + (pH_value - self.pH_isopotential) * (self.temperature_reference / T_actual)

    def _initialize_subclass(self, initial_state: Optional[Dict[str, Any]]) -> None:
        """Reset / restore physical-sensor extras and build state + outputs."""
        self.filtered_value = np.nan
        self._temperature_value = np.nan
        if initial_state:
            self.filtered_value = float(initial_state.get("filtered_value", self.true_value))
            self._temperature_value = float(initial_state.get("temperature_value", np.nan))

        self.state = {
            **self._base_state_dict(),
            "filtered_value": float(self.filtered_value),
            "temperature_value": float(self._temperature_value),
        }
        self.outputs_data = self._build_outputs(
            self.measured_value,
            extras={"temperature_value": float(self._temperature_value)},
        )

    def step(self, t: float, dt: float, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """
        Advance the sensor by one simulation step.

        Reads the configured signal from upstream outputs, then applies
        response lag, Nernst temperature correction (pH only), calibration
        error, drift, noise, and range limits.
        """
        self._advance_drift_and_read(dt, inputs)

        should_sample = self._should_sample(t)
        if should_sample and not np.isnan(self.true_value):
            self.filtered_value = self._apply_response_lag(self.true_value, self.filtered_value, self.response_time, dt)
            # Nernst correction: applied after lag, before noise/calibration errors.
            # Only active for pH sensors when temperature_signal_key is set.
            corrected_value = self._apply_nernst_correction(self.filtered_value, inputs)
            raw_value = self._apply_errors(corrected_value)
            self.measured_value, self.in_range = self._clamp_to_range(raw_value)
            self.last_sample_time = t
            self.is_valid = True
        elif should_sample:
            self.is_valid = False

        self.state.update(
            {
                **self._base_state_dict(),
                "filtered_value": float(self.filtered_value),
                "temperature_value": float(self._temperature_value),
            }
        )
        self.outputs_data = self._build_outputs(
            self.measured_value,
            extras={"temperature_value": float(self._temperature_value)},
            include_drift=True,
        )
        return self.outputs_data

    def to_dict(self) -> Dict[str, Any]:
        """Serialize sensor configuration and state."""
        return {
            **self._base_config_dict(),
            "sensor_type": self.sensor_type.value,
            "response_time": self.response_time,
            "temperature_signal_key": self.temperature_signal_key,
            "temperature_reference": self.temperature_reference,
            "pH_isopotential": self.pH_isopotential,
            "state": self.state,
        }

    @classmethod
    def from_dict(cls, config: Dict[str, Any]) -> "PhysicalSensor":
        """Create sensor from serialized configuration."""
        sensor = cls(
            component_id=config["component_id"],
            sensor_type=config.get("sensor_type", "temperature"),
            signal_key=config.get("signal_key"),
            measurement_range=tuple(config.get("measurement_range", (250.0, 400.0))),
            measurement_noise=config.get("measurement_noise", 0.0),
            accuracy=config.get("accuracy", 0.0),
            drift_rate=config.get("drift_rate", 0.0),
            response_time=config.get("response_time", 0.0),
            sample_interval=config.get("sample_interval", 0.0),
            unit=config.get("unit"),
            output_key=config.get("output_key"),
            name=config.get("name"),
            temperature_signal_key=config.get("temperature_signal_key"),
            temperature_reference=config.get("temperature_reference", 298.15),
            pH_isopotential=config.get("pH_isopotential", 7.0),
        )
        cls._restore_io(sensor, config)
        return sensor

Functions

from_dict(config) classmethod

Create sensor from serialized configuration.

Source code in pyadm1/components/sensors/physical.py
@classmethod
def from_dict(cls, config: Dict[str, Any]) -> "PhysicalSensor":
    """Create sensor from serialized configuration."""
    sensor = cls(
        component_id=config["component_id"],
        sensor_type=config.get("sensor_type", "temperature"),
        signal_key=config.get("signal_key"),
        measurement_range=tuple(config.get("measurement_range", (250.0, 400.0))),
        measurement_noise=config.get("measurement_noise", 0.0),
        accuracy=config.get("accuracy", 0.0),
        drift_rate=config.get("drift_rate", 0.0),
        response_time=config.get("response_time", 0.0),
        sample_interval=config.get("sample_interval", 0.0),
        unit=config.get("unit"),
        output_key=config.get("output_key"),
        name=config.get("name"),
        temperature_signal_key=config.get("temperature_signal_key"),
        temperature_reference=config.get("temperature_reference", 298.15),
        pH_isopotential=config.get("pH_isopotential", 7.0),
    )
    cls._restore_io(sensor, config)
    return sensor

step(t, dt, inputs)

Advance the sensor by one simulation step.

Reads the configured signal from upstream outputs, then applies response lag, Nernst temperature correction (pH only), calibration error, drift, noise, and range limits.

Source code in pyadm1/components/sensors/physical.py
def step(self, t: float, dt: float, inputs: Dict[str, Any]) -> Dict[str, Any]:
    """
    Advance the sensor by one simulation step.

    Reads the configured signal from upstream outputs, then applies
    response lag, Nernst temperature correction (pH only), calibration
    error, drift, noise, and range limits.
    """
    self._advance_drift_and_read(dt, inputs)

    should_sample = self._should_sample(t)
    if should_sample and not np.isnan(self.true_value):
        self.filtered_value = self._apply_response_lag(self.true_value, self.filtered_value, self.response_time, dt)
        # Nernst correction: applied after lag, before noise/calibration errors.
        # Only active for pH sensors when temperature_signal_key is set.
        corrected_value = self._apply_nernst_correction(self.filtered_value, inputs)
        raw_value = self._apply_errors(corrected_value)
        self.measured_value, self.in_range = self._clamp_to_range(raw_value)
        self.last_sample_time = t
        self.is_valid = True
    elif should_sample:
        self.is_valid = False

    self.state.update(
        {
            **self._base_state_dict(),
            "filtered_value": float(self.filtered_value),
            "temperature_value": float(self._temperature_value),
        }
    )
    self.outputs_data = self._build_outputs(
        self.measured_value,
        extras={"temperature_value": float(self._temperature_value)},
        include_drift=True,
    )
    return self.outputs_data

to_dict()

Serialize sensor configuration and state.

Source code in pyadm1/components/sensors/physical.py
def to_dict(self) -> Dict[str, Any]:
    """Serialize sensor configuration and state."""
    return {
        **self._base_config_dict(),
        "sensor_type": self.sensor_type.value,
        "response_time": self.response_time,
        "temperature_signal_key": self.temperature_signal_key,
        "temperature_reference": self.temperature_reference,
        "pH_isopotential": self.pH_isopotential,
        "state": self.state,
    }

pyadm1.components.sensors.chemical.ChemicalSensor

Bases: AbstractSensor

Generic chemical analyzer with sampling delay and drift.

When measurement_noise, measurement_delay, or detection_limit are omitted, they are taken from _ANALYZER_METHOD_DEFAULTS for the chosen analyzer_method, so different analyzer types produce different measurement fidelity and timing out of the box.

Parameters:

Name Type Description Default
component_id str

Unique component identifier.

required
sensor_type str

One of VFA, ammonia, COD, or nutrients.

'VFA'
analyzer_method Optional[str]

Measurement principle such as titration or spectroscopy.

None
signal_key Optional[str]

Input key to read from upstream outputs. If omitted, a type-specific default is used.

None
measurement_range Optional[Tuple[float, float]]

Inclusive valid measurement range.

None
measurement_noise Optional[float]

Gaussian noise standard deviation in engineering units. None uses the analyzer-method default.

None
accuracy float

Maximum fixed calibration offset in engineering units.

0.0
drift_rate float

Linear drift rate in engineering units per day.

0.0
sample_interval float

Sampling interval in days.

0.0
measurement_delay Optional[float]

Delay between sampling and reported result in days. None uses the analyzer-method default.

None
response_time Optional[float]

First-order lag time constant in days applied to the analytical result as it settles to its final reading (e.g. ISE electrode equilibration). Applied every step toward the latest matured result, independently of the sample queue. None uses the analyzer-method default.

None
detection_limit Optional[float]

Values below this threshold are reported as zero. None uses the analyzer-method default.

None
unit Optional[str]

Engineering unit label.

None
output_key Optional[str]

Namespaced output key for the measured value.

None
rng_seed Optional[int]

Optional random seed for deterministic runs.

None
name Optional[str]

Human-readable component name.

None
Source code in pyadm1/components/sensors/chemical.py
class ChemicalSensor(AbstractSensor):
    """
    Generic chemical analyzer with sampling delay and drift.

    When *measurement_noise*, *measurement_delay*, or *detection_limit* are
    omitted, they are taken from ``_ANALYZER_METHOD_DEFAULTS`` for the chosen
    *analyzer_method*, so different analyzer types produce different
    measurement fidelity and timing out of the box.

    Args:
        component_id: Unique component identifier.
        sensor_type: One of ``VFA``, ``ammonia``, ``COD``, or ``nutrients``.
        analyzer_method: Measurement principle such as titration or spectroscopy.
        signal_key: Input key to read from upstream outputs. If omitted,
            a type-specific default is used.
        measurement_range: Inclusive valid measurement range.
        measurement_noise: Gaussian noise standard deviation in engineering units.
            ``None`` uses the analyzer-method default.
        accuracy: Maximum fixed calibration offset in engineering units.
        drift_rate: Linear drift rate in engineering units per day.
        sample_interval: Sampling interval in days.
        measurement_delay: Delay between sampling and reported result in days.
            ``None`` uses the analyzer-method default.
        response_time: First-order lag time constant in days applied to the
            analytical result as it settles to its final reading (e.g. ISE
            electrode equilibration). Applied every step toward the latest
            matured result, independently of the sample queue.
            ``None`` uses the analyzer-method default.
        detection_limit: Values below this threshold are reported as zero.
            ``None`` uses the analyzer-method default.
        unit: Engineering unit label.
        output_key: Namespaced output key for the measured value.
        rng_seed: Optional random seed for deterministic runs.
        name: Human-readable component name.
    """

    def __init__(
        self,
        component_id: str,
        sensor_type: str = "VFA",
        analyzer_method: Optional[str] = None,
        signal_key: Optional[str] = None,
        measurement_range: Optional[Tuple[float, float]] = None,
        measurement_noise: Optional[float] = None,
        accuracy: float = 0.0,
        drift_rate: float = 0.0,
        sample_interval: float = 0.0,
        measurement_delay: Optional[float] = None,
        response_time: Optional[float] = None,
        detection_limit: Optional[float] = None,
        unit: Optional[str] = None,
        output_key: Optional[str] = None,
        rng_seed: Optional[int] = None,
        name: Optional[str] = None,
    ):
        self.sensor_type = self._parse_sensor_type(sensor_type)
        defaults = _DEFAULT_SENSOR_CONFIG[self.sensor_type]

        self.analyzer_method = self._parse_analyzer_method(analyzer_method or defaults["analyzer_method"])

        method_defaults = _ANALYZER_METHOD_DEFAULTS.get(self.sensor_type, {}).get(self.analyzer_method, {})

        resolved_signal_key = signal_key or defaults["signal_key"]
        resolved_candidate_keys = (resolved_signal_key,) if signal_key else defaults["candidate_keys"]
        resolved_noise = measurement_noise if measurement_noise is not None else method_defaults.get("measurement_noise", 0.0)
        resolved_delay = measurement_delay if measurement_delay is not None else method_defaults.get("measurement_delay", 0.0)
        resolved_response_time = response_time if response_time is not None else method_defaults.get("response_time", 0.0)
        resolved_detection_limit = (
            detection_limit
            if detection_limit is not None
            else method_defaults.get("detection_limit", defaults["detection_limit"])
        )

        super().__init__(
            component_id=component_id,
            signal_key=resolved_signal_key,
            candidate_keys=resolved_candidate_keys,
            measurement_range=measurement_range or defaults["measurement_range"],
            measurement_noise=resolved_noise,
            accuracy=accuracy,
            drift_rate=drift_rate,
            sample_interval=sample_interval,
            unit=unit or defaults["unit"],
            output_key=output_key,
            rng_seed=rng_seed,
            name=name,
        )

        self.measurement_delay = float(max(0.0, resolved_delay))
        self.response_time = float(max(0.0, resolved_response_time))
        self.detection_limit = float(max(0.0, resolved_detection_limit))

        self.filtered_value: float = np.nan
        self.reported_value: float = np.nan
        self.last_result_time: float = -np.inf
        self.is_detected: bool = False
        self._pending_samples: List[Tuple[float, float]] = []

        self.initialize()

    @staticmethod
    def _parse_sensor_type(sensor_type: str) -> ChemicalSensorType:
        """Normalize user input into a supported chemical sensor type."""
        aliases = {
            "vfa": ChemicalSensorType.VFA,
            "volatile_fatty_acids": ChemicalSensorType.VFA,
            "ammonia": ChemicalSensorType.AMMONIA,
            "nh3": ChemicalSensorType.AMMONIA,
            "tan": ChemicalSensorType.AMMONIA,
            "cod": ChemicalSensorType.COD,
            "nutrients": ChemicalSensorType.NUTRIENTS,
            "nutrient": ChemicalSensorType.NUTRIENTS,
        }
        return AbstractSensor._parse_enum(sensor_type, aliases, ChemicalSensorType, "chemical sensor type")

    @staticmethod
    def _parse_analyzer_method(analyzer_method: str) -> ChemicalAnalyzerMethod:
        """Normalize analyzer method input."""
        aliases = {
            "online_titration": ChemicalAnalyzerMethod.ONLINE_TITRATION,
            "titration": ChemicalAnalyzerMethod.ONLINE_TITRATION,
            "gas_chromatography": ChemicalAnalyzerMethod.GAS_CHROMATOGRAPHY,
            "gc": ChemicalAnalyzerMethod.GAS_CHROMATOGRAPHY,
            "ion_selective": ChemicalAnalyzerMethod.ION_SELECTIVE,
            "ion_selective_electrode": ChemicalAnalyzerMethod.ION_SELECTIVE,
            "ise": ChemicalAnalyzerMethod.ION_SELECTIVE,
            "spectroscopy": ChemicalAnalyzerMethod.SPECTROSCOPY,
            "spectroscopic": ChemicalAnalyzerMethod.SPECTROSCOPY,
            "colorimetric": ChemicalAnalyzerMethod.COLORIMETRIC,
        }
        return AbstractSensor._parse_enum(analyzer_method, aliases, ChemicalAnalyzerMethod, "analyzer method")

    def _initialize_subclass(self, initial_state: Optional[Dict[str, Any]]) -> None:
        """Reset / restore chemical-analyzer extras and build state + outputs."""
        self.filtered_value = np.nan
        self.reported_value = np.nan
        self.last_result_time = -np.inf
        self.is_detected = False
        self._pending_samples = []

        if initial_state:
            self.filtered_value = float(initial_state.get("filtered_value", self.measured_value))
            self.reported_value = float(initial_state.get("reported_value", self.filtered_value))
            self.last_result_time = float(initial_state.get("last_result_time", -np.inf))
            self.is_detected = bool(initial_state.get("is_detected", False))
            pending = initial_state.get("pending_samples", [])
            self._pending_samples = [(float(r), float(s)) for r, s in pending]

        self.state = {
            **self._base_state_dict(),
            "filtered_value": float(self.filtered_value),
            "reported_value": float(self.reported_value),
            "last_result_time": float(self.last_result_time),
            "is_detected": bool(self.is_detected),
            "pending_samples": list(self._pending_samples),
        }
        self.outputs_data = self._build_outputs(
            self.reported_value,
            extras={
                "analyzer_method": self.analyzer_method.value,
                "is_detected": bool(self.is_detected),
            },
        )

    def step(self, t: float, dt: float, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Advance the analyzer by one simulation step."""
        self._advance_drift_and_read(dt, inputs)

        if self._should_sample(t) and not np.isnan(self.true_value):
            self._pending_samples.append((t + self.measurement_delay, self.true_value))
            self.last_sample_time = t

        matured_sample = self._pop_latest_ready_sample(t + dt)
        if matured_sample is not None:
            raw_value = self._apply_errors(matured_sample)
            self.measured_value, self.in_range = self._clamp_to_range(raw_value)
            self.last_result_time = t
            self.is_valid = True

        # Apply response lag toward the latest analytical result every step.
        # With response_time=0 this is a direct assignment (backward compatible).
        if not np.isnan(self.measured_value):
            self.filtered_value = self._apply_response_lag(self.measured_value, self.filtered_value, self.response_time, dt)
            self.is_detected = self.filtered_value >= self.detection_limit
            self.reported_value = float(self.filtered_value if self.is_detected else 0.0)

        self.state.update(
            {
                **self._base_state_dict(),
                "filtered_value": float(self.filtered_value),
                "reported_value": float(self.reported_value),
                "last_result_time": float(self.last_result_time),
                "is_detected": bool(self.is_detected),
                "pending_samples": list(self._pending_samples),
            }
        )
        self.outputs_data = self._build_outputs(
            self.reported_value,
            extras={
                "analyzer_method": self.analyzer_method.value,
                "is_detected": bool(self.is_detected),
            },
            include_drift=True,
        )
        return self.outputs_data

    def _pop_latest_ready_sample(self, t_end: float) -> Optional[float]:
        """
        Return the latest sample whose analysis delay completes before *t_end*.

        A sample added at time ``t_sample`` with delay ``d`` has release time
        ``t_sample + d``; it matures during a simulation step covering the
        half-open interval ``[t, t + dt)``.  Callers pass ``t_end = t + dt``
        so that samples whose analysis finishes within the current step are
        reported at the end of that step (strict ``<`` preserves the
        half-open convention — a sample releasing exactly at ``t + dt``
        matures on the next step).
        """
        latest_ready: Optional[float] = None
        pending: List[Tuple[float, float]] = []
        for release_time, sample_value in self._pending_samples:
            if release_time < t_end:
                latest_ready = sample_value
            else:
                pending.append((release_time, sample_value))
        self._pending_samples = pending
        return latest_ready

    def to_dict(self) -> Dict[str, Any]:
        """Serialize analyzer configuration and state."""
        return {
            **self._base_config_dict(),
            "sensor_type": self.sensor_type.value,
            "analyzer_method": self.analyzer_method.value,
            "measurement_delay": self.measurement_delay,
            "response_time": self.response_time,
            "detection_limit": self.detection_limit,
            "state": self.state,
        }

    @classmethod
    def from_dict(cls, config: Dict[str, Any]) -> "ChemicalSensor":
        """Create analyzer from serialized configuration."""
        sensor = cls(
            component_id=config["component_id"],
            sensor_type=config.get("sensor_type", "VFA"),
            analyzer_method=config.get("analyzer_method"),
            signal_key=config.get("signal_key"),
            measurement_range=tuple(config.get("measurement_range", (0.0, 25.0))),
            measurement_noise=config.get("measurement_noise"),
            accuracy=config.get("accuracy", 0.0),
            drift_rate=config.get("drift_rate", 0.0),
            sample_interval=config.get("sample_interval", 0.0),
            measurement_delay=config.get("measurement_delay"),
            response_time=config.get("response_time"),
            detection_limit=config.get("detection_limit"),
            unit=config.get("unit"),
            output_key=config.get("output_key"),
            name=config.get("name"),
        )
        cls._restore_io(sensor, config)
        return sensor

Functions

from_dict(config) classmethod

Create analyzer from serialized configuration.

Source code in pyadm1/components/sensors/chemical.py
@classmethod
def from_dict(cls, config: Dict[str, Any]) -> "ChemicalSensor":
    """Create analyzer from serialized configuration."""
    sensor = cls(
        component_id=config["component_id"],
        sensor_type=config.get("sensor_type", "VFA"),
        analyzer_method=config.get("analyzer_method"),
        signal_key=config.get("signal_key"),
        measurement_range=tuple(config.get("measurement_range", (0.0, 25.0))),
        measurement_noise=config.get("measurement_noise"),
        accuracy=config.get("accuracy", 0.0),
        drift_rate=config.get("drift_rate", 0.0),
        sample_interval=config.get("sample_interval", 0.0),
        measurement_delay=config.get("measurement_delay"),
        response_time=config.get("response_time"),
        detection_limit=config.get("detection_limit"),
        unit=config.get("unit"),
        output_key=config.get("output_key"),
        name=config.get("name"),
    )
    cls._restore_io(sensor, config)
    return sensor

step(t, dt, inputs)

Advance the analyzer by one simulation step.

Source code in pyadm1/components/sensors/chemical.py
def step(self, t: float, dt: float, inputs: Dict[str, Any]) -> Dict[str, Any]:
    """Advance the analyzer by one simulation step."""
    self._advance_drift_and_read(dt, inputs)

    if self._should_sample(t) and not np.isnan(self.true_value):
        self._pending_samples.append((t + self.measurement_delay, self.true_value))
        self.last_sample_time = t

    matured_sample = self._pop_latest_ready_sample(t + dt)
    if matured_sample is not None:
        raw_value = self._apply_errors(matured_sample)
        self.measured_value, self.in_range = self._clamp_to_range(raw_value)
        self.last_result_time = t
        self.is_valid = True

    # Apply response lag toward the latest analytical result every step.
    # With response_time=0 this is a direct assignment (backward compatible).
    if not np.isnan(self.measured_value):
        self.filtered_value = self._apply_response_lag(self.measured_value, self.filtered_value, self.response_time, dt)
        self.is_detected = self.filtered_value >= self.detection_limit
        self.reported_value = float(self.filtered_value if self.is_detected else 0.0)

    self.state.update(
        {
            **self._base_state_dict(),
            "filtered_value": float(self.filtered_value),
            "reported_value": float(self.reported_value),
            "last_result_time": float(self.last_result_time),
            "is_detected": bool(self.is_detected),
            "pending_samples": list(self._pending_samples),
        }
    )
    self.outputs_data = self._build_outputs(
        self.reported_value,
        extras={
            "analyzer_method": self.analyzer_method.value,
            "is_detected": bool(self.is_detected),
        },
        include_drift=True,
    )
    return self.outputs_data

to_dict()

Serialize analyzer configuration and state.

Source code in pyadm1/components/sensors/chemical.py
def to_dict(self) -> Dict[str, Any]:
    """Serialize analyzer configuration and state."""
    return {
        **self._base_config_dict(),
        "sensor_type": self.sensor_type.value,
        "analyzer_method": self.analyzer_method.value,
        "measurement_delay": self.measurement_delay,
        "response_time": self.response_time,
        "detection_limit": self.detection_limit,
        "state": self.state,
    }

pyadm1.components.sensors.gas.GasSensor

Bases: AbstractSensor

Generic gas composition sensor with detection limit and response lag.

Supports two operating modes selected automatically by measurement_delay:

Continuous mode (measurement_delay = 0, default for NDIR, electrochemical, paramagnetic, PID): the response lag is applied to the live input signal every step, matching how analog gas analyzers smooth their output.

Batch mode (measurement_delay > 0, e.g. gas chromatographs): a sample is queued at each sampling interval and released after the analysis delay elapses. The response lag is then applied continuously toward the latest released result, modelling the instrument output settling.

When measurement_noise, response_time, measurement_delay, or detection_limit are omitted, they are taken from _ANALYZER_METHOD_DEFAULTS for the chosen analyzer_method.

Parameters:

Name Type Description Default
component_id str

Unique component identifier.

required
sensor_type str

One of CH4, CO2, H2S, O2, or trace_gas.

'CH4'
analyzer_method Optional[str]

Measurement principle — infrared, calorimetric, electrochemical, paramagnetic, photoionization, or gas_chromatography.

None
signal_key Optional[str]

Input key to read from upstream outputs.

None
measurement_range Optional[Tuple[float, float]]

Inclusive valid measurement range.

None
measurement_noise Optional[float]

Gaussian noise std dev in engineering units. None uses the analyzer-method default.

None
accuracy float

Maximum fixed calibration offset in engineering units.

0.0
drift_rate float

Linear drift rate in engineering units per day.

0.0
response_time Optional[float]

First-order lag time constant in days. None uses the analyzer-method default.

None
sample_interval float

Sampling interval in days.

0.0
measurement_delay Optional[float]

Delay from sampling to result release in days. None uses the analyzer-method default. Values > 0 activate batch mode.

None
detection_limit Optional[float]

Values below this threshold are reported as zero. None uses the analyzer-method default.

None
cross_sensitivity Optional[Dict[str, float]]

Mapping from input signal key to interference coefficient (same engineering units as the sensor output per unit of the interfering gas). The total additive bias is sum(coeff * inputs[key] for key, coeff in cross_sensitivity.items()). Applied after response lag and before calibration errors. Example — CO2 interference on a CH4 NDIR channel: {"CO2": -0.003} (−0.3 % CH4 per 1 % CO2).

None
unit Optional[str]

Engineering unit label.

None
output_key Optional[str]

Namespaced output key for the measured value.

None
rng_seed Optional[int]

Optional random seed for deterministic runs.

None
name Optional[str]

Human-readable component name.

None
Source code in pyadm1/components/sensors/gas.py
class GasSensor(AbstractSensor):
    """
    Generic gas composition sensor with detection limit and response lag.

    Supports two operating modes selected automatically by *measurement_delay*:

    **Continuous mode** (``measurement_delay = 0``, default for NDIR,
    electrochemical, paramagnetic, PID): the response lag is applied to the
    live input signal every step, matching how analog gas analyzers smooth
    their output.

    **Batch mode** (``measurement_delay > 0``, e.g. gas chromatographs):
    a sample is queued at each sampling interval and released after the
    analysis delay elapses.  The response lag is then applied continuously
    toward the latest released result, modelling the instrument output settling.

    When *measurement_noise*, *response_time*, *measurement_delay*, or
    *detection_limit* are omitted, they are taken from
    ``_ANALYZER_METHOD_DEFAULTS`` for the chosen *analyzer_method*.

    Args:
        component_id: Unique component identifier.
        sensor_type: One of ``CH4``, ``CO2``, ``H2S``, ``O2``, or ``trace_gas``.
        analyzer_method: Measurement principle — ``infrared``, ``calorimetric``,
            ``electrochemical``, ``paramagnetic``, ``photoionization``, or
            ``gas_chromatography``.
        signal_key: Input key to read from upstream outputs.
        measurement_range: Inclusive valid measurement range.
        measurement_noise: Gaussian noise std dev in engineering units.
            ``None`` uses the analyzer-method default.
        accuracy: Maximum fixed calibration offset in engineering units.
        drift_rate: Linear drift rate in engineering units per day.
        response_time: First-order lag time constant in days.
            ``None`` uses the analyzer-method default.
        sample_interval: Sampling interval in days.
        measurement_delay: Delay from sampling to result release in days.
            ``None`` uses the analyzer-method default.  Values > 0 activate
            batch mode.
        detection_limit: Values below this threshold are reported as zero.
            ``None`` uses the analyzer-method default.
        cross_sensitivity: Mapping from input signal key to interference
            coefficient (same engineering units as the sensor output per unit
            of the interfering gas).  The total additive bias is
            ``sum(coeff * inputs[key] for key, coeff in cross_sensitivity.items())``.
            Applied after response lag and before calibration errors.
            Example — CO2 interference on a CH4 NDIR channel:
            ``{"CO2": -0.003}`` (−0.3 % CH4 per 1 % CO2).
        unit: Engineering unit label.
        output_key: Namespaced output key for the measured value.
        rng_seed: Optional random seed for deterministic runs.
        name: Human-readable component name.
    """

    def __init__(
        self,
        component_id: str,
        sensor_type: str = "CH4",
        analyzer_method: Optional[str] = None,
        signal_key: Optional[str] = None,
        measurement_range: Optional[Tuple[float, float]] = None,
        measurement_noise: Optional[float] = None,
        accuracy: float = 0.0,
        drift_rate: float = 0.0,
        response_time: Optional[float] = None,
        sample_interval: float = 0.0,
        measurement_delay: Optional[float] = None,
        detection_limit: Optional[float] = None,
        cross_sensitivity: Optional[Dict[str, float]] = None,
        unit: Optional[str] = None,
        output_key: Optional[str] = None,
        rng_seed: Optional[int] = None,
        name: Optional[str] = None,
    ):
        self.sensor_type = self._parse_sensor_type(sensor_type)
        defaults = _DEFAULT_SENSOR_CONFIG[self.sensor_type]

        self.analyzer_method = self._parse_analyzer_method(analyzer_method or defaults["analyzer_method"])

        method_defaults = _ANALYZER_METHOD_DEFAULTS.get(self.sensor_type, {}).get(self.analyzer_method, {})

        resolved_signal_key = signal_key or defaults["signal_key"]
        resolved_candidate_keys = (resolved_signal_key,) if signal_key else defaults["candidate_keys"]
        resolved_noise = measurement_noise if measurement_noise is not None else method_defaults.get("measurement_noise", 0.0)
        resolved_response_time = response_time if response_time is not None else method_defaults.get("response_time", 0.0)
        resolved_delay = measurement_delay if measurement_delay is not None else method_defaults.get("measurement_delay", 0.0)
        resolved_detection_limit = (
            detection_limit
            if detection_limit is not None
            else method_defaults.get("detection_limit", defaults["detection_limit"])
        )

        super().__init__(
            component_id=component_id,
            signal_key=resolved_signal_key,
            candidate_keys=resolved_candidate_keys,
            measurement_range=measurement_range or defaults["measurement_range"],
            measurement_noise=resolved_noise,
            accuracy=accuracy,
            drift_rate=drift_rate,
            sample_interval=sample_interval,
            unit=unit or defaults["unit"],
            output_key=output_key,
            rng_seed=rng_seed,
            name=name,
        )

        self.response_time = float(max(0.0, resolved_response_time))
        self.measurement_delay = float(max(0.0, resolved_delay))
        self.detection_limit = float(max(0.0, resolved_detection_limit))
        self.cross_sensitivity: Dict[str, float] = dict(cross_sensitivity) if cross_sensitivity else {}

        self.filtered_value: float = np.nan
        self.reported_value: float = np.nan
        self.is_detected: bool = False
        self._pending_samples: List[Tuple[float, float]] = []

        self.initialize()

    @staticmethod
    def _parse_sensor_type(sensor_type: str) -> GasSensorType:
        """Normalize user input into a supported gas sensor type."""
        aliases = {
            "ch4": GasSensorType.CH4,
            "methane": GasSensorType.CH4,
            "co2": GasSensorType.CO2,
            "carbon_dioxide": GasSensorType.CO2,
            "h2s": GasSensorType.H2S,
            "hydrogen_sulfide": GasSensorType.H2S,
            "o2": GasSensorType.O2,
            "oxygen": GasSensorType.O2,
            "trace_gas": GasSensorType.TRACE_GAS,
            "tracegas": GasSensorType.TRACE_GAS,
            "trace": GasSensorType.TRACE_GAS,
        }
        return AbstractSensor._parse_enum(sensor_type, aliases, GasSensorType, "gas sensor type")

    @staticmethod
    def _parse_analyzer_method(analyzer_method: str) -> GasAnalyzerMethod:
        """Normalize analyzer method input."""
        aliases = {
            "infrared": GasAnalyzerMethod.INFRARED,
            "ndir": GasAnalyzerMethod.INFRARED,
            "calorimetric": GasAnalyzerMethod.CALORIMETRIC,
            "electrochemical": GasAnalyzerMethod.ELECTROCHEMICAL,
            "paramagnetic": GasAnalyzerMethod.PARAMAGNETIC,
            "photoionization": GasAnalyzerMethod.PHOTOIONIZATION,
            "pid": GasAnalyzerMethod.PHOTOIONIZATION,
            "gas_chromatography": GasAnalyzerMethod.GAS_CHROMATOGRAPHY,
            "gc": GasAnalyzerMethod.GAS_CHROMATOGRAPHY,
        }
        return AbstractSensor._parse_enum(analyzer_method, aliases, GasAnalyzerMethod, "analyzer method")

    def _apply_cross_sensitivity(self, value: float, inputs: Dict[str, Any]) -> float:
        """Add interference bias from other signals in *inputs*."""
        if not self.cross_sensitivity:
            return value
        bias = 0.0
        for key, coefficient in self.cross_sensitivity.items():
            raw = inputs.get(key)
            if raw is None:
                continue
            try:
                bias += coefficient * float(raw)
            except (TypeError, ValueError):
                continue
        return value + bias

    def _initialize_subclass(self, initial_state: Optional[Dict[str, Any]]) -> None:
        """Reset / restore gas-sensor extras and build state + outputs."""
        self.filtered_value = np.nan
        self.reported_value = np.nan
        self.is_detected = False
        self._pending_samples = []

        if initial_state:
            self.filtered_value = float(initial_state.get("filtered_value", self.true_value))
            self.reported_value = float(initial_state.get("reported_value", self.measured_value))
            self.is_detected = bool(initial_state.get("is_detected", False))
            pending = initial_state.get("pending_samples", [])
            self._pending_samples = [(float(r), float(s)) for r, s in pending]

        self.state = {
            **self._base_state_dict(),
            "filtered_value": float(self.filtered_value),
            "reported_value": float(self.reported_value),
            "is_detected": bool(self.is_detected),
            "pending_samples": list(self._pending_samples),
        }
        self.outputs_data = self._build_outputs(
            self.reported_value,
            extras={
                "analyzer_method": self.analyzer_method.value,
                "is_detected": bool(self.is_detected),
            },
        )

    def step(self, t: float, dt: float, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """
        Advance the sensor by one simulation step.

        In **continuous mode** (``measurement_delay = 0``) the response lag is
        applied to the live input signal before adding errors, matching the
        analog behaviour of NDIR and electrochemical sensors.

        In **batch mode** (``measurement_delay > 0``) a sample is queued and
        released after the analysis delay, then the response lag tracks the
        released result, matching the discrete output of a GC analyzer.
        """
        self._advance_drift_and_read(dt, inputs)

        should_sample = self._should_sample(t)

        if self.measurement_delay > 0:
            # ---- Batch mode (GC-type) ----------------------------------------
            if should_sample and not np.isnan(self.true_value):
                self._pending_samples.append((t + self.measurement_delay, self.true_value))
                self.last_sample_time = t
            elif should_sample:
                self.is_valid = False

            matured = self._pop_latest_ready_sample(t)
            if matured is not None:
                cross_corrected = self._apply_cross_sensitivity(matured, inputs)
                raw_value = self._apply_errors(cross_corrected)
                self.measured_value, self.in_range = self._clamp_to_range(raw_value)
                self.is_valid = True

            # Lag applied continuously toward the latest released result.
            if not np.isnan(self.measured_value):
                self.filtered_value = self._apply_response_lag(
                    self.measured_value, self.filtered_value, self.response_time, dt
                )
                self.is_detected = self.filtered_value >= self.detection_limit
                self.reported_value = float(self.filtered_value if self.is_detected else 0.0)

        else:
            # ---- Continuous mode (NDIR, electrochemical, PID, …) --------------
            if should_sample and not np.isnan(self.true_value):
                self.filtered_value = self._apply_response_lag(self.true_value, self.filtered_value, self.response_time, dt)
                cross_corrected = self._apply_cross_sensitivity(self.filtered_value, inputs)
                raw_value = self._apply_errors(cross_corrected)
                self.measured_value, self.in_range = self._clamp_to_range(raw_value)
                self.is_detected = self.measured_value >= self.detection_limit
                self.reported_value = float(self.measured_value if self.is_detected else 0.0)
                self.last_sample_time = t
                self.is_valid = True
            elif should_sample:
                self.is_valid = False

        self.state.update(
            {
                **self._base_state_dict(),
                "filtered_value": float(self.filtered_value),
                "reported_value": float(self.reported_value),
                "is_detected": bool(self.is_detected),
                "pending_samples": list(self._pending_samples),
            }
        )
        self.outputs_data = self._build_outputs(
            self.reported_value,
            extras={
                "analyzer_method": self.analyzer_method.value,
                "is_detected": bool(self.is_detected),
            },
            include_drift=True,
        )
        return self.outputs_data

    def _pop_latest_ready_sample(self, t: float) -> Optional[float]:
        """Return the latest sample whose analysis delay has elapsed."""
        latest_ready: Optional[float] = None
        pending: List[Tuple[float, float]] = []
        for release_time, sample_value in self._pending_samples:
            if release_time <= t + 1e-12:
                latest_ready = sample_value
            else:
                pending.append((release_time, sample_value))
        self._pending_samples = pending
        return latest_ready

    def to_dict(self) -> Dict[str, Any]:
        """Serialize sensor configuration and state."""
        return {
            **self._base_config_dict(),
            "sensor_type": self.sensor_type.value,
            "analyzer_method": self.analyzer_method.value,
            "response_time": self.response_time,
            "measurement_delay": self.measurement_delay,
            "detection_limit": self.detection_limit,
            "cross_sensitivity": dict(self.cross_sensitivity),
            "state": self.state,
        }

    @classmethod
    def from_dict(cls, config: Dict[str, Any]) -> "GasSensor":
        """Create sensor from serialized configuration."""
        sensor = cls(
            component_id=config["component_id"],
            sensor_type=config.get("sensor_type", "CH4"),
            analyzer_method=config.get("analyzer_method"),
            signal_key=config.get("signal_key"),
            measurement_range=tuple(config.get("measurement_range", (0.0, 100.0))),
            measurement_noise=config.get("measurement_noise"),
            accuracy=config.get("accuracy", 0.0),
            drift_rate=config.get("drift_rate", 0.0),
            response_time=config.get("response_time"),
            sample_interval=config.get("sample_interval", 0.0),
            measurement_delay=config.get("measurement_delay"),
            detection_limit=config.get("detection_limit"),
            cross_sensitivity=config.get("cross_sensitivity"),
            unit=config.get("unit"),
            output_key=config.get("output_key"),
            name=config.get("name"),
        )
        cls._restore_io(sensor, config)
        return sensor

Functions

from_dict(config) classmethod

Create sensor from serialized configuration.

Source code in pyadm1/components/sensors/gas.py
@classmethod
def from_dict(cls, config: Dict[str, Any]) -> "GasSensor":
    """Create sensor from serialized configuration."""
    sensor = cls(
        component_id=config["component_id"],
        sensor_type=config.get("sensor_type", "CH4"),
        analyzer_method=config.get("analyzer_method"),
        signal_key=config.get("signal_key"),
        measurement_range=tuple(config.get("measurement_range", (0.0, 100.0))),
        measurement_noise=config.get("measurement_noise"),
        accuracy=config.get("accuracy", 0.0),
        drift_rate=config.get("drift_rate", 0.0),
        response_time=config.get("response_time"),
        sample_interval=config.get("sample_interval", 0.0),
        measurement_delay=config.get("measurement_delay"),
        detection_limit=config.get("detection_limit"),
        cross_sensitivity=config.get("cross_sensitivity"),
        unit=config.get("unit"),
        output_key=config.get("output_key"),
        name=config.get("name"),
    )
    cls._restore_io(sensor, config)
    return sensor

step(t, dt, inputs)

Advance the sensor by one simulation step.

In continuous mode (measurement_delay = 0) the response lag is applied to the live input signal before adding errors, matching the analog behaviour of NDIR and electrochemical sensors.

In batch mode (measurement_delay > 0) a sample is queued and released after the analysis delay, then the response lag tracks the released result, matching the discrete output of a GC analyzer.

Source code in pyadm1/components/sensors/gas.py
def step(self, t: float, dt: float, inputs: Dict[str, Any]) -> Dict[str, Any]:
    """
    Advance the sensor by one simulation step.

    In **continuous mode** (``measurement_delay = 0``) the response lag is
    applied to the live input signal before adding errors, matching the
    analog behaviour of NDIR and electrochemical sensors.

    In **batch mode** (``measurement_delay > 0``) a sample is queued and
    released after the analysis delay, then the response lag tracks the
    released result, matching the discrete output of a GC analyzer.
    """
    self._advance_drift_and_read(dt, inputs)

    should_sample = self._should_sample(t)

    if self.measurement_delay > 0:
        # ---- Batch mode (GC-type) ----------------------------------------
        if should_sample and not np.isnan(self.true_value):
            self._pending_samples.append((t + self.measurement_delay, self.true_value))
            self.last_sample_time = t
        elif should_sample:
            self.is_valid = False

        matured = self._pop_latest_ready_sample(t)
        if matured is not None:
            cross_corrected = self._apply_cross_sensitivity(matured, inputs)
            raw_value = self._apply_errors(cross_corrected)
            self.measured_value, self.in_range = self._clamp_to_range(raw_value)
            self.is_valid = True

        # Lag applied continuously toward the latest released result.
        if not np.isnan(self.measured_value):
            self.filtered_value = self._apply_response_lag(
                self.measured_value, self.filtered_value, self.response_time, dt
            )
            self.is_detected = self.filtered_value >= self.detection_limit
            self.reported_value = float(self.filtered_value if self.is_detected else 0.0)

    else:
        # ---- Continuous mode (NDIR, electrochemical, PID, …) --------------
        if should_sample and not np.isnan(self.true_value):
            self.filtered_value = self._apply_response_lag(self.true_value, self.filtered_value, self.response_time, dt)
            cross_corrected = self._apply_cross_sensitivity(self.filtered_value, inputs)
            raw_value = self._apply_errors(cross_corrected)
            self.measured_value, self.in_range = self._clamp_to_range(raw_value)
            self.is_detected = self.measured_value >= self.detection_limit
            self.reported_value = float(self.measured_value if self.is_detected else 0.0)
            self.last_sample_time = t
            self.is_valid = True
        elif should_sample:
            self.is_valid = False

    self.state.update(
        {
            **self._base_state_dict(),
            "filtered_value": float(self.filtered_value),
            "reported_value": float(self.reported_value),
            "is_detected": bool(self.is_detected),
            "pending_samples": list(self._pending_samples),
        }
    )
    self.outputs_data = self._build_outputs(
        self.reported_value,
        extras={
            "analyzer_method": self.analyzer_method.value,
            "is_detected": bool(self.is_detected),
        },
        include_drift=True,
    )
    return self.outputs_data

to_dict()

Serialize sensor configuration and state.

Source code in pyadm1/components/sensors/gas.py
def to_dict(self) -> Dict[str, Any]:
    """Serialize sensor configuration and state."""
    return {
        **self._base_config_dict(),
        "sensor_type": self.sensor_type.value,
        "analyzer_method": self.analyzer_method.value,
        "response_time": self.response_time,
        "measurement_delay": self.measurement_delay,
        "detection_limit": self.detection_limit,
        "cross_sensitivity": dict(self.cross_sensitivity),
        "state": self.state,
    }