Skip to content

Camera

FrameGrabber

robot_environment.camera.framegrabber.FrameGrabber

Bases: ABC

An abstract class that provides the abstract method get_current_frame() to be implemented by classes inheriting from this one. The FrameGrabber grabs frames from the camera and provides them.

Source code in robot_environment/camera/framegrabber.py
class FrameGrabber(ABC):
    """
    An abstract class that provides the abstract method get_current_frame() to be implemented by classes
    inheriting from this one. The FrameGrabber grabs frames from the camera and provides them.
    """

    # *** CONSTRUCTORS ***
    def __init__(self, environment: "Environment", verbose: bool = False):
        """
        Initialize the FrameGrabber.

        Args:
            environment: Environment object this FrameGrabber is installed in.
            verbose: Enable verbose logging.
        """
        self._current_frame = None
        self._environment = environment
        self._verbose = verbose

    # *** PUBLIC GET methods ***

    def get_current_frame_shape(self) -> tuple[int, ...]:
        """
        Return the shape of the current frame.

        Returns:
            tuple[int, ...]: Shape of the frame.
        """
        return self._current_frame.shape

    def get_current_frame_width_height(self) -> tuple[int, int]:
        """
        Returns width and height of current frame in pixels.

        Returns:
            width and height of current frame in pixels.
        """
        return self._current_frame.shape[0], self._current_frame.shape[1]

    # *** PUBLIC methods ***

    # *** PUBLIC STATIC/CLASS GET methods ***

    @abstractmethod
    def get_current_frame(self) -> np.ndarray:
        """
        Captures an image of the robot's workspace, ensuring proper undistortion in RGB.

        Returns:
            numpy.ndarray: Raw image captured from the robot's camera.
        """
        return self._current_frame

    # *** PRIVATE methods ***

    # *** PRIVATE STATIC/CLASS methods ***

    # *** PUBLIC properties ***

    def current_frame(self) -> np.ndarray:
        """
        Returns current frame.

        Returns:
            numpy.ndarray: Current frame.
        """
        return self._current_frame

    def environment(self) -> "Environment":
        """
        Returns the environment object.

        Returns:
            Environment: The environment instance.
        """
        return self._environment

    def verbose(self) -> bool:
        """
        Returns the verbosity status.

        Returns:
            bool: True if verbose is on, else False.
        """
        return self._verbose

    # *** PRIVATE variables ***

    # current frame.
    _current_frame = None

    _environment = None

    _verbose = False

Functions

__init__(environment, verbose=False)

Initialize the FrameGrabber.

Parameters:

Name Type Description Default
environment Environment

Environment object this FrameGrabber is installed in.

required
verbose bool

Enable verbose logging.

False
Source code in robot_environment/camera/framegrabber.py
def __init__(self, environment: "Environment", verbose: bool = False):
    """
    Initialize the FrameGrabber.

    Args:
        environment: Environment object this FrameGrabber is installed in.
        verbose: Enable verbose logging.
    """
    self._current_frame = None
    self._environment = environment
    self._verbose = verbose

current_frame()

Returns current frame.

Returns:

Type Description
ndarray

numpy.ndarray: Current frame.

Source code in robot_environment/camera/framegrabber.py
def current_frame(self) -> np.ndarray:
    """
    Returns current frame.

    Returns:
        numpy.ndarray: Current frame.
    """
    return self._current_frame

environment()

Returns the environment object.

Returns:

Name Type Description
Environment Environment

The environment instance.

Source code in robot_environment/camera/framegrabber.py
def environment(self) -> "Environment":
    """
    Returns the environment object.

    Returns:
        Environment: The environment instance.
    """
    return self._environment

get_current_frame() abstractmethod

Captures an image of the robot's workspace, ensuring proper undistortion in RGB.

Returns:

Type Description
ndarray

numpy.ndarray: Raw image captured from the robot's camera.

Source code in robot_environment/camera/framegrabber.py
@abstractmethod
def get_current_frame(self) -> np.ndarray:
    """
    Captures an image of the robot's workspace, ensuring proper undistortion in RGB.

    Returns:
        numpy.ndarray: Raw image captured from the robot's camera.
    """
    return self._current_frame

get_current_frame_shape()

Return the shape of the current frame.

Returns:

Type Description
tuple[int, ...]

tuple[int, ...]: Shape of the frame.

Source code in robot_environment/camera/framegrabber.py
def get_current_frame_shape(self) -> tuple[int, ...]:
    """
    Return the shape of the current frame.

    Returns:
        tuple[int, ...]: Shape of the frame.
    """
    return self._current_frame.shape

get_current_frame_width_height()

Returns width and height of current frame in pixels.

Returns:

Type Description
tuple[int, int]

width and height of current frame in pixels.

Source code in robot_environment/camera/framegrabber.py
def get_current_frame_width_height(self) -> tuple[int, int]:
    """
    Returns width and height of current frame in pixels.

    Returns:
        width and height of current frame in pixels.
    """
    return self._current_frame.shape[0], self._current_frame.shape[1]

verbose()

Returns the verbosity status.

Returns:

Name Type Description
bool bool

True if verbose is on, else False.

Source code in robot_environment/camera/framegrabber.py
def verbose(self) -> bool:
    """
    Returns the verbosity status.

    Returns:
        bool: True if verbose is on, else False.
    """
    return self._verbose

NiryoFrameGrabber

robot_environment.camera.niryo_framegrabber.NiryoFrameGrabber

Bases: FrameGrabber

A class implementing a framegrabber for Niryo Ned2 robot arm

