Zum Inhalt

RedisImageStreamer

redis_robot_comm.redis_image_streamer.RedisImageStreamer

Ein Redis-basierter Stream, der OpenCV-Bilder beliebiger Größe veröffentlichen und empfangen kann.

A Redis-backed stream that can publish and consume OpenCV images of arbitrary size.

Die Klasse serialisiert ein Bild (entweder als Rohdaten oder JPEG) und speichert es in einem Redis-Stream. Jeder Eintrag enthält Metadaten wie die Bildform, den Datentyp und optionale benutzerdefinierte Felder.

The class serializes an image (either as raw bytes or JPEG) and stores it in a Redis stream. Each entry contains metadata such as the image shape, data type, and optional custom fields.

Source code in redis_robot_comm/redis_image_streamer.py
class RedisImageStreamer:
    """
    Ein Redis-basierter Stream, der OpenCV-Bilder beliebiger Größe veröffentlichen und empfangen kann.

    A Redis-backed stream that can publish and consume OpenCV images of arbitrary size.

    Die Klasse serialisiert ein Bild (entweder als Rohdaten oder JPEG) und speichert es in einem Redis-Stream.
    Jeder Eintrag enthält Metadaten wie die Bildform, den Datentyp und optionale benutzerdefinierte Felder.

    The class serializes an image (either as raw bytes or JPEG) and stores it in a Redis stream.
    Each entry contains metadata such as the image shape, data type, and optional custom fields.
    """

    def __init__(
        self,
        host: Optional[str] = None,
        port: Optional[int] = None,
        stream_name: str = "robot_camera",
        config: Optional[RedisConfig] = None,
        stream_config: Optional[ImageStreamConfig] = None,
    ) -> None:
        """
        Initialisiert den Redis Image Streamer.

        Initialize the Redis image streamer.

        Args:
            host (Optional[str]): Hostname oder IP-Adresse des Redis-Servers. (Redis server hostname or IP address).
            port (Optional[int]): Port des Redis-Servers. (Redis server port).
            stream_name (str): Name des Streams, der die Bild-Frames enthalten wird. (Name of the stream that will hold the image frames).
            config (Optional[RedisConfig]): Optionale RedisConfig-Instanz. (Optional RedisConfig instance).
            stream_config (Optional[ImageStreamConfig]): Optionale ImageStreamConfig-Instanz. (Optional ImageStreamConfig instance).

        Raises:
            RedisConnectionError: Wenn die Verbindung zu Redis fehlschlägt. (If connection to Redis fails).
        """
        if config is None:
            config = get_redis_config()

        # Override config with explicit parameters if provided
        host = host or config.host
        port = port or config.port

        self.stream_config = stream_config or ImageStreamConfig()
        validate_stream_name(stream_name)
        self.stream_name: str = stream_name
        self.verbose: bool = False
        try:
            self.client = redis.Redis(
                host=host,
                port=port,
                db=config.db,
                password=config.password,
                socket_timeout=config.socket_timeout,
                socket_connect_timeout=config.socket_connect_timeout,
                retry_on_timeout=config.retry_on_timeout,
                max_connections=config.max_connections,
                decode_responses=True,
            )
            self.client.ping()
        except RedisError as e:
            raise RedisConnectionError(f"Failed to connect to Redis: {e}") from e

    @retry_on_connection_error(max_attempts=3, delay=0.5)
    def publish_image(
        self,
        image: ImageArray,
        metadata: Optional[ImageMetadata] = None,
        compress_jpeg: bool = True,
        quality: Optional[int] = None,
        maxlen: Optional[int] = None,
    ) -> StreamID:
        """
        Veröffentlicht einen einzelnen Bild-Frame im Redis-Stream.

        Publish a single image frame to the Redis stream.

        Args:
            image (ImageArray): OpenCV-Bild-Array (HxWxC oder HxW für Graustufen). (OpenCV image array (H×W×C or H×W for grayscale)).
            metadata (Optional[ImageMetadata]): Beliebige Metadaten, die zusammen mit dem Frame gespeichert werden. (Arbitrary metadata stored alongside the frame).
            compress_jpeg (bool): Gibt an, ob das Bild in das JPEG-Format komprimiert werden soll. (Whether to compress the image to JPEG).
            quality (int): JPEG-Kompressionsqualität (1-100). (JPEG compression quality (1-100)).
            maxlen (int): Maximale Anzahl von Einträgen, die im Stream gehalten werden sollen. (Maximum number of entries to keep in the stream).

        Returns:
            StreamID: Die eindeutige Redis-Eintrags-ID. (The unique Redis entry ID).

        Raises:
            InvalidImageError: Wenn das bereitgestellte Bild ungültig ist. (If the supplied image is invalid).
            RedisPublishError: Wenn die Veröffentlichung bei Redis fehlschlägt. (If publishing to Redis fails).
        """
        try:
            validate_image(image)
        except InvalidImageError as e:
            logger.error(f"Image validation failed: {e}")
            raise

        # Use default quality if not specified
        if quality is None:
            quality = self.stream_config.default_quality

        # Validate quality
        if not (self.stream_config.min_quality <= quality <= self.stream_config.max_quality):
            raise ValueError(f"Quality must be between {self.stream_config.min_quality} and {self.stream_config.max_quality}")

        # Use default maxlen if not specified
        if maxlen is None:
            maxlen = self.stream_config.max_length

        timestamp = time.time()

        # Handle different image sizes dynamically
        height, width = image.shape[:2]
        channels = image.shape[2] if len(image.shape) == 3 else 1

        if compress_jpeg:
            # Compress to JPEG
            success, buffer = cv2.imencode(".jpg", image, [cv2.IMWRITE_JPEG_QUALITY, quality])
            if not success:
                raise InvalidImageError("Failed to compress image to JPEG")
            image_data = base64.b64encode(buffer).decode("utf-8")
            format_type = "jpeg"
            compressed_size = len(buffer)
        else:
            # Raw image data
            image_data = base64.b64encode(image.tobytes()).decode("utf-8")
            format_type = "raw"
            compressed_size = image.nbytes

        # Prepare message with dynamic image info
        message = {
            "timestamp": str(timestamp),
            "image_data": image_data,
            "format": format_type,
            "width": str(width),
            "height": str(height),
            "channels": str(channels),
            "dtype": str(image.dtype),
            "compressed_size": str(compressed_size),
            "original_size": str(image.nbytes),
        }

        # Add optional metadata
        if metadata:
            message["metadata"] = json.dumps(metadata)

        # Publish to Redis stream
        try:
            stream_id = self.client.xadd(self.stream_name, message, maxlen=maxlen)
            if self.verbose:
                logger.info(f"Published {width}x{height} image ({compressed_size} bytes)")
            return str(stream_id)
        except RedisError as e:
            logger.error(f"Failed to publish image to Redis: {e}")
            raise RedisPublishError(f"Failed to publish image: {e}") from e

    def get_latest_image(self) -> Optional[Tuple[ImageArray, ImageMetadata]]:
        """
        Ruft den neuesten Frame aus dem Stream ab.

        Retrieve the newest frame from the stream.

        Returns:
            Optional[Tuple[ImageArray, ImageMetadata]]: Ein Tupel aus (image_array, metadata_dict), falls ein Frame vorhanden ist, andernfalls None. (A tuple of (image_array, metadata_dict) if a frame is present, otherwise None).

        Raises:
            RedisRetrievalError: Wenn der Abruf von Redis fehlschlägt. (If retrieval from Redis fails).
        """
        try:
            messages = self.client.xrevrange(self.stream_name, count=1)
            if not messages:
                return None

            msg_id, fields = messages[0]
            return self._decode_variable_image(fields)

        except RedisError as e:
            logger.error(f"Error getting latest image from Redis: {e}")
            raise RedisRetrievalError(f"Failed to retrieve latest image: {e}") from e
        except Exception as e:
            logger.error(f"Unexpected error getting latest image: {e}")
            raise RedisRetrievalError(f"Unexpected error during image retrieval: {e}") from e

    def subscribe_variable_images(
        self,
        callback: Callable[[ImageArray, ImageMetadata, Dict[str, Any]], None],
        block_ms: int = 1000,
        start_after: str = "$",
    ) -> None:
        """
        Hört kontinuierlich auf neue Frames und ruft für jeden Frame einen Callback auf.

        Continuously listen for new frames and invoke callback for each one.

        Args:
            callback (Callable[[ImageArray, ImageMetadata, Dict[str, Any]], None]): Funktion, die (image, metadata, image_info) erhält. (Function receiving (image, metadata, image_info)).
            block_ms (int): Zeitüberschreitung für das zugrunde liegende Redis xread. (Timeout for the underlying Redis xread).
            start_after (str): Redis-Stream-ID, nach der mit dem Lesen begonnen werden soll. (Redis stream ID after which to start reading).

        Raises:
            RedisRetrievalError: Wenn das Abonnement fehlschlägt. (If subscription fails).
        """
        last_id = start_after
        if self.verbose:
            logger.info(f"Subscribing to image stream {self.stream_name}...")

        try:
            while True:
                messages = self.client.xread({self.stream_name: last_id}, block=block_ms, count=1)

                for stream, msgs in messages:
                    for msg_id, fields in msgs:
                        last_id = msg_id
                        try:
                            result = self._decode_variable_image(fields)
                            if result:
                                image, metadata = result

                                # Prepare image info for callback
                                image_info = {
                                    "width": image.shape[1],
                                    "height": image.shape[0],
                                    "channels": (image.shape[2] if len(image.shape) == 3 else 1),
                                    "timestamp": float(fields.get("timestamp", "0")),
                                    "compressed_size": int(fields.get("compressed_size", "0")),
                                    "original_size": int(fields.get("original_size", "0")),
                                }

                                callback(image, metadata, image_info)
                        except Exception as e:
                            logger.error(f"Error processing image message {msg_id}: {e}")

        except KeyboardInterrupt:
            logger.info("Stopped subscribing to images")
        except RedisError as e:
            logger.error(f"Redis error in image subscription: {e}")
            raise RedisRetrievalError(f"Image subscription failed: {e}") from e
        except Exception as e:
            logger.error(f"Unexpected error in image subscription: {e}")
            time.sleep(0.1)

    def _decode_variable_image(self, fields: Dict[str, Any]) -> Optional[Tuple[ImageArray, ImageMetadata]]:
        """
        Dekodiert einen Redis-Stream-Eintrag, der ein Bild enthält.

        Decode a Redis stream entry that contains an image.

        Args:
            fields (Dict[str, Any]): Schlüssel/Wert-Paare aus einem Redis-Eintrag. (Key/value pairs from a Redis entry).

        Returns:
            Optional[Tuple[ImageArray, ImageMetadata]]: Ein Tupel aus (image_array, metadata_dict) oder None, falls die Dekodierung fehlschlägt. (A tuple of (image_array, metadata_dict) or None if decoding fails).
        """
        try:
            # Extract image parameters
            width = int(fields["width"])
            height = int(fields["height"])
            channels = int(fields["channels"])
            format_type = fields["format"]

            # Decode image data
            image_data = base64.b64decode(fields["image_data"])

            if format_type == "jpeg":
                # Decode JPEG
                nparr = np.frombuffer(image_data, np.uint8)
                image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
                if image is None:
                    raise RuntimeError("JPEG decoding returned None")
            else:
                # Decode raw image with specified dimensions
                dtype = fields["dtype"]
                shape: Tuple[int, ...]
                if channels == 1:
                    shape = (height, width)
                else:
                    shape = (height, width, channels)
                image = np.frombuffer(image_data, dtype=dtype).reshape(shape)

            # Extract metadata if available
            metadata = {}
            if "metadata" in fields:
                metadata = json.loads(fields["metadata"])

            return image, metadata

        except Exception as e:
            if self.verbose:
                logger.error(f"Error decoding variable image: {e}")
            return None

    def get_stream_stats(self) -> Dict[str, Any]:
        """
        Ruft Buchhaltungsinformationen über den Redis-Stream ab.

        Retrieve bookkeeping information about the Redis stream.

        Returns:
            Dict[str, Any]: Dictionary mit Stream-Statistiken. (Dictionary with stream statistics).

        Raises:
            RedisRetrievalError: Wenn der Abruf der Stream-Statistiken fehlschlägt. (If retrieval of stream statistics fails).
        """
        try:
            info = self.client.xinfo_stream(self.stream_name)
            return {
                "total_messages": info.get("length", 0),
                "first_entry_id": info.get("first-entry", [None])[0],
                "last_entry_id": info.get("last-entry", [None])[0],
            }
        except Exception as e:
            logger.error(f"Error getting stream stats: {e}")
            raise RedisRetrievalError(f"Stream not found or empty: {e}") from e

