Zum Inhalt

Kern-API (Core API)

custom_grid_env.interface.AgentInterface(render=True, render_mode=None, step_delay=100, slip_probability=0.2, slip_type='longitudinal', ghost_agent_class=None, use_particle_filter=True, pf_num_particles=200, pf_sensor_mode='both', show_particles=True, color_sensor_quality=0.8)

Interface for AI agents to interact with the CustomGridEnv.

Usage

interface = AgentInterface(render=True, slip_probability=0.2) obs = interface.reset() while not interface.is_terminated(): action = your_agent.get_action(obs) obs, reward, done, info = interface.step(action) results = interface.get_episode_stats() interface.close()

Attributes:

Name Type Description
env CustomGridEnv

The gymnasium environment.

render_enabled bool

Whether to render the environment.

step_delay int

Delay between steps in milliseconds.

total_reward float

Cumulative reward in the current episode.

terminated bool

Whether the episode has terminated.

truncated bool

Whether the episode was truncated.

episode_steps int

Number of steps taken in the current episode.

last_info dict

Information from the last step.

Initializes the AgentInterface.

Parameters:

Name Type Description Default
render bool

Whether to render the graphical display. Defaults to True.

True
render_mode str

The mode to render with ("human" or "rgb_array"). Defaults to "rgb_array" if render is True and no mode is provided.

None
step_delay int

Milliseconds to wait between steps when rendering. Defaults to 100.

100
slip_probability float

Probability of slipping. Defaults to 0.2.

0.2
ghost_agent_class type

Class for ghost agent. Defaults to ChaseGhostAgent.

None
color_sensor_quality float

Probability of the color sensor measuring the correct color. Defaults to 0.8.

0.8
Source code in src/custom_grid_env/interface.py
def __init__(
    self,
    render: bool = True,
    render_mode: Optional[str] = None,
    step_delay: int = 100,
    slip_probability: float = 0.2,
    slip_type: str = "longitudinal",
    ghost_agent_class: Optional[Type[Agent]] = None,
    use_particle_filter: bool = True,
    pf_num_particles: int = 200,
    pf_sensor_mode: str = "both",  # 'color', 'cnn', or 'both'
    show_particles: bool = True,
    color_sensor_quality: float = 0.8,
):
    """Initializes the AgentInterface.

    Args:
        render (bool): Whether to render the graphical display. Defaults to True.
        render_mode (str, optional): The mode to render with ("human" or "rgb_array").
            Defaults to "rgb_array" if render is True and no mode is provided.
        step_delay (int): Milliseconds to wait between steps when rendering. Defaults to 100.
        slip_probability (float): Probability of slipping. Defaults to 0.2.
        ghost_agent_class (type, optional): Class for ghost agent. Defaults to ChaseGhostAgent.
        color_sensor_quality (float): Probability of the color sensor measuring the correct color.
            Defaults to 0.8.
    """
    if render_mode is None:
        render_mode = "rgb_array" if render else None

    self.env = CustomGridEnv(
        render_mode=render_mode,
        slip_probability=slip_probability,
        slip_type=slip_type,
        color_sensor_quality=color_sensor_quality,
    )
    self.render_enabled = render
    self.step_delay = step_delay
    self.total_reward = 0.0
    self.terminated = False
    self.truncated = False
    self.episode_steps = 0
    self.last_info = {}

    if ghost_agent_class is None:
        self._ghost_agent = ChaseGhostAgent(self.env.action_space)
    else:
        self._ghost_agent = ghost_agent_class(self.env.action_space)
    self.vision_sensor = VisionSensor()

    self.use_particle_filter = use_particle_filter
    self.pf_sensor_mode = pf_sensor_mode
    self.show_particles = show_particles
    self.pf = None
    if self.use_particle_filter:
        self.pf = ParticleFilter(
            rows=self.env.rows, cols=self.env.cols, num_particles=pf_num_particles
        )

close()

Cleans up resources.

Source code in src/custom_grid_env/interface.py
def close(self):
    """Cleans up resources."""
    self.env.close()

get_action_space()

Gets the action space.

Returns:

Type Description
Space

gym.spaces.Space: The action space.

Source code in src/custom_grid_env/interface.py
def get_action_space(self) -> gym.spaces.Space:
    """Gets the action space.

    Returns:
        gym.spaces.Space: The action space.
    """
    return self.env.action_space

get_episode_stats()

Gets statistics for the current/last episode.

Returns:

Name Type Description
dict Dict[str, Any]