Source code in robot_environment/camera/niryo_framegrabber.py
class NiryoFrameGrabber(FrameGrabber):
    """
    A class implementing a framegrabber for Niryo Ned2 robot arm
    """

    # *** CONSTRUCTORS ***
    @log_start_end_cls()
    def __init__(self, environment: "Environment", stream_name: str = "robot_camera", verbose: bool = False):
        """
        Initialize the Niryo framegrabber.

        Args:
            environment: Environment object this FrameGrabber is installed in.
            stream_name: Name of the Redis stream for image publishing.
            verbose: Enable verbose logging.

        Raises:
            TypeError: If the robot controller is not an instance of NiryoRobotController.
        """
        super().__init__(environment, verbose)

        self._logger = get_package_logger(__name__, verbose)

        robot = environment.get_robot_controller()

        if not isinstance(robot, NiryoRobotController):
            raise TypeError("robot must be an instance of NiryoRobotController.")

        self._robot = robot

        self._mtx, self._dist = self._robot.get_camera_intrinsics()

        self.streamer = RedisImageStreamer(stream_name=stream_name)
        self.frame_counter = 0

    # *** PUBLIC GET methods ***

    # *** PUBLIC methods ***

    # @log_start_end_cls()
    def get_current_frame(self) -> np.ndarray:
        """
        Captures an image of the robot's workspace, ensuring proper undistortion in BGR.

        Returns:
            numpy.ndarray: Raw image captured from the robot's camera.
        """
        try:
            img_compressed = self._robot.get_img_compressed()
        except UnicodeDecodeError as e:
            self._logger.error(f"Error getting compressed image: {e}", exc_info=True)
            return self._current_frame

        img_raw = uncompress_image(img_compressed)
        img = undistort_image(img_raw, self._mtx, self._dist)

        img_work = extract_img_workspace(img, workspace_ratio=1)

        if img_work is not None:
            gripper_pose = self._robot.get_pose()

            # TODO: try to get transformation between camera and gripper
            camera_pose = gripper_pose

            self._logger.debug(f"camera_pose: {camera_pose}")

            current_frame = img_work
            myworkspace = self._environment.get_visible_workspace(camera_pose)

            if myworkspace is not None:
                myworkspace.set_img_shape(img_work.shape)
                workspace_id = myworkspace.id()
            else:
                self._logger.debug(f"No visible workspace: {myworkspace}")
                workspace_id = "unknown"
        else:
            current_frame = img
            workspace_id = "none"

        self._current_frame = current_frame

        # TODO: benötige ich die Methode? für was? falls ja, wo muss die aufgerufen werden? funktioniert hat die noch
        #  nicht. sollte in Methode get_visible_workspace() auf den Mittelpunkt des workspaces angewandt werden
        # is_visible = self.is_point_visible(np.array([0.24, 0.01, 0.001]))
        # print(is_visible)

        # cv2.imshow("Camera View", self._current_frame)
        # Break the loop if ESC key is pressed
        # cv2.waitKey(0)

        self.publish_workspace_image(self._current_frame, workspace_id)

        return self._current_frame

    def publish_workspace_image(
        self, image: np.ndarray, workspace_id: str, robot_pose: Optional[Dict[str, float]] = None
    ) -> str:
        """
        Publish workspace image with robot context via Redis.

        Args:
            image: The image to publish.
            workspace_id: ID of the workspace shown in the image.
            robot_pose: Optional robot pose metadata.

        Returns:
            str: The Redis stream ID of the published image.
        """
        metadata = {
            "workspace_id": workspace_id,
            "frame_id": self.frame_counter,
            "robot_pose": robot_pose or {},
            "image_source": "robot_mounted_camera",
        }

        stream_id = self.streamer.publish_image(image, metadata=metadata, compress_jpeg=True, quality=85)

        self.frame_counter += 1
        return stream_id

    def is_point_visible(self, world_point: np.array, camera_to_gripper_transform=np.eye(4)) -> bool:
        """
        Determines whether a given world point is visible in the camera's field of view.

        Args:
            world_point (np.array): A 3D point in world coordinates as a NumPy array of shape (3,).
            camera_to_gripper_transform (np.array): A 4x4 transformation matrix that defines
                the relationship between the camera and the gripper frame. Defaults to the identity matrix.

        Returns:
            bool: True if the point is visible in the camera's view, False otherwise.

        Explanation:
            1. Transforms the `world_point` from world coordinates to the gripper frame using the
               gripper's pose transformation matrix.
            2. Transforms the point to the camera frame using the provided camera-to-gripper transformation.
            3. Projects the point onto the image plane using the camera's intrinsic matrix.
            4. Checks if the projected point lies within the camera's image bounds (640x480 pixels).
            5. Optionally undistorts the projected point to account for lens distortion.

        Notes:
            - This method assumes the camera intrinsic matrix (`self._mtx`) and distortion coefficients (`self._dist`)
              are precomputed and valid for a resolution of 640x480 pixels.
            - Points behind the camera (with z <= 0 in the camera frame) are not visible.
        """
        # print(self._mtx, self._dist)
        # Unpack camera intrinsics
        K = np.array(self._mtx)
        distortion = np.array(self._dist)

        # img_width, img_height = super().get_current_frame_width_height()
        # ich muss hier mit 640 x 480 pixel arbeiten, da camera intrinsics nur dafür gilt: uo, vo
        img_width, img_height = 640, 480

        gripper_pose = self._robot.get_pose()
        gripper_pose = PoseObjectPNP.convert_niryo_pose_object2pose_object(gripper_pose)

        # Transform the world point to the gripper frame
        world_to_gripper_transform = gripper_pose.to_transformation_matrix()
        # Convert `world_point` to homogeneous coordinates by appending a 1
        world_point_homogeneous = np.append(world_point, 1)

        # print(world_to_gripper_transform)
        # print(gripper_pose)

        # Transform `world_point` to the gripper frame
        # gripper_frame_point = np.linalg.inv(world_to_gripper_transform) @ world_point_homogeneous
        gripper_frame_point = np.dot(world_to_gripper_transform, world_point_homogeneous)

        # Transform the point to the camera frame
        camera_point = np.dot(camera_to_gripper_transform, gripper_frame_point)

        # print(camera_point)

        # Check if the point is in front of the camera
        if camera_point[2] <= 0:
            return False

        # Project the point to the image plane
        pixel_coords = np.dot(K, camera_point[:3] / camera_point[2])

        # Normalize homogeneous coordinates
        u, v = pixel_coords[:2] / pixel_coords[2]

        # print(u, v)

        # Check if the point is within the image bounds
        if 0 <= u < img_width and 0 <= v < img_height:
            # Optional: Account for lens distortion
            undistorted_points = cv2.undistortPoints(np.array([[u, v]], dtype=np.float32), K, distortion)
            u, v = undistorted_points[0][0]
            return 0 <= u < img_width and 0 <= v < img_height

        return False

    # *** PUBLIC STATIC/CLASS GET methods ***

    # *** PRIVATE methods ***

    # *** PRIVATE STATIC/CLASS methods ***

    # *** PUBLIC properties ***

    def camera_matrix(self) -> np.ndarray:
        """
        Returns the camera intrinsic matrix.

        Returns:
            np.ndarray: 3x3 intrinsic matrix.
        """
        return self._mtx

    def camera_dist_coeff(self) -> np.ndarray:
        """
        Returns the camera distortion coefficients.

        Returns:
            np.ndarray: Distortion coefficients.
        """
        return self._dist

    # *** PRIVATE variables ***

    # NiryoRobotController object
    _robot = None

    # camera intrinsic transformation matrix
    _mtx = None

    # camera distortion coefficients
    _dist = None
    _logger = None

