Skip to content

Environment

robot_environment.environment.Environment

Environment class for robotic pick-and-place operations.

Coordinates between: - Robot control - Vision system - Workspace management - Object memory tracking

Source code in robot_environment/environment.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
class Environment:
    """
    Environment class for robotic pick-and-place operations.

    Coordinates between:
    - Robot control
    - Vision system
    - Workspace management
    - Object memory tracking
    """

    # *** CONSTRUCTORS ***
    def __init__(
        self,
        el_api_key: str,
        use_simulation: bool,
        robot_id: str,
        verbose: bool = False,
        start_camera_thread: bool = True,
        enable_performance_monitoring: bool = True,
        performance_log_interval: float = 60.0,
    ):
        """
        Creates environment object.

        Args:
            el_api_key: ElevenLabs API Key for text-to-speech
            use_simulation: If True, simulate the robot, else use real robot
            robot_id: Robot identifier ("niryo" or "widowx")
            verbose: Enable verbose logging
            start_camera_thread: If True, start camera update thread
                                Set to False for MCP server!
            enable_performance_monitoring: Enable performance metrics tracking
            performance_log_interval: Interval in seconds for performance logging
        """
        self._use_simulation = use_simulation
        self._verbose = verbose
        self._logger = get_package_logger(__name__, verbose)

        self._logger.info("Initializing Environment")
        self._logger.debug(
            f"Configuration: simulation={use_simulation}, "
            f"robot_id={robot_id}, camera_thread={start_camera_thread}, "
            f"metrics={enable_performance_monitoring}"
        )

        # Initialize performance metrics
        self._metrics = PerformanceMetrics(history_size=100, verbose=verbose) if enable_performance_monitoring else None

        self._performance_monitor: Optional[PerformanceMonitor] = None
        if enable_performance_monitoring:
            self._performance_monitor = PerformanceMonitor(
                self._metrics, interval_seconds=performance_log_interval, verbose=verbose
            )

        # Initialize robot (must come before framegrabber and workspaces)
        self._robot = Robot(self, use_simulation, robot_id, verbose)

        # Initialize robot-specific components
        if isinstance(self.get_robot_controller(), NiryoRobotController):
            self._framegrabber = NiryoFrameGrabber(self, verbose=verbose)
            self._workspaces = NiryoWorkspaces(self, verbose)
            self._logger.debug(f"Home workspace: {self._workspaces.get_home_workspace()}")
        elif isinstance(self.get_robot_controller(), WidowXRobotController):
            self._framegrabber = WidowXFrameGrabber(self, verbose=verbose)
            self._workspaces = WidowXWorkspaces(self, verbose)
        else:
            self._logger.error(f"Unknown robot controller type: {self.get_robot_controller()}")

        # Initialize text-to-speech
        self._oralcom = Text2Speech(
            el_api_key,
            verbose=verbose,
            enable_queue=True,  # Enable built-in audio queue
            max_queue_size=50,
            duplicate_timeout=2.0,
        )

        # Thread control
        self._stop_event = threading.Event()

        # Initialize ObjectMemoryManager
        self._memory_manager = ObjectMemoryManager(manual_update_timeout=5.0, position_tolerance=0.05, verbose=verbose)

        # Initialize workspaces in memory manager
        if hasattr(self._workspaces, "__iter__"):
            try:
                for workspace in self._workspaces:
                    workspace_id = workspace.id()
                    self._memory_manager.initialize_workspace(workspace_id)
                    self._logger.debug(f"Initialized memory for workspace: {workspace_id}")
            except Exception as e:
                self._logger.warning(f"Could not iterate workspaces: {e}")
                if hasattr(self._workspaces, "get_workspace_home_id"):
                    default_ws_id = self._workspaces.get_workspace_home_id()
                    self._memory_manager.initialize_workspace(default_ws_id)

        # Current workspace tracking
        # self._current_workspace_id: Optional[str] = None
        if self._workspaces is not None:
            self._current_workspace_id = self._workspaces.get_workspace_home_id()
            self._logger.debug(f"Set initial workspace to: {self._current_workspace_id}")

        # Redis-based communication
        self._object_broker = RedisMessageBroker()
        self._label_manager = RedisLabelManager()

        # Verify Redis connection
        try:
            if hasattr(self._object_broker, "test_connection") and not self._object_broker.test_connection():
                self._logger.warning("Could not connect to Redis server. Object detection will not work.")
        except Exception as e:
            self._logger.warning(f"Error checking Redis connection: {e}")

        # Start performance monitor if enabled
        if self._performance_monitor:
            self._performance_monitor.start()
            self._logger.info("Performance monitoring started")

        # Start camera thread if requested
        if start_camera_thread:
            self._logger.info("Starting camera update thread...")
            self.start_camera_updates(visualize=False)
        else:
            self._logger.info("Camera thread disabled (manual control)")

    def __del__(self):
        """Destructor."""
        if hasattr(self, "_stop_event"):
            self._logger.debug("Shutting down environment in destructor...")
            self._stop_event.set()

        if hasattr(self, "_performance_monitor") and self._performance_monitor:
            self._performance_monitor.stop()

    def cleanup(self):
        """
        Explicit cleanup method - call when done with the object.
        More reliable than relying on __del__.
        """
        if hasattr(self, "_stop_event"):
            self._logger.info("Shutting down environment...")
            self._stop_event.set()

        if hasattr(self, "_performance_monitor") and self._performance_monitor:
            self._performance_monitor.stop()

        if hasattr(self, "_oralcom"):
            self._oralcom.shutdown(timeout=5.0)

    # *** PUBLIC METHODS ***

    def start_camera_updates(self, visualize: bool = False) -> threading.Thread:
        """
        Start the background camera update thread.

        Args:
            visualize: If True, show the camera feed (requires GUI).

        Returns:
            The started threading.Thread object.
        """

        def loop():
            for _ in self.update_camera_and_objects(visualize=visualize):
                pass

        t = threading.Thread(target=loop, daemon=True)
        t.start()
        return t

    def update_camera_and_objects(self, visualize: bool = False):
        """
        Continuously updates the camera and detected objects.

        Args:
            visualize: If True, displays the updated camera feed
        """
        if self._workspaces is None:
            self._logger.error("Workspaces not initialized - cannot update camera and objects")
            return

        # FIX: Get home workspace ID and set it as current
        home_workspace_id = self._workspaces.get_workspace_home_id()

        if self._current_workspace_id is None:
            self._current_workspace_id = home_workspace_id  # Set before moving

        self.robot_move2observation_pose(home_workspace_id)

        while not self._stop_event.is_set():
            loop_start = time.perf_counter()

            # Get current frame with timing
            if self._metrics:
                with self._metrics.timer("frame_capture"):
                    img = self.get_current_frame()
            else:
                img = self.get_current_frame()

            time.sleep(0.1)

            # Get detected objects from Redis
            # Get detected objects from Redis with timing
            if self._metrics:
                with self._metrics.timer("object_fetch_redis"):
                    detected_objects = self.get_detected_objects()

                # Record object count
                self._metrics.increment_counter("objects_detected", len(detected_objects))
            else:
                detected_objects = self.get_detected_objects()

            # Update memory using ObjectMemoryManager
            if self._current_workspace_id:  # This should now always be True
                at_observation = self.is_any_workspace_visible()
                robot_moving = self.get_robot_in_motion()

                if self._metrics:
                    mem_start = time.perf_counter()

                objects_added, objects_updated = self._memory_manager.update(
                    workspace_id=self._current_workspace_id,
                    detected_objects=detected_objects,
                    at_observation_pose=at_observation,
                    robot_in_motion=robot_moving,
                )

                if self._metrics:
                    mem_duration = (time.perf_counter() - mem_start) * 1000
                    self._metrics.record_memory_update(mem_duration, objects_added, objects_updated)

                self._logger.debug(
                    f"Memory update for '{self._current_workspace_id}': " f"added={objects_added}, updated={objects_updated}"
                )
            else:
                # This should never happen now, but log it if it does
                self._logger.error("Current workspace ID is None - memory not updated!")

            # Record loop iteration time
            if self._metrics:
                loop_duration = (time.perf_counter() - loop_start) * 1000
                self._metrics.record_timing("camera_loop_iteration", loop_duration)

            # Log memory stats
            if self._verbose:
                stats = self._memory_manager.get_memory_stats()
                if self._current_workspace_id in stats:
                    ws_stats = stats[self._current_workspace_id]
                    self._logger.debug(
                        f"Memory: {ws_stats['object_count']} objects, "
                        f"manual_updates={ws_stats['manual_updates']}, "
                        f"visible={ws_stats['visible']}\n"
                    )

            yield img

            if self.get_robot_in_motion():
                time.sleep(0.05)
            else:
                time.sleep(0.05)

    def clear_memory(self) -> None:
        """
        Manually clear all objects from memory.
        Useful when workspace has changed significantly.
        """
        self._logger.warning("Clearing memory of all objects")

        if self._metrics:
            with self._metrics.timer("memory_clear"):
                self._memory_manager.clear()
            self._metrics.increment_counter("memory_clears")
        else:
            self._memory_manager.clear()

    def get_detected_objects_from_memory(self) -> Objects:
        """
        Get a copy of the object memory for current workspace.

        Returns:
            Objects: Copy of objects currently in memory
        """
        if self._metrics:
            with self._metrics.timer("memory_get"):
                result = self._get_memory_internal()
            return result
        else:
            return self._get_memory_internal()

    def _get_memory_internal(self) -> Objects:
        """Internal method for getting memory."""
        if self._current_workspace_id:
            return self._memory_manager.get(self._current_workspace_id)

        home_ws_id = self._workspaces.get_workspace_home_id()
        return self._memory_manager.get(home_ws_id)

    def remove_object_from_memory(self, object_label: str, coordinate: List[float]) -> None:
        """
        Remove an object from memory after manipulation.

        Args:
            object_label: Label of the object to remove
            coordinate: Last known coordinate [x, y]
        """
        if not self._current_workspace_id:
            self._logger.warning("No current workspace set")
            return

        self._memory_manager.remove_object(
            workspace_id=self._current_workspace_id, object_label=object_label, coordinate=coordinate
        )

    def update_object_in_memory(self, object_label: str, old_coordinate: List[float], new_pose: "PoseObjectPNP") -> None:
        """
        Update an object's position in memory after movement.

        Args:
            object_label: Label of the object
            old_coordinate: Previous coordinate [x, y]
            new_pose: New pose after movement
        """
        if not self._current_workspace_id:
            self._logger.warning("No current workspace set")
            return

        self._memory_manager.mark_manual_update(
            workspace_id=self._current_workspace_id,
            object_label=object_label,
            old_coordinate=old_coordinate,
            new_pose=new_pose,
        )

    # Performance metrics methods

    def get_performance_metrics(self) -> Optional[PerformanceMetrics]:
        """
        Get the performance metrics tracker.

        Returns:
            PerformanceMetrics instance or None if disabled
        """
        return self._metrics

    def get_performance_stats(self) -> Optional[Dict]:
        """
        Get current performance statistics.

        Returns:
            Dictionary with performance stats or None if disabled
        """
        if self._metrics:
            return self._metrics.get_stats()
        return None

    def print_performance_summary(self) -> None:
        """Print a human-readable performance summary."""
        if self._metrics:
            print(self._metrics.get_summary())
        else:
            print("Performance monitoring is disabled")

    def export_performance_metrics(self, filepath: str) -> None:
        """
        Export performance metrics to JSON file.

        Args:
            filepath: Path to output file
        """
        if self._metrics:
            self._metrics.export_json(filepath)
            self._logger.info(f"Performance metrics exported to {filepath}")
        else:
            self._logger.warning("Performance monitoring is disabled")

    def reset_performance_metrics(self) -> None:
        """Reset all performance metrics."""
        if self._metrics:
            self._metrics.reset()
            self._logger.info("Performance metrics reset")

    # Multi-workspace memory methods

    def get_current_workspace_id(self) -> Optional[str]:
        """Get the ID of the currently observed workspace."""
        return self._current_workspace_id

    def set_current_workspace(self, workspace_id: str) -> None:
        """Set the current workspace being observed."""
        self._current_workspace_id = workspace_id
        self._logger.debug(f"Current workspace set to: {workspace_id}")

    def get_detected_objects_from_workspace(self, workspace_id: str) -> Objects:
        """
        Get objects from a specific workspace memory.

        Args:
            workspace_id: ID of the workspace

        Returns:
            Objects: Copy of objects in that workspace's memory
        """
        return self._memory_manager.get(workspace_id)

    def get_all_workspace_objects(self) -> Dict[str, Objects]:
        """
        Get objects from all workspaces.

        Returns:
            Dict mapping workspace_id to Objects collection
        """
        return self._memory_manager.get_all()

    def clear_workspace_memory(self, workspace_id: str) -> None:
        """Clear memory for a specific workspace."""
        self._memory_manager.clear(workspace_id)

    def remove_object_from_workspace(self, workspace_id: str, object_label: str, coordinate: list) -> None:
        """Remove an object from a specific workspace's memory."""
        self._memory_manager.remove_object(workspace_id=workspace_id, object_label=object_label, coordinate=coordinate)

    def update_object_in_workspace(
        self, source_workspace_id: str, target_workspace_id: str, object_label: str, old_coordinate: list, new_coordinate: list
    ) -> None:
        """
        Move an object from one workspace to another in memory.

        Args:
            source_workspace_id: ID of workspace where object currently is
            target_workspace_id: ID of workspace where object will be placed
            object_label: Label of the object
            old_coordinate: Current coordinate in source workspace
            new_coordinate: New coordinate in target workspace
        """
        self._memory_manager.move_object(
            source_workspace_id=source_workspace_id,
            target_workspace_id=target_workspace_id,
            object_label=object_label,
            old_coordinate=old_coordinate,
            new_coordinate=new_coordinate,
        )

    # *** PUBLIC SET METHODS ***

    def add_object_name2object_labels(self, object_name: str) -> str:
        """
        Add a new object to the list of recognizable objects via Redis.

        Args:
            object_name: Name of the object to add

        Returns:
            str: Status message
        """
        # Add label to Redis stream
        success = self._label_manager.add_label(object_name)

        if success:
            mymessage = f"Added {object_name} to the list of recognizable objects."
        else:
            mymessage = f"{object_name} is already in the list of recognizable objects."

        # Provide audio feedback
        thread_oral = self._oralcom.call_text2speech_async(mymessage)
        thread_oral.join()

        return mymessage

    def stop_camera_updates(self) -> None:
        """Stop camera update thread."""
        self._stop_event.set()

    def oralcom_call_text2speech_async(self, text: str, priority: int = 0) -> bool:
        """
        Asynchronously call text-to-speech API.

        Args:
            text: Message for text-to-speech
            priority: Priority (0-10, higher = more urgent)

        Returns:
            True if queued successfully (or dummy thread for compatibility)
        """
        return self._oralcom.speak(text, priority=priority, blocking=False)

    # *** PUBLIC GET METHODS ***

    def get_largest_free_space_with_center(self, workspace_id: Optional[str] = None) -> Tuple[float, float, float]:
        """
        Determines the largest free space in the workspace in square metres and its center coordinate in metres.
        This method can be used to determine at which location an object can be placed safely.

        Args:
            workspace_id: Optional ID of the workspace to analyze. If None, the home workspace is used.

        Returns:
            tuple: (largest_free_area_m2, center_x, center_y) where:
                - largest_area_m2 (float): Largest free area in square meters.
                - center_x (float): X-coordinate of the center of the largest free area in meters.
                - center_y (float): Y-coordinate of the center of the largest free area in meters.
        """
        if workspace_id:
            workspace = self.get_workspace_by_id(workspace_id)
        else:
            workspace = self.get_workspace(0)

        if workspace is None:
            self._logger.error(f"Workspace not found: {workspace_id}")
            return 0.0, 0.0, 0.0

        detected_objects = self.get_detected_objects()

        return calculate_largest_free_space(
            workspace=workspace, detected_objects=detected_objects, visualize=self.verbose, logger=self._logger
        )

    def get_workspace_coordinate_from_point(self, workspace_id: str, point: str) -> Optional[List[float]]:
        """
        Get the world coordinate of a special point of the given workspace.

        Args:
            workspace_id (str): ID of workspace.
            point (str): description of point. Possible values are:
            - 'upper left corner': Returns the world coordinate of the upper left corner of the workspace.
            - 'upper right corner': Returns the world coordinate of the upper right corner of the workspace.
            - 'lower left corner': Returns the world coordinate of the lower left corner of the workspace.
            - 'lower right corner': Returns the world coordinate of the lower right corner of the workspace.
            - 'center point': Returns the world coordinate of the center of the workspace.

        Returns:
            List[float]: (x,y) world coordinate of the point on the workspace that was specified by the argument point.
        """
        if point == "upper left corner":
            return self.get_workspace_by_id(workspace_id).xy_ul_wc().xy_coordinate()
        elif point == "upper right corner":
            return self.get_workspace_by_id(workspace_id).xy_ur_wc().xy_coordinate()
        elif point == "lower left corner":
            return self.get_workspace_by_id(workspace_id).xy_ll_wc().xy_coordinate()
        elif point == "lower right corner":
            return self.get_workspace_by_id(workspace_id).xy_lr_wc().xy_coordinate()
        elif point == "center point":
            return self.get_workspace_by_id(workspace_id).xy_center_wc().xy_coordinate()
        else:
            self._logger.error(f"Unknown point type: {point}")
            return None

    # GET methods from Workspaces

    def get_workspace(self, index: int = 0) -> Workspace:
        """
        Return the workspace at the given position index in the list of workspaces.

        Args:
            index: 0-based index in the list of workspaces.

        Returns:

        """
        return self._workspaces.get_workspace(index)

    def get_workspace_by_id(self, workspace_id: str) -> Optional[Workspace]:
        """
        Return the Workspace object with the given id, if existent, else None is returned.

        Args:
            id: workspace ID

        Returns:
            Workspace or None, if no workspace with the given id exists.
        """
        return self._workspaces.get_workspace_by_id(workspace_id)

    def get_workspace_home_id(self) -> str:
        """
        Returns the ID of the workspace at index 0.

        Returns:
            the ID of the workspace at index 0.
        """
        return self._workspaces.get_workspace_home_id()

    def get_workspace_id(self, index: int) -> str:
        """
        Return the id of the workspace at the given position index in the list of workspaces.

        Args:
            index: 0-based index in the list of workspaces.

        Returns:
            str: id of the workspace at the given position index in the list of workspaces.
        """
        return self._workspaces.get_workspace_id(index)

    @log_start_end_cls()
    def get_visible_workspace(self, camera_pose: PoseObjectPNP) -> Workspace:
        """Get visible workspace from camera pose."""
        return self._workspaces.get_visible_workspace(camera_pose)

    def is_any_workspace_visible(self) -> bool:
        """Check if any workspace is currently visible."""
        pose = self.get_robot_pose()
        return self.get_visible_workspace(pose) is not None

    def get_observation_pose(self, workspace_id: str) -> PoseObjectPNP:
        """
        Return the observation pose of the given workspace id

        Args:
            workspace_id: id of the workspace

        Returns:
            PoseObjectPNP: observation pose of the gripper where it can observe the workspace given by workspace_id
        """
        return self._workspaces.get_observation_pose(workspace_id)

    # Camera-related GET methods

    def get_current_frame(self) -> np.ndarray:
        """
        Capture image from robot's camera.

        Returns:
            numpy.ndarray: Camera image
        """
        frame = self._framegrabber.get_current_frame()

        if self._metrics and frame is not None:
            self._metrics.increment_counter("frames_captured")

        return frame

    def get_current_frame_width_height(self) -> tuple[int, int]:
        """
        Returns width and height of current frame in pixels.

        Returns:
            width and height of current frame in pixels.
        """
        return self._framegrabber.get_current_frame_width_height()

    # Robot-related GET methods

    def get_robot_controller(self) -> RobotController:
        """

        Returns:
            RobotController: object that controls the robot.
        """
        return self._robot.robot()

    @log_start_end_cls()
    def get_robot_in_motion(self) -> bool:
        """
        Check if robot is in motion.

        Returns:
            bool: True if robot is moving, False otherwise
        """
        return self._robot.robot_in_motion()

    def get_robot_pose(self) -> PoseObjectPNP:
        """
        Get current pose of gripper of robot.

        Returns:
            current pose of gripper of robot.
        """
        if self._metrics:
            with self._metrics.timer("robot_get_pose"):
                return self._robot.get_pose()
        else:
            return self._robot.get_pose()

    @log_start_end_cls()
    def get_robot_target_pose_from_rel(self, workspace_id: str, u_rel: float, v_rel: float, yaw: float) -> PoseObjectPNP:
        """
        Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw),
        calculate the corresponding pose in world coordinates. The parameter yaw is useful, if we want to pick at the
        given coordinate an object that has the given orientation. For this method to work, it is important that
        only the workspace of the robot is visible in the image and nothing else. At least for the Niryo robot
        this is important. This means, (u_rel, v_rel) = (0, 0), is the upper left corner of the workspace.

        Args:
            workspace_id: id of the workspace
            u_rel: horizontal coordinate in image of workspace, normalized between 0 and 1
            v_rel: vertical coordinate in image of workspace, normalized between 0 and 1
            yaw: orientation of an object at the pixel coordinates [u_rel, v_rel].

        Returns:
            PoseObjectPNP: Pose in world coordinates
        """
        return self._robot.get_target_pose_from_rel(workspace_id, u_rel, v_rel, yaw)

    # Vision-related GET methods

    def get_object_labels_as_string(self) -> str:
        """
        Return detectable object labels as comma-separated string.

        Returns:
            str: Comma-separated list of objects
        """
        object_labels = self.get_object_labels()

        if not object_labels or not object_labels[0]:
            return "No detectable objects configured."

        return f"I can recognize these objects: {', '.join(object_labels[0])}"

    def get_detected_objects(self) -> Objects:
        """
        Get detected objects from Redis stream.

        Returns:
            Objects: Collection of detected objects
        """
        # Get latest objects from Redis (published by vision_detect_segment)
        objects_dict_list = self._object_broker.get_latest_objects(max_age_seconds=2.0)

        if not objects_dict_list:
            if self.verbose:
                self._logger.info(
                    "No fresh object detections from Redis. "
                    "Ensure 'vision_detect_segment' is running and publishing to the correct channel."
                )
            return Objects()

        # Convert dictionaries to Object instances
        # Use current workspace for coordinate transformation if available, else use home workspace
        workspace = (
            self.get_workspace_by_id(self._current_workspace_id) if self._current_workspace_id else self.get_workspace(0)
        )
        return Objects.dict_list_to_objects(objects_dict_list, workspace)

    def get_object_labels(self) -> List[List[str]]:
        """
        Get list of detectable object labels from Redis.

        Returns:
            List of lists of detectable strings
        """
        # Get latest labels from Redis (published by vision_detect_segment)
        labels = self._label_manager.get_latest_labels(timeout_seconds=60.0)

        if labels is None:
            if self.verbose:
                print("No labels from Redis, using empty list")
            return [[]]

        return [labels]

    # Robot control methods

    def robot_move2home_observation_pose(self) -> None:
        """Move robot to home workspace observation pose."""
        workspace_id = self.get_workspace_home_id()
        self.robot_move2observation_pose(workspace_id)

    def robot_move2observation_pose(self, workspace_id: str) -> None:
        """
        Move robot to observation pose for given workspace.

        Args:
            workspace_id: ID of workspace
        """
        if self._metrics:
            with self._metrics.timer("robot_move_observation"):
                self._robot.move2observation_pose(workspace_id)
        else:
            self._robot.move2observation_pose(workspace_id)

        self._current_workspace_id = workspace_id
        self._logger.debug(f"Set current workspace to: {workspace_id}")

    # *** PUBLIC STATIC/CLASS GET methods ***

    # *** PRIVATE methods ***

    # *** PUBLIC properties ***

    def workspaces(self) -> Workspaces:
        """Return workspaces object."""
        return self._workspaces

    def framegrabber(self) -> FrameGrabber:
        """Return framegrabber object."""
        return self._framegrabber

    def robot(self) -> Robot:
        """Return robot object."""
        return self._robot

    def use_simulation(self) -> bool:
        """Check if using simulation."""
        return self._use_simulation

    def metrics(self) -> Optional[PerformanceMetrics]:
        """Get performance metrics tracker."""
        return self._metrics

    @property
    def verbose(self) -> bool:
        """Check if verbose logging enabled."""
        return self._logger.isEnabledFor(logging.DEBUG)

    @verbose.setter
    def verbose(self, value: bool):
        """Set verbose logging on/off."""
        from .common.logger_config import set_verbose

        set_verbose(self._logger, value)

    # *** PRIVATE VARIABLES ***

    # Workspaces object
    _workspaces = None

    # FrameGraber object
    _framegrabber = None

    # Robot object
    _robot = None
    _use_simulation = False
    _verbose = False
    _memory_manager: Optional[ObjectMemoryManager] = None
    _current_workspace_id: Optional[str] = None