Episode statistics.

Source code in src/custom_grid_env/interface.py
def get_episode_stats(self) -> Dict[str, Any]:
    """Gets statistics for the current/last episode.

    Returns:
        dict: Episode statistics.
    """
    return {
        "total_reward": self.total_reward,
        "steps": self.episode_steps,
        "terminated": self.terminated,
        "truncated": self.truncated,
        "reached_goal": self.last_info.get("reached_goal", False),
        "caught_by_ghost": self.last_info.get("caught_by_ghost", False),
    }

get_observation_space()

Gets the observation space.

Returns:

Type Description
Space

gym.spaces.Space: The observation space.

Source code in src/custom_grid_env/interface.py
def get_observation_space(self) -> gym.spaces.Space:
    """Gets the observation space.

    Returns:
        gym.spaces.Space: The observation space.
    """
    return self.env.observation_space

get_reward_structure()

Gets the reward structure for the environment.

Returns:

Name Type Description
dict Dict[str, Any]

The reward structure.

Source code in src/custom_grid_env/interface.py
def get_reward_structure(self) -> Dict[str, Any]:
    """Gets the reward structure for the environment.

    Returns:
        dict: The reward structure.
    """
    return self.env.get_reward_structure()

is_terminated()

Checks if the current episode has ended.

Returns:

Name Type Description
bool bool

True if terminated or truncated, False otherwise.

Source code in src/custom_grid_env/interface.py
def is_terminated(self) -> bool:
    """Checks if the current episode has ended.

    Returns:
        bool: True if terminated or truncated, False otherwise.
    """
    return self.terminated or self.truncated

reset(seed=None)

Resets the environment for a new episode.

Parameters:

Name Type Description Default
seed int

Random seed for reproducibility.

None

Returns:

Name Type Description
dict Dict[str, Any]

Initial observation for the agent.

Source code in src/custom_grid_env/interface.py
def reset(self, seed: Optional[int] = None) -> Dict[str, Any]:
    """Resets the environment for a new episode.

    Args:
        seed (int, optional): Random seed for reproducibility.

    Returns:
        dict: Initial observation for the agent.
    """
    obs, info = self.env.reset(seed=seed)
    self.total_reward = 0.0
    self.terminated = False
    self.truncated = False
    self.episode_steps = 0
    self.last_info = info

    if self.pf:
        # Re-initialize PF with original particle count
        self.pf = ParticleFilter(
            rows=self.env.rows,
            cols=self.env.cols,
            num_particles=self.pf.num_particles,
        )
        # Initial update with reset measurement
        self._update_pf(info)

    if self.render_enabled:
        self._render_with_pf()

    return obs

set_ghost_agent(agent_class)

Sets the ghost agent.

Parameters:

Name Type Description Default
agent_class Type[Agent]

The class of the new ghost agent.

required
Source code in src/custom_grid_env/interface.py
def set_ghost_agent(self, agent_class: Type[Agent]):
    """Sets the ghost agent.

    Args:
        agent_class (Type[Agent]): The class of the new ghost agent.
    """
    self._ghost_agent = agent_class(self.env.action_space, env=self.env)

step(action)

Takes a step in the environment (agent moves, then ghost moves automatically).

Parameters:

Name Type Description Default
action int

Agent action (0=left, 1=down, 2=right, 3=up).

required

Returns:

Name Type Description
tuple Tuple[Dict[str, Any], float, bool, Dict[str, Any]]

(observation, reward, done, info)