Functions

__init__(environment, stream_name='robot_camera', verbose=False)

Initialize the Niryo framegrabber.

Parameters:

Name Type Description Default
environment Environment

Environment object this FrameGrabber is installed in.

required
stream_name str

Name of the Redis stream for image publishing.

'robot_camera'
verbose bool

Enable verbose logging.

False

Raises:

Type Description
TypeError

If the robot controller is not an instance of NiryoRobotController.

Source code in robot_environment/camera/niryo_framegrabber.py
@log_start_end_cls()
def __init__(self, environment: "Environment", stream_name: str = "robot_camera", verbose: bool = False):
    """
    Initialize the Niryo framegrabber.

    Args:
        environment: Environment object this FrameGrabber is installed in.
        stream_name: Name of the Redis stream for image publishing.
        verbose: Enable verbose logging.

    Raises:
        TypeError: If the robot controller is not an instance of NiryoRobotController.
    """
    super().__init__(environment, verbose)

    self._logger = get_package_logger(__name__, verbose)

    robot = environment.get_robot_controller()

    if not isinstance(robot, NiryoRobotController):
        raise TypeError("robot must be an instance of NiryoRobotController.")

    self._robot = robot

    self._mtx, self._dist = self._robot.get_camera_intrinsics()

    self.streamer = RedisImageStreamer(stream_name=stream_name)
    self.frame_counter = 0

camera_dist_coeff()

Returns the camera distortion coefficients.

Returns:

Type Description
ndarray

np.ndarray: Distortion coefficients.

Source code in robot_environment/camera/niryo_framegrabber.py
def camera_dist_coeff(self) -> np.ndarray:
    """
    Returns the camera distortion coefficients.

    Returns:
        np.ndarray: Distortion coefficients.
    """
    return self._dist

camera_matrix()

Returns the camera intrinsic matrix.

Returns:

Type Description
ndarray

np.ndarray: 3x3 intrinsic matrix.

Source code in robot_environment/camera/niryo_framegrabber.py
def camera_matrix(self) -> np.ndarray:
    """
    Returns the camera intrinsic matrix.

    Returns:
        np.ndarray: 3x3 intrinsic matrix.
    """
    return self._mtx

get_current_frame()

Captures an image of the robot's workspace, ensuring proper undistortion in BGR.

Returns:

Type Description
ndarray

numpy.ndarray: Raw image captured from the robot's camera.

Source code in robot_environment/camera/niryo_framegrabber.py
def get_current_frame(self) -> np.ndarray:
    """
    Captures an image of the robot's workspace, ensuring proper undistortion in BGR.

    Returns:
        numpy.ndarray: Raw image captured from the robot's camera.
    """
    try:
        img_compressed = self._robot.get_img_compressed()
    except UnicodeDecodeError as e:
        self._logger.error(f"Error getting compressed image: {e}", exc_info=True)
        return self._current_frame

    img_raw = uncompress_image(img_compressed)
    img = undistort_image(img_raw, self._mtx, self._dist)

    img_work = extract_img_workspace(img, workspace_ratio=1)

    if img_work is not None:
        gripper_pose = self._robot.get_pose()

        # TODO: try to get transformation between camera and gripper
        camera_pose = gripper_pose

        self._logger.debug(f"camera_pose: {camera_pose}")

        current_frame = img_work
        myworkspace = self._environment.get_visible_workspace(camera_pose)

        if myworkspace is not None:
            myworkspace.set_img_shape(img_work.shape)
            workspace_id = myworkspace.id()
        else:
            self._logger.debug(f"No visible workspace: {myworkspace}")
            workspace_id = "unknown"
    else:
        current_frame = img
        workspace_id = "none"

    self._current_frame = current_frame

    # TODO: benötige ich die Methode? für was? falls ja, wo muss die aufgerufen werden? funktioniert hat die noch
    #  nicht. sollte in Methode get_visible_workspace() auf den Mittelpunkt des workspaces angewandt werden
    # is_visible = self.is_point_visible(np.array([0.24, 0.01, 0.001]))
    # print(is_visible)

    # cv2.imshow("Camera View", self._current_frame)
    # Break the loop if ESC key is pressed
    # cv2.waitKey(0)

    self.publish_workspace_image(self._current_frame, workspace_id)

    return self._current_frame