Attributes

verbose property writable

Check if verbose logging enabled.

Functions

__del__()

Destructor.

Source code in robot_environment/environment.py
def __del__(self):
    """Destructor."""
    if hasattr(self, "_stop_event"):
        self._logger.debug("Shutting down environment in destructor...")
        self._stop_event.set()

    if hasattr(self, "_performance_monitor") and self._performance_monitor:
        self._performance_monitor.stop()

__init__(el_api_key, use_simulation, robot_id, verbose=False, start_camera_thread=True, enable_performance_monitoring=True, performance_log_interval=60.0)

Creates environment object.

Parameters:

Name Type Description Default
el_api_key str

ElevenLabs API Key for text-to-speech

required
use_simulation bool

If True, simulate the robot, else use real robot

required
robot_id str

Robot identifier ("niryo" or "widowx")

required
verbose bool

Enable verbose logging

False
start_camera_thread bool

If True, start camera update thread Set to False for MCP server!

True
enable_performance_monitoring bool

Enable performance metrics tracking

True
performance_log_interval float

Interval in seconds for performance logging

60.0
Source code in robot_environment/environment.py
def __init__(
    self,
    el_api_key: str,
    use_simulation: bool,
    robot_id: str,
    verbose: bool = False,
    start_camera_thread: bool = True,
    enable_performance_monitoring: bool = True,
    performance_log_interval: float = 60.0,
):
    """
    Creates environment object.

    Args:
        el_api_key: ElevenLabs API Key for text-to-speech
        use_simulation: If True, simulate the robot, else use real robot
        robot_id: Robot identifier ("niryo" or "widowx")
        verbose: Enable verbose logging
        start_camera_thread: If True, start camera update thread
                            Set to False for MCP server!
        enable_performance_monitoring: Enable performance metrics tracking
        performance_log_interval: Interval in seconds for performance logging
    """
    self._use_simulation = use_simulation
    self._verbose = verbose
    self._logger = get_package_logger(__name__, verbose)

    self._logger.info("Initializing Environment")
    self._logger.debug(
        f"Configuration: simulation={use_simulation}, "
        f"robot_id={robot_id}, camera_thread={start_camera_thread}, "
        f"metrics={enable_performance_monitoring}"
    )

    # Initialize performance metrics
    self._metrics = PerformanceMetrics(history_size=100, verbose=verbose) if enable_performance_monitoring else None

    self._performance_monitor: Optional[PerformanceMonitor] = None
    if enable_performance_monitoring:
        self._performance_monitor = PerformanceMonitor(
            self._metrics, interval_seconds=performance_log_interval, verbose=verbose
        )

    # Initialize robot (must come before framegrabber and workspaces)
    self._robot = Robot(self, use_simulation, robot_id, verbose)

    # Initialize robot-specific components
    if isinstance(self.get_robot_controller(), NiryoRobotController):
        self._framegrabber = NiryoFrameGrabber(self, verbose=verbose)
        self._workspaces = NiryoWorkspaces(self, verbose)
        self._logger.debug(f"Home workspace: {self._workspaces.get_home_workspace()}")
    elif isinstance(self.get_robot_controller(), WidowXRobotController):
        self._framegrabber = WidowXFrameGrabber(self, verbose=verbose)
        self._workspaces = WidowXWorkspaces(self, verbose)
    else:
        self._logger.error(f"Unknown robot controller type: {self.get_robot_controller()}")

    # Initialize text-to-speech
    self._oralcom = Text2Speech(
        el_api_key,
        verbose=verbose,
        enable_queue=True,  # Enable built-in audio queue
        max_queue_size=50,
        duplicate_timeout=2.0,
    )

    # Thread control
    self._stop_event = threading.Event()

    # Initialize ObjectMemoryManager
    self._memory_manager = ObjectMemoryManager(manual_update_timeout=5.0, position_tolerance=0.05, verbose=verbose)

    # Initialize workspaces in memory manager
    if hasattr(self._workspaces, "__iter__"):
        try:
            for workspace in self._workspaces:
                workspace_id = workspace.id()
                self._memory_manager.initialize_workspace(workspace_id)
                self._logger.debug(f"Initialized memory for workspace: {workspace_id}")
        except Exception as e:
            self._logger.warning(f"Could not iterate workspaces: {e}")
            if hasattr(self._workspaces, "get_workspace_home_id"):
                default_ws_id = self._workspaces.get_workspace_home_id()
                self._memory_manager.initialize_workspace(default_ws_id)

    # Current workspace tracking
    # self._current_workspace_id: Optional[str] = None
    if self._workspaces is not None:
        self._current_workspace_id = self._workspaces.get_workspace_home_id()
        self._logger.debug(f"Set initial workspace to: {self._current_workspace_id}")

    # Redis-based communication
    self._object_broker = RedisMessageBroker()
    self._label_manager = RedisLabelManager()

    # Verify Redis connection
    try:
        if hasattr(self._object_broker, "test_connection") and not self._object_broker.test_connection():
            self._logger.warning("Could not connect to Redis server. Object detection will not work.")
    except Exception as e:
        self._logger.warning(f"Error checking Redis connection: {e}")

    # Start performance monitor if enabled
    if self._performance_monitor:
        self._performance_monitor.start()
        self._logger.info("Performance monitoring started")

    # Start camera thread if requested
    if start_camera_thread:
        self._logger.info("Starting camera update thread...")
        self.start_camera_updates(visualize=False)
    else:
        self._logger.info("Camera thread disabled (manual control)")