Functions

__init__(host=None, port=None, stream_name='robot_camera', config=None, stream_config=None)

Initialisiert den Redis Image Streamer.

Initialize the Redis image streamer.

Parameters:

Name Type Description Default
host Optional[str]

Hostname oder IP-Adresse des Redis-Servers. (Redis server hostname or IP address).

None
port Optional[int]

Port des Redis-Servers. (Redis server port).

None
stream_name str

Name des Streams, der die Bild-Frames enthalten wird. (Name of the stream that will hold the image frames).

'robot_camera'
config Optional[RedisConfig]

Optionale RedisConfig-Instanz. (Optional RedisConfig instance).

None
stream_config Optional[ImageStreamConfig]

Optionale ImageStreamConfig-Instanz. (Optional ImageStreamConfig instance).

None

Raises:

Type Description
RedisConnectionError

Wenn die Verbindung zu Redis fehlschlägt. (If connection to Redis fails).

Source code in redis_robot_comm/redis_image_streamer.py
def __init__(
    self,
    host: Optional[str] = None,
    port: Optional[int] = None,
    stream_name: str = "robot_camera",
    config: Optional[RedisConfig] = None,
    stream_config: Optional[ImageStreamConfig] = None,
) -> None:
    """
    Initialisiert den Redis Image Streamer.

    Initialize the Redis image streamer.

    Args:
        host (Optional[str]): Hostname oder IP-Adresse des Redis-Servers. (Redis server hostname or IP address).
        port (Optional[int]): Port des Redis-Servers. (Redis server port).
        stream_name (str): Name des Streams, der die Bild-Frames enthalten wird. (Name of the stream that will hold the image frames).
        config (Optional[RedisConfig]): Optionale RedisConfig-Instanz. (Optional RedisConfig instance).
        stream_config (Optional[ImageStreamConfig]): Optionale ImageStreamConfig-Instanz. (Optional ImageStreamConfig instance).

    Raises:
        RedisConnectionError: Wenn die Verbindung zu Redis fehlschlägt. (If connection to Redis fails).
    """
    if config is None:
        config = get_redis_config()

    # Override config with explicit parameters if provided
    host = host or config.host
    port = port or config.port

    self.stream_config = stream_config or ImageStreamConfig()
    validate_stream_name(stream_name)
    self.stream_name: str = stream_name
    self.verbose: bool = False
    try:
        self.client = redis.Redis(
            host=host,
            port=port,
            db=config.db,
            password=config.password,
            socket_timeout=config.socket_timeout,
            socket_connect_timeout=config.socket_connect_timeout,
            retry_on_timeout=config.retry_on_timeout,
            max_connections=config.max_connections,
            decode_responses=True,
        )
        self.client.ping()
    except RedisError as e:
        raise RedisConnectionError(f"Failed to connect to Redis: {e}") from e