is_point_visible(world_point, camera_to_gripper_transform=np.eye(4))

Determines whether a given world point is visible in the camera's field of view.

Parameters:

Name Type Description Default
world_point array

A 3D point in world coordinates as a NumPy array of shape (3,).

required
camera_to_gripper_transform array

A 4x4 transformation matrix that defines the relationship between the camera and the gripper frame. Defaults to the identity matrix.

eye(4)

Returns:

Name Type Description
bool bool

True if the point is visible in the camera's view, False otherwise.

Explanation
  1. Transforms the world_point from world coordinates to the gripper frame using the gripper's pose transformation matrix.
  2. Transforms the point to the camera frame using the provided camera-to-gripper transformation.
  3. Projects the point onto the image plane using the camera's intrinsic matrix.
  4. Checks if the projected point lies within the camera's image bounds (640x480 pixels).
  5. Optionally undistorts the projected point to account for lens distortion.
Notes
  • This method assumes the camera intrinsic matrix (self._mtx) and distortion coefficients (self._dist) are precomputed and valid for a resolution of 640x480 pixels.
  • Points behind the camera (with z <= 0 in the camera frame) are not visible.
Source code in robot_environment/camera/niryo_framegrabber.py
def is_point_visible(self, world_point: np.array, camera_to_gripper_transform=np.eye(4)) -> bool:
    """
    Determines whether a given world point is visible in the camera's field of view.

    Args:
        world_point (np.array): A 3D point in world coordinates as a NumPy array of shape (3,).
        camera_to_gripper_transform (np.array): A 4x4 transformation matrix that defines
            the relationship between the camera and the gripper frame. Defaults to the identity matrix.

    Returns:
        bool: True if the point is visible in the camera's view, False otherwise.

    Explanation:
        1. Transforms the `world_point` from world coordinates to the gripper frame using the
           gripper's pose transformation matrix.
        2. Transforms the point to the camera frame using the provided camera-to-gripper transformation.
        3. Projects the point onto the image plane using the camera's intrinsic matrix.
        4. Checks if the projected point lies within the camera's image bounds (640x480 pixels).
        5. Optionally undistorts the projected point to account for lens distortion.

    Notes:
        - This method assumes the camera intrinsic matrix (`self._mtx`) and distortion coefficients (`self._dist`)
          are precomputed and valid for a resolution of 640x480 pixels.
        - Points behind the camera (with z <= 0 in the camera frame) are not visible.
    """
    # print(self._mtx, self._dist)
    # Unpack camera intrinsics
    K = np.array(self._mtx)
    distortion = np.array(self._dist)

    # img_width, img_height = super().get_current_frame_width_height()
    # ich muss hier mit 640 x 480 pixel arbeiten, da camera intrinsics nur dafür gilt: uo, vo
    img_width, img_height = 640, 480

    gripper_pose = self._robot.get_pose()
    gripper_pose = PoseObjectPNP.convert_niryo_pose_object2pose_object(gripper_pose)

    # Transform the world point to the gripper frame
    world_to_gripper_transform = gripper_pose.to_transformation_matrix()
    # Convert `world_point` to homogeneous coordinates by appending a 1
    world_point_homogeneous = np.append(world_point, 1)

    # print(world_to_gripper_transform)
    # print(gripper_pose)

    # Transform `world_point` to the gripper frame
    # gripper_frame_point = np.linalg.inv(world_to_gripper_transform) @ world_point_homogeneous
    gripper_frame_point = np.dot(world_to_gripper_transform, world_point_homogeneous)

    # Transform the point to the camera frame
    camera_point = np.dot(camera_to_gripper_transform, gripper_frame_point)

    # print(camera_point)

    # Check if the point is in front of the camera
    if camera_point[2] <= 0:
        return False

    # Project the point to the image plane
    pixel_coords = np.dot(K, camera_point[:3] / camera_point[2])

    # Normalize homogeneous coordinates
    u, v = pixel_coords[:2] / pixel_coords[2]

    # print(u, v)

    # Check if the point is within the image bounds
    if 0 <= u < img_width and 0 <= v < img_height:
        # Optional: Account for lens distortion
        undistorted_points = cv2.undistortPoints(np.array([[u, v]], dtype=np.float32), K, distortion)
        u, v = undistorted_points[0][0]
        return 0 <= u < img_width and 0 <= v < img_height

    return False

