Zum Inhalt

Utilities

Workspace Utils

robot_environment.utils.workspace_utils

Workspace utility functions for free space calculation and coordinate transformations.

Functions

calculate_largest_free_space(workspace, detected_objects, grid_resolution=100, visualize=False, logger=None)

Determines the largest free space in the workspace in square metres and its center coordinate in metres.

Parameters:

Name Type Description Default
workspace Workspace

The workspace object to analyze.

required
detected_objects Objects

Collection of objects detected in the workspace.

required
grid_resolution int

Resolution of the workspace grid (default: 100x100).

100
visualize bool

If True, displays the grid visualization (requires GUI).

False
logger Optional[Logger]

Optional logger for debug information.

None

Returns:

Name Type Description
tuple Tuple[float, float, float]

(largest_area_m2, center_x, center_y)

Source code in robot_environment/utils/workspace_utils.py
def calculate_largest_free_space(
    workspace: Workspace,
    detected_objects: Objects,
    grid_resolution: int = 100,
    visualize: bool = False,
    logger: Optional[logging.Logger] = None,
) -> Tuple[float, float, float]:
    """
    Determines the largest free space in the workspace in square metres and its center coordinate in metres.

    Args:
        workspace: The workspace object to analyze.
        detected_objects: Collection of objects detected in the workspace.
        grid_resolution: Resolution of the workspace grid (default: 100x100).
        visualize: If True, displays the grid visualization (requires GUI).
        logger: Optional logger for debug information.

    Returns:
        tuple: (largest_area_m2, center_x, center_y)
    """
    if logger is None:
        logger = logging.getLogger(__name__)

    workspace_top_left = workspace.xy_ul_wc()
    workspace_bottom_right = workspace.xy_lr_wc()

    x_max, y_max = workspace_top_left.x, workspace_top_left.y
    x_min, y_min = workspace_bottom_right.x, workspace_bottom_right.y

    logger.debug(f"Workspace bounds: x=[{x_min}, {x_max}], y=[{y_min}, {y_max}]")

    workspace_width = abs(y_max - y_min)
    workspace_height = abs(x_max - x_min)

    # Create a grid to represent the workspace
    grid = np.zeros((grid_resolution, grid_resolution), dtype=int)

    # Map world coordinates to grid indices
    def to_grid_coords(x: float, y: float) -> Tuple[int, int]:
        """
        Map world coordinates to grid indices.

        Args:
            x: World x-coordinate.
            y: World y-coordinate.

        Returns:
            Tuple[int, int]: (u, v) grid indices.
        """
        v = int((x_max - x) / workspace_height * grid_resolution)
        u = int((y_max - y) / workspace_width * grid_resolution)
        # Clip to ensure indices are within grid bounds
        v = max(0, min(v, grid_resolution - 1))
        u = max(0, min(u, grid_resolution - 1))
        return u, v

    # Map grid indices back to world coordinates
    def to_world_coords(u: int, v: int) -> Tuple[float, float]:
        """
        Map grid indices back to world coordinates.

        Args:
            u: Grid u-index.
            v: Grid v-index.

        Returns:
            Tuple[float, float]: (x, y) world coordinates.
        """
        x = x_max - (v + 0.5) * (workspace_height / grid_resolution)
        y = y_max - (u + 0.5) * (workspace_width / grid_resolution)
        return x, y

    # Mark the grid cells occupied by objects
    for obj in detected_objects:
        x_start = obj.x_com() - obj.height_m() / 2
        x_end = obj.x_com() + obj.height_m() / 2
        y_start = obj.y_com() - obj.width_m() / 2
        y_end = obj.y_com() + obj.width_m() / 2

        # Convert object bounds to grid indices
        u_end, v_end = to_grid_coords(x_start, y_start)
        u_start, v_start = to_grid_coords(x_end, y_end)

        logger.debug(f"Object bounds: x=[{x_start}, {x_end}], y=[{y_start}, {y_end}]")
        logger.debug(f"Grid coords: u=[{u_start}, {u_end}], v=[{v_start}, {v_end}]")

        # Mark grid cells as occupied (ensuring correct order for slicing)
        v_min_idx = min(v_start, v_end)
        v_max_idx = max(v_start, v_end)
        u_min_idx = min(u_start, u_end)
        u_max_idx = max(u_start, u_end)
        grid[v_min_idx : v_max_idx + 1, u_min_idx : u_max_idx + 1] = 1

    # Find the largest rectangle of zeros in the grid
    largest_area_cells, (v_start_rect, u_start_rect), (v_end_rect, u_end_rect) = _max_rectangle_area(grid)
    largest_area_m2 = (largest_area_cells / (grid_resolution**2)) * (workspace_width * workspace_height)

    # Calculate the center of the largest rectangle in grid coordinates
    v_center = (v_start_rect + v_end_rect) // 2
    u_center = (u_start_rect + u_end_rect) // 2

    # Map the center to world coordinates
    center_x, center_y = to_world_coords(u_center, v_center)

    if visualize:
        try:
            # Mark center in the grid for visualization
            grid_vis = grid.copy()
            grid_vis[v_center : v_center + 1, u_center : u_center + 1] = 2
            # Normalize grid to 0–255 for visualization
            grid_visual = (grid_vis * 255 // 2).astype(np.uint8)
            cv2.imshow("Largest Free Space Grid", grid_visual)
            cv2.waitKey(1)  # Use 1 instead of 0 to avoid blocking in non-interactive mode
        except Exception as e:
            logger.warning(f"Could not visualize free space grid: {e}")

    logger.info(f"Largest free area: {largest_area_m2:.4f} square meters")
    logger.info(f"Center: ({center_x:.4f}, {center_y:.4f}) meters")

    return largest_area_m2, center_x, center_y

Object Memory Manager

robot_environment.object_memory_manager.ObjectMemoryManager

Manages object position memory with workspace tracking.

Features: - Multi-workspace memory management - Manual update tracking (for pick/place operations) - Thread-safe operations - Workspace visibility state tracking - Intelligent memory clearing based on robot state

Example

manager = ObjectMemoryManager(verbose=True)

Initialize workspaces

manager.initialize_workspace("niryo_ws") manager.initialize_workspace("niryo_ws_left")

Update memory with detected objects

manager.update("niryo_ws", detected_objects, at_observation=True)

Get objects from memory

objects = manager.get("niryo_ws")

Manual update after placing object

manager.mark_manual_update("niryo_ws", "cube", old_coord=[0.2, 0.0], new_pose=new_pose)

Source code in robot_environment/object_memory_manager.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
class ObjectMemoryManager:
    """
    Manages object position memory with workspace tracking.

    Features:
    - Multi-workspace memory management
    - Manual update tracking (for pick/place operations)
    - Thread-safe operations
    - Workspace visibility state tracking
    - Intelligent memory clearing based on robot state

    Example:
        manager = ObjectMemoryManager(verbose=True)

        # Initialize workspaces
        manager.initialize_workspace("niryo_ws")
        manager.initialize_workspace("niryo_ws_left")

        # Update memory with detected objects
        manager.update("niryo_ws", detected_objects, at_observation=True)

        # Get objects from memory
        objects = manager.get("niryo_ws")

        # Manual update after placing object
        manager.mark_manual_update("niryo_ws", "cube",
                                   old_coord=[0.2, 0.0],
                                   new_pose=new_pose)
    """

    def __init__(self, manual_update_timeout: float = 5.0, position_tolerance: float = 0.05, verbose: bool = False):
        """
        Initialize the Object Memory Manager.

        Args:
            manual_update_timeout: Seconds to keep manual updates (default: 5.0)
            position_tolerance: Distance threshold for duplicate detection in meters (default: 0.05)
            verbose: Enable verbose logging (default: False)
        """
        # Workspace memories
        self._memories: Dict[str, Objects] = {}

        # Thread safety (using RLock to allow reentrant calls from update/move_object to initialize_workspace)
        self._lock = threading.RLock()

        # Manual updates tracking: {workspace_id: {object_label: timestamp}}
        self._manual_updates: Dict[str, Dict[str, float]] = {}

        # Configuration
        self._manual_update_timeout = manual_update_timeout
        self._position_tolerance = position_tolerance

        # State tracking
        self._workspace_visibility: Dict[str, bool] = {}
        self._workspace_was_lost: Dict[str, bool] = {}

        # Logging
        self._verbose = verbose
        self._logger = logging.getLogger(__name__)
        if verbose:
            self._logger.setLevel(logging.DEBUG)

    def initialize_workspace(self, workspace_id: str) -> None:
        """
        Initialize memory for a new workspace.

        Args:
            workspace_id: ID of the workspace to initialize
        """
        with self._lock:
            if workspace_id not in self._memories:
                self._memories[workspace_id] = Objects()
                self._manual_updates[workspace_id] = {}
                self._workspace_visibility[workspace_id] = False
                self._workspace_was_lost[workspace_id] = False

                self._logger.debug(f"Initialized memory for workspace: {workspace_id}")

    def update(
        self, workspace_id: str, detected_objects: Objects, at_observation_pose: bool, robot_in_motion: bool
    ) -> Tuple[int, int]:
        """
        Update memory with newly detected objects.

        Only updates when conditions are appropriate (at observation pose, not moving).
        Respects manual updates from pick/place operations.

        Args:
            workspace_id: ID of the workspace
            detected_objects: Objects detected in current frame
            at_observation_pose: Whether robot is at observation pose
            robot_in_motion: Whether robot is currently moving

        Returns:
            Tuple of (objects_added, objects_updated)
        """
        with self._lock:
            # Ensure workspace is initialized
            if workspace_id not in self._memories:
                self.initialize_workspace(workspace_id)

            # Check if we should clear memory first
            if self._should_clear_memory(workspace_id, at_observation_pose, robot_in_motion):
                self._clear_workspace_internal(workspace_id)

            # Update visibility tracking
            self._update_visibility_state(workspace_id, at_observation_pose, robot_in_motion)

            # Only update when at observation pose
            if not self._should_update_memory(at_observation_pose, robot_in_motion):
                self._logger.debug(f"Skipping memory update for {workspace_id} - conditions not met")
                return 0, 0

            # Clean up expired manual updates
            self._cleanup_expired_manual_updates(workspace_id)

            # Update memory with new detections
            objects_added, objects_updated = self._merge_detections(workspace_id, detected_objects)

            if objects_added > 0 or objects_updated > 0:
                self._logger.debug(
                    f"Memory update for '{workspace_id}': "
                    f"added={objects_added}, updated={objects_updated}, "
                    f"total={len(self._memories[workspace_id])}"
                )

            return objects_added, objects_updated

    def get(self, workspace_id: str) -> Objects:
        """
        Get a copy of objects from workspace memory.

        Args:
            workspace_id: ID of the workspace

        Returns:
            Copy of objects in memory for this workspace
        """
        with self._lock:
            if workspace_id not in self._memories:
                self._logger.warning(f"Workspace {workspace_id} not initialized")
                return Objects()

            # Return a copy to avoid external modifications
            return Objects(list(self._memories[workspace_id]))

    def get_all(self) -> Dict[str, Objects]:
        """
        Get objects from all workspaces.

        Returns:
            Dictionary mapping workspace_id to Objects collection
        """
        with self._lock:
            return {ws_id: Objects(list(objects)) for ws_id, objects in self._memories.items()}

    def clear(self, workspace_id: Optional[str] = None) -> None:
        """
        Clear memory for specific workspace or all workspaces.

        Args:
            workspace_id: ID of workspace to clear, or None to clear all
        """
        with self._lock:
            if workspace_id is None:
                # Clear all workspaces
                for ws_id in self._memories:
                    self._clear_workspace_internal(ws_id)
                self._logger.info("Cleared memory for all workspaces")
            else:
                if workspace_id in self._memories:
                    self._clear_workspace_internal(workspace_id)
                    self._logger.info(f"Cleared memory for workspace: {workspace_id}")
                else:
                    self._logger.warning(f"Workspace {workspace_id} not found")

    def remove_object(self, workspace_id: str, object_label: str, coordinate: List[float]) -> bool:
        """
        Remove an object from workspace memory.

        Args:
            workspace_id: ID of the workspace
            object_label: Label of the object to remove
            coordinate: Last known coordinate [x, y]

        Returns:
            True if object was found and removed, False otherwise
        """
        with self._lock:
            if workspace_id not in self._memories:
                self._logger.warning(f"Workspace {workspace_id} not found")
                return False

            workspace_objects = self._memories[workspace_id]

            for i, obj in enumerate(workspace_objects):
                if obj.label() == object_label and self._is_same_position(obj, coordinate):

                    del workspace_objects[i]

                    # Clear manual update tracking
                    if workspace_id in self._manual_updates:
                        self._manual_updates[workspace_id].pop(object_label, None)

                    self._logger.info(f"Removed {object_label} from {workspace_id} at {coordinate}")
                    return True

            self._logger.warning(f"Could not find {object_label} at {coordinate} in {workspace_id}")
            return False

    def mark_manual_update(
        self, workspace_id: str, object_label: str, old_coordinate: List[float], new_pose: PoseObjectPNP
    ) -> bool:
        """
        Update an object's position after manual manipulation.

        Args:
            workspace_id: ID of the workspace
            object_label: Label of the object
            old_coordinate: Previous coordinate [x, y]
            new_pose: New pose after movement

        Returns:
            True if object was found and updated, False otherwise
        """
        with self._lock:
            if workspace_id not in self._memories:
                self._logger.warning(f"Workspace {workspace_id} not found")
                return False

            workspace_objects = self._memories[workspace_id]

            for obj in workspace_objects:
                if obj.label() == object_label and self._is_same_position(obj, old_coordinate):

                    # Update position
                    obj.set_pose_com(new_pose)

                    # Track manual update
                    if workspace_id not in self._manual_updates:
                        self._manual_updates[workspace_id] = {}
                    self._manual_updates[workspace_id][object_label] = time.time()

                    self._logger.info(
                        f"Updated {object_label} in {workspace_id}: "
                        f"{old_coordinate} -> [{new_pose.x:.3f}, {new_pose.y:.3f}]"
                    )
                    return True

            self._logger.warning(f"Could not find {object_label} at {old_coordinate} in {workspace_id}")
            return False

    def move_object(
        self,
        source_workspace_id: str,
        target_workspace_id: str,
        object_label: str,
        old_coordinate: List[float],
        new_coordinate: List[float],
    ) -> bool:
        """
        Move an object from one workspace to another in memory.

        Args:
            source_workspace_id: ID of source workspace
            target_workspace_id: ID of target workspace
            object_label: Label of the object
            old_coordinate: Current coordinate in source workspace
            new_coordinate: New coordinate in target workspace

        Returns:
            True if object was found and moved, False otherwise
        """
        with self._lock:
            # Validate workspaces
            if source_workspace_id not in self._memories:
                self._logger.warning(f"Source workspace {source_workspace_id} not found")
                return False

            if target_workspace_id not in self._memories:
                self.initialize_workspace(target_workspace_id)

            # Find and remove from source
            source_objects = self._memories[source_workspace_id]
            obj_to_move = None

            for i, obj in enumerate(source_objects):
                if obj.label() == object_label and self._is_same_position(obj, old_coordinate):

                    obj_to_move = obj
                    del source_objects[i]
                    break

            if obj_to_move is None:
                self._logger.warning(f"Could not find {object_label} at {old_coordinate} " f"in {source_workspace_id}")
                return False

            # Update object's position
            obj_to_move._x_com = new_coordinate[0]
            obj_to_move._y_com = new_coordinate[1]

            # Add to target workspace
            self._memories[target_workspace_id].append(obj_to_move)

            # Track manual update in target workspace
            if target_workspace_id not in self._manual_updates:
                self._manual_updates[target_workspace_id] = {}
            self._manual_updates[target_workspace_id][object_label] = time.time()

            self._logger.info(f"Moved {object_label} from {source_workspace_id} " f"to {target_workspace_id}")
            return True

    def get_memory_stats(self) -> Dict[str, Dict]:
        """
        Get statistics about memory contents.

        Returns:
            Dictionary with stats for each workspace
        """
        with self._lock:
            stats = {}
            for ws_id, objects in self._memories.items():
                manual_updates = self._manual_updates.get(ws_id, {})
                stats[ws_id] = {
                    "object_count": len(objects),
                    "manual_updates": len(manual_updates),
                    "visible": self._workspace_visibility.get(ws_id, False),
                    "was_lost": self._workspace_was_lost.get(ws_id, False),
                    "objects": [
                        {
                            "label": obj.label(),
                            "position": [obj.x_com(), obj.y_com()],
                            "manually_updated": obj.label() in manual_updates,
                        }
                        for obj in objects
                    ],
                }
            return stats

    # Private helper methods

    def _should_update_memory(self, at_observation_pose: bool, robot_in_motion: bool) -> bool:
        """Determine if memory should be updated."""
        return at_observation_pose and not robot_in_motion

    def _should_clear_memory(self, workspace_id: str, at_observation_pose: bool, robot_in_motion: bool) -> bool:
        """Determine if memory should be cleared."""
        was_lost = self._workspace_was_lost.get(workspace_id, False)
        now_visible = at_observation_pose and not robot_in_motion

        # Clear memory when workspace becomes visible again after being lost
        should_clear = was_lost and now_visible

        if should_clear:
            self._workspace_was_lost[workspace_id] = False

        return should_clear

    def _update_visibility_state(self, workspace_id: str, at_observation_pose: bool, robot_in_motion: bool) -> None:
        """Update workspace visibility tracking."""
        was_visible = self._workspace_visibility.get(workspace_id, False)
        now_visible = at_observation_pose and not robot_in_motion

        self._workspace_visibility[workspace_id] = now_visible

        # Detect when workspace is lost
        if was_visible and not now_visible:
            self._workspace_was_lost[workspace_id] = True
            self._logger.debug(f"Workspace {workspace_id} lost - robot moved")

        # Clear lost flag when workspace becomes visible again
        if now_visible and self._workspace_was_lost.get(workspace_id, False):
            self._logger.debug(f"Workspace {workspace_id} visible again - will clear memory on next update")

    def _cleanup_expired_manual_updates(self, workspace_id: str) -> None:
        """Remove expired manual updates."""
        if workspace_id not in self._manual_updates:
            return

        current_time = time.time()
        manual_updates = self._manual_updates[workspace_id]

        expired_labels = [
            label for label, timestamp in manual_updates.items() if current_time - timestamp > self._manual_update_timeout
        ]

        for label in expired_labels:
            del manual_updates[label]
            self._logger.debug(f"Manual update expired for {label} in {workspace_id}")

    def _merge_detections(self, workspace_id: str, detected_objects: Objects) -> Tuple[int, int]:
        """
        Merge new detections into memory.

        Returns:
            Tuple of (objects_added, objects_updated)
        """
        workspace_memory = self._memories[workspace_id]
        manual_updates = self._manual_updates.get(workspace_id, {})

        objects_added = 0
        objects_updated = 0

        for obj in detected_objects:
            x_center, y_center = obj.xy_com()
            label = obj.label()

            # Check if this object has a recent manual update
            if label in manual_updates:
                # Find the manually updated object
                found_manual = False
                for memory_obj in workspace_memory:
                    if memory_obj.label() == label:
                        manual_dist = ((memory_obj.x_com() - x_center) ** 2 + (memory_obj.y_com() - y_center) ** 2) ** 0.5

                        if manual_dist > self._position_tolerance:
                            # Keep manual update, ignore detection
                            self._logger.debug(f"Keeping manual update for {label} " f"(distance: {manual_dist:.3f}m)")
                            found_manual = True
                            break
                        else:
                            # Detection confirms manual update
                            memory_obj._x_com = x_center
                            memory_obj._y_com = y_center
                            objects_updated += 1
                            found_manual = True
                            self._logger.debug(f"Detection confirms manual update for {label}")
                            break

                if found_manual:
                    continue

            # Check if object already exists in memory
            is_duplicate = False
            for memory_obj in workspace_memory:
                if memory_obj.label() == label and self._is_same_position(memory_obj, [x_center, y_center]):
                    is_duplicate = True
                    break

            if not is_duplicate:
                workspace_memory.append(obj)
                objects_added += 1

        return objects_added, objects_updated

    def _is_same_position(self, obj: Object, coordinate: List[float]) -> bool:
        """Check if object is at the same position within tolerance."""
        return (
            abs(obj.x_com() - coordinate[0]) <= self._position_tolerance
            and abs(obj.y_com() - coordinate[1]) <= self._position_tolerance
        )

    def _clear_workspace_internal(self, workspace_id: str) -> None:
        """Internal method to clear workspace memory (assumes lock is held)."""
        if workspace_id in self._memories:
            count = len(self._memories[workspace_id])
            self._memories[workspace_id].clear()

            if workspace_id in self._manual_updates:
                self._manual_updates[workspace_id].clear()

            self._workspace_was_lost[workspace_id] = False

            self._logger.debug(f"Cleared {count} objects from {workspace_id}")

Functions

__init__(manual_update_timeout=5.0, position_tolerance=0.05, verbose=False)

Initialize the Object Memory Manager.

Parameters:

Name Type Description Default
manual_update_timeout float

Seconds to keep manual updates (default: 5.0)

5.0
position_tolerance float

Distance threshold for duplicate detection in meters (default: 0.05)

0.05
verbose bool

Enable verbose logging (default: False)

False
Source code in robot_environment/object_memory_manager.py
def __init__(self, manual_update_timeout: float = 5.0, position_tolerance: float = 0.05, verbose: bool = False):
    """
    Initialize the Object Memory Manager.

    Args:
        manual_update_timeout: Seconds to keep manual updates (default: 5.0)
        position_tolerance: Distance threshold for duplicate detection in meters (default: 0.05)
        verbose: Enable verbose logging (default: False)
    """
    # Workspace memories
    self._memories: Dict[str, Objects] = {}

    # Thread safety (using RLock to allow reentrant calls from update/move_object to initialize_workspace)
    self._lock = threading.RLock()

    # Manual updates tracking: {workspace_id: {object_label: timestamp}}
    self._manual_updates: Dict[str, Dict[str, float]] = {}

    # Configuration
    self._manual_update_timeout = manual_update_timeout
    self._position_tolerance = position_tolerance

    # State tracking
    self._workspace_visibility: Dict[str, bool] = {}
    self._workspace_was_lost: Dict[str, bool] = {}

    # Logging
    self._verbose = verbose
    self._logger = logging.getLogger(__name__)
    if verbose:
        self._logger.setLevel(logging.DEBUG)

clear(workspace_id=None)

Clear memory for specific workspace or all workspaces.

Parameters:

Name Type Description Default
workspace_id Optional[str]

ID of workspace to clear, or None to clear all

None
Source code in robot_environment/object_memory_manager.py
def clear(self, workspace_id: Optional[str] = None) -> None:
    """
    Clear memory for specific workspace or all workspaces.

    Args:
        workspace_id: ID of workspace to clear, or None to clear all
    """
    with self._lock:
        if workspace_id is None:
            # Clear all workspaces
            for ws_id in self._memories:
                self._clear_workspace_internal(ws_id)
            self._logger.info("Cleared memory for all workspaces")
        else:
            if workspace_id in self._memories:
                self._clear_workspace_internal(workspace_id)
                self._logger.info(f"Cleared memory for workspace: {workspace_id}")
            else:
                self._logger.warning(f"Workspace {workspace_id} not found")

get(workspace_id)

Get a copy of objects from workspace memory.

Parameters:

Name Type Description Default
workspace_id str

ID of the workspace

required

Returns:

Type Description
Objects

Copy of objects in memory for this workspace

Source code in robot_environment/object_memory_manager.py
def get(self, workspace_id: str) -> Objects:
    """
    Get a copy of objects from workspace memory.

    Args:
        workspace_id: ID of the workspace

    Returns:
        Copy of objects in memory for this workspace
    """
    with self._lock:
        if workspace_id not in self._memories:
            self._logger.warning(f"Workspace {workspace_id} not initialized")
            return Objects()

        # Return a copy to avoid external modifications
        return Objects(list(self._memories[workspace_id]))

get_all()

Get objects from all workspaces.

Returns:

Type Description
Dict[str, Objects]

Dictionary mapping workspace_id to Objects collection

Source code in robot_environment/object_memory_manager.py
def get_all(self) -> Dict[str, Objects]:
    """
    Get objects from all workspaces.

    Returns:
        Dictionary mapping workspace_id to Objects collection
    """
    with self._lock:
        return {ws_id: Objects(list(objects)) for ws_id, objects in self._memories.items()}

get_memory_stats()

Get statistics about memory contents.

Returns:

Type Description
Dict[str, Dict]

Dictionary with stats for each workspace

Source code in robot_environment/object_memory_manager.py
def get_memory_stats(self) -> Dict[str, Dict]:
    """
    Get statistics about memory contents.

    Returns:
        Dictionary with stats for each workspace
    """
    with self._lock:
        stats = {}
        for ws_id, objects in self._memories.items():
            manual_updates = self._manual_updates.get(ws_id, {})
            stats[ws_id] = {
                "object_count": len(objects),
                "manual_updates": len(manual_updates),
                "visible": self._workspace_visibility.get(ws_id, False),
                "was_lost": self._workspace_was_lost.get(ws_id, False),
                "objects": [
                    {
                        "label": obj.label(),
                        "position": [obj.x_com(), obj.y_com()],
                        "manually_updated": obj.label() in manual_updates,
                    }
                    for obj in objects
                ],
            }
        return stats

initialize_workspace(workspace_id)

Initialize memory for a new workspace.

Parameters:

Name Type Description Default
workspace_id str

ID of the workspace to initialize

required
Source code in robot_environment/object_memory_manager.py
def initialize_workspace(self, workspace_id: str) -> None:
    """
    Initialize memory for a new workspace.

    Args:
        workspace_id: ID of the workspace to initialize
    """
    with self._lock:
        if workspace_id not in self._memories:
            self._memories[workspace_id] = Objects()
            self._manual_updates[workspace_id] = {}
            self._workspace_visibility[workspace_id] = False
            self._workspace_was_lost[workspace_id] = False

            self._logger.debug(f"Initialized memory for workspace: {workspace_id}")

mark_manual_update(workspace_id, object_label, old_coordinate, new_pose)

Update an object's position after manual manipulation.

Parameters:

Name Type Description Default
workspace_id str

ID of the workspace

required
object_label str

Label of the object

required
old_coordinate List[float]

Previous coordinate [x, y]

required
new_pose PoseObjectPNP

New pose after movement

required

Returns:

Type Description
bool

True if object was found and updated, False otherwise

Source code in robot_environment/object_memory_manager.py
def mark_manual_update(
    self, workspace_id: str, object_label: str, old_coordinate: List[float], new_pose: PoseObjectPNP
) -> bool:
    """
    Update an object's position after manual manipulation.

    Args:
        workspace_id: ID of the workspace
        object_label: Label of the object
        old_coordinate: Previous coordinate [x, y]
        new_pose: New pose after movement

    Returns:
        True if object was found and updated, False otherwise
    """
    with self._lock:
        if workspace_id not in self._memories:
            self._logger.warning(f"Workspace {workspace_id} not found")
            return False

        workspace_objects = self._memories[workspace_id]

        for obj in workspace_objects:
            if obj.label() == object_label and self._is_same_position(obj, old_coordinate):

                # Update position
                obj.set_pose_com(new_pose)

                # Track manual update
                if workspace_id not in self._manual_updates:
                    self._manual_updates[workspace_id] = {}
                self._manual_updates[workspace_id][object_label] = time.time()

                self._logger.info(
                    f"Updated {object_label} in {workspace_id}: "
                    f"{old_coordinate} -> [{new_pose.x:.3f}, {new_pose.y:.3f}]"
                )
                return True

        self._logger.warning(f"Could not find {object_label} at {old_coordinate} in {workspace_id}")
        return False

move_object(source_workspace_id, target_workspace_id, object_label, old_coordinate, new_coordinate)

Move an object from one workspace to another in memory.

Parameters:

Name Type Description Default
source_workspace_id str

ID of source workspace

required
target_workspace_id str

ID of target workspace

required
object_label str

Label of the object

required
old_coordinate List[float]

Current coordinate in source workspace

required
new_coordinate List[float]

New coordinate in target workspace

required

Returns:

Type Description
bool

True if object was found and moved, False otherwise

Source code in robot_environment/object_memory_manager.py
def move_object(
    self,
    source_workspace_id: str,
    target_workspace_id: str,
    object_label: str,
    old_coordinate: List[float],
    new_coordinate: List[float],
) -> bool:
    """
    Move an object from one workspace to another in memory.

    Args:
        source_workspace_id: ID of source workspace
        target_workspace_id: ID of target workspace
        object_label: Label of the object
        old_coordinate: Current coordinate in source workspace
        new_coordinate: New coordinate in target workspace

    Returns:
        True if object was found and moved, False otherwise
    """
    with self._lock:
        # Validate workspaces
        if source_workspace_id not in self._memories:
            self._logger.warning(f"Source workspace {source_workspace_id} not found")
            return False

        if target_workspace_id not in self._memories:
            self.initialize_workspace(target_workspace_id)

        # Find and remove from source
        source_objects = self._memories[source_workspace_id]
        obj_to_move = None

        for i, obj in enumerate(source_objects):
            if obj.label() == object_label and self._is_same_position(obj, old_coordinate):

                obj_to_move = obj
                del source_objects[i]
                break

        if obj_to_move is None:
            self._logger.warning(f"Could not find {object_label} at {old_coordinate} " f"in {source_workspace_id}")
            return False

        # Update object's position
        obj_to_move._x_com = new_coordinate[0]
        obj_to_move._y_com = new_coordinate[1]

        # Add to target workspace
        self._memories[target_workspace_id].append(obj_to_move)

        # Track manual update in target workspace
        if target_workspace_id not in self._manual_updates:
            self._manual_updates[target_workspace_id] = {}
        self._manual_updates[target_workspace_id][object_label] = time.time()

        self._logger.info(f"Moved {object_label} from {source_workspace_id} " f"to {target_workspace_id}")
        return True

remove_object(workspace_id, object_label, coordinate)

Remove an object from workspace memory.

Parameters:

Name Type Description Default
workspace_id str

ID of the workspace

required
object_label str

Label of the object to remove

required
coordinate List[float]

Last known coordinate [x, y]

required

Returns:

Type Description
bool

True if object was found and removed, False otherwise

Source code in robot_environment/object_memory_manager.py
def remove_object(self, workspace_id: str, object_label: str, coordinate: List[float]) -> bool:
    """
    Remove an object from workspace memory.

    Args:
        workspace_id: ID of the workspace
        object_label: Label of the object to remove
        coordinate: Last known coordinate [x, y]

    Returns:
        True if object was found and removed, False otherwise
    """
    with self._lock:
        if workspace_id not in self._memories:
            self._logger.warning(f"Workspace {workspace_id} not found")
            return False

        workspace_objects = self._memories[workspace_id]

        for i, obj in enumerate(workspace_objects):
            if obj.label() == object_label and self._is_same_position(obj, coordinate):

                del workspace_objects[i]

                # Clear manual update tracking
                if workspace_id in self._manual_updates:
                    self._manual_updates[workspace_id].pop(object_label, None)

                self._logger.info(f"Removed {object_label} from {workspace_id} at {coordinate}")
                return True

        self._logger.warning(f"Could not find {object_label} at {coordinate} in {workspace_id}")
        return False

update(workspace_id, detected_objects, at_observation_pose, robot_in_motion)

Update memory with newly detected objects.

Only updates when conditions are appropriate (at observation pose, not moving). Respects manual updates from pick/place operations.

Parameters:

Name Type Description Default
workspace_id str

ID of the workspace

required
detected_objects Objects

Objects detected in current frame

required
at_observation_pose bool

Whether robot is at observation pose

required
robot_in_motion bool

Whether robot is currently moving

required

Returns:

Type Description
Tuple[int, int]

Tuple of (objects_added, objects_updated)

Source code in robot_environment/object_memory_manager.py
def update(
    self, workspace_id: str, detected_objects: Objects, at_observation_pose: bool, robot_in_motion: bool
) -> Tuple[int, int]:
    """
    Update memory with newly detected objects.

    Only updates when conditions are appropriate (at observation pose, not moving).
    Respects manual updates from pick/place operations.

    Args:
        workspace_id: ID of the workspace
        detected_objects: Objects detected in current frame
        at_observation_pose: Whether robot is at observation pose
        robot_in_motion: Whether robot is currently moving

    Returns:
        Tuple of (objects_added, objects_updated)
    """
    with self._lock:
        # Ensure workspace is initialized
        if workspace_id not in self._memories:
            self.initialize_workspace(workspace_id)

        # Check if we should clear memory first
        if self._should_clear_memory(workspace_id, at_observation_pose, robot_in_motion):
            self._clear_workspace_internal(workspace_id)

        # Update visibility tracking
        self._update_visibility_state(workspace_id, at_observation_pose, robot_in_motion)

        # Only update when at observation pose
        if not self._should_update_memory(at_observation_pose, robot_in_motion):
            self._logger.debug(f"Skipping memory update for {workspace_id} - conditions not met")
            return 0, 0

        # Clean up expired manual updates
        self._cleanup_expired_manual_updates(workspace_id)

        # Update memory with new detections
        objects_added, objects_updated = self._merge_detections(workspace_id, detected_objects)

        if objects_added > 0 or objects_updated > 0:
            self._logger.debug(
                f"Memory update for '{workspace_id}': "
                f"added={objects_added}, updated={objects_updated}, "
                f"total={len(self._memories[workspace_id])}"
            )

        return objects_added, objects_updated

Performance Metrics

robot_environment.performance_metrics.PerformanceMetrics

Centralized performance monitoring for robot_environment.

Tracks timing and throughput metrics for all major operations: - Camera frame capture - Object detection - Robot movements (pick, place, push) - Memory updates - Redis communication

Example

metrics = PerformanceMetrics(history_size=100)

Using context manager

with metrics.timer('frame_capture'): frame = capture_frame()

Manual recording

metrics.record_timing('detection', 45.2)

Get statistics

stats = metrics.get_stats() print(f"Average FPS: {stats['camera']['fps']:.1f}")

Source code in robot_environment/performance_metrics.py
class PerformanceMetrics:
    """
    Centralized performance monitoring for robot_environment.

    Tracks timing and throughput metrics for all major operations:
    - Camera frame capture
    - Object detection
    - Robot movements (pick, place, push)
    - Memory updates
    - Redis communication

    Example:
        metrics = PerformanceMetrics(history_size=100)

        # Using context manager
        with metrics.timer('frame_capture'):
            frame = capture_frame()

        # Manual recording
        metrics.record_timing('detection', 45.2)

        # Get statistics
        stats = metrics.get_stats()
        print(f"Average FPS: {stats['camera']['fps']:.1f}")
    """

    def __init__(self, history_size: int = 100, verbose: bool = False):
        """
        Initialize performance metrics tracker.

        Args:
            history_size: Number of samples to keep for each metric
            verbose: Enable verbose logging of metrics
        """
        self._history_size = history_size
        self._verbose = verbose
        self._logger = logging.getLogger(__name__)

        # Thread safety
        self._lock = threading.Lock()

        # Timing metrics (stored as deques for efficient rolling windows)
        self._timings: Dict[str, deque] = {
            # Camera operations
            "frame_capture": deque(maxlen=history_size),
            "frame_publish": deque(maxlen=history_size),
            # Vision operations
            "object_detection": deque(maxlen=history_size),
            "object_fetch_redis": deque(maxlen=history_size),
            # Memory operations
            "memory_update": deque(maxlen=history_size),
            "memory_get": deque(maxlen=history_size),
            "memory_clear": deque(maxlen=history_size),
            # Robot operations
            "robot_pick": deque(maxlen=history_size),
            "robot_place": deque(maxlen=history_size),
            "robot_push": deque(maxlen=history_size),
            "robot_move_observation": deque(maxlen=history_size),
            "robot_get_pose": deque(maxlen=history_size),
            # High-level operations
            "pick_place_total": deque(maxlen=history_size),
            "camera_loop_iteration": deque(maxlen=history_size),
            # Communication
            "redis_publish": deque(maxlen=history_size),
            "redis_fetch": deque(maxlen=history_size),
        }

        # Counter metrics
        self._counters: Dict[str, int] = {
            "frames_captured": 0,
            "objects_detected": 0,
            "pick_operations": 0,
            "place_operations": 0,
            "push_operations": 0,
            "pick_successes": 0,
            "place_successes": 0,
            "pick_failures": 0,
            "place_failures": 0,
            "memory_updates": 0,
            "memory_clears": 0,
        }

        # Start time for uptime tracking
        self._start_time = datetime.now()

        # Last values for rate calculations
        self._last_frame_time = None
        self._last_stats_time = perf_counter()

        if verbose:
            self._logger.setLevel(logging.DEBUG)

    def timer(self, metric_name: str) -> PerformanceTimer:
        """
        Create a context manager timer for an operation.

        Args:
            metric_name: Name of the metric to record

        Returns:
            PerformanceTimer context manager

        Example:
            with metrics.timer('frame_capture'):
                frame = camera.get_frame()
        """
        return PerformanceTimer(self, metric_name)

    def record_timing(self, metric_name: str, duration_ms: float) -> None:
        """
        Record a timing measurement.

        Args:
            metric_name: Name of the metric
            duration_ms: Duration in milliseconds
        """
        with self._lock:
            if metric_name in self._timings:
                self._timings[metric_name].append(duration_ms)

                if self._verbose and len(self._timings[metric_name]) % 10 == 0:
                    stats = TimingStats.from_samples(list(self._timings[metric_name]))
                    self._logger.debug(
                        f"{metric_name}: {duration_ms:.1f}ms " f"(avg: {stats.mean:.1f}ms, p95: {stats.p95:.1f}ms)"
                    )
            else:
                # FIX: Create the metric on-the-fly if it doesn't exist
                from collections import deque

                self._timings[metric_name] = deque([duration_ms], maxlen=self._history_size)
                self._logger.debug(f"Created new timing metric: {metric_name}")

    def increment_counter(self, counter_name: str, amount: int = 1) -> None:
        """
        Increment a counter metric.

        Args:
            counter_name: Name of the counter
            amount: Amount to increment by (default: 1)
        """
        with self._lock:
            if counter_name in self._counters:
                self._counters[counter_name] += amount
            else:
                self._logger.warning(f"Unknown counter: {counter_name}")

    def record_frame_captured(self, duration_ms: float) -> None:
        """Record a frame capture event."""
        self.record_timing("frame_capture", duration_ms)
        self.increment_counter("frames_captured")
        self._last_frame_time = perf_counter()

    def record_objects_detected(self, count: int, detection_time_ms: float) -> None:
        """Record object detection results."""
        self.record_timing("object_detection", detection_time_ms)
        self.increment_counter("objects_detected", count)

    def record_pick_operation(self, duration_s: float, success: bool) -> None:
        """Record a pick operation."""
        self.record_timing("robot_pick", duration_s * 1000)
        self.increment_counter("pick_operations")
        if success:
            self.increment_counter("pick_successes")
        else:
            self.increment_counter("pick_failures")

    def record_place_operation(self, duration_s: float, success: bool) -> None:
        """Record a place operation."""
        self.record_timing("robot_place", duration_s * 1000)
        self.increment_counter("place_operations")
        if success:
            self.increment_counter("place_successes")
        else:
            self.increment_counter("place_failures")

    def record_memory_update(self, duration_ms: float, objects_added: int, objects_updated: int) -> None:
        """Record a memory update operation."""
        self.record_timing("memory_update", duration_ms)
        self.increment_counter("memory_updates")

    def get_stats(self) -> Dict[str, Any]:
        """
        Get comprehensive performance statistics.

        Returns:
            Dictionary containing all performance metrics and statistics
        """
        with self._lock:
            # current_time = perf_counter()
            uptime_seconds = (datetime.now() - self._start_time).total_seconds()

            stats = {
                "uptime_seconds": uptime_seconds,
                "timestamp": datetime.now().isoformat(),
                # Camera metrics
                "camera": {
                    "frames_captured": self._counters["frames_captured"],
                    "fps": self._calculate_fps(),
                    "frame_capture": self._get_timing_stats("frame_capture"),
                    "frame_publish": self._get_timing_stats("frame_publish"),
                    "loop_iteration": self._get_timing_stats("camera_loop_iteration"),
                },
                # Vision metrics
                "vision": {
                    "objects_detected": self._counters["objects_detected"],
                    "detection_time": self._get_timing_stats("object_detection"),
                    "redis_fetch_time": self._get_timing_stats("object_fetch_redis"),
                },
                # Memory metrics
                "memory": {
                    "updates": self._counters["memory_updates"],
                    "clears": self._counters["memory_clears"],
                    "update_time": self._get_timing_stats("memory_update"),
                    "get_time": self._get_timing_stats("memory_get"),
                },
                # Robot operation metrics
                "robot": {
                    "operations": {
                        "pick": {
                            "count": self._counters["pick_operations"],
                            "successes": self._counters["pick_successes"],
                            "failures": self._counters["pick_failures"],
                            "success_rate": self._calculate_success_rate("pick"),
                            "duration": self._get_timing_stats("robot_pick"),
                        },
                        "place": {
                            "count": self._counters["place_operations"],
                            "successes": self._counters["place_successes"],
                            "failures": self._counters["place_failures"],
                            "success_rate": self._calculate_success_rate("place"),
                            "duration": self._get_timing_stats("robot_place"),
                        },
                        "push": {
                            "count": self._counters["push_operations"],
                            "duration": self._get_timing_stats("robot_push"),
                        },
                    },
                    "movement": {
                        "observation_pose": self._get_timing_stats("robot_move_observation"),
                        "get_pose": self._get_timing_stats("robot_get_pose"),
                    },
                    "pick_place_total": self._get_timing_stats("pick_place_total"),
                },
                # Communication metrics
                "communication": {
                    "redis_publish": self._get_timing_stats("redis_publish"),
                    "redis_fetch": self._get_timing_stats("redis_fetch"),
                },
            }

            return stats

    def get_summary(self) -> str:
        """
        Get a human-readable summary of performance metrics.

        Returns:
            Formatted string with key performance indicators
        """
        stats = self.get_stats()

        lines = [
            "=" * 70,
            "PERFORMANCE METRICS SUMMARY",
            "=" * 70,
            f"Uptime: {stats['uptime_seconds']:.1f}s",
            "",
            "CAMERA:",
            f"  Frames captured: {stats['camera']['frames_captured']}",
            f"  Current FPS: {stats['camera']['fps']:.1f}",
            f"  Frame capture time: {stats['camera']['frame_capture']['mean']:.1f}ms "
            f"(p95: {stats['camera']['frame_capture']['p95']:.1f}ms)",
            "",
            "VISION:",
            f"  Objects detected: {stats['vision']['objects_detected']}",
            f"  Detection time: {stats['vision']['detection_time']['mean']:.1f}ms "
            f"(p95: {stats['vision']['detection_time']['p95']:.1f}ms)",
            "",
            "ROBOT OPERATIONS:",
            f"  Pick operations: {stats['robot']['operations']['pick']['count']} "
            f"(success rate: {stats['robot']['operations']['pick']['success_rate']:.1f}%)",
            f"  Pick duration: {stats['robot']['operations']['pick']['duration']['mean']:.0f}ms",
            f"  Place operations: {stats['robot']['operations']['place']['count']} "
            f"(success rate: {stats['robot']['operations']['place']['success_rate']:.1f}%)",
            f"  Place duration: {stats['robot']['operations']['place']['duration']['mean']:.0f}ms",
            "",
            "MEMORY:",
            f"  Updates: {stats['memory']['updates']}",
            f"  Update time: {stats['memory']['update_time']['mean']:.1f}ms",
            "=" * 70,
        ]

        return "\n".join(lines)

    def reset(self) -> None:
        """Reset all metrics."""
        with self._lock:
            for timing_deque in self._timings.values():
                timing_deque.clear()

            for counter_name in self._counters:
                self._counters[counter_name] = 0

            self._start_time = datetime.now()
            self._last_frame_time = None

            self._logger.info("Performance metrics reset")

    def export_json(self, filepath: str) -> None:
        """
        Export metrics to JSON file.

        Args:
            filepath: Path to output file
        """
        stats = self.get_stats()
        with open(filepath, "w") as f:
            json.dump(stats, f, indent=2)

        self._logger.info(f"Metrics exported to {filepath}")

    # Private helper methods

    def _get_timing_stats(self, metric_name: str) -> Dict[str, float]:
        """Get timing statistics for a metric."""
        if metric_name not in self._timings:
            return TimingStats().to_dict()

        samples = list(self._timings[metric_name])
        return TimingStats.from_samples(samples).to_dict()

    def _calculate_fps(self) -> float:
        """Calculate current frames per second."""
        frame_times = list(self._timings["frame_capture"])
        if not frame_times:
            return 0.0

        # Use recent samples for more accurate instantaneous FPS
        recent_samples = frame_times[-10:] if len(frame_times) >= 10 else frame_times
        avg_time_ms = np.mean(recent_samples)

        if avg_time_ms > 0:
            return 1000.0 / avg_time_ms
        return 0.0

    def _calculate_success_rate(self, operation: str) -> float:
        """Calculate success rate for an operation."""
        total = self._counters.get(f"{operation}_operations", 0)
        if total == 0:
            return 0.0

        successes = self._counters.get(f"{operation}_successes", 0)
        return (successes / total) * 100.0

Functions

__init__(history_size=100, verbose=False)

Initialize performance metrics tracker.

Parameters:

Name Type Description Default
history_size int

Number of samples to keep for each metric

100
verbose bool

Enable verbose logging of metrics

False
Source code in robot_environment/performance_metrics.py
def __init__(self, history_size: int = 100, verbose: bool = False):
    """
    Initialize performance metrics tracker.

    Args:
        history_size: Number of samples to keep for each metric
        verbose: Enable verbose logging of metrics
    """
    self._history_size = history_size
    self._verbose = verbose
    self._logger = logging.getLogger(__name__)

    # Thread safety
    self._lock = threading.Lock()

    # Timing metrics (stored as deques for efficient rolling windows)
    self._timings: Dict[str, deque] = {
        # Camera operations
        "frame_capture": deque(maxlen=history_size),
        "frame_publish": deque(maxlen=history_size),
        # Vision operations
        "object_detection": deque(maxlen=history_size),
        "object_fetch_redis": deque(maxlen=history_size),
        # Memory operations
        "memory_update": deque(maxlen=history_size),
        "memory_get": deque(maxlen=history_size),
        "memory_clear": deque(maxlen=history_size),
        # Robot operations
        "robot_pick": deque(maxlen=history_size),
        "robot_place": deque(maxlen=history_size),
        "robot_push": deque(maxlen=history_size),
        "robot_move_observation": deque(maxlen=history_size),
        "robot_get_pose": deque(maxlen=history_size),
        # High-level operations
        "pick_place_total": deque(maxlen=history_size),
        "camera_loop_iteration": deque(maxlen=history_size),
        # Communication
        "redis_publish": deque(maxlen=history_size),
        "redis_fetch": deque(maxlen=history_size),
    }

    # Counter metrics
    self._counters: Dict[str, int] = {
        "frames_captured": 0,
        "objects_detected": 0,
        "pick_operations": 0,
        "place_operations": 0,
        "push_operations": 0,
        "pick_successes": 0,
        "place_successes": 0,
        "pick_failures": 0,
        "place_failures": 0,
        "memory_updates": 0,
        "memory_clears": 0,
    }

    # Start time for uptime tracking
    self._start_time = datetime.now()

    # Last values for rate calculations
    self._last_frame_time = None
    self._last_stats_time = perf_counter()

    if verbose:
        self._logger.setLevel(logging.DEBUG)

export_json(filepath)

Export metrics to JSON file.

Parameters:

Name Type Description Default
filepath str

Path to output file

required
Source code in robot_environment/performance_metrics.py
def export_json(self, filepath: str) -> None:
    """
    Export metrics to JSON file.

    Args:
        filepath: Path to output file
    """
    stats = self.get_stats()
    with open(filepath, "w") as f:
        json.dump(stats, f, indent=2)

    self._logger.info(f"Metrics exported to {filepath}")

get_stats()

Get comprehensive performance statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary containing all performance metrics and statistics

Source code in robot_environment/performance_metrics.py
def get_stats(self) -> Dict[str, Any]:
    """
    Get comprehensive performance statistics.

    Returns:
        Dictionary containing all performance metrics and statistics
    """
    with self._lock:
        # current_time = perf_counter()
        uptime_seconds = (datetime.now() - self._start_time).total_seconds()

        stats = {
            "uptime_seconds": uptime_seconds,
            "timestamp": datetime.now().isoformat(),
            # Camera metrics
            "camera": {
                "frames_captured": self._counters["frames_captured"],
                "fps": self._calculate_fps(),
                "frame_capture": self._get_timing_stats("frame_capture"),
                "frame_publish": self._get_timing_stats("frame_publish"),
                "loop_iteration": self._get_timing_stats("camera_loop_iteration"),
            },
            # Vision metrics
            "vision": {
                "objects_detected": self._counters["objects_detected"],
                "detection_time": self._get_timing_stats("object_detection"),
                "redis_fetch_time": self._get_timing_stats("object_fetch_redis"),
            },
            # Memory metrics
            "memory": {
                "updates": self._counters["memory_updates"],
                "clears": self._counters["memory_clears"],
                "update_time": self._get_timing_stats("memory_update"),
                "get_time": self._get_timing_stats("memory_get"),
            },
            # Robot operation metrics
            "robot": {
                "operations": {
                    "pick": {
                        "count": self._counters["pick_operations"],
                        "successes": self._counters["pick_successes"],
                        "failures": self._counters["pick_failures"],
                        "success_rate": self._calculate_success_rate("pick"),
                        "duration": self._get_timing_stats("robot_pick"),
                    },
                    "place": {
                        "count": self._counters["place_operations"],
                        "successes": self._counters["place_successes"],
                        "failures": self._counters["place_failures"],
                        "success_rate": self._calculate_success_rate("place"),
                        "duration": self._get_timing_stats("robot_place"),
                    },
                    "push": {
                        "count": self._counters["push_operations"],
                        "duration": self._get_timing_stats("robot_push"),
                    },
                },
                "movement": {
                    "observation_pose": self._get_timing_stats("robot_move_observation"),
                    "get_pose": self._get_timing_stats("robot_get_pose"),
                },
                "pick_place_total": self._get_timing_stats("pick_place_total"),
            },
            # Communication metrics
            "communication": {
                "redis_publish": self._get_timing_stats("redis_publish"),
                "redis_fetch": self._get_timing_stats("redis_fetch"),
            },
        }

        return stats

get_summary()

Get a human-readable summary of performance metrics.

Returns:

Type Description
str

Formatted string with key performance indicators

Source code in robot_environment/performance_metrics.py
def get_summary(self) -> str:
    """
    Get a human-readable summary of performance metrics.

    Returns:
        Formatted string with key performance indicators
    """
    stats = self.get_stats()

    lines = [
        "=" * 70,
        "PERFORMANCE METRICS SUMMARY",
        "=" * 70,
        f"Uptime: {stats['uptime_seconds']:.1f}s",
        "",
        "CAMERA:",
        f"  Frames captured: {stats['camera']['frames_captured']}",
        f"  Current FPS: {stats['camera']['fps']:.1f}",
        f"  Frame capture time: {stats['camera']['frame_capture']['mean']:.1f}ms "
        f"(p95: {stats['camera']['frame_capture']['p95']:.1f}ms)",
        "",
        "VISION:",
        f"  Objects detected: {stats['vision']['objects_detected']}",
        f"  Detection time: {stats['vision']['detection_time']['mean']:.1f}ms "
        f"(p95: {stats['vision']['detection_time']['p95']:.1f}ms)",
        "",
        "ROBOT OPERATIONS:",
        f"  Pick operations: {stats['robot']['operations']['pick']['count']} "
        f"(success rate: {stats['robot']['operations']['pick']['success_rate']:.1f}%)",
        f"  Pick duration: {stats['robot']['operations']['pick']['duration']['mean']:.0f}ms",
        f"  Place operations: {stats['robot']['operations']['place']['count']} "
        f"(success rate: {stats['robot']['operations']['place']['success_rate']:.1f}%)",
        f"  Place duration: {stats['robot']['operations']['place']['duration']['mean']:.0f}ms",
        "",
        "MEMORY:",
        f"  Updates: {stats['memory']['updates']}",
        f"  Update time: {stats['memory']['update_time']['mean']:.1f}ms",
        "=" * 70,
    ]

    return "\n".join(lines)

increment_counter(counter_name, amount=1)

Increment a counter metric.

Parameters:

Name Type Description Default
counter_name str

Name of the counter

required
amount int

Amount to increment by (default: 1)

1
Source code in robot_environment/performance_metrics.py
def increment_counter(self, counter_name: str, amount: int = 1) -> None:
    """
    Increment a counter metric.

    Args:
        counter_name: Name of the counter
        amount: Amount to increment by (default: 1)
    """
    with self._lock:
        if counter_name in self._counters:
            self._counters[counter_name] += amount
        else:
            self._logger.warning(f"Unknown counter: {counter_name}")

record_frame_captured(duration_ms)

Record a frame capture event.

Source code in robot_environment/performance_metrics.py
def record_frame_captured(self, duration_ms: float) -> None:
    """Record a frame capture event."""
    self.record_timing("frame_capture", duration_ms)
    self.increment_counter("frames_captured")
    self._last_frame_time = perf_counter()

record_memory_update(duration_ms, objects_added, objects_updated)

Record a memory update operation.

Source code in robot_environment/performance_metrics.py
def record_memory_update(self, duration_ms: float, objects_added: int, objects_updated: int) -> None:
    """Record a memory update operation."""
    self.record_timing("memory_update", duration_ms)
    self.increment_counter("memory_updates")

record_objects_detected(count, detection_time_ms)

Record object detection results.

Source code in robot_environment/performance_metrics.py
def record_objects_detected(self, count: int, detection_time_ms: float) -> None:
    """Record object detection results."""
    self.record_timing("object_detection", detection_time_ms)
    self.increment_counter("objects_detected", count)

record_pick_operation(duration_s, success)

Record a pick operation.

Source code in robot_environment/performance_metrics.py
def record_pick_operation(self, duration_s: float, success: bool) -> None:
    """Record a pick operation."""
    self.record_timing("robot_pick", duration_s * 1000)
    self.increment_counter("pick_operations")
    if success:
        self.increment_counter("pick_successes")
    else:
        self.increment_counter("pick_failures")

record_place_operation(duration_s, success)

Record a place operation.

Source code in robot_environment/performance_metrics.py
def record_place_operation(self, duration_s: float, success: bool) -> None:
    """Record a place operation."""
    self.record_timing("robot_place", duration_s * 1000)
    self.increment_counter("place_operations")
    if success:
        self.increment_counter("place_successes")
    else:
        self.increment_counter("place_failures")

record_timing(metric_name, duration_ms)

Record a timing measurement.

Parameters:

Name Type Description Default
metric_name str

Name of the metric

required
duration_ms float

Duration in milliseconds

required
Source code in robot_environment/performance_metrics.py
def record_timing(self, metric_name: str, duration_ms: float) -> None:
    """
    Record a timing measurement.

    Args:
        metric_name: Name of the metric
        duration_ms: Duration in milliseconds
    """
    with self._lock:
        if metric_name in self._timings:
            self._timings[metric_name].append(duration_ms)

            if self._verbose and len(self._timings[metric_name]) % 10 == 0:
                stats = TimingStats.from_samples(list(self._timings[metric_name]))
                self._logger.debug(
                    f"{metric_name}: {duration_ms:.1f}ms " f"(avg: {stats.mean:.1f}ms, p95: {stats.p95:.1f}ms)"
                )
        else:
            # FIX: Create the metric on-the-fly if it doesn't exist
            from collections import deque

            self._timings[metric_name] = deque([duration_ms], maxlen=self._history_size)
            self._logger.debug(f"Created new timing metric: {metric_name}")

reset()

Reset all metrics.

Source code in robot_environment/performance_metrics.py
def reset(self) -> None:
    """Reset all metrics."""
    with self._lock:
        for timing_deque in self._timings.values():
            timing_deque.clear()

        for counter_name in self._counters:
            self._counters[counter_name] = 0

        self._start_time = datetime.now()
        self._last_frame_time = None

        self._logger.info("Performance metrics reset")

timer(metric_name)

Create a context manager timer for an operation.

Parameters:

Name Type Description Default
metric_name str

Name of the metric to record

required

Returns:

Type Description
PerformanceTimer

PerformanceTimer context manager

Example

with metrics.timer('frame_capture'): frame = camera.get_frame()

Source code in robot_environment/performance_metrics.py
def timer(self, metric_name: str) -> PerformanceTimer:
    """
    Create a context manager timer for an operation.

    Args:
        metric_name: Name of the metric to record

    Returns:
        PerformanceTimer context manager

    Example:
        with metrics.timer('frame_capture'):
            frame = camera.get_frame()
    """
    return PerformanceTimer(self, metric_name)

robot_environment.performance_metrics.PerformanceMonitor

Background monitor that periodically logs performance metrics.

Example

metrics = PerformanceMetrics() monitor = PerformanceMonitor(metrics, interval_seconds=30) monitor.start()

... do work ...

monitor.stop()

Source code in robot_environment/performance_metrics.py
class PerformanceMonitor:
    """
    Background monitor that periodically logs performance metrics.

    Example:
        metrics = PerformanceMetrics()
        monitor = PerformanceMonitor(metrics, interval_seconds=30)
        monitor.start()

        # ... do work ...

        monitor.stop()
    """

    def __init__(self, metrics: PerformanceMetrics, interval_seconds: float = 60.0, verbose: bool = True):
        """
        Initialize performance monitor.

        Args:
            metrics: PerformanceMetrics instance to monitor
            interval_seconds: Logging interval in seconds
            verbose: Enable verbose logging
        """
        self.metrics = metrics
        self.interval_seconds = interval_seconds
        self.verbose = verbose
        self._logger = logging.getLogger(__name__)

        self._stop_event = threading.Event()
        self._thread: Optional[threading.Thread] = None

    def start(self) -> None:
        """Start the monitoring thread."""
        if self._thread is not None and self._thread.is_alive():
            self._logger.warning("Monitor already running")
            return

        self._stop_event.clear()
        self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
        self._thread.start()

        self._logger.info(f"Performance monitor started (interval: {self.interval_seconds}s)")

    def stop(self) -> None:
        """Stop the monitoring thread."""
        if self._thread is None:
            return

        self._stop_event.set()
        self._thread.join(timeout=5.0)

        self._logger.info("Performance monitor stopped")

    def _monitor_loop(self) -> None:
        """Main monitoring loop."""
        while not self._stop_event.wait(self.interval_seconds):
            try:
                if self.verbose:
                    summary = self.metrics.get_summary()
                    self._logger.info(f"\n{summary}")
                else:
                    stats = self.metrics.get_stats()
                    self._logger.info(
                        f"Performance: FPS={stats['camera']['fps']:.1f}, "
                        f"Pick rate={stats['robot']['operations']['pick']['success_rate']:.0f}%, "
                        f"Objects={stats['vision']['objects_detected']}"
                    )
            except Exception as e:
                self._logger.error(f"Error in monitor loop: {e}", exc_info=True)

Functions

__init__(metrics, interval_seconds=60.0, verbose=True)

Initialize performance monitor.

Parameters:

Name Type Description Default
metrics PerformanceMetrics

PerformanceMetrics instance to monitor

required
interval_seconds float

Logging interval in seconds

60.0
verbose bool

Enable verbose logging

True
Source code in robot_environment/performance_metrics.py
def __init__(self, metrics: PerformanceMetrics, interval_seconds: float = 60.0, verbose: bool = True):
    """
    Initialize performance monitor.

    Args:
        metrics: PerformanceMetrics instance to monitor
        interval_seconds: Logging interval in seconds
        verbose: Enable verbose logging
    """
    self.metrics = metrics
    self.interval_seconds = interval_seconds
    self.verbose = verbose
    self._logger = logging.getLogger(__name__)

    self._stop_event = threading.Event()
    self._thread: Optional[threading.Thread] = None

start()

Start the monitoring thread.

Source code in robot_environment/performance_metrics.py
def start(self) -> None:
    """Start the monitoring thread."""
    if self._thread is not None and self._thread.is_alive():
        self._logger.warning("Monitor already running")
        return

    self._stop_event.clear()
    self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
    self._thread.start()

    self._logger.info(f"Performance monitor started (interval: {self.interval_seconds}s)")

stop()

Stop the monitoring thread.

Source code in robot_environment/performance_metrics.py
def stop(self) -> None:
    """Stop the monitoring thread."""
    if self._thread is None:
        return

    self._stop_event.set()
    self._thread.join(timeout=5.0)

    self._logger.info("Performance monitor stopped")