get_latest_image()

Ruft den neuesten Frame aus dem Stream ab.

Retrieve the newest frame from the stream.

Returns:

Type Description
Optional[Tuple[ImageArray, ImageMetadata]]

Optional[Tuple[ImageArray, ImageMetadata]]: Ein Tupel aus (image_array, metadata_dict), falls ein Frame vorhanden ist, andernfalls None. (A tuple of (image_array, metadata_dict) if a frame is present, otherwise None).

Raises:

Type Description
RedisRetrievalError

Wenn der Abruf von Redis fehlschlägt. (If retrieval from Redis fails).

Source code in redis_robot_comm/redis_image_streamer.py
def get_latest_image(self) -> Optional[Tuple[ImageArray, ImageMetadata]]:
    """
    Ruft den neuesten Frame aus dem Stream ab.

    Retrieve the newest frame from the stream.

    Returns:
        Optional[Tuple[ImageArray, ImageMetadata]]: Ein Tupel aus (image_array, metadata_dict), falls ein Frame vorhanden ist, andernfalls None. (A tuple of (image_array, metadata_dict) if a frame is present, otherwise None).

    Raises:
        RedisRetrievalError: Wenn der Abruf von Redis fehlschlägt. (If retrieval from Redis fails).
    """
    try:
        messages = self.client.xrevrange(self.stream_name, count=1)
        if not messages:
            return None

        msg_id, fields = messages[0]
        return self._decode_variable_image(fields)

    except RedisError as e:
        logger.error(f"Error getting latest image from Redis: {e}")
        raise RedisRetrievalError(f"Failed to retrieve latest image: {e}") from e
    except Exception as e:
        logger.error(f"Unexpected error getting latest image: {e}")
        raise RedisRetrievalError(f"Unexpected error during image retrieval: {e}") from e