publish_workspace_image(image, workspace_id, robot_pose=None)

Publish workspace image with robot context via Redis.

Parameters:

Name Type Description Default
image ndarray

The image to publish.

required
workspace_id str

ID of the workspace shown in the image.

required
robot_pose Optional[Dict[str, float]]

Optional robot pose metadata.

None

Returns:

Name Type Description
str str

The Redis stream ID of the published image.

Source code in robot_environment/camera/niryo_framegrabber.py
def publish_workspace_image(
    self, image: np.ndarray, workspace_id: str, robot_pose: Optional[Dict[str, float]] = None
) -> str:
    """
    Publish workspace image with robot context via Redis.

    Args:
        image: The image to publish.
        workspace_id: ID of the workspace shown in the image.
        robot_pose: Optional robot pose metadata.

    Returns:
        str: The Redis stream ID of the published image.
    """
    metadata = {
        "workspace_id": workspace_id,
        "frame_id": self.frame_counter,
        "robot_pose": robot_pose or {},
        "image_source": "robot_mounted_camera",
    }

    stream_id = self.streamer.publish_image(image, metadata=metadata, compress_jpeg=True, quality=85)

    self.frame_counter += 1
    return stream_id

WidowXFrameGrabber

robot_environment.camera.widowx_framegrabber.WidowXFrameGrabber

Bases: FrameGrabber

A class implementing a framegrabber for WidowX robot arm - here Intel RealSense camera as third person camera

Source code in robot_environment/camera/widowx_framegrabber.py
class WidowXFrameGrabber(FrameGrabber):
    """
    A class implementing a framegrabber for WidowX robot arm - here Intel RealSense camera as third person camera
    """

    # *** CONSTRUCTORS ***
    def __init__(self, environment: "Environment", verbose: bool = False):
        """
        Initialize the WidowX framegrabber.

        Args:
            environment: Environment object this FrameGrabber is installed in.
            verbose: Enable verbose logging.
        """
        super().__init__(environment, verbose)

    # *** PUBLIC GET methods ***

    # *** PUBLIC methods ***

    def get_current_frame(self) -> np.ndarray:
        """
        Captures an image of the robot's workspace, ensuring proper undistortion in BGR.

        Args:

        Returns:
            numpy.ndarray: Raw image captured from the robot's camera.
        """
        # TODO: capture a frame from the camera and set _current_frame

        return self._current_frame

Functions

__init__(environment, verbose=False)

Initialize the WidowX framegrabber.

Parameters:

Name Type Description Default
environment Environment

Environment object this FrameGrabber is installed in.

required
verbose bool

Enable verbose logging.

False
Source code in robot_environment/camera/widowx_framegrabber.py
def __init__(self, environment: "Environment", verbose: bool = False):
    """
    Initialize the WidowX framegrabber.

    Args:
        environment: Environment object this FrameGrabber is installed in.
        verbose: Enable verbose logging.
    """
    super().__init__(environment, verbose)

get_current_frame()

Captures an image of the robot's workspace, ensuring proper undistortion in BGR.

Args:

Returns:

Type Description
ndarray

numpy.ndarray: Raw image captured from the robot's camera.

Source code in robot_environment/camera/widowx_framegrabber.py
def get_current_frame(self) -> np.ndarray:
    """
    Captures an image of the robot's workspace, ensuring proper undistortion in BGR.

    Args:

    Returns:
        numpy.ndarray: Raw image captured from the robot's camera.
    """
    # TODO: capture a frame from the camera and set _current_frame

    return self._current_frame