Source code in src/custom_grid_env/interface.py
def step(self, action: int) -> Tuple[Dict[str, Any], float, bool, Dict[str, Any]]:
    """Takes a step in the environment (agent moves, then ghost moves automatically).

    Args:
        action (int): Agent action (0=left, 1=down, 2=right, 3=up).

    Returns:
        tuple: (observation, reward, done, info)
    """
    if self.terminated or self.truncated:
        raise RuntimeError(
            "Episode has ended. Call reset() to start a new episode."
        )

    combined_info = {}
    total_step_reward = 0.0

    # Agent's turn
    logger.debug(f"Agent's turn. action={action}")

    if self.pf:
        self.pf.predict(
            action,
            self.env.slip_probability,
            self.env.slip_type,
            self.env._is_move_valid,
        )

    obs, reward, self.terminated, self.truncated, info = self.env.step(action)
    total_step_reward += reward

    if self.pf:
        self._update_pf(info)

    if self.render_enabled:
        logger.debug("Rendering after agent's turn.")
        self._render_with_pf()
        pygame.time.wait(self.step_delay)

    logger.debug(
        f"env.info after agent's turn and potential render: {self.env.info}"
    )
    combined_info.update(self.env.info)

    if self.terminated:
        self.total_reward += total_step_reward
        self.episode_steps += 1
        self.last_info = combined_info
        return obs, float(total_step_reward), True, combined_info

    # Ghost's turn
    logger.debug("Ghost's turn.")
    ghost_obs = self.env._get_ghost_obs()
    ghost_action = self._ghost_agent.get_action(ghost_obs)

    obs, reward, self.terminated, self.truncated, info = self.env.step(ghost_action)

    if self.render_enabled:
        logger.debug("Rendering after ghost's turn.")
        self._render_with_pf()
        pygame.time.wait(self.step_delay)

    logger.debug(
        f"env.info after ghost's turn and potential render: {self.env.info}"
    )
    combined_info.update(self.env.info)

    if info.get("caught_by_ghost"):
        total_step_reward += reward

    self.total_reward += total_step_reward
    self.episode_steps += 1
    self.last_info = combined_info

    return (
        obs,
        float(total_step_reward),
        self.terminated or self.truncated,
        combined_info,
    )

custom_grid_env.env.CustomGridEnv(render_mode='human', slip_probability=0.2, slip_type='longitudinal', color_sensor_quality=0.8)

Bases: Env

A custom grid environment with an agent and a ghost.

Attributes:

Name Type Description
render_mode str

Current render mode.

slip_probability float

Probability of slipping to a perpendicular direction.

rows int

Number of rows in the grid.

cols int

Number of columns in the grid.

grid ndarray

The grid containing cell information.

agent_pos list

Current position of the agent [row, col].

start_pos list

Starting position of the agent.

ghost_pos list

Current position of the ghost.

ghost_start_pos list

Starting position of the ghost.

step_count int

Current step count in the episode.

current_turn int

Whose turn it is (0 for agent, 1 for ghost).

Initializes the CustomGridEnv.

Parameters:

Name Type Description Default
render_mode str

The mode to render with. Defaults to "human".

'human'
slip_probability float

Chance to move perpendicular to intended direction. Defaults to 0.2.

0.2
slip_type str

Type of slipping ("perpendicular" or "longitudinal"). Defaults to "longitudinal".

'longitudinal'
color_sensor_quality float

Probability of the color sensor measuring the correct color. Defaults to 0.8.

0.8
Source code in src/custom_grid_env/env.py
def __init__(
    self,
    render_mode: str = "human",
    slip_probability: float = 0.2,
    slip_type: str = "longitudinal",
    color_sensor_quality: float = 0.8,
):
    """Initializes the CustomGridEnv.

    Args:
        render_mode (str): The mode to render with. Defaults to "human".
        slip_probability (float): Chance to move perpendicular to intended direction. Defaults to 0.2.
        slip_type (str): Type of slipping ("perpendicular" or "longitudinal").
            Defaults to "longitudinal".
        color_sensor_quality (float): Probability of the color sensor measuring the correct color.
            Defaults to 0.8.
    """
    super().__init__()
    self.render_mode = render_mode
    self.slip_probability = slip_probability
    self.slip_type = slip_type
    self.color_sensor_quality = color_sensor_quality
    self.rows = 4
    self.cols = 5
    self.observation_space = gym.spaces.Dict(
        {
            "current_cell": gym.spaces.Dict(
                {
                    "colour": gym.spaces.Discrete(3),  # 0=none, 1=red, 2=green
                    "has_item": gym.spaces.MultiBinary(3),  # [dog, flower, notes]
                    "is_goal": gym.spaces.Discrete(2),
                    "text": gym.spaces.Text(max_length=10),
                }
            ),
            "neighbors": gym.spaces.Dict(
                {
                    "up": gym.spaces.Dict(
                        {
                            "accessible": gym.spaces.Discrete(2),
                            "colour": gym.spaces.Discrete(3),
                        }
                    ),
                    "right": gym.spaces.Dict(
                        {
                            "accessible": gym.spaces.Discrete(2),
                            "colour": gym.spaces.Discrete(3),
                        }
                    ),
                    "down": gym.spaces.Dict(
                        {
                            "accessible": gym.spaces.Discrete(2),
                            "colour": gym.spaces.Discrete(3),
                        }
                    ),
                    "left": gym.spaces.Dict(
                        {
                            "accessible": gym.spaces.Discrete(2),
                            "colour": gym.spaces.Discrete(3),
                        }
                    ),
                }
            ),
            "ghost_relative_pos": gym.spaces.Box(
                low=-4, high=4, shape=(2,), dtype=np.int32
            ),
            "ghost_distance": gym.spaces.Discrete(21),  # Max distance in 4x5 grid
        }
    )

    self.action_space = gym.spaces.Discrete(4)  # 0: left, 1: down, 2: right, 3: up

    self.grid = np.empty((self.rows, self.cols), dtype=object)
    self._setup_grid()
    self._setup_walls()
    self.agent_pos = [0, 2]
    self.start_pos = [0, 2]
    self.ghost_pos = [3, 4]
    self.ghost_start_pos = [3, 4]
    self.step_count = 0
    self.current_turn = 0
    self.info = {}

    # Rendering setup
    self.renderer = None
    if self.render_mode in self.metadata["render_modes"]:
        self.renderer = PygameRenderer(
            rows=self.rows,
            cols=self.cols,
            render_mode=self.render_mode,
            render_fps=self.metadata["render_fps"],
        )