get_stream_stats()

Ruft Buchhaltungsinformationen über den Redis-Stream ab.

Retrieve bookkeeping information about the Redis stream.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary mit Stream-Statistiken. (Dictionary with stream statistics).

Raises:

Type Description
RedisRetrievalError

Wenn der Abruf der Stream-Statistiken fehlschlägt. (If retrieval of stream statistics fails).

Source code in redis_robot_comm/redis_image_streamer.py
def get_stream_stats(self) -> Dict[str, Any]:
    """
    Ruft Buchhaltungsinformationen über den Redis-Stream ab.

    Retrieve bookkeeping information about the Redis stream.

    Returns:
        Dict[str, Any]: Dictionary mit Stream-Statistiken. (Dictionary with stream statistics).

    Raises:
        RedisRetrievalError: Wenn der Abruf der Stream-Statistiken fehlschlägt. (If retrieval of stream statistics fails).
    """
    try:
        info = self.client.xinfo_stream(self.stream_name)
        return {
            "total_messages": info.get("length", 0),
            "first_entry_id": info.get("first-entry", [None])[0],
            "last_entry_id": info.get("last-entry", [None])[0],
        }
    except Exception as e:
        logger.error(f"Error getting stream stats: {e}")
        raise RedisRetrievalError(f"Stream not found or empty: {e}") from e