add_object_name2object_labels(object_name)

Add a new object to the list of recognizable objects via Redis.

Parameters:

Name Type Description Default
object_name str

Name of the object to add

required

Returns:

Name Type Description
str str

Status message

Source code in robot_environment/environment.py
def add_object_name2object_labels(self, object_name: str) -> str:
    """
    Add a new object to the list of recognizable objects via Redis.

    Args:
        object_name: Name of the object to add

    Returns:
        str: Status message
    """
    # Add label to Redis stream
    success = self._label_manager.add_label(object_name)

    if success:
        mymessage = f"Added {object_name} to the list of recognizable objects."
    else:
        mymessage = f"{object_name} is already in the list of recognizable objects."

    # Provide audio feedback
    thread_oral = self._oralcom.call_text2speech_async(mymessage)
    thread_oral.join()

    return mymessage

cleanup()

Explicit cleanup method - call when done with the object. More reliable than relying on del.

Source code in robot_environment/environment.py
def cleanup(self):
    """
    Explicit cleanup method - call when done with the object.
    More reliable than relying on __del__.
    """
    if hasattr(self, "_stop_event"):
        self._logger.info("Shutting down environment...")
        self._stop_event.set()

    if hasattr(self, "_performance_monitor") and self._performance_monitor:
        self._performance_monitor.stop()

    if hasattr(self, "_oralcom"):
        self._oralcom.shutdown(timeout=5.0)