calculate_reward(caught_by_ghost=False)

Calculates reward based on current game state.

Parameters:

Name Type Description Default
caught_by_ghost bool

Whether the ghost caught the agent. Defaults to False.

False

Returns:

Name Type Description
tuple Tuple[float, bool, Dict[str, Any]]

(reward, terminated, info_dict)

Source code in src/custom_grid_env/env.py
def calculate_reward(
    self, caught_by_ghost: bool = False
) -> Tuple[float, bool, Dict[str, Any]]:
    """Calculates reward based on current game state.

    Args:
        caught_by_ghost (bool): Whether the ghost caught the agent. Defaults to False.

    Returns:
        tuple: (reward, terminated, info_dict)
    """
    reward_structure = self.get_reward_structure()
    reward = reward_structure["step_penalty"]
    terminated = False

    if caught_by_ghost:
        return (
            float(reward_structure["caught_by_ghost"]),
            True,
            {"caught_by_ghost": True},
        )

    current_cell = self.grid[self.agent_pos[0], self.agent_pos[1]]

    if current_cell["is_goal"]:
        return float(reward_structure["reached_goal"]), True, {"reached_goal": True}

    return float(reward), terminated, {}

close()

Cleans up resources.

Source code in src/custom_grid_env/env.py
def close(self):
    """Cleans up resources."""
    if self.renderer:
        self.renderer.close()

get_current_turn()

Returns whose turn it is.

Returns:

Name Type Description
str str

'agent' or 'ghost'.

Source code in src/custom_grid_env/env.py
def get_current_turn(self) -> str:
    """Returns whose turn it is.

    Returns:
        str: 'agent' or 'ghost'.
    """
    return "agent" if self.current_turn == 0 else "ghost"

get_reward_structure()

Gets the reward structure for this environment.

Returns:

Name Type Description
dict Dict[str, Any]

Dictionary describing all rewards and their values.

Source code in src/custom_grid_env/env.py
def get_reward_structure(self) -> Dict[str, Any]:
    """Gets the reward structure for this environment.

    Returns:
        dict: Dictionary describing all rewards and their values.
    """
    return {
        "step_penalty": -1,
        "caught_by_ghost": -50,
        "reached_goal": 100,
        "slip_probability": self.slip_probability,
        "terminal_states": ["caught_by_ghost", "reached_goal"],
    }

move_ghost(ghost_action)

Moves the ghost with an externally provided action.

Parameters:

Name Type Description Default
ghost_action int

Action for ghost (0=left, 1=down, 2=right, 3=up).

required
Source code in src/custom_grid_env/env.py
def move_ghost(self, ghost_action: int):
    """Moves the ghost with an externally provided action.

    Args:
        ghost_action (int): Action for ghost (0=left, 1=down, 2=right, 3=up).
    """
    self.ghost_pos = self._move_entity(self.ghost_pos, ghost_action)

render()

Renders the environment.

Returns:

Type Description
Optional[ndarray]

np.ndarray, optional: RGB array if render_mode is "rgb_array".

Source code in src/custom_grid_env/env.py
def render(self) -> Optional[np.ndarray]:
    """Renders the environment.

    Returns:
        np.ndarray, optional: RGB array if render_mode is "rgb_array".
    """
    if self.renderer:
        logger.debug(f"Calling renderer.render with info: {self.info}")
        return self.renderer.render(
            agent_pos=self.agent_pos,
            ghost_pos=self.ghost_pos,
            grid=self.grid,
            walls_horizontal=self.walls_horizontal,
            walls_vertical=self.walls_vertical,
            step_count=self.step_count,
            current_turn=self.current_turn,
            info=self.info,
        )
    return None