publish_image(image, metadata=None, compress_jpeg=True, quality=None, maxlen=None)

Veröffentlicht einen einzelnen Bild-Frame im Redis-Stream.

Publish a single image frame to the Redis stream.

Parameters:

Name Type Description Default
image ImageArray

OpenCV-Bild-Array (HxWxC oder HxW für Graustufen). (OpenCV image array (H×W×C or H×W for grayscale)).

required
metadata Optional[ImageMetadata]

Beliebige Metadaten, die zusammen mit dem Frame gespeichert werden. (Arbitrary metadata stored alongside the frame).

None
compress_jpeg bool

Gibt an, ob das Bild in das JPEG-Format komprimiert werden soll. (Whether to compress the image to JPEG).

True
quality int

JPEG-Kompressionsqualität (1-100). (JPEG compression quality (1-100)).

None
maxlen int

Maximale Anzahl von Einträgen, die im Stream gehalten werden sollen. (Maximum number of entries to keep in the stream).

None

Returns:

Name Type Description
StreamID StreamID

Die eindeutige Redis-Eintrags-ID. (The unique Redis entry ID).

Raises:

Type Description
InvalidImageError

Wenn das bereitgestellte Bild ungültig ist. (If the supplied image is invalid).

RedisPublishError

Wenn die Veröffentlichung bei Redis fehlschlägt. (If publishing to Redis fails).