clear_memory()

Manually clear all objects from memory. Useful when workspace has changed significantly.

Source code in robot_environment/environment.py
def clear_memory(self) -> None:
    """
    Manually clear all objects from memory.
    Useful when workspace has changed significantly.
    """
    self._logger.warning("Clearing memory of all objects")

    if self._metrics:
        with self._metrics.timer("memory_clear"):
            self._memory_manager.clear()
        self._metrics.increment_counter("memory_clears")
    else:
        self._memory_manager.clear()

clear_workspace_memory(workspace_id)

Clear memory for a specific workspace.

Source code in robot_environment/environment.py
def clear_workspace_memory(self, workspace_id: str) -> None:
    """Clear memory for a specific workspace."""
    self._memory_manager.clear(workspace_id)

export_performance_metrics(filepath)

Export performance metrics to JSON file.

Parameters:

Name Type Description Default
filepath str

Path to output file

required
Source code in robot_environment/environment.py
def export_performance_metrics(self, filepath: str) -> None:
    """
    Export performance metrics to JSON file.

    Args:
        filepath: Path to output file
    """
    if self._metrics:
        self._metrics.export_json(filepath)
        self._logger.info(f"Performance metrics exported to {filepath}")
    else:
        self._logger.warning("Performance monitoring is disabled")