reset(seed=None, options=None)

Resets the environment.

Parameters:

Name Type Description Default
seed int

Random seed.

None
options dict

Additional options.

None

Returns:

Name Type Description
tuple Tuple[Dict[str, Any], Dict[str, Any]]

(observation, info)

Source code in src/custom_grid_env/env.py
def reset(
    self, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    """Resets the environment.

    Args:
        seed (int, optional): Random seed.
        options (dict, optional): Additional options.

    Returns:
        tuple: (observation, info)
    """
    super().reset(seed=seed)
    self.agent_pos = list(self.start_pos)
    self.ghost_pos = list(self.ghost_start_pos)
    self.step_count = 0
    self.current_turn = 0
    self.info = {
        "current_turn": "agent",
        "color_measurement": self._get_color_sensor_measurement(self.agent_pos),
        "ghost_distance": self._calculate_shortest_path_distance(
            self.agent_pos, self.ghost_pos
        ),
    }
    return self._get_obs(), self.info

step(action)

Executes one step in the environment.

Parameters:

Name Type Description Default
action int

Action for current entity.

required

Returns:

Name Type Description
tuple Tuple[Dict[str, Any], float, bool, bool, Dict[str, Any]]

(observation, reward, terminated, truncated, info)

Source code in src/custom_grid_env/env.py
def step(
    self, action: int
) -> Tuple[Dict[str, Any], float, bool, bool, Dict[str, Any]]:
    """Executes one step in the environment.

    Args:
        action (int): Action for current entity.

    Returns:
        tuple: (observation, reward, terminated, truncated, info)
    """
    logger.debug(f"step(action={action}) called. current_turn={self.current_turn}")

    # Preserve certain keys across info.clear() if needed
    # Actually, if the user calls render() manually, cnn_prediction might be in self.info.
    # But step() usually clears it for the new step.
    info = self.info
    logger.debug(f"info before clear: {info}")

    preserved_info = {}
    keys_to_preserve = [
        "cnn_prediction",
        "cnn_probs",
        "estimated_pos",
        "color_measurement",
        "intended_action",
        "actual_action",
        "slipped",
        "particles",
    ]
    for key in keys_to_preserve:
        if key in info:
            preserved_info[key] = info[key]

    info.clear()
    info.update(preserved_info)

    reward = 0.0
    terminated = False
    action_names = {0: "left", 1: "down", 2: "right", 3: "up"}

    if self.current_turn == 0:
        self.step_count += 1
        actual_actions, slipped = self._apply_slip(action)
        for act in actual_actions:
            self.agent_pos = self._move_entity(self.agent_pos, act)

        info["slipped"] = slipped
        info["intended_action"] = action_names[action]
        info["actual_action"] = (
            ", ".join([action_names[a] for a in actual_actions])
            if actual_actions
            else "stay"
        )
        info["color_measurement"] = self._get_color_sensor_measurement(
            self.agent_pos
        )

        current_cell = self.grid[self.agent_pos[0], self.agent_pos[1]]
        if self.agent_pos == self.ghost_pos:
            reward = float(self.get_reward_structure()["caught_by_ghost"])
            terminated = True
            info["caught_by_ghost"] = True
        elif current_cell["is_goal"]:
            reward = float(self.get_reward_structure()["reached_goal"])
            terminated = True
            info["reached_goal"] = True
        else:
            reward = float(self.get_reward_structure()["step_penalty"])

        self.current_turn = 1
        info["current_turn"] = "ghost"
        info["mover"] = "agent"
        info["ghost_distance"] = self._calculate_shortest_path_distance(
            self.agent_pos, self.ghost_pos
        )
        self.info = info

    else:
        self.move_ghost(action)
        if self.agent_pos == self.ghost_pos:
            reward = float(self.get_reward_structure()["caught_by_ghost"])
            terminated = True
            info["caught_by_ghost"] = True

        self.current_turn = 0
        info["current_turn"] = "agent"
        info["mover"] = "ghost"
        info["ghost_distance"] = self._calculate_shortest_path_distance(
            self.agent_pos, self.ghost_pos
        )

    self.info = info
    return self._get_obs(), float(reward), terminated, False, info