Source code in redis_robot_comm/redis_image_streamer.py
@retry_on_connection_error(max_attempts=3, delay=0.5)
def publish_image(
    self,
    image: ImageArray,
    metadata: Optional[ImageMetadata] = None,
    compress_jpeg: bool = True,
    quality: Optional[int] = None,
    maxlen: Optional[int] = None,
) -> StreamID:
    """
    Veröffentlicht einen einzelnen Bild-Frame im Redis-Stream.

    Publish a single image frame to the Redis stream.

    Args:
        image (ImageArray): OpenCV-Bild-Array (HxWxC oder HxW für Graustufen). (OpenCV image array (H×W×C or H×W for grayscale)).
        metadata (Optional[ImageMetadata]): Beliebige Metadaten, die zusammen mit dem Frame gespeichert werden. (Arbitrary metadata stored alongside the frame).
        compress_jpeg (bool): Gibt an, ob das Bild in das JPEG-Format komprimiert werden soll. (Whether to compress the image to JPEG).
        quality (int): JPEG-Kompressionsqualität (1-100). (JPEG compression quality (1-100)).
        maxlen (int): Maximale Anzahl von Einträgen, die im Stream gehalten werden sollen. (Maximum number of entries to keep in the stream).

    Returns:
        StreamID: Die eindeutige Redis-Eintrags-ID. (The unique Redis entry ID).

    Raises:
        InvalidImageError: Wenn das bereitgestellte Bild ungültig ist. (If the supplied image is invalid).
        RedisPublishError: Wenn die Veröffentlichung bei Redis fehlschlägt. (If publishing to Redis fails).
    """
    try:
        validate_image(image)
    except InvalidImageError as e:
        logger.error(f"Image validation failed: {e}")
        raise

    # Use default quality if not specified
    if quality is None:
        quality = self.stream_config.default_quality

    # Validate quality
    if not (self.stream_config.min_quality <= quality <= self.stream_config.max_quality):
        raise ValueError(f"Quality must be between {self.stream_config.min_quality} and {self.stream_config.max_quality}")

    # Use default maxlen if not specified
    if maxlen is None:
        maxlen = self.stream_config.max_length

    timestamp = time.time()

    # Handle different image sizes dynamically
    height, width = image.shape[:2]
    channels = image.shape[2] if len(image.shape) == 3 else 1

    if compress_jpeg:
        # Compress to JPEG
        success, buffer = cv2.imencode(".jpg", image, [cv2.IMWRITE_JPEG_QUALITY, quality])
        if not success:
            raise InvalidImageError("Failed to compress image to JPEG")
        image_data = base64.b64encode(buffer).decode("utf-8")
        format_type = "jpeg"
        compressed_size = len(buffer)
    else:
        # Raw image data
        image_data = base64.b64encode(image.tobytes()).decode("utf-8")
        format_type = "raw"
        compressed_size = image.nbytes

    # Prepare message with dynamic image info
    message = {
        "timestamp": str(timestamp),
        "image_data": image_data,
        "format": format_type,
        "width": str(width),
        "height": str(height),
        "channels": str(channels),
        "dtype": str(image.dtype),
        "compressed_size": str(compressed_size),
        "original_size": str(image.nbytes),
    }

    # Add optional metadata
    if metadata:
        message["metadata"] = json.dumps(metadata)

    # Publish to Redis stream
    try:
        stream_id = self.client.xadd(self.stream_name, message, maxlen=maxlen)
        if self.verbose:
            logger.info(f"Published {width}x{height} image ({compressed_size} bytes)")
        return str(stream_id)
    except RedisError as e:
        logger.error(f"Failed to publish image to Redis: {e}")
        raise RedisPublishError(f"Failed to publish image: {e}") from e