framegrabber()

Return framegrabber object.

Source code in robot_environment/environment.py
def framegrabber(self) -> FrameGrabber:
    """Return framegrabber object."""
    return self._framegrabber

get_all_workspace_objects()

Get objects from all workspaces.

Returns:

Type Description
Dict[str, Objects]

Dict mapping workspace_id to Objects collection

Source code in robot_environment/environment.py
def get_all_workspace_objects(self) -> Dict[str, Objects]:
    """
    Get objects from all workspaces.

    Returns:
        Dict mapping workspace_id to Objects collection
    """
    return self._memory_manager.get_all()

get_current_frame()

Capture image from robot's camera.

Returns:

Type Description
ndarray

numpy.ndarray: Camera image

Source code in robot_environment/environment.py
def get_current_frame(self) -> np.ndarray:
    """
    Capture image from robot's camera.

    Returns:
        numpy.ndarray: Camera image
    """
    frame = self._framegrabber.get_current_frame()

    if self._metrics and frame is not None:
        self._metrics.increment_counter("frames_captured")

    return frame

get_current_frame_width_height()

Returns width and height of current frame in pixels.

Returns:

Type Description
tuple[int, int]

width and height of current frame in pixels.

Source code in robot_environment/environment.py
def get_current_frame_width_height(self) -> tuple[int, int]:
    """
    Returns width and height of current frame in pixels.

    Returns:
        width and height of current frame in pixels.
    """
    return self._framegrabber.get_current_frame_width_height()

get_current_workspace_id()

Get the ID of the currently observed workspace.

Source code in robot_environment/environment.py
def get_current_workspace_id(self) -> Optional[str]:
    """Get the ID of the currently observed workspace."""
    return self._current_workspace_id

get_detected_objects()

Get detected objects from Redis stream.

Returns:

Name Type Description
Objects Objects

Collection of detected objects

Source code in robot_environment/environment.py
def get_detected_objects(self) -> Objects:
    """
    Get detected objects from Redis stream.

    Returns:
        Objects: Collection of detected objects
    """
    # Get latest objects from Redis (published by vision_detect_segment)
    objects_dict_list = self._object_broker.get_latest_objects(max_age_seconds=2.0)

    if not objects_dict_list:
        if self.verbose:
            self._logger.info(
                "No fresh object detections from Redis. "
                "Ensure 'vision_detect_segment' is running and publishing to the correct channel."
            )
        return Objects()

    # Convert dictionaries to Object instances
    # Use current workspace for coordinate transformation if available, else use home workspace
    workspace = (
        self.get_workspace_by_id(self._current_workspace_id) if self._current_workspace_id else self.get_workspace(0)
    )
    return Objects.dict_list_to_objects(objects_dict_list, workspace)

get_detected_objects_from_memory()

Get a copy of the object memory for current workspace.

Returns:

Name Type Description
Objects Objects

Copy of objects currently in memory

Source code in robot_environment/environment.py
def get_detected_objects_from_memory(self) -> Objects:
    """
    Get a copy of the object memory for current workspace.

    Returns:
        Objects: Copy of objects currently in memory
    """
    if self._metrics:
        with self._metrics.timer("memory_get"):
            result = self._get_memory_internal()
        return result
    else:
        return self._get_memory_internal()

get_detected_objects_from_workspace(workspace_id)

Get objects from a specific workspace memory.

Parameters:

Name Type Description Default
workspace_id str

ID of the workspace

required

Returns:

Name Type Description
Objects Objects

Copy of objects in that workspace's memory

Source code in robot_environment/environment.py
def get_detected_objects_from_workspace(self, workspace_id: str) -> Objects:
    """
    Get objects from a specific workspace memory.

    Args:
        workspace_id: ID of the workspace

    Returns:
        Objects: Copy of objects in that workspace's memory
    """
    return self._memory_manager.get(workspace_id)

get_largest_free_space_with_center(workspace_id=None)

Determines the largest free space in the workspace in square metres and its center coordinate in metres. This method can be used to determine at which location an object can be placed safely.

Parameters:

Name Type Description Default
workspace_id Optional[str]

Optional ID of the workspace to analyze. If None, the home workspace is used.

None

Returns:

Name Type Description
tuple Tuple[float, float, float]

(largest_free_area_m2, center_x, center_y) where: - largest_area_m2 (float): Largest free area in square meters. - center_x (float): X-coordinate of the center of the largest free area in meters. - center_y (float): Y-coordinate of the center of the largest free area in meters.

Source code in robot_environment/environment.py
def get_largest_free_space_with_center(self, workspace_id: Optional[str] = None) -> Tuple[float, float, float]:
    """
    Determines the largest free space in the workspace in square metres and its center coordinate in metres.
    This method can be used to determine at which location an object can be placed safely.

    Args:
        workspace_id: Optional ID of the workspace to analyze. If None, the home workspace is used.

    Returns:
        tuple: (largest_free_area_m2, center_x, center_y) where:
            - largest_area_m2 (float): Largest free area in square meters.
            - center_x (float): X-coordinate of the center of the largest free area in meters.
            - center_y (float): Y-coordinate of the center of the largest free area in meters.
    """
    if workspace_id:
        workspace = self.get_workspace_by_id(workspace_id)
    else:
        workspace = self.get_workspace(0)

    if workspace is None:
        self._logger.error(f"Workspace not found: {workspace_id}")
        return 0.0, 0.0, 0.0

    detected_objects = self.get_detected_objects()

    return calculate_largest_free_space(
        workspace=workspace, detected_objects=detected_objects, visualize=self.verbose, logger=self._logger
    )

get_object_labels()

Get list of detectable object labels from Redis.

Returns:

Type Description
List[List[str]]

List of lists of detectable strings

Source code in robot_environment/environment.py
def get_object_labels(self) -> List[List[str]]:
    """
    Get list of detectable object labels from Redis.

    Returns:
        List of lists of detectable strings
    """
    # Get latest labels from Redis (published by vision_detect_segment)
    labels = self._label_manager.get_latest_labels(timeout_seconds=60.0)

    if labels is None:
        if self.verbose:
            print("No labels from Redis, using empty list")
        return [[]]

    return [labels]

get_object_labels_as_string()

Return detectable object labels as comma-separated string.

Returns:

Name Type Description
str str

Comma-separated list of objects

Source code in robot_environment/environment.py
def get_object_labels_as_string(self) -> str:
    """
    Return detectable object labels as comma-separated string.

    Returns:
        str: Comma-separated list of objects
    """
    object_labels = self.get_object_labels()

    if not object_labels or not object_labels[0]:
        return "No detectable objects configured."

    return f"I can recognize these objects: {', '.join(object_labels[0])}"

get_observation_pose(workspace_id)

Return the observation pose of the given workspace id

Parameters:

Name Type Description Default
workspace_id str

id of the workspace

required

Returns:

Name Type Description
PoseObjectPNP PoseObjectPNP

observation pose of the gripper where it can observe the workspace given by workspace_id

Source code in robot_environment/environment.py
def get_observation_pose(self, workspace_id: str) -> PoseObjectPNP:
    """
    Return the observation pose of the given workspace id

    Args:
        workspace_id: id of the workspace

    Returns:
        PoseObjectPNP: observation pose of the gripper where it can observe the workspace given by workspace_id
    """
    return self._workspaces.get_observation_pose(workspace_id)

get_performance_metrics()

Get the performance metrics tracker.

Returns:

Type Description
Optional[PerformanceMetrics]

PerformanceMetrics instance or None if disabled

Source code in robot_environment/environment.py
def get_performance_metrics(self) -> Optional[PerformanceMetrics]:
    """
    Get the performance metrics tracker.

    Returns:
        PerformanceMetrics instance or None if disabled
    """
    return self._metrics

get_performance_stats()

Get current performance statistics.

Returns:

Type Description
Optional[Dict]

Dictionary with performance stats or None if disabled

Source code in robot_environment/environment.py
def get_performance_stats(self) -> Optional[Dict]:
    """
    Get current performance statistics.

    Returns:
        Dictionary with performance stats or None if disabled
    """
    if self._metrics:
        return self._metrics.get_stats()
    return None

get_robot_controller()

Returns:

Name Type Description
RobotController RobotController

object that controls the robot.

Source code in robot_environment/environment.py
def get_robot_controller(self) -> RobotController:
    """

    Returns:
        RobotController: object that controls the robot.
    """
    return self._robot.robot()

get_robot_in_motion()

Check if robot is in motion.

Returns:

Name Type Description
bool bool

True if robot is moving, False otherwise

Source code in robot_environment/environment.py
@log_start_end_cls()
def get_robot_in_motion(self) -> bool:
    """
    Check if robot is in motion.

    Returns:
        bool: True if robot is moving, False otherwise
    """
    return self._robot.robot_in_motion()

get_robot_pose()

Get current pose of gripper of robot.

Returns:

Type Description
PoseObjectPNP

current pose of gripper of robot.

Source code in robot_environment/environment.py
def get_robot_pose(self) -> PoseObjectPNP:
    """
    Get current pose of gripper of robot.

    Returns:
        current pose of gripper of robot.
    """
    if self._metrics:
        with self._metrics.timer("robot_get_pose"):
            return self._robot.get_pose()
    else:
        return self._robot.get_pose()

get_robot_target_pose_from_rel(workspace_id, u_rel, v_rel, yaw)

Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw), calculate the corresponding pose in world coordinates. The parameter yaw is useful, if we want to pick at the given coordinate an object that has the given orientation. For this method to work, it is important that only the workspace of the robot is visible in the image and nothing else. At least for the Niryo robot this is important. This means, (u_rel, v_rel) = (0, 0), is the upper left corner of the workspace.

Parameters:

Name Type Description Default
workspace_id str

id of the workspace

required
u_rel float

horizontal coordinate in image of workspace, normalized between 0 and 1

required
v_rel float

vertical coordinate in image of workspace, normalized between 0 and 1

required
yaw float

orientation of an object at the pixel coordinates [u_rel, v_rel].

required

Returns:

Name Type Description
PoseObjectPNP PoseObjectPNP

Pose in world coordinates

Source code in robot_environment/environment.py
@log_start_end_cls()
def get_robot_target_pose_from_rel(self, workspace_id: str, u_rel: float, v_rel: float, yaw: float) -> PoseObjectPNP:
    """
    Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw),
    calculate the corresponding pose in world coordinates. The parameter yaw is useful, if we want to pick at the
    given coordinate an object that has the given orientation. For this method to work, it is important that
    only the workspace of the robot is visible in the image and nothing else. At least for the Niryo robot
    this is important. This means, (u_rel, v_rel) = (0, 0), is the upper left corner of the workspace.

    Args:
        workspace_id: id of the workspace
        u_rel: horizontal coordinate in image of workspace, normalized between 0 and 1
        v_rel: vertical coordinate in image of workspace, normalized between 0 and 1
        yaw: orientation of an object at the pixel coordinates [u_rel, v_rel].

    Returns:
        PoseObjectPNP: Pose in world coordinates
    """
    return self._robot.get_target_pose_from_rel(workspace_id, u_rel, v_rel, yaw)

get_visible_workspace(camera_pose)

Get visible workspace from camera pose.

Source code in robot_environment/environment.py
@log_start_end_cls()
def get_visible_workspace(self, camera_pose: PoseObjectPNP) -> Workspace:
    """Get visible workspace from camera pose."""
    return self._workspaces.get_visible_workspace(camera_pose)