subscribe_variable_images(callback, block_ms=1000, start_after='$')

Hört kontinuierlich auf neue Frames und ruft für jeden Frame einen Callback auf.

Continuously listen for new frames and invoke callback for each one.

Parameters:

Name Type Description Default
callback Callable[[ImageArray, ImageMetadata, Dict[str, Any]], None]

Funktion, die (image, metadata, image_info) erhält. (Function receiving (image, metadata, image_info)).

required
block_ms int

Zeitüberschreitung für das zugrunde liegende Redis xread. (Timeout for the underlying Redis xread).

1000
start_after str

Redis-Stream-ID, nach der mit dem Lesen begonnen werden soll. (Redis stream ID after which to start reading).

'$'

Raises:

Type Description
RedisRetrievalError

Wenn das Abonnement fehlschlägt. (If subscription fails).

Source code in redis_robot_comm/redis_image_streamer.py
def subscribe_variable_images(
    self,
    callback: Callable[[ImageArray, ImageMetadata, Dict[str, Any]], None],
    block_ms: int = 1000,
    start_after: str = "$",
) -> None:
    """
    Hört kontinuierlich auf neue Frames und ruft für jeden Frame einen Callback auf.

    Continuously listen for new frames and invoke callback for each one.

    Args:
        callback (Callable[[ImageArray, ImageMetadata, Dict[str, Any]], None]): Funktion, die (image, metadata, image_info) erhält. (Function receiving (image, metadata, image_info)).
        block_ms (int): Zeitüberschreitung für das zugrunde liegende Redis xread. (Timeout for the underlying Redis xread).
        start_after (str): Redis-Stream-ID, nach der mit dem Lesen begonnen werden soll. (Redis stream ID after which to start reading).

    Raises:
        RedisRetrievalError: Wenn das Abonnement fehlschlägt. (If subscription fails).
    """
    last_id = start_after
    if self.verbose:
        logger.info(f"Subscribing to image stream {self.stream_name}...")

    try:
        while True:
            messages = self.client.xread({self.stream_name: last_id}, block=block_ms, count=1)

            for stream, msgs in messages:
                for msg_id, fields in msgs:
                    last_id = msg_id
                    try:
                        result = self._decode_variable_image(fields)
                        if result:
                            image, metadata = result

                            # Prepare image info for callback
                            image_info = {
                                "width": image.shape[1],
                                "height": image.shape[0],
                                "channels": (image.shape[2] if len(image.shape) == 3 else 1),
                                "timestamp": float(fields.get("timestamp", "0")),
                                "compressed_size": int(fields.get("compressed_size", "0")),
                                "original_size": int(fields.get("original_size", "0")),
                            }

                            callback(image, metadata, image_info)
                    except Exception as e:
                        logger.error(f"Error processing image message {msg_id}: {e}")

    except KeyboardInterrupt:
        logger.info("Stopped subscribing to images")
    except RedisError as e:
        logger.error(f"Redis error in image subscription: {e}")
        raise RedisRetrievalError(f"Image subscription failed: {e}") from e
    except Exception as e:
        logger.error(f"Unexpected error in image subscription: {e}")
        time.sleep(0.1)