get_workspace(index=0)

Return the workspace at the given position index in the list of workspaces.

Parameters:

Name Type Description Default
index int

0-based index in the list of workspaces.

0
Source code in robot_environment/environment.py
def get_workspace(self, index: int = 0) -> Workspace:
    """
    Return the workspace at the given position index in the list of workspaces.

    Args:
        index: 0-based index in the list of workspaces.

    Returns:

    """
    return self._workspaces.get_workspace(index)

get_workspace_by_id(workspace_id)

Return the Workspace object with the given id, if existent, else None is returned.

Parameters:

Name Type Description Default
id

workspace ID

required

Returns:

Type Description
Optional[Workspace]

Workspace or None, if no workspace with the given id exists.

Source code in robot_environment/environment.py
def get_workspace_by_id(self, workspace_id: str) -> Optional[Workspace]:
    """
    Return the Workspace object with the given id, if existent, else None is returned.

    Args:
        id: workspace ID

    Returns:
        Workspace or None, if no workspace with the given id exists.
    """
    return self._workspaces.get_workspace_by_id(workspace_id)

get_workspace_coordinate_from_point(workspace_id, point)

Get the world coordinate of a special point of the given workspace.

Parameters:

Name Type Description Default
workspace_id str

ID of workspace.

required
point str

description of point. Possible values are:

required
- 'upper left corner'

Returns the world coordinate of the upper left corner of the workspace.

required
- 'upper right corner'

Returns the world coordinate of the upper right corner of the workspace.

required
- 'lower left corner'

Returns the world coordinate of the lower left corner of the workspace.

required
- 'lower right corner'

Returns the world coordinate of the lower right corner of the workspace.

required
- 'center point'

Returns the world coordinate of the center of the workspace.

required

Returns:

Type Description
Optional[List[float]]

List[float]: (x,y) world coordinate of the point on the workspace that was specified by the argument point.

Source code in robot_environment/environment.py
def get_workspace_coordinate_from_point(self, workspace_id: str, point: str) -> Optional[List[float]]:
    """
    Get the world coordinate of a special point of the given workspace.

    Args:
        workspace_id (str): ID of workspace.
        point (str): description of point. Possible values are:
        - 'upper left corner': Returns the world coordinate of the upper left corner of the workspace.
        - 'upper right corner': Returns the world coordinate of the upper right corner of the workspace.
        - 'lower left corner': Returns the world coordinate of the lower left corner of the workspace.
        - 'lower right corner': Returns the world coordinate of the lower right corner of the workspace.
        - 'center point': Returns the world coordinate of the center of the workspace.

    Returns:
        List[float]: (x,y) world coordinate of the point on the workspace that was specified by the argument point.
    """
    if point == "upper left corner":
        return self.get_workspace_by_id(workspace_id).xy_ul_wc().xy_coordinate()
    elif point == "upper right corner":
        return self.get_workspace_by_id(workspace_id).xy_ur_wc().xy_coordinate()
    elif point == "lower left corner":
        return self.get_workspace_by_id(workspace_id).xy_ll_wc().xy_coordinate()
    elif point == "lower right corner":
        return self.get_workspace_by_id(workspace_id).xy_lr_wc().xy_coordinate()
    elif point == "center point":
        return self.get_workspace_by_id(workspace_id).xy_center_wc().xy_coordinate()
    else:
        self._logger.error(f"Unknown point type: {point}")
        return None

get_workspace_home_id()

Returns the ID of the workspace at index 0.

Returns:

Type Description
str

the ID of the workspace at index 0.

Source code in robot_environment/environment.py
def get_workspace_home_id(self) -> str:
    """
    Returns the ID of the workspace at index 0.

    Returns:
        the ID of the workspace at index 0.
    """
    return self._workspaces.get_workspace_home_id()

get_workspace_id(index)

Return the id of the workspace at the given position index in the list of workspaces.

Parameters:

Name Type Description Default
index int

0-based index in the list of workspaces.

required

Returns:

Name Type Description
str str

id of the workspace at the given position index in the list of workspaces.

Source code in robot_environment/environment.py
def get_workspace_id(self, index: int) -> str:
    """
    Return the id of the workspace at the given position index in the list of workspaces.

    Args:
        index: 0-based index in the list of workspaces.

    Returns:
        str: id of the workspace at the given position index in the list of workspaces.
    """
    return self._workspaces.get_workspace_id(index)

is_any_workspace_visible()

Check if any workspace is currently visible.

Source code in robot_environment/environment.py
def is_any_workspace_visible(self) -> bool:
    """Check if any workspace is currently visible."""
    pose = self.get_robot_pose()
    return self.get_visible_workspace(pose) is not None

metrics()

Get performance metrics tracker.

Source code in robot_environment/environment.py
def metrics(self) -> Optional[PerformanceMetrics]:
    """Get performance metrics tracker."""
    return self._metrics

oralcom_call_text2speech_async(text, priority=0)

Asynchronously call text-to-speech API.

Parameters:

Name Type Description Default
text str

Message for text-to-speech

required
priority int

Priority (0-10, higher = more urgent)

0

Returns:

Type Description
bool

True if queued successfully (or dummy thread for compatibility)

Source code in robot_environment/environment.py
def oralcom_call_text2speech_async(self, text: str, priority: int = 0) -> bool:
    """
    Asynchronously call text-to-speech API.

    Args:
        text: Message for text-to-speech
        priority: Priority (0-10, higher = more urgent)

    Returns:
        True if queued successfully (or dummy thread for compatibility)
    """
    return self._oralcom.speak(text, priority=priority, blocking=False)

print_performance_summary()

Print a human-readable performance summary.

Source code in robot_environment/environment.py
def print_performance_summary(self) -> None:
    """Print a human-readable performance summary."""
    if self._metrics:
        print(self._metrics.get_summary())
    else:
        print("Performance monitoring is disabled")

remove_object_from_memory(object_label, coordinate)

Remove an object from memory after manipulation.

Parameters:

Name Type Description Default
object_label str

Label of the object to remove

required
coordinate List[float]

Last known coordinate [x, y]

required
Source code in robot_environment/environment.py
def remove_object_from_memory(self, object_label: str, coordinate: List[float]) -> None:
    """
    Remove an object from memory after manipulation.

    Args:
        object_label: Label of the object to remove
        coordinate: Last known coordinate [x, y]
    """
    if not self._current_workspace_id:
        self._logger.warning("No current workspace set")
        return

    self._memory_manager.remove_object(
        workspace_id=self._current_workspace_id, object_label=object_label, coordinate=coordinate
    )

remove_object_from_workspace(workspace_id, object_label, coordinate)

Remove an object from a specific workspace's memory.

Source code in robot_environment/environment.py
def remove_object_from_workspace(self, workspace_id: str, object_label: str, coordinate: list) -> None:
    """Remove an object from a specific workspace's memory."""
    self._memory_manager.remove_object(workspace_id=workspace_id, object_label=object_label, coordinate=coordinate)

reset_performance_metrics()

Reset all performance metrics.

Source code in robot_environment/environment.py
def reset_performance_metrics(self) -> None:
    """Reset all performance metrics."""
    if self._metrics:
        self._metrics.reset()
        self._logger.info("Performance metrics reset")

robot()

Return robot object.

Source code in robot_environment/environment.py
def robot(self) -> Robot:
    """Return robot object."""
    return self._robot

robot_move2home_observation_pose()

Move robot to home workspace observation pose.

Source code in robot_environment/environment.py
def robot_move2home_observation_pose(self) -> None:
    """Move robot to home workspace observation pose."""
    workspace_id = self.get_workspace_home_id()
    self.robot_move2observation_pose(workspace_id)

robot_move2observation_pose(workspace_id)

Move robot to observation pose for given workspace.

Parameters:

Name Type Description Default
workspace_id str

ID of workspace

required
Source code in robot_environment/environment.py
def robot_move2observation_pose(self, workspace_id: str) -> None:
    """
    Move robot to observation pose for given workspace.

    Args:
        workspace_id: ID of workspace
    """
    if self._metrics:
        with self._metrics.timer("robot_move_observation"):
            self._robot.move2observation_pose(workspace_id)
    else:
        self._robot.move2observation_pose(workspace_id)

    self._current_workspace_id = workspace_id
    self._logger.debug(f"Set current workspace to: {workspace_id}")

set_current_workspace(workspace_id)

Set the current workspace being observed.

Source code in robot_environment/environment.py
def set_current_workspace(self, workspace_id: str) -> None:
    """Set the current workspace being observed."""
    self._current_workspace_id = workspace_id
    self._logger.debug(f"Current workspace set to: {workspace_id}")

start_camera_updates(visualize=False)

Start the background camera update thread.

Parameters:

Name Type Description Default
visualize bool

If True, show the camera feed (requires GUI).

False

Returns:

Type Description
Thread

The started threading.Thread object.

Source code in robot_environment/environment.py
def start_camera_updates(self, visualize: bool = False) -> threading.Thread:
    """
    Start the background camera update thread.

    Args:
        visualize: If True, show the camera feed (requires GUI).

    Returns:
        The started threading.Thread object.
    """

    def loop():
        for _ in self.update_camera_and_objects(visualize=visualize):
            pass

    t = threading.Thread(target=loop, daemon=True)
    t.start()
    return t

stop_camera_updates()

Stop camera update thread.

Source code in robot_environment/environment.py
def stop_camera_updates(self) -> None:
    """Stop camera update thread."""
    self._stop_event.set()

update_camera_and_objects(visualize=False)

Continuously updates the camera and detected objects.

Parameters:

Name Type Description Default
visualize bool

If True, displays the updated camera feed

False
Source code in robot_environment/environment.py
def update_camera_and_objects(self, visualize: bool = False):
    """
    Continuously updates the camera and detected objects.

    Args:
        visualize: If True, displays the updated camera feed
    """
    if self._workspaces is None:
        self._logger.error("Workspaces not initialized - cannot update camera and objects")
        return

    # FIX: Get home workspace ID and set it as current
    home_workspace_id = self._workspaces.get_workspace_home_id()

    if self._current_workspace_id is None:
        self._current_workspace_id = home_workspace_id  # Set before moving

    self.robot_move2observation_pose(home_workspace_id)

    while not self._stop_event.is_set():
        loop_start = time.perf_counter()

        # Get current frame with timing
        if self._metrics:
            with self._metrics.timer("frame_capture"):
                img = self.get_current_frame()
        else:
            img = self.get_current_frame()

        time.sleep(0.1)

        # Get detected objects from Redis
        # Get detected objects from Redis with timing
        if self._metrics:
            with self._metrics.timer("object_fetch_redis"):
                detected_objects = self.get_detected_objects()

            # Record object count
            self._metrics.increment_counter("objects_detected", len(detected_objects))
        else:
            detected_objects = self.get_detected_objects()

        # Update memory using ObjectMemoryManager
        if self._current_workspace_id:  # This should now always be True
            at_observation = self.is_any_workspace_visible()
            robot_moving = self.get_robot_in_motion()

            if self._metrics:
                mem_start = time.perf_counter()

            objects_added, objects_updated = self._memory_manager.update(
                workspace_id=self._current_workspace_id,
                detected_objects=detected_objects,
                at_observation_pose=at_observation,
                robot_in_motion=robot_moving,
            )

            if self._metrics:
                mem_duration = (time.perf_counter() - mem_start) * 1000
                self._metrics.record_memory_update(mem_duration, objects_added, objects_updated)

            self._logger.debug(
                f"Memory update for '{self._current_workspace_id}': " f"added={objects_added}, updated={objects_updated}"
            )
        else:
            # This should never happen now, but log it if it does
            self._logger.error("Current workspace ID is None - memory not updated!")

        # Record loop iteration time
        if self._metrics:
            loop_duration = (time.perf_counter() - loop_start) * 1000
            self._metrics.record_timing("camera_loop_iteration", loop_duration)

        # Log memory stats
        if self._verbose:
            stats = self._memory_manager.get_memory_stats()
            if self._current_workspace_id in stats:
                ws_stats = stats[self._current_workspace_id]
                self._logger.debug(
                    f"Memory: {ws_stats['object_count']} objects, "
                    f"manual_updates={ws_stats['manual_updates']}, "
                    f"visible={ws_stats['visible']}\n"
                )

        yield img

        if self.get_robot_in_motion():
            time.sleep(0.05)
        else:
            time.sleep(0.05)

update_object_in_memory(object_label, old_coordinate, new_pose)

Update an object's position in memory after movement.

Parameters:

Name Type Description Default
object_label str

Label of the object

required
old_coordinate List[float]

Previous coordinate [x, y]

required
new_pose 'PoseObjectPNP'

New pose after movement

required
Source code in robot_environment/environment.py
def update_object_in_memory(self, object_label: str, old_coordinate: List[float], new_pose: "PoseObjectPNP") -> None:
    """
    Update an object's position in memory after movement.

    Args:
        object_label: Label of the object
        old_coordinate: Previous coordinate [x, y]
        new_pose: New pose after movement
    """
    if not self._current_workspace_id:
        self._logger.warning("No current workspace set")
        return

    self._memory_manager.mark_manual_update(
        workspace_id=self._current_workspace_id,
        object_label=object_label,
        old_coordinate=old_coordinate,
        new_pose=new_pose,
    )

update_object_in_workspace(source_workspace_id, target_workspace_id, object_label, old_coordinate, new_coordinate)

Move an object from one workspace to another in memory.

Parameters:

Name Type Description Default
source_workspace_id str

ID of workspace where object currently is

required
target_workspace_id str

ID of workspace where object will be placed

required
object_label str

Label of the object

required
old_coordinate list

Current coordinate in source workspace

required
new_coordinate list

New coordinate in target workspace

required
Source code in robot_environment/environment.py
def update_object_in_workspace(
    self, source_workspace_id: str, target_workspace_id: str, object_label: str, old_coordinate: list, new_coordinate: list
) -> None:
    """
    Move an object from one workspace to another in memory.

    Args:
        source_workspace_id: ID of workspace where object currently is
        target_workspace_id: ID of workspace where object will be placed
        object_label: Label of the object
        old_coordinate: Current coordinate in source workspace
        new_coordinate: New coordinate in target workspace
    """
    self._memory_manager.move_object(
        source_workspace_id=source_workspace_id,
        target_workspace_id=target_workspace_id,
        object_label=object_label,
        old_coordinate=old_coordinate,
        new_coordinate=new_coordinate,
    )

use_simulation()

Check if using simulation.

Source code in robot_environment/environment.py
def use_simulation(self) -> bool:
    """Check if using simulation."""
    return self._use_simulation

workspaces()

Return workspaces object.

Source code in robot_environment/environment.py
def workspaces(self) -> Workspaces:
    """Return workspaces object."""
    return self._workspaces