Zum Inhalt

Robot

robot_environment.robot.robot.Robot

Bases: RobotAPI

Source code in robot_environment/robot/robot.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
class Robot(RobotAPI):
    # *** CONSTRUCTORS ***
    @log_start_end_cls()
    def __init__(
        self, environment: "Environment", use_simulation: bool = False, robot_id: str = "niryo", verbose: bool = False
    ):
        """
        Initialize the Robot.

        Args:
            environment: Environment object this Robot is installed in.
            use_simulation: If True, simulate the robot, else use the real robot.
            robot_id: Robot identifier ("niryo" or "widowx").
            verbose: Enable verbose logging.
        """
        super().__init__()

        self._environment = environment
        self._verbose = verbose
        self._object_last_picked = None

        self._logger = get_package_logger(__name__, verbose)
        self._logger.info(f"Initializing robot: {robot_id}")

        if robot_id == "niryo":
            self._robot = NiryoRobotController(self, use_simulation, verbose)
        else:
            self._robot = None

    def handle_object_detection(self, objects_dict_list: List[Dict[str, Any]]) -> None:
        """Process incoming object detections from Redis"""
        # Convert dictionaries back to Object instances
        objects = Objects.dict_list_to_objects(objects_dict_list, self.environment().get_workspace(0))

        # Now work with Object instances as before
        for obj in objects:
            self._logger.debug(f"Received object: {obj.label()} at {obj.xy_com()}")

    def get_pose(self) -> PoseObjectPNP:
        """
        Get current pose of gripper of robot.

        Returns:
            current pose of gripper of robot.
        """
        return self._robot.get_pose()

    @log_start_end_cls()
    def get_target_pose_from_rel(self, workspace_id: str, u_rel: float, v_rel: float, yaw: float) -> PoseObjectPNP:
        """
        Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw),
        calculate the corresponding pose in world coordinates. The parameter yaw is useful, if we want to pick at the
        given coordinate an object that has the given orientation. For this method to work, it is important that
        only the workspace of the robot is visible in the image and nothing else. At least for the Niryo robot
        this is important. This means, (u_rel, v_rel) = (0, 0), is the upper left corner of the workspace.

        Args:
            workspace_id: id of the workspace
            u_rel: horizontal coordinate in image of workspace, normalized between 0 and 1
            v_rel: vertical coordinate in image of workspace, normalized between 0 and 1
            yaw: orientation of an object at the pixel coordinates [u_rel, v_rel].

        Returns:
            pose_object: Pose of the point in world coordinates of the robot.
        """
        self._logger.debug(f"robot::get_target_pose_from_rel {workspace_id}, {u_rel}, {v_rel}, {yaw}")

        return self._robot.get_target_pose_from_rel(workspace_id, u_rel, v_rel, yaw)

    # *** PUBLIC methods ***

    def calibrate(self) -> bool:
        """
        Calibrates the Robot.

        Returns:
            True, if calibration was successful, else False
        """
        return self._robot.calibrate()

    @log_start_end_cls()
    def move2observation_pose(self, workspace_id: str) -> None:
        """
        The robot will move to a pose where it can observe (the gripper hovers over) the workspace given by workspace_id.
        Before a robot can pick up or place an object in a workspace, it must first move to this observation pose of the corresponding workspace.

        Args:
            workspace_id: id of the workspace

        Returns:
            None
        """
        self._robot.move2observation_pose(workspace_id)

    # TODO: the documentation of these pick methods is more upto date as teh one in robot_api

    @log_start_end_cls()
    def pick_place_object(
        self,
        object_name: str,
        pick_coordinate: List[float],
        place_coordinate: List[float],
        location: Union["Location", str, None] = None,
        z_offset: float = 0.001,
    ) -> bool:
        """
        Instructs the pick-and-place robot arm to pick a specific object and place it using its gripper.
        The gripper will move to the specified 'pick_coordinate' and pick the named object. It will then move to the
        specified 'place_coordinate' and place the object there. If you need to pick-and-place an object, call this
        function and not robot_pick_object() followed by robot_place_object().

        Example call:

        robot.pick_place_object(
            object_name='chocolate bar',
            pick_coordinate=[-0.1, 0.01],
            place_coordinate=[0.1, 0.11],
            location=Location.RIGHT_NEXT_TO
        )
        --> Picks the chocolate bar that is located at world coordinates [-0.1, 0.01] and places it right next to an
        object that exists at world coordinate [0.1, 0.11].

        robot.pick_place_object(
            object_name='cube',
            pick_coordinate=[0.2, 0.05],
            place_coordinate=[0.3, 0.1],
            location=Location.ON_TOP_OF,
            z_offset=0.02
        )
        --> Picks the cube with a 2cm z-offset (useful if it's on top of another object).

        Args:
            object_name (str): The name of the object to be picked up. Ensure this name matches an object visible in
            the robot's workspace.
            pick_coordinate (List): The world coordinates [x, y] where the object should be picked up. Use these
            coordinates to identify the object's exact position.
            place_coordinate (List): The world coordinates [x, y] where the object should be placed at.
            location: Specifies the relative placement position of the picked object with respect to an object
                being at the 'place_coordinate'. Possible values are defined in the `Location` Enum:
                `Location.LEFT_NEXT_TO`, `Location.RIGHT_NEXT_TO`, `Location.ABOVE`, `Location.BELOW`,
                `Location.ON_TOP_OF`, `Location.INSIDE`, `Location.NONE`.
            z_offset (float): Additional height offset in meters to apply when picking (default: 0.001).
            Useful for picking objects that are stacked on top of other objects.

        Returns:
            bool: True if successful
        """
        success = self.pick_object(object_name, pick_coordinate, z_offset=z_offset)

        if success:
            place_success = self.place_object(place_coordinate, location)
            return place_success
        else:
            return False

    @log_start_end_cls()
    def pick_object(self, object_name: str, pick_coordinate: List[float], z_offset: float = 0.001) -> bool:
        """
        Command the pick-and-place robot arm to pick up a specific object using its gripper. The gripper will move to
        the specified 'pick_coordinate' and pick the named object.

        Example call:

        robot.pick_object("pen", [0.01, -0.15])
        --> Picks the pen that is located at world coordinates [0.01, -0.15].

        robot.pick_object("pen", [0.01, -0.15], z_offset=0.02)
        --> Picks the pen with a 2cm offset above its detected position (useful for stacked objects).

        Args:
            object_name (str): The name of the object to be picked up. Ensure this name matches an object visible in
            the robot's workspace.
            pick_coordinate (List): The world coordinates [x, y] where the object should be picked up. Use these
            coordinates to identify the object's exact position.
            z_offset (float): Additional height offset in meters to apply when picking (default: 0.001).
            Useful for picking objects that are stacked on top of other objects.
        Returns:
            bool: True
        """
        coords_str = "[" + ", ".join(f"{x:.2f}" for x in pick_coordinate) + "]"
        message = f"Going to pick {object_name} at coordinate {coords_str}."
        self._logger.info(message)

        self.environment().oralcom_call_text2speech_async(message, priority=8)

        obj_to_pick = self._get_nearest_object(object_name, pick_coordinate)

        if obj_to_pick:
            self._object_last_picked = obj_to_pick

            # Apply z_offset to the pick pose
            pick_pose = obj_to_pick.pose_com()
            pick_pose = pick_pose.copy_with_offsets(z_offset=z_offset)

            success = self._robot.robot_pick_object(pick_pose)
        else:
            success = False

        # thread_oral.join()

        return success

    @log_start_end_cls()
    def place_object(self, place_coordinate: List[float], location: Union["Location", str, None] = None) -> bool:
        """
        Instruct the pick-and-place robot arm to place a picked object at the specified 'place_coordinate'. The
        function moves the gripper to the specified 'place_coordinate' and calculates the exact placement position from
        the given 'location'. Before calling this function you have to call robot_pick_object() to pick an object.

        Example call:

        robot.place_object([0.2, 0.0], "left next to")
        --> Places the already gripped object left next to the world coordinate [0.2, 0.0].

        Args:
            place_coordinate: The world coordinates [x, y] of the target object.
            location: Specifies the relative placement position of the picked object in relation to an object
                being at the 'place_coordinate'. Possible positions: 'left next to', 'right next to', 'above', 'below',
                'on top of', 'inside', or None.
        Returns:
            bool: True
        """
        location = Location.convert_str2location(location)

        if self._object_last_picked:
            old_coordinate = [self._object_last_picked.x_com(), self._object_last_picked.y_com()]
            message = (
                f"Going to place {self._object_last_picked.label()} {location} coordinate ["
                f"{place_coordinate[0]:.2f}, {place_coordinate[1]:.2f}]."
            )
        else:
            old_coordinate = None
            message = f"Going to place it {location} coordinate [{place_coordinate[0]:.2f}, {place_coordinate[1]:.2f}]."

        self._logger.info(message)

        self.environment().oralcom_call_text2speech_async(message, priority=8)
        obj_where_to_place = None

        if location is not None and location is not Location.NONE:
            obj_where_to_place = self._get_nearest_object(None, place_coordinate)
            if obj_where_to_place is None:
                place_pose = PoseObjectPNP(place_coordinate[0], place_coordinate[1], 0.09, 0.0, 1.57, 0.0)
            else:
                place_pose = obj_where_to_place.pose_center()
        else:
            place_pose = PoseObjectPNP(place_coordinate[0], place_coordinate[1], 0.09, 0.0, 1.57, 0.0)
            self._logger.debug(f"place_object: {place_pose}")

        x_off = 0.02
        y_off = 0.02

        if self._object_last_picked:
            x_off += self._object_last_picked.height_m() / 2
            y_off += self._object_last_picked.width_m() / 2

        if place_pose:
            # TODO: use height of object instead
            if location == Location.ON_TOP_OF:
                place_pose.z += 0.02
            elif location == Location.INSIDE:
                place_pose.z += 0.01
            elif location == Location.RIGHT_NEXT_TO and obj_where_to_place:
                place_pose.y -= obj_where_to_place.width_m() / 2 + y_off
            elif location == Location.LEFT_NEXT_TO and obj_where_to_place:
                place_pose.y += obj_where_to_place.width_m() / 2 + y_off
            elif location == Location.BELOW and obj_where_to_place:
                # print(obj_where_to_place.height_m(), self._object_last_picked.width_m(), x_off)
                # TODO: nutze hier auch width, da width immer die größere größe ist und nicht eine koordinatenrichtugn hat
                #  ich muss anstatt width und height eine größe haben dim_x und dim_y, die a x und y koordinate gebunden sind
                #  ich habe das in object klasse repariert, width geht immer entlang y-achse jetzt. prüfen hier
                place_pose.x -= obj_where_to_place.height_m() / 2 + x_off
            elif location == Location.ABOVE and obj_where_to_place:
                # TODO: nutze hier auch width, da width immer die größere größe ist und nicht eine koordinatenrichtugn hat
                #  ich habe das in object klasse repariert, width geht immer entlang y-achse jetzt. prüfen hier
                print(obj_where_to_place.height_m(), self._object_last_picked.width_m(), x_off)
                place_pose.x += obj_where_to_place.height_m() / 2 + x_off
                self._logger.debug(f"{place_pose}")
            elif location is Location.NONE or location is None:
                pass  # I do not have to do anything as the given location is where to place the object
            else:
                self._logger.error(f"Unknown location: {location} (type: {type(location)})")

            success = self._robot.robot_place_object(place_pose)

            # update position of placed object to the new position
            # Update memory after successful placement
            if success and self._object_last_picked and old_coordinate:
                final_coordinate = [place_pose.x, place_pose.y]
                self._logger.debug(f"final_coordinate: {final_coordinate}")

                self.environment().update_object_in_memory(
                    self._object_last_picked.label(), old_coordinate, new_pose=place_pose
                )

                # Give the memory system a moment to register the update
                import time

                time.sleep(0.1)
        else:
            success = False

        self._object_last_picked = None

        # thread_oral.join()

        return success

    @log_start_end_cls()
    def push_object(self, object_name: str, push_coordinate: List[float], direction: str, distance: float) -> bool:
        """
        Instruct the pick-and-place robot arm to push a specific object to a new position.
        This function should only be called if it is not possible to pick the object.
        An object cannot be picked if its shorter side is larger than the gripper.

        Args:
            object_name: The name of the object to be pushed.
            push_coordinate: The world coordinates [x, y] where the object to push is located.
            direction: The direction in which to push the object ("up", "down", "left", "right").
            distance: The distance (in millimeters) to push the object.

        Returns:
            bool: True
        """
        message = f"Calling push with {object_name} and {direction}"
        self._logger.info(message)

        self.environment().oralcom_call_text2speech_async(message, priority=8)

        obj_to_push = self._get_nearest_object(object_name, push_coordinate)

        push_pose = obj_to_push.pose_com()

        # it is certainly better when pushing up to move under the object with a closed gripper so we can
        #  actually push up. same for the other directions.
        if direction == "up":
            push_pose.x -= obj_to_push.height_m() / 2.0
            # gripper 90° rotated. TODO: I have to test these orientations
            push_pose.yaw = math.pi / 2.0
        elif direction == "down":
            push_pose.x += obj_to_push.height_m() / 2.0
            # gripper 90° rotated. TODO: I have to test these orientations
            push_pose.yaw = math.pi / 2.0
        elif direction == "left":
            push_pose.y += obj_to_push.width_m() / 2.0
            # gripper 0° rotated. TODO: I have to test these orientations
            push_pose.yaw = 0.0
        elif direction == "right":
            push_pose.y -= obj_to_push.width_m() / 2.0
            # gripper 0° rotated. TODO: I have to test these orientations
            push_pose.yaw = 0.0
        else:
            self._logger.error(f"Unknown direction: {direction}")

        if obj_to_push is not None:
            success = self._robot.robot_push_object(push_pose, direction, distance)
        else:
            success = False

        # thread_oral.join()

        return success

    def pick_place_object_across_workspaces(
        self,
        object_name: str,
        pick_workspace_id: str,
        pick_coordinate: List[float],
        place_workspace_id: str,
        place_coordinate: List[float],
        location: Union["Location", str, None] = None,
        z_offset: float = 0.001,
    ) -> bool:
        """
        Pick an object from one workspace and place it in another workspace.

        Args:
            object_name: Name of the object to pick
            pick_workspace_id: ID of the workspace to pick from
            pick_coordinate: [x, y] coordinate in pick workspace
            place_workspace_id: ID of the workspace to place in
            place_coordinate: [x, y] coordinate in place workspace
            location: Relative placement location (Location enum or string)
            z_offset: Additional height offset in meters when picking (default: 0.001)

        Returns:
            bool: True if successful, False otherwise

        Example:
            robot.pick_place_object_across_workspaces(
                object_name='cube',
                pick_workspace_id='niryo_ws_left',
                pick_coordinate=[0.2, 0.05],
                place_workspace_id='niryo_ws_right',
                place_coordinate=[0.25, -0.05],
                location=Location.RIGHT_NEXT_TO,
                z_offset=0.02
            )
        """
        self._logger.debug(f"Multi-workspace operation: {object_name}")
        self._logger.debug(f"  Pick from: {pick_workspace_id} at {pick_coordinate}")
        self._logger.debug(f"  Place in: {place_workspace_id} at {place_coordinate}")

        # Step 1: Move to pick workspace observation pose
        self.move2observation_pose(pick_workspace_id)
        self.environment()._current_workspace_id = pick_workspace_id

        # Step 2: Pick the object
        success = self.pick_object_from_workspace(object_name, pick_workspace_id, pick_coordinate, z_offset=z_offset)

        if not success:
            self._logger.error(f"Failed to pick {object_name} from {pick_workspace_id}")
            return False

        # Step 3: Move to place workspace observation pose
        self.move2observation_pose(place_workspace_id)
        self.environment()._current_workspace_id = place_workspace_id

        # Step 4: Place the object
        place_success = self.place_object_in_workspace(place_workspace_id, place_coordinate, location)

        if place_success:
            # Update memory: remove from source, add to target
            self.environment().update_object_in_workspace(
                source_workspace_id=pick_workspace_id,
                target_workspace_id=place_workspace_id,
                object_label=object_name,
                old_coordinate=pick_coordinate,
                new_coordinate=place_coordinate,
            )

            self._logger.info(f"Successfully moved {object_name} from {pick_workspace_id} to {place_workspace_id}")

        return place_success

    def pick_object_from_workspace(
        self, object_name: str, workspace_id: str, pick_coordinate: List[float], z_offset: float = 0.001
    ) -> bool:
        """
        Pick an object from a specific workspace.

        Args:
            object_name: Name of the object to pick
            workspace_id: ID of the workspace
            pick_coordinate: [x, y] coordinate in workspace
            z_offset: Additional height offset in meters (default: 0.001)

        Returns:
            bool: True if successful
        """
        coords_str = "[" + ", ".join(f"{x:.2f}" for x in pick_coordinate) + "]"
        message = f"Picking {object_name} from workspace {workspace_id} at {coords_str}."
        self._logger.info(message)

        self.environment().oralcom_call_text2speech_async(message, priority=8)

        # Get object from specific workspace memory
        obj_to_pick = self._get_nearest_object_in_workspace(object_name, workspace_id, pick_coordinate)

        if obj_to_pick:
            self._object_last_picked = obj_to_pick
            self._object_source_workspace = workspace_id

            # Apply z_offset to the pick pose
            pick_pose = obj_to_pick.pose_com()
            pick_pose = pick_pose.copy_with_offsets(z_offset=z_offset)

            success = self._robot.robot_pick_object(pick_pose)
        else:
            success = False

        # thread_oral.join()
        return success

    def place_object_in_workspace(
        self, workspace_id: str, place_coordinate: List[float], location: Union["Location", str, None] = None
    ) -> bool:
        """
        Place a picked object in a specific workspace.

        Args:
            workspace_id: ID of the target workspace
            place_coordinate: [x, y] coordinate in workspace
            location: Relative placement location

        Returns:
            bool: True if successful
        """
        location = Location.convert_str2location(location)

        if self._object_last_picked:
            message = (
                f"Placing {self._object_last_picked.label()} in workspace "
                f"{workspace_id} {location} coordinate "
                f"[{place_coordinate[0]:.2f}, {place_coordinate[1]:.2f}]."
            )
        else:
            message = (
                f"Placing object in workspace {workspace_id} {location} "
                f"coordinate [{place_coordinate[0]:.2f}, {place_coordinate[1]:.2f}]."
            )

        self._logger.info(message)
        self.environment().oralcom_call_text2speech_async(message, priority=8)

        # Get workspace for coordinate transformation
        workspace = self.environment().get_workspace_by_id(workspace_id)
        if workspace is None:
            self._logger.error(f"Workspace {workspace_id} not found")
            # thread_oral.join()
            return False

        # Find reference object in target workspace if location specified
        obj_where_to_place = None
        if location is not None and location is not Location.NONE:
            obj_where_to_place = self._get_nearest_object_in_workspace(None, workspace_id, place_coordinate)

            if obj_where_to_place is None:
                place_pose = PoseObjectPNP(place_coordinate[0], place_coordinate[1], 0.09, 0.0, 1.57, 0.0)
            else:
                place_pose = obj_where_to_place.pose_center()
        else:
            place_pose = PoseObjectPNP(place_coordinate[0], place_coordinate[1], 0.09, 0.0, 1.57, 0.0)

        # Calculate placement offset based on location
        if place_pose and obj_where_to_place:
            x_off = 0.02
            y_off = 0.02

            if self._object_last_picked:
                x_off += self._object_last_picked.height_m() / 2
                y_off += self._object_last_picked.width_m() / 2

            if location == Location.ON_TOP_OF:
                place_pose.z += 0.02
            elif location == Location.INSIDE:
                place_pose.z += 0.01
            elif location == Location.RIGHT_NEXT_TO:
                place_pose.y -= obj_where_to_place.width_m() / 2 + y_off
            elif location == Location.LEFT_NEXT_TO:
                place_pose.y += obj_where_to_place.width_m() / 2 + y_off
            elif location == Location.BELOW:
                place_pose.x -= obj_where_to_place.height_m() / 2 + x_off
            elif location == Location.ABOVE:
                place_pose.x += obj_where_to_place.height_m() / 2 + x_off

        success = self._robot.robot_place_object(place_pose)

        # Clear last picked object
        self._object_last_picked = None
        if "_object_source_workspace" in self.__dict__:
            del self._object_source_workspace

        # thread_oral.join()
        return success

    def _get_nearest_object_in_workspace(
        self, label: Union[str, None], workspace_id: str, target_coords: List[float]
    ) -> Optional[Object]:
        """
        Find the nearest object in a specific workspace.

        Args:
            label: Object label to search for (None for any object)
            workspace_id: ID of the workspace
            target_coords: Target coordinates [x, y]

        Returns:
            Object or None
        """
        # Get objects from specific workspace memory
        detected_objects = self.environment().get_detected_objects_from_workspace(workspace_id)

        self._logger.debug(f"Objects in workspace {workspace_id}: {detected_objects}")

        if len(target_coords) == 0:
            nearest_object = next((obj for obj in detected_objects if obj.label() == label), None)
            min_distance = 0
        else:
            nearest_object, min_distance = detected_objects.get_nearest_detected_object(target_coords, label)

        if nearest_object:
            self._logger.debug(f"Found {nearest_object.label()} at distance {min_distance:.3f}m")
        else:
            self._logger.warning(f"Object {label} not found in workspace {workspace_id}")
            self._logger.info(f"Available objects: " f"{detected_objects.get_detected_objects_as_comma_separated_string()}")

        return nearest_object

    @staticmethod
    def _parse_command(line: str) -> Tuple[Optional[str], Optional[str], List[Any], Dict[str, Any]]:
        """
        Parse a single line of the input into the target object, method, positional arguments, and keyword arguments.
        Deprecated: Use robot_environment.robot.command_processor.parse_robot_command instead.
        """
        return parse_robot_command(line)

    def get_detected_objects(self):
        """Get latest detected objects from memory."""
        latest_objects = self._environment.get_detected_objects_from_memory()
        return latest_objects

    def _get_nearest_object(self, label: Union[str, None], target_coords: List[float]) -> Optional[Object]:
        """
        Find the nearest object with the specified label.

        Args:
            label:
            target_coords:

        Returns:
            object:
        """
        detected_objects = self.get_detected_objects()

        self._logger.debug(f"detected_objects: {detected_objects}")

        if len(target_coords) == 0:  # then no target coords are given, true for push method
            nearest_object = next((obj for obj in detected_objects if obj.label() == label), None)
            min_distance = 0
        else:
            nearest_object, min_distance = detected_objects.get_nearest_detected_object(target_coords, label)

        if nearest_object:
            self._logger.debug(f"Nearest object found: {nearest_object} with distance {min_distance}")
        else:
            self._logger.warning(
                f"Object {label} does not exist: " f"{detected_objects.get_detected_objects_as_comma_separated_string()}"
            )

            # add functionality that looks for the most similar object in self.get_detected_objects()
            # and ask user whether this object should be used instead. if answer of user is yes, then set
            # nearest_object to this new object
            # TODO: get_most_similar_object wieder nutzen
            #  nearest_object_name = self.environment().get_most_similar_object(label)
            nearest_object_name = None

            if nearest_object_name is not None:
                self._logger.info(
                    f"I have detected the object {nearest_object_name}. Do you want to handle this object instead?"
                )

                # TODO: auf antwort von user warten und diese prüfen.
                answer = "yes"

                if answer != "yes":
                    return None
                else:
                    nearest_object = next((obj for obj in detected_objects if obj.label() == nearest_object_name), None)

        return nearest_object

    # *** PUBLIC properties ***

    def environment(self) -> Environment:
        """
        Returns the environment object.

        Returns:
            Environment: The environment instance.
        """
        return self._environment

    def robot_in_motion(self) -> bool:
        """
        Check if the robot is currently in motion.

        Returns:
            bool: True if the robot is in motion, False otherwise.
        """
        return self._robot.is_in_motion()

    def robot(self) -> RobotController:
        """
        Returns:
            RobotController: object that controls the robot.
        """
        return self._robot

    def verbose(self) -> bool:
        """
        Returns the verbosity status.

        Returns:
            bool: True if verbose is on, else False.
        """
        return self._verbose

    # *** PRIVATE variables ***

    _environment = None

    # RobotController object
    _robot = None

    # True, if robot is in motion and therefore cannot see the workspace markers
    # _robot_in_motion = False

    _object_source_workspace: Optional[str] = None  # Track source workspace

    _verbose = False
    _logger = None

Functions

__init__(environment, use_simulation=False, robot_id='niryo', verbose=False)

Initialize the Robot.

Parameters:

Name Type Description Default
environment 'Environment'

Environment object this Robot is installed in.

required
use_simulation bool

If True, simulate the robot, else use the real robot.

False
robot_id str

Robot identifier ("niryo" or "widowx").

'niryo'
verbose bool

Enable verbose logging.

False
Source code in robot_environment/robot/robot.py
@log_start_end_cls()
def __init__(
    self, environment: "Environment", use_simulation: bool = False, robot_id: str = "niryo", verbose: bool = False
):
    """
    Initialize the Robot.

    Args:
        environment: Environment object this Robot is installed in.
        use_simulation: If True, simulate the robot, else use the real robot.
        robot_id: Robot identifier ("niryo" or "widowx").
        verbose: Enable verbose logging.
    """
    super().__init__()

    self._environment = environment
    self._verbose = verbose
    self._object_last_picked = None

    self._logger = get_package_logger(__name__, verbose)
    self._logger.info(f"Initializing robot: {robot_id}")

    if robot_id == "niryo":
        self._robot = NiryoRobotController(self, use_simulation, verbose)
    else:
        self._robot = None

calibrate()

Calibrates the Robot.

Returns:

Type Description
bool

True, if calibration was successful, else False

Source code in robot_environment/robot/robot.py
def calibrate(self) -> bool:
    """
    Calibrates the Robot.

    Returns:
        True, if calibration was successful, else False
    """
    return self._robot.calibrate()

environment()

Returns the environment object.

Returns:

Name Type Description
Environment Environment

The environment instance.

Source code in robot_environment/robot/robot.py
def environment(self) -> Environment:
    """
    Returns the environment object.

    Returns:
        Environment: The environment instance.
    """
    return self._environment

get_detected_objects()

Get latest detected objects from memory.

Source code in robot_environment/robot/robot.py
def get_detected_objects(self):
    """Get latest detected objects from memory."""
    latest_objects = self._environment.get_detected_objects_from_memory()
    return latest_objects

get_pose()

Get current pose of gripper of robot.

Returns:

Type Description
PoseObjectPNP

current pose of gripper of robot.

Source code in robot_environment/robot/robot.py
def get_pose(self) -> PoseObjectPNP:
    """
    Get current pose of gripper of robot.

    Returns:
        current pose of gripper of robot.
    """
    return self._robot.get_pose()

get_target_pose_from_rel(workspace_id, u_rel, v_rel, yaw)

Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw), calculate the corresponding pose in world coordinates. The parameter yaw is useful, if we want to pick at the given coordinate an object that has the given orientation. For this method to work, it is important that only the workspace of the robot is visible in the image and nothing else. At least for the Niryo robot this is important. This means, (u_rel, v_rel) = (0, 0), is the upper left corner of the workspace.

Parameters:

Name Type Description Default
workspace_id str

id of the workspace

required
u_rel float

horizontal coordinate in image of workspace, normalized between 0 and 1

required
v_rel float

vertical coordinate in image of workspace, normalized between 0 and 1

required
yaw float

orientation of an object at the pixel coordinates [u_rel, v_rel].

required

Returns:

Name Type Description
pose_object PoseObjectPNP

Pose of the point in world coordinates of the robot.

Source code in robot_environment/robot/robot.py
@log_start_end_cls()
def get_target_pose_from_rel(self, workspace_id: str, u_rel: float, v_rel: float, yaw: float) -> PoseObjectPNP:
    """
    Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw),
    calculate the corresponding pose in world coordinates. The parameter yaw is useful, if we want to pick at the
    given coordinate an object that has the given orientation. For this method to work, it is important that
    only the workspace of the robot is visible in the image and nothing else. At least for the Niryo robot
    this is important. This means, (u_rel, v_rel) = (0, 0), is the upper left corner of the workspace.

    Args:
        workspace_id: id of the workspace
        u_rel: horizontal coordinate in image of workspace, normalized between 0 and 1
        v_rel: vertical coordinate in image of workspace, normalized between 0 and 1
        yaw: orientation of an object at the pixel coordinates [u_rel, v_rel].

    Returns:
        pose_object: Pose of the point in world coordinates of the robot.
    """
    self._logger.debug(f"robot::get_target_pose_from_rel {workspace_id}, {u_rel}, {v_rel}, {yaw}")

    return self._robot.get_target_pose_from_rel(workspace_id, u_rel, v_rel, yaw)

handle_object_detection(objects_dict_list)

Process incoming object detections from Redis

Source code in robot_environment/robot/robot.py
def handle_object_detection(self, objects_dict_list: List[Dict[str, Any]]) -> None:
    """Process incoming object detections from Redis"""
    # Convert dictionaries back to Object instances
    objects = Objects.dict_list_to_objects(objects_dict_list, self.environment().get_workspace(0))

    # Now work with Object instances as before
    for obj in objects:
        self._logger.debug(f"Received object: {obj.label()} at {obj.xy_com()}")

move2observation_pose(workspace_id)

The robot will move to a pose where it can observe (the gripper hovers over) the workspace given by workspace_id. Before a robot can pick up or place an object in a workspace, it must first move to this observation pose of the corresponding workspace.

Parameters:

Name Type Description Default
workspace_id str

id of the workspace

required

Returns:

Type Description
None

None

Source code in robot_environment/robot/robot.py
@log_start_end_cls()
def move2observation_pose(self, workspace_id: str) -> None:
    """
    The robot will move to a pose where it can observe (the gripper hovers over) the workspace given by workspace_id.
    Before a robot can pick up or place an object in a workspace, it must first move to this observation pose of the corresponding workspace.

    Args:
        workspace_id: id of the workspace

    Returns:
        None
    """
    self._robot.move2observation_pose(workspace_id)

pick_object(object_name, pick_coordinate, z_offset=0.001)

Command the pick-and-place robot arm to pick up a specific object using its gripper. The gripper will move to the specified 'pick_coordinate' and pick the named object.

Example call:

robot.pick_object("pen", [0.01, -0.15]) --> Picks the pen that is located at world coordinates [0.01, -0.15].

robot.pick_object("pen", [0.01, -0.15], z_offset=0.02) --> Picks the pen with a 2cm offset above its detected position (useful for stacked objects).

Parameters:

Name Type Description Default
object_name str

The name of the object to be picked up. Ensure this name matches an object visible in

required
pick_coordinate List

The world coordinates [x, y] where the object should be picked up. Use these

required
z_offset float

Additional height offset in meters to apply when picking (default: 0.001).

0.001
Source code in robot_environment/robot/robot.py
@log_start_end_cls()
def pick_object(self, object_name: str, pick_coordinate: List[float], z_offset: float = 0.001) -> bool:
    """
    Command the pick-and-place robot arm to pick up a specific object using its gripper. The gripper will move to
    the specified 'pick_coordinate' and pick the named object.

    Example call:

    robot.pick_object("pen", [0.01, -0.15])
    --> Picks the pen that is located at world coordinates [0.01, -0.15].

    robot.pick_object("pen", [0.01, -0.15], z_offset=0.02)
    --> Picks the pen with a 2cm offset above its detected position (useful for stacked objects).

    Args:
        object_name (str): The name of the object to be picked up. Ensure this name matches an object visible in
        the robot's workspace.
        pick_coordinate (List): The world coordinates [x, y] where the object should be picked up. Use these
        coordinates to identify the object's exact position.
        z_offset (float): Additional height offset in meters to apply when picking (default: 0.001).
        Useful for picking objects that are stacked on top of other objects.
    Returns:
        bool: True
    """
    coords_str = "[" + ", ".join(f"{x:.2f}" for x in pick_coordinate) + "]"
    message = f"Going to pick {object_name} at coordinate {coords_str}."
    self._logger.info(message)

    self.environment().oralcom_call_text2speech_async(message, priority=8)

    obj_to_pick = self._get_nearest_object(object_name, pick_coordinate)

    if obj_to_pick:
        self._object_last_picked = obj_to_pick

        # Apply z_offset to the pick pose
        pick_pose = obj_to_pick.pose_com()
        pick_pose = pick_pose.copy_with_offsets(z_offset=z_offset)

        success = self._robot.robot_pick_object(pick_pose)
    else:
        success = False

    # thread_oral.join()

    return success

pick_object_from_workspace(object_name, workspace_id, pick_coordinate, z_offset=0.001)

Pick an object from a specific workspace.

Parameters:

Name Type Description Default
object_name str

Name of the object to pick

required
workspace_id str

ID of the workspace

required
pick_coordinate List[float]

[x, y] coordinate in workspace

required
z_offset float

Additional height offset in meters (default: 0.001)

0.001

Returns:

Name Type Description
bool bool

True if successful

Source code in robot_environment/robot/robot.py
def pick_object_from_workspace(
    self, object_name: str, workspace_id: str, pick_coordinate: List[float], z_offset: float = 0.001
) -> bool:
    """
    Pick an object from a specific workspace.

    Args:
        object_name: Name of the object to pick
        workspace_id: ID of the workspace
        pick_coordinate: [x, y] coordinate in workspace
        z_offset: Additional height offset in meters (default: 0.001)

    Returns:
        bool: True if successful
    """
    coords_str = "[" + ", ".join(f"{x:.2f}" for x in pick_coordinate) + "]"
    message = f"Picking {object_name} from workspace {workspace_id} at {coords_str}."
    self._logger.info(message)

    self.environment().oralcom_call_text2speech_async(message, priority=8)

    # Get object from specific workspace memory
    obj_to_pick = self._get_nearest_object_in_workspace(object_name, workspace_id, pick_coordinate)

    if obj_to_pick:
        self._object_last_picked = obj_to_pick
        self._object_source_workspace = workspace_id

        # Apply z_offset to the pick pose
        pick_pose = obj_to_pick.pose_com()
        pick_pose = pick_pose.copy_with_offsets(z_offset=z_offset)

        success = self._robot.robot_pick_object(pick_pose)
    else:
        success = False

    # thread_oral.join()
    return success

pick_place_object(object_name, pick_coordinate, place_coordinate, location=None, z_offset=0.001)

Instructs the pick-and-place robot arm to pick a specific object and place it using its gripper. The gripper will move to the specified 'pick_coordinate' and pick the named object. It will then move to the specified 'place_coordinate' and place the object there. If you need to pick-and-place an object, call this function and not robot_pick_object() followed by robot_place_object().

Example call:

robot.pick_place_object( object_name='chocolate bar', pick_coordinate=[-0.1, 0.01], place_coordinate=[0.1, 0.11], location=Location.RIGHT_NEXT_TO ) --> Picks the chocolate bar that is located at world coordinates [-0.1, 0.01] and places it right next to an object that exists at world coordinate [0.1, 0.11].

robot.pick_place_object( object_name='cube', pick_coordinate=[0.2, 0.05], place_coordinate=[0.3, 0.1], location=Location.ON_TOP_OF, z_offset=0.02 ) --> Picks the cube with a 2cm z-offset (useful if it's on top of another object).

Parameters:

Name Type Description Default
object_name str

The name of the object to be picked up. Ensure this name matches an object visible in

required
pick_coordinate List

The world coordinates [x, y] where the object should be picked up. Use these

required
place_coordinate List

The world coordinates [x, y] where the object should be placed at.

required
location Union['Location', str, None]

Specifies the relative placement position of the picked object with respect to an object being at the 'place_coordinate'. Possible values are defined in the Location Enum: Location.LEFT_NEXT_TO, Location.RIGHT_NEXT_TO, Location.ABOVE, Location.BELOW, Location.ON_TOP_OF, Location.INSIDE, Location.NONE.

None
z_offset float

Additional height offset in meters to apply when picking (default: 0.001).

0.001

Returns:

Name Type Description
bool bool

True if successful

Source code in robot_environment/robot/robot.py
@log_start_end_cls()
def pick_place_object(
    self,
    object_name: str,
    pick_coordinate: List[float],
    place_coordinate: List[float],
    location: Union["Location", str, None] = None,
    z_offset: float = 0.001,
) -> bool:
    """
    Instructs the pick-and-place robot arm to pick a specific object and place it using its gripper.
    The gripper will move to the specified 'pick_coordinate' and pick the named object. It will then move to the
    specified 'place_coordinate' and place the object there. If you need to pick-and-place an object, call this
    function and not robot_pick_object() followed by robot_place_object().

    Example call:

    robot.pick_place_object(
        object_name='chocolate bar',
        pick_coordinate=[-0.1, 0.01],
        place_coordinate=[0.1, 0.11],
        location=Location.RIGHT_NEXT_TO
    )
    --> Picks the chocolate bar that is located at world coordinates [-0.1, 0.01] and places it right next to an
    object that exists at world coordinate [0.1, 0.11].

    robot.pick_place_object(
        object_name='cube',
        pick_coordinate=[0.2, 0.05],
        place_coordinate=[0.3, 0.1],
        location=Location.ON_TOP_OF,
        z_offset=0.02
    )
    --> Picks the cube with a 2cm z-offset (useful if it's on top of another object).

    Args:
        object_name (str): The name of the object to be picked up. Ensure this name matches an object visible in
        the robot's workspace.
        pick_coordinate (List): The world coordinates [x, y] where the object should be picked up. Use these
        coordinates to identify the object's exact position.
        place_coordinate (List): The world coordinates [x, y] where the object should be placed at.
        location: Specifies the relative placement position of the picked object with respect to an object
            being at the 'place_coordinate'. Possible values are defined in the `Location` Enum:
            `Location.LEFT_NEXT_TO`, `Location.RIGHT_NEXT_TO`, `Location.ABOVE`, `Location.BELOW`,
            `Location.ON_TOP_OF`, `Location.INSIDE`, `Location.NONE`.
        z_offset (float): Additional height offset in meters to apply when picking (default: 0.001).
        Useful for picking objects that are stacked on top of other objects.

    Returns:
        bool: True if successful
    """
    success = self.pick_object(object_name, pick_coordinate, z_offset=z_offset)

    if success:
        place_success = self.place_object(place_coordinate, location)
        return place_success
    else:
        return False

pick_place_object_across_workspaces(object_name, pick_workspace_id, pick_coordinate, place_workspace_id, place_coordinate, location=None, z_offset=0.001)

Pick an object from one workspace and place it in another workspace.

Parameters:

Name Type Description Default
object_name str

Name of the object to pick

required
pick_workspace_id str

ID of the workspace to pick from

required
pick_coordinate List[float]

[x, y] coordinate in pick workspace

required
place_workspace_id str

ID of the workspace to place in

required
place_coordinate List[float]

[x, y] coordinate in place workspace

required
location Union['Location', str, None]

Relative placement location (Location enum or string)

None
z_offset float

Additional height offset in meters when picking (default: 0.001)

0.001

Returns:

Name Type Description
bool bool

True if successful, False otherwise

Example

robot.pick_place_object_across_workspaces( object_name='cube', pick_workspace_id='niryo_ws_left', pick_coordinate=[0.2, 0.05], place_workspace_id='niryo_ws_right', place_coordinate=[0.25, -0.05], location=Location.RIGHT_NEXT_TO, z_offset=0.02 )

Source code in robot_environment/robot/robot.py
def pick_place_object_across_workspaces(
    self,
    object_name: str,
    pick_workspace_id: str,
    pick_coordinate: List[float],
    place_workspace_id: str,
    place_coordinate: List[float],
    location: Union["Location", str, None] = None,
    z_offset: float = 0.001,
) -> bool:
    """
    Pick an object from one workspace and place it in another workspace.

    Args:
        object_name: Name of the object to pick
        pick_workspace_id: ID of the workspace to pick from
        pick_coordinate: [x, y] coordinate in pick workspace
        place_workspace_id: ID of the workspace to place in
        place_coordinate: [x, y] coordinate in place workspace
        location: Relative placement location (Location enum or string)
        z_offset: Additional height offset in meters when picking (default: 0.001)

    Returns:
        bool: True if successful, False otherwise

    Example:
        robot.pick_place_object_across_workspaces(
            object_name='cube',
            pick_workspace_id='niryo_ws_left',
            pick_coordinate=[0.2, 0.05],
            place_workspace_id='niryo_ws_right',
            place_coordinate=[0.25, -0.05],
            location=Location.RIGHT_NEXT_TO,
            z_offset=0.02
        )
    """
    self._logger.debug(f"Multi-workspace operation: {object_name}")
    self._logger.debug(f"  Pick from: {pick_workspace_id} at {pick_coordinate}")
    self._logger.debug(f"  Place in: {place_workspace_id} at {place_coordinate}")

    # Step 1: Move to pick workspace observation pose
    self.move2observation_pose(pick_workspace_id)
    self.environment()._current_workspace_id = pick_workspace_id

    # Step 2: Pick the object
    success = self.pick_object_from_workspace(object_name, pick_workspace_id, pick_coordinate, z_offset=z_offset)

    if not success:
        self._logger.error(f"Failed to pick {object_name} from {pick_workspace_id}")
        return False

    # Step 3: Move to place workspace observation pose
    self.move2observation_pose(place_workspace_id)
    self.environment()._current_workspace_id = place_workspace_id

    # Step 4: Place the object
    place_success = self.place_object_in_workspace(place_workspace_id, place_coordinate, location)

    if place_success:
        # Update memory: remove from source, add to target
        self.environment().update_object_in_workspace(
            source_workspace_id=pick_workspace_id,
            target_workspace_id=place_workspace_id,
            object_label=object_name,
            old_coordinate=pick_coordinate,
            new_coordinate=place_coordinate,
        )

        self._logger.info(f"Successfully moved {object_name} from {pick_workspace_id} to {place_workspace_id}")

    return place_success

place_object(place_coordinate, location=None)

Instruct the pick-and-place robot arm to place a picked object at the specified 'place_coordinate'. The function moves the gripper to the specified 'place_coordinate' and calculates the exact placement position from the given 'location'. Before calling this function you have to call robot_pick_object() to pick an object.

Example call:

robot.place_object([0.2, 0.0], "left next to") --> Places the already gripped object left next to the world coordinate [0.2, 0.0].

Parameters:

Name Type Description Default
place_coordinate List[float]

The world coordinates [x, y] of the target object.

required
location Union['Location', str, None]

Specifies the relative placement position of the picked object in relation to an object being at the 'place_coordinate'. Possible positions: 'left next to', 'right next to', 'above', 'below', 'on top of', 'inside', or None.

None
Source code in robot_environment/robot/robot.py
@log_start_end_cls()
def place_object(self, place_coordinate: List[float], location: Union["Location", str, None] = None) -> bool:
    """
    Instruct the pick-and-place robot arm to place a picked object at the specified 'place_coordinate'. The
    function moves the gripper to the specified 'place_coordinate' and calculates the exact placement position from
    the given 'location'. Before calling this function you have to call robot_pick_object() to pick an object.

    Example call:

    robot.place_object([0.2, 0.0], "left next to")
    --> Places the already gripped object left next to the world coordinate [0.2, 0.0].

    Args:
        place_coordinate: The world coordinates [x, y] of the target object.
        location: Specifies the relative placement position of the picked object in relation to an object
            being at the 'place_coordinate'. Possible positions: 'left next to', 'right next to', 'above', 'below',
            'on top of', 'inside', or None.
    Returns:
        bool: True
    """
    location = Location.convert_str2location(location)

    if self._object_last_picked:
        old_coordinate = [self._object_last_picked.x_com(), self._object_last_picked.y_com()]
        message = (
            f"Going to place {self._object_last_picked.label()} {location} coordinate ["
            f"{place_coordinate[0]:.2f}, {place_coordinate[1]:.2f}]."
        )
    else:
        old_coordinate = None
        message = f"Going to place it {location} coordinate [{place_coordinate[0]:.2f}, {place_coordinate[1]:.2f}]."

    self._logger.info(message)

    self.environment().oralcom_call_text2speech_async(message, priority=8)
    obj_where_to_place = None

    if location is not None and location is not Location.NONE:
        obj_where_to_place = self._get_nearest_object(None, place_coordinate)
        if obj_where_to_place is None:
            place_pose = PoseObjectPNP(place_coordinate[0], place_coordinate[1], 0.09, 0.0, 1.57, 0.0)
        else:
            place_pose = obj_where_to_place.pose_center()
    else:
        place_pose = PoseObjectPNP(place_coordinate[0], place_coordinate[1], 0.09, 0.0, 1.57, 0.0)
        self._logger.debug(f"place_object: {place_pose}")

    x_off = 0.02
    y_off = 0.02

    if self._object_last_picked:
        x_off += self._object_last_picked.height_m() / 2
        y_off += self._object_last_picked.width_m() / 2

    if place_pose:
        # TODO: use height of object instead
        if location == Location.ON_TOP_OF:
            place_pose.z += 0.02
        elif location == Location.INSIDE:
            place_pose.z += 0.01
        elif location == Location.RIGHT_NEXT_TO and obj_where_to_place:
            place_pose.y -= obj_where_to_place.width_m() / 2 + y_off
        elif location == Location.LEFT_NEXT_TO and obj_where_to_place:
            place_pose.y += obj_where_to_place.width_m() / 2 + y_off
        elif location == Location.BELOW and obj_where_to_place:
            # print(obj_where_to_place.height_m(), self._object_last_picked.width_m(), x_off)
            # TODO: nutze hier auch width, da width immer die größere größe ist und nicht eine koordinatenrichtugn hat
            #  ich muss anstatt width und height eine größe haben dim_x und dim_y, die a x und y koordinate gebunden sind
            #  ich habe das in object klasse repariert, width geht immer entlang y-achse jetzt. prüfen hier
            place_pose.x -= obj_where_to_place.height_m() / 2 + x_off
        elif location == Location.ABOVE and obj_where_to_place:
            # TODO: nutze hier auch width, da width immer die größere größe ist und nicht eine koordinatenrichtugn hat
            #  ich habe das in object klasse repariert, width geht immer entlang y-achse jetzt. prüfen hier
            print(obj_where_to_place.height_m(), self._object_last_picked.width_m(), x_off)
            place_pose.x += obj_where_to_place.height_m() / 2 + x_off
            self._logger.debug(f"{place_pose}")
        elif location is Location.NONE or location is None:
            pass  # I do not have to do anything as the given location is where to place the object
        else:
            self._logger.error(f"Unknown location: {location} (type: {type(location)})")

        success = self._robot.robot_place_object(place_pose)

        # update position of placed object to the new position
        # Update memory after successful placement
        if success and self._object_last_picked and old_coordinate:
            final_coordinate = [place_pose.x, place_pose.y]
            self._logger.debug(f"final_coordinate: {final_coordinate}")

            self.environment().update_object_in_memory(
                self._object_last_picked.label(), old_coordinate, new_pose=place_pose
            )

            # Give the memory system a moment to register the update
            import time

            time.sleep(0.1)
    else:
        success = False

    self._object_last_picked = None

    # thread_oral.join()

    return success

place_object_in_workspace(workspace_id, place_coordinate, location=None)

Place a picked object in a specific workspace.

Parameters:

Name Type Description Default
workspace_id str

ID of the target workspace

required
place_coordinate List[float]

[x, y] coordinate in workspace

required
location Union['Location', str, None]

Relative placement location

None

Returns:

Name Type Description
bool bool

True if successful

Source code in robot_environment/robot/robot.py
def place_object_in_workspace(
    self, workspace_id: str, place_coordinate: List[float], location: Union["Location", str, None] = None
) -> bool:
    """
    Place a picked object in a specific workspace.

    Args:
        workspace_id: ID of the target workspace
        place_coordinate: [x, y] coordinate in workspace
        location: Relative placement location

    Returns:
        bool: True if successful
    """
    location = Location.convert_str2location(location)

    if self._object_last_picked:
        message = (
            f"Placing {self._object_last_picked.label()} in workspace "
            f"{workspace_id} {location} coordinate "
            f"[{place_coordinate[0]:.2f}, {place_coordinate[1]:.2f}]."
        )
    else:
        message = (
            f"Placing object in workspace {workspace_id} {location} "
            f"coordinate [{place_coordinate[0]:.2f}, {place_coordinate[1]:.2f}]."
        )

    self._logger.info(message)
    self.environment().oralcom_call_text2speech_async(message, priority=8)

    # Get workspace for coordinate transformation
    workspace = self.environment().get_workspace_by_id(workspace_id)
    if workspace is None:
        self._logger.error(f"Workspace {workspace_id} not found")
        # thread_oral.join()
        return False

    # Find reference object in target workspace if location specified
    obj_where_to_place = None
    if location is not None and location is not Location.NONE:
        obj_where_to_place = self._get_nearest_object_in_workspace(None, workspace_id, place_coordinate)

        if obj_where_to_place is None:
            place_pose = PoseObjectPNP(place_coordinate[0], place_coordinate[1], 0.09, 0.0, 1.57, 0.0)
        else:
            place_pose = obj_where_to_place.pose_center()
    else:
        place_pose = PoseObjectPNP(place_coordinate[0], place_coordinate[1], 0.09, 0.0, 1.57, 0.0)

    # Calculate placement offset based on location
    if place_pose and obj_where_to_place:
        x_off = 0.02
        y_off = 0.02

        if self._object_last_picked:
            x_off += self._object_last_picked.height_m() / 2
            y_off += self._object_last_picked.width_m() / 2

        if location == Location.ON_TOP_OF:
            place_pose.z += 0.02
        elif location == Location.INSIDE:
            place_pose.z += 0.01
        elif location == Location.RIGHT_NEXT_TO:
            place_pose.y -= obj_where_to_place.width_m() / 2 + y_off
        elif location == Location.LEFT_NEXT_TO:
            place_pose.y += obj_where_to_place.width_m() / 2 + y_off
        elif location == Location.BELOW:
            place_pose.x -= obj_where_to_place.height_m() / 2 + x_off
        elif location == Location.ABOVE:
            place_pose.x += obj_where_to_place.height_m() / 2 + x_off

    success = self._robot.robot_place_object(place_pose)

    # Clear last picked object
    self._object_last_picked = None
    if "_object_source_workspace" in self.__dict__:
        del self._object_source_workspace

    # thread_oral.join()
    return success

push_object(object_name, push_coordinate, direction, distance)

Instruct the pick-and-place robot arm to push a specific object to a new position. This function should only be called if it is not possible to pick the object. An object cannot be picked if its shorter side is larger than the gripper.

Parameters:

Name Type Description Default
object_name str

The name of the object to be pushed.

required
push_coordinate List[float]

The world coordinates [x, y] where the object to push is located.

required
direction str

The direction in which to push the object ("up", "down", "left", "right").

required
distance float

The distance (in millimeters) to push the object.

required

Returns:

Name Type Description
bool bool

True

Source code in robot_environment/robot/robot.py
@log_start_end_cls()
def push_object(self, object_name: str, push_coordinate: List[float], direction: str, distance: float) -> bool:
    """
    Instruct the pick-and-place robot arm to push a specific object to a new position.
    This function should only be called if it is not possible to pick the object.
    An object cannot be picked if its shorter side is larger than the gripper.

    Args:
        object_name: The name of the object to be pushed.
        push_coordinate: The world coordinates [x, y] where the object to push is located.
        direction: The direction in which to push the object ("up", "down", "left", "right").
        distance: The distance (in millimeters) to push the object.

    Returns:
        bool: True
    """
    message = f"Calling push with {object_name} and {direction}"
    self._logger.info(message)

    self.environment().oralcom_call_text2speech_async(message, priority=8)

    obj_to_push = self._get_nearest_object(object_name, push_coordinate)

    push_pose = obj_to_push.pose_com()

    # it is certainly better when pushing up to move under the object with a closed gripper so we can
    #  actually push up. same for the other directions.
    if direction == "up":
        push_pose.x -= obj_to_push.height_m() / 2.0
        # gripper 90° rotated. TODO: I have to test these orientations
        push_pose.yaw = math.pi / 2.0
    elif direction == "down":
        push_pose.x += obj_to_push.height_m() / 2.0
        # gripper 90° rotated. TODO: I have to test these orientations
        push_pose.yaw = math.pi / 2.0
    elif direction == "left":
        push_pose.y += obj_to_push.width_m() / 2.0
        # gripper 0° rotated. TODO: I have to test these orientations
        push_pose.yaw = 0.0
    elif direction == "right":
        push_pose.y -= obj_to_push.width_m() / 2.0
        # gripper 0° rotated. TODO: I have to test these orientations
        push_pose.yaw = 0.0
    else:
        self._logger.error(f"Unknown direction: {direction}")

    if obj_to_push is not None:
        success = self._robot.robot_push_object(push_pose, direction, distance)
    else:
        success = False

    # thread_oral.join()

    return success

robot()

Returns:

Name Type Description
RobotController RobotController

object that controls the robot.

Source code in robot_environment/robot/robot.py
def robot(self) -> RobotController:
    """
    Returns:
        RobotController: object that controls the robot.
    """
    return self._robot

robot_in_motion()

Check if the robot is currently in motion.

Returns:

Name Type Description
bool bool

True if the robot is in motion, False otherwise.

Source code in robot_environment/robot/robot.py
def robot_in_motion(self) -> bool:
    """
    Check if the robot is currently in motion.

    Returns:
        bool: True if the robot is in motion, False otherwise.
    """
    return self._robot.is_in_motion()

verbose()

Returns the verbosity status.

Returns:

Name Type Description
bool bool

True if verbose is on, else False.

Source code in robot_environment/robot/robot.py
def verbose(self) -> bool:
    """
    Returns the verbosity status.

    Returns:
        bool: True if verbose is on, else False.
    """
    return self._verbose

Controllers

RobotController

robot_environment.robot.robot_controller.RobotController

Bases: ABC

An abstract class for the pick-and-place robot that provides the primitive tasks of the robot like pick and place operations.

Source code in robot_environment/robot/robot_controller.py
class RobotController(ABC):
    """
    An abstract class for the pick-and-place robot that provides the primitive tasks of the robot like
    pick and place operations.

    """

    # *** CONSTRUCTORS ***
    def __init__(self, robot: "Robot", use_simulation: bool, verbose: bool = False):
        """
        Initializes the robot (connects to it and calibrates it).

        Args:
            robot: object of the Robot class.
            use_simulation: True, if working with a simulation model of the robot,
            else False if we work with a real robot.
            verbose:
        """
        super().__init__()

        # Initialize the asyncio lock
        self._lock = threading.Lock()  # asyncio.Lock()

        self._verbose = verbose
        self._robot = robot
        self._in_motion = False

        self._init_robot(use_simulation)

    # Deleting (Calling destructor)
    def __del__(self):
        """
        Destructor for the RobotController.
        """
        pass

    # *** PUBLIC SET methods ***

    # *** PUBLIC GET methods ***

    @abstractmethod
    def get_pose(self) -> "PoseObjectPNP":
        """
        Get current pose of gripper of robot.

        Returns:
            current pose of gripper of robot.
        """
        pass

    # *** PUBLIC methods ***

    @abstractmethod
    def calibrate(self) -> bool:
        """
        Calibrates the Robot.

        Returns:
            True, if calibration was successful, else False
        """
        pass

    # TODO: also possible to only pass PoseObject of the object. The advantage of passing Object might be
    #  that an object has more then one pick position. then robot can try and pick at a few positions.
    # @abstractmethod
    # def robot_pick_object(self, obj2pick: "Object") -> bool:
    #     """
    #     Calls the pick command of the self._robot_ctrl to pick the given Object
    #
    #     Args:
    #         obj2pick: Object that shall be picked
    #
    #     Returns:
    #         True, if pick was successful, else False
    #     """
    #     return False
    @abstractmethod
    def robot_pick_object(self, pick_pose: "PoseObjectPNP") -> bool:
        """
        Calls the pick command of the self._robot_ctrl to pick an object at the given pose.

        Args:
            pick_pose: Pose where the object should be picked (includes z-offset if needed)

        Returns:
            True, if pick was successful, else False
        """
        return False

    @abstractmethod
    def robot_place_object(self, place_pose: "PoseObjectPNP") -> bool:
        """
        Places an already picked object at the given place_pose.

        Args:
            place_pose: Pose where to place the already picked object

        Returns:
            True, if place was successful, else False
        """
        return False

    @abstractmethod
    def robot_push_object(self, push_pose: "PoseObjectPNP", direction: str, distance: float) -> bool:
        """
        Push given object (its Pose) into the given direction by the given distance.

        Args:
            push_pose: the Pose of the object that should be pushed.
            direction: "up", "down", "left", "right"
            distance: distance in millimeters

        Returns:
            True, if push was successful, else False
        """
        pass

    @abstractmethod
    def get_target_pose_from_rel(self, workspace_id: str, u_rel: float, v_rel: float, yaw: float) -> "PoseObjectPNP":
        """
        Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw),
        calculate the corresponding pose in world coordinates. The parameter yaw is useful, if we want to pick at the
        given coordinate an object that has the given orientation. For this method to work, it is important that
        only the workspace of the robot is visible in the image and nothing else. At least for the Niryo robot
        this is important. This means, (u_rel, v_rel) = (0, 0), is the upper left corner of the workspace.

        Args:
            workspace_id: id of the workspace
            u_rel: horizontal coordinate in image of workspace, normalized between 0 and 1
            v_rel: vertical coordinate in image of workspace, normalized between 0 and 1
            yaw: orientation of an object at the pixel coordinates [u_rel, v_rel].

        Returns:
            pose_object: Pose of the point in world coordinates of the robot.
        """
        pass

    @abstractmethod
    def move2observation_pose(self, workspace_id: str) -> None:
        """
        The robot should move to a pose where it can observe the workspace given by workspace_id.

        Args:
            workspace_id: id of the workspace
        """
        pass

    # *** PUBLIC STATIC/CLASS GET methods ***

    # *** PRIVATE methods ***

    @abstractmethod
    def _init_robot(self, use_simulation: bool) -> bool:
        """
        Code to initialize the robot. After calling this method the robot should be ready to receive commands.

        Args:
            use_simulation: True, if working with a simulation model of the robot,
            else False if we work with a real robot.

        Returns:
            bool: True, if initialization was successful, else False
        """
        pass

    def _set_in_motion(self, in_motion: bool):
        """Set the robot motion state."""
        self._in_motion = in_motion
        # if hasattr(self._robot, "_robot_in_motion"):
        #     self._robot._robot_in_motion = in_motion

    # *** PUBLIC properties ***

    def is_in_motion(self) -> bool:
        """Check if robot is currently in motion."""
        return self._in_motion

    def robot_ctrl(self):
        """

        Returns:
            the object of the underlying robot. For the Niryo Ned2 it is an object of the NiryoRobot class.
        """
        return self._robot_ctrl

    def robot(self) -> "Robot":
        """
        Returns the robot object.

        Returns:
            Robot: The robot instance.
        """
        return self._robot

    def lock(self) -> threading.Lock:
        """

        Returns:
            a lock, to only call the robot interface with one method and not many methods in parallel, because for Niryo
            the interface is not thread safe.
        """
        return self._lock

    def verbose(self) -> bool:
        """

        Returns: True, if verbose is on, else False

        """
        return self._verbose

    # *** PRIVATE variables ***

    # the object of the underlying robot. For the Niryo Ned2 it is an object of the NiryoRobot class.
    _robot_ctrl = None

    # object of the Robot class
    _robot = None

    # a lock, to only call the robot interface with one method and not many methods in parallel, because for Niryo
    # the interface is not thread safe.
    _lock = None

    _verbose = False

Functions

__del__()

Destructor for the RobotController.

Source code in robot_environment/robot/robot_controller.py
def __del__(self):
    """
    Destructor for the RobotController.
    """
    pass

__init__(robot, use_simulation, verbose=False)

Initializes the robot (connects to it and calibrates it).

Parameters:

Name Type Description Default
robot Robot

object of the Robot class.

required
use_simulation bool

True, if working with a simulation model of the robot,

required
verbose bool
False
Source code in robot_environment/robot/robot_controller.py
def __init__(self, robot: "Robot", use_simulation: bool, verbose: bool = False):
    """
    Initializes the robot (connects to it and calibrates it).

    Args:
        robot: object of the Robot class.
        use_simulation: True, if working with a simulation model of the robot,
        else False if we work with a real robot.
        verbose:
    """
    super().__init__()

    # Initialize the asyncio lock
    self._lock = threading.Lock()  # asyncio.Lock()

    self._verbose = verbose
    self._robot = robot
    self._in_motion = False

    self._init_robot(use_simulation)

calibrate() abstractmethod

Calibrates the Robot.

Returns:

Type Description
bool

True, if calibration was successful, else False

Source code in robot_environment/robot/robot_controller.py
@abstractmethod
def calibrate(self) -> bool:
    """
    Calibrates the Robot.

    Returns:
        True, if calibration was successful, else False
    """
    pass

get_pose() abstractmethod

Get current pose of gripper of robot.

Returns:

Type Description
PoseObjectPNP

current pose of gripper of robot.

Source code in robot_environment/robot/robot_controller.py
@abstractmethod
def get_pose(self) -> "PoseObjectPNP":
    """
    Get current pose of gripper of robot.

    Returns:
        current pose of gripper of robot.
    """
    pass

get_target_pose_from_rel(workspace_id, u_rel, v_rel, yaw) abstractmethod

Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw), calculate the corresponding pose in world coordinates. The parameter yaw is useful, if we want to pick at the given coordinate an object that has the given orientation. For this method to work, it is important that only the workspace of the robot is visible in the image and nothing else. At least for the Niryo robot this is important. This means, (u_rel, v_rel) = (0, 0), is the upper left corner of the workspace.

Parameters:

Name Type Description Default
workspace_id str

id of the workspace

required
u_rel float

horizontal coordinate in image of workspace, normalized between 0 and 1

required
v_rel float

vertical coordinate in image of workspace, normalized between 0 and 1

required
yaw float

orientation of an object at the pixel coordinates [u_rel, v_rel].

required

Returns:

Name Type Description
pose_object PoseObjectPNP

Pose of the point in world coordinates of the robot.

Source code in robot_environment/robot/robot_controller.py
@abstractmethod
def get_target_pose_from_rel(self, workspace_id: str, u_rel: float, v_rel: float, yaw: float) -> "PoseObjectPNP":
    """
    Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw),
    calculate the corresponding pose in world coordinates. The parameter yaw is useful, if we want to pick at the
    given coordinate an object that has the given orientation. For this method to work, it is important that
    only the workspace of the robot is visible in the image and nothing else. At least for the Niryo robot
    this is important. This means, (u_rel, v_rel) = (0, 0), is the upper left corner of the workspace.

    Args:
        workspace_id: id of the workspace
        u_rel: horizontal coordinate in image of workspace, normalized between 0 and 1
        v_rel: vertical coordinate in image of workspace, normalized between 0 and 1
        yaw: orientation of an object at the pixel coordinates [u_rel, v_rel].

    Returns:
        pose_object: Pose of the point in world coordinates of the robot.
    """
    pass

is_in_motion()

Check if robot is currently in motion.

Source code in robot_environment/robot/robot_controller.py
def is_in_motion(self) -> bool:
    """Check if robot is currently in motion."""
    return self._in_motion

lock()

Returns:

Type Description
Lock

a lock, to only call the robot interface with one method and not many methods in parallel, because for Niryo

Lock

the interface is not thread safe.

Source code in robot_environment/robot/robot_controller.py
def lock(self) -> threading.Lock:
    """

    Returns:
        a lock, to only call the robot interface with one method and not many methods in parallel, because for Niryo
        the interface is not thread safe.
    """
    return self._lock

move2observation_pose(workspace_id) abstractmethod

The robot should move to a pose where it can observe the workspace given by workspace_id.

Parameters:

Name Type Description Default
workspace_id str

id of the workspace

required
Source code in robot_environment/robot/robot_controller.py
@abstractmethod
def move2observation_pose(self, workspace_id: str) -> None:
    """
    The robot should move to a pose where it can observe the workspace given by workspace_id.

    Args:
        workspace_id: id of the workspace
    """
    pass

robot()

Returns the robot object.

Returns:

Name Type Description
Robot Robot

The robot instance.

Source code in robot_environment/robot/robot_controller.py
def robot(self) -> "Robot":
    """
    Returns the robot object.

    Returns:
        Robot: The robot instance.
    """
    return self._robot

robot_ctrl()

Returns:

Type Description

the object of the underlying robot. For the Niryo Ned2 it is an object of the NiryoRobot class.

Source code in robot_environment/robot/robot_controller.py
def robot_ctrl(self):
    """

    Returns:
        the object of the underlying robot. For the Niryo Ned2 it is an object of the NiryoRobot class.
    """
    return self._robot_ctrl

robot_pick_object(pick_pose) abstractmethod

Calls the pick command of the self._robot_ctrl to pick an object at the given pose.

Parameters:

Name Type Description Default
pick_pose PoseObjectPNP

Pose where the object should be picked (includes z-offset if needed)

required

Returns:

Type Description
bool

True, if pick was successful, else False

Source code in robot_environment/robot/robot_controller.py
@abstractmethod
def robot_pick_object(self, pick_pose: "PoseObjectPNP") -> bool:
    """
    Calls the pick command of the self._robot_ctrl to pick an object at the given pose.

    Args:
        pick_pose: Pose where the object should be picked (includes z-offset if needed)

    Returns:
        True, if pick was successful, else False
    """
    return False

robot_place_object(place_pose) abstractmethod

Places an already picked object at the given place_pose.

Parameters:

Name Type Description Default
place_pose PoseObjectPNP

Pose where to place the already picked object

required

Returns:

Type Description
bool

True, if place was successful, else False

Source code in robot_environment/robot/robot_controller.py
@abstractmethod
def robot_place_object(self, place_pose: "PoseObjectPNP") -> bool:
    """
    Places an already picked object at the given place_pose.

    Args:
        place_pose: Pose where to place the already picked object

    Returns:
        True, if place was successful, else False
    """
    return False

robot_push_object(push_pose, direction, distance) abstractmethod

Push given object (its Pose) into the given direction by the given distance.

Parameters:

Name Type Description Default
push_pose PoseObjectPNP

the Pose of the object that should be pushed.

required
direction str

"up", "down", "left", "right"

required
distance float

distance in millimeters

required

Returns:

Type Description
bool

True, if push was successful, else False

Source code in robot_environment/robot/robot_controller.py
@abstractmethod
def robot_push_object(self, push_pose: "PoseObjectPNP", direction: str, distance: float) -> bool:
    """
    Push given object (its Pose) into the given direction by the given distance.

    Args:
        push_pose: the Pose of the object that should be pushed.
        direction: "up", "down", "left", "right"
        distance: distance in millimeters

    Returns:
        True, if push was successful, else False
    """
    pass

verbose()

Source code in robot_environment/robot/robot_controller.py
def verbose(self) -> bool:
    """

    Returns: True, if verbose is on, else False

    """
    return self._verbose

NiryoRobotController

robot_environment.robot.niryo_robot_controller.NiryoRobotController

Bases: RobotController

Class for the pick-and-place niryo ned2 robot that provides the primitive tasks of the robot like pick and place operations.

Source code in robot_environment/robot/niryo_robot_controller.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
class NiryoRobotController(RobotController):
    """
    Class for the pick-and-place niryo ned2 robot that provides the primitive tasks of the robot like
    pick and place operations.
    """

    # *** CONSTRUCTORS ***
    @log_start_end_cls()
    def __init__(self, robot: "Robot", use_simulation: bool, verbose: bool = False):
        """
        Initializes the robot (connects to it and calibrates it).

        Args:
            robot: object of the Robot class.
            use_simulation: True, if working with a simulation model of the robot,
            else False if we work with a real robot.
            verbose:
        """
        self._executor = ThreadPoolExecutor(max_workers=1)
        self._shutdown_v = False
        self._logger = get_package_logger(__name__, verbose)

        super().__init__(robot, use_simulation, verbose)

    # Deleting (Calling destructor)
    def __del__(self):
        super().__del__()

        if hasattr(self, "_executor") and self._executor:
            self._logger.debug("Shutting down ThreadPoolExecutor in destructor...")
            self._executor.shutdown(wait=True)

        self._logger.debug("Destructor called, Robot deleted.")
        with self._lock:
            self._logger.debug("Destructor called, Robot deleted.2")
            self._shutdown()

    def cleanup(self):
        """
        Explicit cleanup method - call this when you're done with the object.
        This is more reliable than relying on __del__.
        """
        if hasattr(self, "_executor") and self._executor:
            self._logger.info("Shutting down ThreadPoolExecutor...")
            self._shutdown_v = True
            self._executor.shutdown(wait=True)
            self._executor = None

    # *** PUBLIC SET methods ***

    # *** PUBLIC GET methods ***

    def get_pose(self) -> PoseObjectPNP:
        """
        Get current pose of gripper of robot.

        Returns:
            current pose of gripper of robot.
        """
        with self._lock:
            if pyniryo_v == "pyniryo2":
                pose = self._robot_ctrl.arm.get_pose()
            else:
                pose = self._robot_ctrl.get_pose()

        return PoseObjectPNP.convert_niryo_pose_object2pose_object(pose)

    def get_camera_intrinsics(self) -> Tuple[np.ndarray, np.ndarray]:
        """
        Get camera intrinsic matrix and distortion coefficients.

        Returns:
            Tuple[np.ndarray, np.ndarray]: (mtx, dist)
        """
        # all calls of methods of the _robot (NiryoRobot) object are locked, because they are not safe thread
        with self._lock:
            if pyniryo_v == "pyniryo2":
                mtx, dist = self._robot_ctrl.vision.get_camera_intrinsics()
            else:
                mtx, dist = self._robot_ctrl.get_camera_intrinsics()

        return mtx, dist

    def get_img_compressed(self) -> np.ndarray:
        """
        Get compressed image from the robot's camera.

        Returns:
            np.ndarray: Compressed image data.
        """
        with self._lock:
            if pyniryo_v == "pyniryo2":
                img_compressed = self._robot_ctrl.vision.get_img_compressed()
            else:
                img_compressed = self._robot_ctrl.get_img_compressed()

        return img_compressed

    # *** PUBLIC methods ***

    def calibrate(self) -> bool:
        """
        Calibrates the NiryoRobot.

        Returns:
            True, if calibration was successful, else False
        """
        self._calibrate_auto()
        return True

    def reset_connection(self) -> None:
        """
        Reset the connection to the robot by safely disconnecting and reconnecting.
        """
        self._logger.info("Resetting the robot connection...")
        try:
            # Attempt to close the connection safely
            if self._robot_ctrl is not None:
                with self._lock:
                    self._shutdown()

        except Exception as e:
            self._logger.error(f"Error while closing connection: {e}", exc_info=True)

        # Reinitialize the connection
        try:
            self._create_robot()
            self._logger.info("Connection successfully reset.")
        except Exception as e:
            self._logger.error(f"Failed to reconnect to the robot: {e}", exc_info=True)
            self._robot = None

    @log_start_end_cls()
    def robot_pick_object(self, pick_pose: "PoseObjectPNP") -> bool:
        """
        Calls the pick command of the self._robot_ctrl to pick the object at the given pose.

        Args:
            pick_pose: Pose where to pick the object (z-offset already applied if needed)

        Returns:
            True, if pick was successful, else False
        """
        # Convert to Niryo format
        pick_pose_niryo = PoseObjectPNP.convert_pose_object2niryo_pose_object(pick_pose)

        self._set_in_motion(True)
        try:
            with self._lock:
                if pyniryo_v == "pyniryo2":
                    self._robot_ctrl.pick_place.pick_from_pose(pick_pose_niryo)
                else:
                    self._robot_ctrl.pick_from_pose(pick_pose_niryo)

            self._logger.info("Finished pick_from_pose")
            return True
            # TODO: in newest version available
            # return not self._robot_ctrl.collision_detected

        except Exception as e:
            self._logger.error(f"Pick failed: {e}")
            return False
        finally:
            self._set_in_motion(False)

    @log_start_end_cls()
    def robot_place_object(self, place_pose: "PoseObjectPNP") -> bool:
        """
        Places an already picked object at the given place_pose.

        Args:
            place_pose: Pose where to place the already picked object

        Returns:
            True, if place was successful, else False
        """
        place_pose = PoseObjectPNP.convert_pose_object2niryo_pose_object(place_pose)
        place_pose = place_pose.copy_with_offsets(z_offset=0.005)

        self._logger.debug(f"Place pose: {place_pose}")

        self._set_in_motion(True)
        try:
            with self._lock:
                if pyniryo_v == "pyniryo2":
                    self._robot_ctrl.pick_place.place_from_pose(place_pose)
                else:
                    self._robot_ctrl.place_from_pose(place_pose)

            return True
            # TODO: in newest version available
            # return not self._robot_ctrl.collision_detected
        except Exception as e:
            self._logger.error(f"Place failed: {e}")
            return False
        finally:
            self._set_in_motion(False)

    @log_start_end_cls()
    def robot_push_object(self, push_pose: "PoseObjectPNP", direction: str, distance: float) -> bool:
        """
        Push given object (its Pose) into the given direction by the given distance.

        Args:
            push_pose: the Pose of the object that should be pushed.
            direction: "up", "down", "left", "right"
            distance: distance in millimeters

        Returns:
            True, if push was successful, else False
        """
        push_pose = PoseObjectPNP.convert_pose_object2niryo_pose_object(push_pose)

        self._logger.debug(f"Push pose: {push_pose}")

        self._set_in_motion(True)
        try:
            with self._lock:
                if pyniryo_v == "pyniryo2":
                    self._robot_ctrl.tool.close_gripper()
                else:
                    self._robot_ctrl.close_gripper()

                self._move_pose(push_pose)

                if direction == "up":
                    self._shift_pose(RobotAxis.X, distance)
                elif direction == "down":
                    self._shift_pose(RobotAxis.X, -distance)
                elif direction == "left":
                    self._shift_pose(RobotAxis.Y, distance)
                elif direction == "right":
                    self._shift_pose(RobotAxis.Y, -distance)
                else:
                    self._logger.error(f"Unknown direction: {direction}")

            return True
            # TODO: in newest version available
            # return not self._robot_ctrl.collision_detected
        except Exception as e:
            self._logger.error(f"Push failed: {e}")
            return False
        finally:
            self._set_in_motion(False)

    def get_target_pose_from_rel(self, workspace_id: str, u_rel: float, v_rel: float, yaw: float) -> "PoseObjectPNP":
        """
        Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw),
        calculate the corresponding pose in world coordinates. The parameter yaw is useful, if we want to pick at the
        given coordinate an object that has the given orientation. For this method to work, it is important that
        only the workspace of the robot is visible in the image and nothing else. At least for the Niryo robot
        this is important. This means, (u_rel, v_rel) = (0, 0), is the upper left corner of the workspace.

        Args:
            workspace_id: id of the workspace
            u_rel: horizontal coordinate in image of workspace, normalized between 0 and 1
            v_rel: vertical coordinate in image of workspace, normalized between 0 and 1
            yaw: orientation of an object at the pixel coordinates [u_rel, v_rel].

        Returns:
            pose_object: Pose of the point in world coordinates of the robot.
        """
        self._logger.debug(f"Thread {threading.current_thread().name}: {workspace_id}, {u_rel}, {v_rel}, {yaw}")

        # Use the asyncio lock for thread-safe access
        with self._lock:
            self._logger.debug(f"Thread {threading.current_thread().name} acquired lock")

            try:
                x_rel = max(0.0, min(u_rel, 1.0))
                y_rel = max(0.0, min(v_rel, 1.0))

                if pyniryo_v == "pyniryo2":
                    obj_coords = self._robot_ctrl.vision.get_target_pose_from_rel(workspace_id, 0.0, x_rel, y_rel, yaw)
                else:
                    obj_coords = self._robot_ctrl.get_target_pose_from_rel(workspace_id, 0.0, x_rel, y_rel, yaw)

                obj_coords = PoseObjectPNP.convert_niryo_pose_object2pose_object(obj_coords)

            except (NiryoRobotException, UnicodeDecodeError, SyntaxError, TcpCommandException) as e:
                if "does not exist" in str(e):
                    self._logger.warning(f"Workspace '{workspace_id}' does not exist on the robot. Returning zero pose.")
                else:
                    self._logger.error(f"Thread {threading.current_thread().name} Error: {e}", exc_info=True)
                obj_coords = PoseObjectPNP(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
            finally:
                self._logger.debug(f"Thread {threading.current_thread().name} releasing lock")

        self._logger.debug(f"Thread {threading.current_thread().name} exiting: {obj_coords}")

        return obj_coords

    def get_target_pose_from_rel_timeout(
        self, workspace_id: str, x_rel: float, y_rel: float, yaw: float, timeout: float = 0.75
    ) -> "PoseObjectPNP":
        """
        Calculate world coordinates from relative image coordinates with a timeout.

        Args:
            workspace_id: ID of the workspace.
            x_rel: Relative x-coordinate (0-1).
            y_rel: Relative y-coordinate (0-1).
            yaw: Orientation in radians.
            timeout: Timeout in seconds.

        Returns:
            PoseObjectPNP: Target pose in world coordinates.
        """
        self._logger.debug(f"Thread {threading.current_thread().name} entering: {workspace_id}, {x_rel}, {y_rel}, {yaw}")

        if not self._lock.acquire(timeout=timeout):
            self._logger.error(f"Thread {threading.current_thread().name} failed to acquire lock within timeout")
            return PoseObject(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)

        try:
            self._logger.debug(f"Thread {threading.current_thread().name} acquired lock")
            future = self._executor.submit(self._robot_ctrl.get_target_pose_from_rel, workspace_id, 0.0, x_rel, y_rel, yaw)

            try:
                obj_coords = future.result(timeout=timeout)
            except FuturesTimeoutError:
                print(f"Thread {threading.current_thread().name} timeout waiting for robot response")
                # TODO: Ich kann nicht einfach die Verbindung resetten, da ja auch an anderen Orten auf den Roboter
                #  in threads zugegriffen wird.
                # self.reset_connection()
                obj_coords = PoseObject(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
                future.cancel()  # Attempt to cancel the task if it is still running

        except (NiryoRobotException, UnicodeDecodeError, SyntaxError, TcpCommandException) as e:
            self._logger.error(f"Thread {threading.current_thread().name} Error: {e}", exc_info=True)
            obj_coords = PoseObject(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
        finally:
            self._logger.debug(f"Thread {threading.current_thread().name} releasing lock")
            self._lock.release()

        self._logger.debug(f"Thread {threading.current_thread().name} exiting: {obj_coords}")
        obj_coords = PoseObjectPNP.convert_niryo_pose_object2pose_object(obj_coords)

        return obj_coords

    @log_start_end_cls()
    def move2observation_pose(self, workspace_id: str) -> None:
        """
        The robot moves to a pose where it can observe the workspace given by workspace_id.

        Args:
            workspace_id: id of the workspace
        """
        observation_pose = self._robot.environment().get_observation_pose(workspace_id)

        if observation_pose is None:
            self._logger.warning(f"observation_pose is None for workspace: {workspace_id}")
            return

        # Only convert if we have a valid pose
        observation_pose = PoseObjectPNP.convert_pose_object2niryo_pose_object(observation_pose)

        self._set_in_motion(True)
        try:
            with self._lock:
                self._move_pose(observation_pose)
        except UnicodeDecodeError as e:
            self._logger.error(f"move2observation_pose error: {e}, pose: {observation_pose}", exc_info=True)
        finally:
            self._set_in_motion(False)

        self._logger.debug(f"move_pose finished, current: {self.get_pose()}, target: {observation_pose}")

    def _shutdown(self) -> None:
        """
        Closes connection to NiryoRobot.
        """
        if pyniryo_v == "pyniryo2":
            # End Robot Connection
            self._robot_ctrl.end()
        else:
            self._robot_ctrl.close_connection()

    def _shift_pose(self, axis: RobotAxis, distance: float) -> None:
        """
        Shifts the gripper along the given axis for the specified distance.

        Args:
            axis (RobotAxis): axis of the robot to shift the gripper along
            distance: distance in meters to shift the gripper along
        """
        if pyniryo_v == "pyniryo2":
            self._robot_ctrl.arm.shift_pose(axis, distance / 1000)
        else:
            self._robot_ctrl.shift_pose(axis, distance / 1000)

    def _move_pose(self, pose: PoseObject) -> None:
        """
        Move gripper of robot to given pose.

        Args:
            pose (PoseObject): pose of gripper
        """
        if pyniryo_v == "pyniryo2":
            self._robot_ctrl.arm.move_pose(pose)
        else:
            self._robot_ctrl.move_pose(pose)

    @log_start_end_cls()
    def _create_robot(self) -> None:
        """
        Creates the NiryoRobot object and calibrates the robot.
        """
        with self._lock:
            self._robot_ctrl = NiryoRobot(self._robot_ip_address)
        self._calibrate_auto()

    def _calibrate_auto(self) -> None:
        """
        Calibrates the NiryoRobot.
        """
        with self._lock:
            if pyniryo_v == "pyniryo2":
                self._robot_ctrl.tool.update_tool()
                self._robot_ctrl.arm.calibrate_auto()
            else:
                self._robot_ctrl.update_tool()
                self._robot_ctrl.calibrate_auto()

    @log_start_end_cls()
    def _init_robot(self, use_simulation: bool) -> bool:
        """
        Creates the NiryoRobot object and connects to it.

        Args:
            use_simulation:

        Returns:

        """
        # Connect to Niryo Robot
        if not use_simulation:
            robot_ip_address = "192.168.0.140"
        else:
            robot_ip_address = "192.168.247.128"

        self._robot_ip_address = robot_ip_address

        self._create_robot()

        return True

    # *** PUBLIC properties ***

    # *** PRIVATE variables ***

    # ip address of the robot
    _robot_ip_address = ""
    _logger = None

Functions

__init__(robot, use_simulation, verbose=False)

Initializes the robot (connects to it and calibrates it).

Parameters:

Name Type Description Default
robot 'Robot'

object of the Robot class.

required
use_simulation bool

True, if working with a simulation model of the robot,

required
verbose bool
False
Source code in robot_environment/robot/niryo_robot_controller.py
@log_start_end_cls()
def __init__(self, robot: "Robot", use_simulation: bool, verbose: bool = False):
    """
    Initializes the robot (connects to it and calibrates it).

    Args:
        robot: object of the Robot class.
        use_simulation: True, if working with a simulation model of the robot,
        else False if we work with a real robot.
        verbose:
    """
    self._executor = ThreadPoolExecutor(max_workers=1)
    self._shutdown_v = False
    self._logger = get_package_logger(__name__, verbose)

    super().__init__(robot, use_simulation, verbose)

calibrate()

Calibrates the NiryoRobot.

Returns:

Type Description
bool

True, if calibration was successful, else False

Source code in robot_environment/robot/niryo_robot_controller.py
def calibrate(self) -> bool:
    """
    Calibrates the NiryoRobot.

    Returns:
        True, if calibration was successful, else False
    """
    self._calibrate_auto()
    return True

cleanup()

Explicit cleanup method - call this when you're done with the object. This is more reliable than relying on del.

Source code in robot_environment/robot/niryo_robot_controller.py
def cleanup(self):
    """
    Explicit cleanup method - call this when you're done with the object.
    This is more reliable than relying on __del__.
    """
    if hasattr(self, "_executor") and self._executor:
        self._logger.info("Shutting down ThreadPoolExecutor...")
        self._shutdown_v = True
        self._executor.shutdown(wait=True)
        self._executor = None

get_camera_intrinsics()

Get camera intrinsic matrix and distortion coefficients.

Returns:

Type Description
Tuple[ndarray, ndarray]

Tuple[np.ndarray, np.ndarray]: (mtx, dist)

Source code in robot_environment/robot/niryo_robot_controller.py
def get_camera_intrinsics(self) -> Tuple[np.ndarray, np.ndarray]:
    """
    Get camera intrinsic matrix and distortion coefficients.

    Returns:
        Tuple[np.ndarray, np.ndarray]: (mtx, dist)
    """
    # all calls of methods of the _robot (NiryoRobot) object are locked, because they are not safe thread
    with self._lock:
        if pyniryo_v == "pyniryo2":
            mtx, dist = self._robot_ctrl.vision.get_camera_intrinsics()
        else:
            mtx, dist = self._robot_ctrl.get_camera_intrinsics()

    return mtx, dist

get_img_compressed()

Get compressed image from the robot's camera.

Returns:

Type Description
ndarray

np.ndarray: Compressed image data.

Source code in robot_environment/robot/niryo_robot_controller.py
def get_img_compressed(self) -> np.ndarray:
    """
    Get compressed image from the robot's camera.

    Returns:
        np.ndarray: Compressed image data.
    """
    with self._lock:
        if pyniryo_v == "pyniryo2":
            img_compressed = self._robot_ctrl.vision.get_img_compressed()
        else:
            img_compressed = self._robot_ctrl.get_img_compressed()

    return img_compressed

get_pose()

Get current pose of gripper of robot.

Returns:

Type Description
PoseObjectPNP

current pose of gripper of robot.

Source code in robot_environment/robot/niryo_robot_controller.py
def get_pose(self) -> PoseObjectPNP:
    """
    Get current pose of gripper of robot.

    Returns:
        current pose of gripper of robot.
    """
    with self._lock:
        if pyniryo_v == "pyniryo2":
            pose = self._robot_ctrl.arm.get_pose()
        else:
            pose = self._robot_ctrl.get_pose()

    return PoseObjectPNP.convert_niryo_pose_object2pose_object(pose)

get_target_pose_from_rel(workspace_id, u_rel, v_rel, yaw)

Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw), calculate the corresponding pose in world coordinates. The parameter yaw is useful, if we want to pick at the given coordinate an object that has the given orientation. For this method to work, it is important that only the workspace of the robot is visible in the image and nothing else. At least for the Niryo robot this is important. This means, (u_rel, v_rel) = (0, 0), is the upper left corner of the workspace.

Parameters:

Name Type Description Default
workspace_id str

id of the workspace

required
u_rel float

horizontal coordinate in image of workspace, normalized between 0 and 1

required
v_rel float

vertical coordinate in image of workspace, normalized between 0 and 1

required
yaw float

orientation of an object at the pixel coordinates [u_rel, v_rel].

required

Returns:

Name Type Description
pose_object 'PoseObjectPNP'

Pose of the point in world coordinates of the robot.

Source code in robot_environment/robot/niryo_robot_controller.py
def get_target_pose_from_rel(self, workspace_id: str, u_rel: float, v_rel: float, yaw: float) -> "PoseObjectPNP":
    """
    Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw),
    calculate the corresponding pose in world coordinates. The parameter yaw is useful, if we want to pick at the
    given coordinate an object that has the given orientation. For this method to work, it is important that
    only the workspace of the robot is visible in the image and nothing else. At least for the Niryo robot
    this is important. This means, (u_rel, v_rel) = (0, 0), is the upper left corner of the workspace.

    Args:
        workspace_id: id of the workspace
        u_rel: horizontal coordinate in image of workspace, normalized between 0 and 1
        v_rel: vertical coordinate in image of workspace, normalized between 0 and 1
        yaw: orientation of an object at the pixel coordinates [u_rel, v_rel].

    Returns:
        pose_object: Pose of the point in world coordinates of the robot.
    """
    self._logger.debug(f"Thread {threading.current_thread().name}: {workspace_id}, {u_rel}, {v_rel}, {yaw}")

    # Use the asyncio lock for thread-safe access
    with self._lock:
        self._logger.debug(f"Thread {threading.current_thread().name} acquired lock")

        try:
            x_rel = max(0.0, min(u_rel, 1.0))
            y_rel = max(0.0, min(v_rel, 1.0))

            if pyniryo_v == "pyniryo2":
                obj_coords = self._robot_ctrl.vision.get_target_pose_from_rel(workspace_id, 0.0, x_rel, y_rel, yaw)
            else:
                obj_coords = self._robot_ctrl.get_target_pose_from_rel(workspace_id, 0.0, x_rel, y_rel, yaw)

            obj_coords = PoseObjectPNP.convert_niryo_pose_object2pose_object(obj_coords)

        except (NiryoRobotException, UnicodeDecodeError, SyntaxError, TcpCommandException) as e:
            if "does not exist" in str(e):
                self._logger.warning(f"Workspace '{workspace_id}' does not exist on the robot. Returning zero pose.")
            else:
                self._logger.error(f"Thread {threading.current_thread().name} Error: {e}", exc_info=True)
            obj_coords = PoseObjectPNP(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
        finally:
            self._logger.debug(f"Thread {threading.current_thread().name} releasing lock")

    self._logger.debug(f"Thread {threading.current_thread().name} exiting: {obj_coords}")

    return obj_coords

get_target_pose_from_rel_timeout(workspace_id, x_rel, y_rel, yaw, timeout=0.75)

Calculate world coordinates from relative image coordinates with a timeout.

Parameters:

Name Type Description Default
workspace_id str

ID of the workspace.

required
x_rel float

Relative x-coordinate (0-1).

required
y_rel float

Relative y-coordinate (0-1).

required
yaw float

Orientation in radians.

required
timeout float

Timeout in seconds.

0.75

Returns:

Name Type Description
PoseObjectPNP 'PoseObjectPNP'

Target pose in world coordinates.

Source code in robot_environment/robot/niryo_robot_controller.py
def get_target_pose_from_rel_timeout(
    self, workspace_id: str, x_rel: float, y_rel: float, yaw: float, timeout: float = 0.75
) -> "PoseObjectPNP":
    """
    Calculate world coordinates from relative image coordinates with a timeout.

    Args:
        workspace_id: ID of the workspace.
        x_rel: Relative x-coordinate (0-1).
        y_rel: Relative y-coordinate (0-1).
        yaw: Orientation in radians.
        timeout: Timeout in seconds.

    Returns:
        PoseObjectPNP: Target pose in world coordinates.
    """
    self._logger.debug(f"Thread {threading.current_thread().name} entering: {workspace_id}, {x_rel}, {y_rel}, {yaw}")

    if not self._lock.acquire(timeout=timeout):
        self._logger.error(f"Thread {threading.current_thread().name} failed to acquire lock within timeout")
        return PoseObject(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)

    try:
        self._logger.debug(f"Thread {threading.current_thread().name} acquired lock")
        future = self._executor.submit(self._robot_ctrl.get_target_pose_from_rel, workspace_id, 0.0, x_rel, y_rel, yaw)

        try:
            obj_coords = future.result(timeout=timeout)
        except FuturesTimeoutError:
            print(f"Thread {threading.current_thread().name} timeout waiting for robot response")
            # TODO: Ich kann nicht einfach die Verbindung resetten, da ja auch an anderen Orten auf den Roboter
            #  in threads zugegriffen wird.
            # self.reset_connection()
            obj_coords = PoseObject(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
            future.cancel()  # Attempt to cancel the task if it is still running

    except (NiryoRobotException, UnicodeDecodeError, SyntaxError, TcpCommandException) as e:
        self._logger.error(f"Thread {threading.current_thread().name} Error: {e}", exc_info=True)
        obj_coords = PoseObject(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
    finally:
        self._logger.debug(f"Thread {threading.current_thread().name} releasing lock")
        self._lock.release()

    self._logger.debug(f"Thread {threading.current_thread().name} exiting: {obj_coords}")
    obj_coords = PoseObjectPNP.convert_niryo_pose_object2pose_object(obj_coords)

    return obj_coords

move2observation_pose(workspace_id)

The robot moves to a pose where it can observe the workspace given by workspace_id.

Parameters:

Name Type Description Default
workspace_id str

id of the workspace

required
Source code in robot_environment/robot/niryo_robot_controller.py
@log_start_end_cls()
def move2observation_pose(self, workspace_id: str) -> None:
    """
    The robot moves to a pose where it can observe the workspace given by workspace_id.

    Args:
        workspace_id: id of the workspace
    """
    observation_pose = self._robot.environment().get_observation_pose(workspace_id)

    if observation_pose is None:
        self._logger.warning(f"observation_pose is None for workspace: {workspace_id}")
        return

    # Only convert if we have a valid pose
    observation_pose = PoseObjectPNP.convert_pose_object2niryo_pose_object(observation_pose)

    self._set_in_motion(True)
    try:
        with self._lock:
            self._move_pose(observation_pose)
    except UnicodeDecodeError as e:
        self._logger.error(f"move2observation_pose error: {e}, pose: {observation_pose}", exc_info=True)
    finally:
        self._set_in_motion(False)

    self._logger.debug(f"move_pose finished, current: {self.get_pose()}, target: {observation_pose}")

reset_connection()

Reset the connection to the robot by safely disconnecting and reconnecting.

Source code in robot_environment/robot/niryo_robot_controller.py
def reset_connection(self) -> None:
    """
    Reset the connection to the robot by safely disconnecting and reconnecting.
    """
    self._logger.info("Resetting the robot connection...")
    try:
        # Attempt to close the connection safely
        if self._robot_ctrl is not None:
            with self._lock:
                self._shutdown()

    except Exception as e:
        self._logger.error(f"Error while closing connection: {e}", exc_info=True)

    # Reinitialize the connection
    try:
        self._create_robot()
        self._logger.info("Connection successfully reset.")
    except Exception as e:
        self._logger.error(f"Failed to reconnect to the robot: {e}", exc_info=True)
        self._robot = None

robot_pick_object(pick_pose)

Calls the pick command of the self._robot_ctrl to pick the object at the given pose.

Parameters:

Name Type Description Default
pick_pose 'PoseObjectPNP'

Pose where to pick the object (z-offset already applied if needed)

required

Returns:

Type Description
bool

True, if pick was successful, else False

Source code in robot_environment/robot/niryo_robot_controller.py
@log_start_end_cls()
def robot_pick_object(self, pick_pose: "PoseObjectPNP") -> bool:
    """
    Calls the pick command of the self._robot_ctrl to pick the object at the given pose.

    Args:
        pick_pose: Pose where to pick the object (z-offset already applied if needed)

    Returns:
        True, if pick was successful, else False
    """
    # Convert to Niryo format
    pick_pose_niryo = PoseObjectPNP.convert_pose_object2niryo_pose_object(pick_pose)

    self._set_in_motion(True)
    try:
        with self._lock:
            if pyniryo_v == "pyniryo2":
                self._robot_ctrl.pick_place.pick_from_pose(pick_pose_niryo)
            else:
                self._robot_ctrl.pick_from_pose(pick_pose_niryo)

        self._logger.info("Finished pick_from_pose")
        return True
        # TODO: in newest version available
        # return not self._robot_ctrl.collision_detected

    except Exception as e:
        self._logger.error(f"Pick failed: {e}")
        return False
    finally:
        self._set_in_motion(False)

robot_place_object(place_pose)

Places an already picked object at the given place_pose.

Parameters:

Name Type Description Default
place_pose 'PoseObjectPNP'

Pose where to place the already picked object

required

Returns:

Type Description
bool

True, if place was successful, else False

Source code in robot_environment/robot/niryo_robot_controller.py
@log_start_end_cls()
def robot_place_object(self, place_pose: "PoseObjectPNP") -> bool:
    """
    Places an already picked object at the given place_pose.

    Args:
        place_pose: Pose where to place the already picked object

    Returns:
        True, if place was successful, else False
    """
    place_pose = PoseObjectPNP.convert_pose_object2niryo_pose_object(place_pose)
    place_pose = place_pose.copy_with_offsets(z_offset=0.005)

    self._logger.debug(f"Place pose: {place_pose}")

    self._set_in_motion(True)
    try:
        with self._lock:
            if pyniryo_v == "pyniryo2":
                self._robot_ctrl.pick_place.place_from_pose(place_pose)
            else:
                self._robot_ctrl.place_from_pose(place_pose)

        return True
        # TODO: in newest version available
        # return not self._robot_ctrl.collision_detected
    except Exception as e:
        self._logger.error(f"Place failed: {e}")
        return False
    finally:
        self._set_in_motion(False)

robot_push_object(push_pose, direction, distance)

Push given object (its Pose) into the given direction by the given distance.

Parameters:

Name Type Description Default
push_pose 'PoseObjectPNP'

the Pose of the object that should be pushed.

required
direction str

"up", "down", "left", "right"

required
distance float

distance in millimeters

required

Returns:

Type Description
bool

True, if push was successful, else False

Source code in robot_environment/robot/niryo_robot_controller.py
@log_start_end_cls()
def robot_push_object(self, push_pose: "PoseObjectPNP", direction: str, distance: float) -> bool:
    """
    Push given object (its Pose) into the given direction by the given distance.

    Args:
        push_pose: the Pose of the object that should be pushed.
        direction: "up", "down", "left", "right"
        distance: distance in millimeters

    Returns:
        True, if push was successful, else False
    """
    push_pose = PoseObjectPNP.convert_pose_object2niryo_pose_object(push_pose)

    self._logger.debug(f"Push pose: {push_pose}")

    self._set_in_motion(True)
    try:
        with self._lock:
            if pyniryo_v == "pyniryo2":
                self._robot_ctrl.tool.close_gripper()
            else:
                self._robot_ctrl.close_gripper()

            self._move_pose(push_pose)

            if direction == "up":
                self._shift_pose(RobotAxis.X, distance)
            elif direction == "down":
                self._shift_pose(RobotAxis.X, -distance)
            elif direction == "left":
                self._shift_pose(RobotAxis.Y, distance)
            elif direction == "right":
                self._shift_pose(RobotAxis.Y, -distance)
            else:
                self._logger.error(f"Unknown direction: {direction}")

        return True
        # TODO: in newest version available
        # return not self._robot_ctrl.collision_detected
    except Exception as e:
        self._logger.error(f"Push failed: {e}")
        return False
    finally:
        self._set_in_motion(False)

WidowXRobotController

robot_environment.robot.widowx_robot_controller.WidowXRobotController

Bases: RobotController

Class for the pick-and-place WidowX robot that provides the primitive tasks of the robot like pick and place operations using the InterbotixManipulatorXS interface.

Source code in robot_environment/robot/widowx_robot_controller.py
class WidowXRobotController(RobotController):
    """
    Class for the pick-and-place WidowX robot that provides the primitive tasks of the robot like
    pick and place operations using the InterbotixManipulatorXS interface.
    """

    # Default pose used when the actual robot pose cannot be determined
    # Typically corresponds to the home position
    DEFAULT_HOME_POSE = PoseObjectPNP(0.3, 0.0, 0.2, 0.0, 1.57, 0.0)

    # *** CONSTRUCTORS ***
    @log_start_end_cls()
    def __init__(self, robot: "Robot", use_simulation: bool, verbose: bool = False):
        """
        Initializes the robot (connects to it and sets up the interface).

        Args:
            robot: object of the Robot class.
            use_simulation: True, if working with a simulation model of the robot,
                else False if we work with a real robot.
            verbose: enable verbose output
        """
        if not INTERBOTIX_AVAILABLE:
            raise ImportError("interbotix_xs_modules is required for WidowX controller")

        super().__init__(robot, use_simulation, verbose)

    # Deleting (Calling destructor)
    def __del__(self):
        super().__del__()
        if hasattr(self, "_robot_ctrl") and self._robot_ctrl is not None:
            if self.verbose():
                print("Shutting down WidowX robot controller...")
            with self._lock:
                self._shutdown()

    # *** PUBLIC GET methods ***

    def get_pose(self) -> PoseObjectPNP:
        """
        Get current pose of gripper of robot.

        Returns:
            current pose of gripper of robot.
        """
        with self._lock:
            try:
                # InterbotixManipulatorXS stores current pose internally
                # This is a simplified version - in practice you'd use the arm's FK
                # or track the last commanded pose
                if hasattr(self, "_last_pose") and self._last_pose is not None:
                    return self._last_pose
                else:
                    # Return default home pose as fallback
                    return self.DEFAULT_HOME_POSE
            except Exception as e:
                if self.verbose():
                    print(f"Error getting pose: {e}")
                return self.DEFAULT_HOME_POSE

    def get_camera_intrinsics(self) -> Tuple[np.ndarray, np.ndarray]:
        """
        Get camera intrinsics for the WidowX camera (if available).

        Returns:
            Tuple[np.ndarray, np.ndarray]: (camera_matrix, distortion_coefficients)
        """
        # WidowX typically uses external camera (e.g., RealSense)
        # These are placeholder values - should be calibrated for your setup

        # Default camera matrix for Intel RealSense D435 (640x480)
        mtx = np.array([[615.0, 0.0, 320.0], [0.0, 615.0, 240.0], [0.0, 0.0, 1.0]])

        # Distortion coefficients (k1, k2, p1, p2, k3)
        dist = np.array([0.0, 0.0, 0.0, 0.0, 0.0])

        return mtx, dist

    # *** PUBLIC methods ***

    def calibrate(self) -> bool:
        """
        Calibrates the WidowX robot.

        Returns:
            True, if calibration was successful, else False
        """
        # TODO: implement calibration

        return True

    def reset_connection(self) -> None:
        """
        Reset the connection to the robot by safely disconnecting and reconnecting.
        """
        if self.verbose():
            print("Resetting WidowX connection...")
        try:
            if self._robot_ctrl is not None:
                with self._lock:
                    self._shutdown()
        except Exception as e:
            print(f"Error while closing connection: {e}")

        # Reinitialize the connection
        try:
            self._create_robot()
            if self.verbose():
                print("Connection successfully reset.")
        except Exception as e:
            print(f"Failed to reconnect to the robot: {e}")
            self._robot_ctrl = None

    @log_start_end_cls()
    def robot_pick_object(self, pick_pose: "PoseObjectPNP") -> bool:
        """
        Calls the pick command to pick an object at the given pose.

        Args:
            pick_pose: Pose where to pick the object (z-offset already applied if needed)

        Returns:
            True, if pick was successful, else False
        """
        try:
            # Add small z-offset for approach (additional to any z-offset already in pick_pose)
            pick_pose_approach = pick_pose.copy_with_offsets(z_offset=0.05)
            pick_pose_grasp = pick_pose  # Use the pose as-is (z-offset already applied)

            with self._lock:
                # Open gripper
                self._robot_ctrl.gripper.release()

                # Move to approach pose (above object)
                self._move_to_pose(pick_pose_approach)

                # Move down to grasp pose
                self._move_to_pose(pick_pose_grasp)

                # Close gripper to grasp
                self._robot_ctrl.gripper.grasp()

                # Lift object
                lift_pose = pick_pose.copy_with_offsets(z_offset=0.05)
                self._move_to_pose(lift_pose)

            if self.verbose():
                print("Pick operation completed successfully")

            return True

        except Exception as e:
            print(f"Error during pick operation: {e}")
            return False

    @log_start_end_cls()
    def robot_place_object(self, place_pose: "PoseObjectPNP") -> bool:
        """
        Places an already picked object at the given place_pose.

        Args:
            place_pose: Pose where to place the already picked object

        Returns:
            True, if place was successful, else False
        """
        try:
            # Add z-offset for approach
            place_pose_approach = place_pose.copy_with_offsets(z_offset=0.05)
            place_pose_final = place_pose.copy_with_offsets(z_offset=0.005)

            with self._lock:
                # Move to approach pose
                self._move_to_pose(place_pose_approach)

                # Move down to place pose
                self._move_to_pose(place_pose_final)

                # Release gripper
                self._robot_ctrl.gripper.release()

                # Retract
                self._move_to_pose(place_pose_approach)

            if self.verbose():
                print("Place operation completed successfully")

            return True

        except Exception as e:
            print(f"Error during place operation: {e}")
            return False

    @log_start_end_cls()
    def robot_push_object(self, push_pose: "PoseObjectPNP", direction: str, distance: float) -> bool:
        """
        Push given object (its Pose) into the given direction by the given distance.

        Args:
            push_pose: the Pose of the object that should be pushed.
            direction: "up", "down", "left", "right"
            distance: distance in millimeters

        Returns:
            True, if push was successful, else False
        """
        try:
            with self._lock:
                # Close gripper first
                self._robot_ctrl.gripper.release()

                # Move to push starting position
                self._move_to_pose(push_pose)

                # Calculate push distance in meters
                push_dist_m = distance / 1000.0

                # Perform push based on direction
                if direction == "up":
                    # Push along positive X axis
                    self._robot_ctrl.arm.set_ee_cartesian_trajectory(x=push_dist_m)
                elif direction == "down":
                    # Push along negative X axis
                    self._robot_ctrl.arm.set_ee_cartesian_trajectory(x=-push_dist_m)
                elif direction == "left":
                    # Push along positive Y axis
                    self._robot_ctrl.arm.set_ee_cartesian_trajectory(y=push_dist_m)
                elif direction == "right":
                    # Push along negative Y axis
                    self._robot_ctrl.arm.set_ee_cartesian_trajectory(y=-push_dist_m)
                else:
                    print(f"Unknown direction: {direction}")
                    return False

            if self.verbose():
                print(f"Push operation completed: {direction}, {distance}mm")

            return True

        except Exception as e:
            print(f"Error during push operation: {e}")
            return False

    def get_target_pose_from_rel(self, workspace_id: str, u_rel: float, v_rel: float, yaw: float) -> "PoseObjectPNP":
        """
        Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw),
        calculate the corresponding pose in world coordinates.

        Args:
            workspace_id: id of the workspace
            u_rel: horizontal coordinate in image of workspace, normalized between 0 and 1
            v_rel: vertical coordinate in image of workspace, normalized between 0 and 1
            yaw: orientation of an object at the pixel coordinates [u_rel, v_rel].

        Returns:
            pose_object: Pose of the point in world coordinates of the robot.
        """
        with self._lock:
            try:
                # Clamp coordinates to [0, 1]
                u_rel = max(0.0, min(u_rel, 1.0))
                v_rel = max(0.0, min(v_rel, 1.0))

                # Get workspace from environment
                workspace = self._robot.environment().get_workspace_by_id(workspace_id)

                if workspace is None:
                    if self.verbose():
                        print(f"Workspace {workspace_id} not found")
                    return PoseObjectPNP(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)

                # Use workspace transformation
                pose = workspace.transform_camera2world_coords(workspace_id, u_rel, v_rel, yaw)

                return pose

            except Exception as e:
                if self.verbose():
                    print(f"Error in get_target_pose_from_rel: {e}")
                return PoseObjectPNP(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)

    @log_start_end_cls()
    def move2observation_pose(self, workspace_id: str) -> None:
        """
        The robot moves to a pose where it can observe the workspace given by workspace_id.

        Args:
            workspace_id: id of the workspace
        """
        try:
            observation_pose = self._robot.environment().get_observation_pose(workspace_id)

            if observation_pose is None:
                if self.verbose():
                    print(f"No observation pose defined for workspace: {workspace_id}")
                return

            with self._lock:
                self._move_to_pose(observation_pose)

            if self.verbose():
                print(f"Moved to observation pose for workspace: {workspace_id}")

        except Exception as e:
            print(f"Error moving to observation pose: {e}")

    # *** PRIVATE methods ***

    def _shutdown(self) -> None:
        """
        Closes connection to InterbotixManipulatorXS.
        """
        if self._robot_ctrl is not None:
            try:
                # Move to sleep pose before shutdown
                self._robot_ctrl.arm.go_to_sleep_pose()
                # Shutdown the robot interface
                self._robot_ctrl.shutdown()
            except Exception as e:
                print(f"Error during shutdown: {e}")

    def _move_to_pose(self, pose: "PoseObjectPNP") -> None:
        """
        Move gripper of robot to given pose using set_ee_pose_components.

        Args:
            pose: pose of gripper (PoseObjectPNP)
        """
        try:
            # Use set_ee_pose_components method
            # Note: InterbotixManipulatorXS uses roll, pitch for orientation
            self._robot_ctrl.arm.set_ee_pose_components(x=pose.x, y=pose.y, z=pose.z, roll=pose.roll, pitch=pose.pitch)

            # Store last commanded pose
            self._last_pose = pose

        except Exception as e:
            if self.verbose():
                print(f"Error moving to pose: {e}")
            raise

    @log_start_end_cls()
    def _create_robot(self) -> None:
        """
        Creates the InterbotixManipulatorXS object and initializes the robot.
        """
        with self._lock:
            # Create robot interface
            self._robot_ctrl = InterbotixManipulatorXS(
                robot_model="wx250s",
                group_name="arm",
                gripper_name="gripper",
            )

            # Move to home pose on initialization
            self._robot_ctrl.arm.go_to_home_pose()

            # Initialize last pose
            self._last_pose = self.DEFAULT_HOME_POSE

    @log_start_end_cls()
    def _init_robot(self, use_simulation: bool) -> bool:
        """
        Creates the InterbotixManipulatorXS object and connects to it.

        Args:
            use_simulation: Currently not used for WidowX (uses ROS parameter)

        Returns:
            bool: True if initialization was successful, else False
        """
        try:
            self._create_robot()

            if self.verbose():
                print("WidowX robot initialized successfully")

            return True

        except Exception as e:
            print(f"Failed to initialize WidowX robot: {e}")
            return False

    # *** PUBLIC properties ***

    # *** PRIVATE variables ***

    # Last commanded pose (for tracking)
    _last_pose = None

Functions

__init__(robot, use_simulation, verbose=False)

Initializes the robot (connects to it and sets up the interface).

Parameters:

Name Type Description Default
robot 'Robot'

object of the Robot class.

required
use_simulation bool

True, if working with a simulation model of the robot, else False if we work with a real robot.

required
verbose bool

enable verbose output

False
Source code in robot_environment/robot/widowx_robot_controller.py
@log_start_end_cls()
def __init__(self, robot: "Robot", use_simulation: bool, verbose: bool = False):
    """
    Initializes the robot (connects to it and sets up the interface).

    Args:
        robot: object of the Robot class.
        use_simulation: True, if working with a simulation model of the robot,
            else False if we work with a real robot.
        verbose: enable verbose output
    """
    if not INTERBOTIX_AVAILABLE:
        raise ImportError("interbotix_xs_modules is required for WidowX controller")

    super().__init__(robot, use_simulation, verbose)

calibrate()

Calibrates the WidowX robot.

Returns:

Type Description
bool

True, if calibration was successful, else False

Source code in robot_environment/robot/widowx_robot_controller.py
def calibrate(self) -> bool:
    """
    Calibrates the WidowX robot.

    Returns:
        True, if calibration was successful, else False
    """
    # TODO: implement calibration

    return True

get_camera_intrinsics()

Get camera intrinsics for the WidowX camera (if available).

Returns:

Type Description
Tuple[ndarray, ndarray]

Tuple[np.ndarray, np.ndarray]: (camera_matrix, distortion_coefficients)

Source code in robot_environment/robot/widowx_robot_controller.py
def get_camera_intrinsics(self) -> Tuple[np.ndarray, np.ndarray]:
    """
    Get camera intrinsics for the WidowX camera (if available).

    Returns:
        Tuple[np.ndarray, np.ndarray]: (camera_matrix, distortion_coefficients)
    """
    # WidowX typically uses external camera (e.g., RealSense)
    # These are placeholder values - should be calibrated for your setup

    # Default camera matrix for Intel RealSense D435 (640x480)
    mtx = np.array([[615.0, 0.0, 320.0], [0.0, 615.0, 240.0], [0.0, 0.0, 1.0]])

    # Distortion coefficients (k1, k2, p1, p2, k3)
    dist = np.array([0.0, 0.0, 0.0, 0.0, 0.0])

    return mtx, dist

get_pose()

Get current pose of gripper of robot.

Returns:

Type Description
PoseObjectPNP

current pose of gripper of robot.

Source code in robot_environment/robot/widowx_robot_controller.py
def get_pose(self) -> PoseObjectPNP:
    """
    Get current pose of gripper of robot.

    Returns:
        current pose of gripper of robot.
    """
    with self._lock:
        try:
            # InterbotixManipulatorXS stores current pose internally
            # This is a simplified version - in practice you'd use the arm's FK
            # or track the last commanded pose
            if hasattr(self, "_last_pose") and self._last_pose is not None:
                return self._last_pose
            else:
                # Return default home pose as fallback
                return self.DEFAULT_HOME_POSE
        except Exception as e:
            if self.verbose():
                print(f"Error getting pose: {e}")
            return self.DEFAULT_HOME_POSE

get_target_pose_from_rel(workspace_id, u_rel, v_rel, yaw)

Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw), calculate the corresponding pose in world coordinates.

Parameters:

Name Type Description Default
workspace_id str

id of the workspace

required
u_rel float

horizontal coordinate in image of workspace, normalized between 0 and 1

required
v_rel float

vertical coordinate in image of workspace, normalized between 0 and 1

required
yaw float

orientation of an object at the pixel coordinates [u_rel, v_rel].

required

Returns:

Name Type Description
pose_object 'PoseObjectPNP'

Pose of the point in world coordinates of the robot.

Source code in robot_environment/robot/widowx_robot_controller.py
def get_target_pose_from_rel(self, workspace_id: str, u_rel: float, v_rel: float, yaw: float) -> "PoseObjectPNP":
    """
    Given relative image coordinates [u_rel, v_rel] and optionally an orientation of the point (yaw),
    calculate the corresponding pose in world coordinates.

    Args:
        workspace_id: id of the workspace
        u_rel: horizontal coordinate in image of workspace, normalized between 0 and 1
        v_rel: vertical coordinate in image of workspace, normalized between 0 and 1
        yaw: orientation of an object at the pixel coordinates [u_rel, v_rel].

    Returns:
        pose_object: Pose of the point in world coordinates of the robot.
    """
    with self._lock:
        try:
            # Clamp coordinates to [0, 1]
            u_rel = max(0.0, min(u_rel, 1.0))
            v_rel = max(0.0, min(v_rel, 1.0))

            # Get workspace from environment
            workspace = self._robot.environment().get_workspace_by_id(workspace_id)

            if workspace is None:
                if self.verbose():
                    print(f"Workspace {workspace_id} not found")
                return PoseObjectPNP(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)

            # Use workspace transformation
            pose = workspace.transform_camera2world_coords(workspace_id, u_rel, v_rel, yaw)

            return pose

        except Exception as e:
            if self.verbose():
                print(f"Error in get_target_pose_from_rel: {e}")
            return PoseObjectPNP(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)

move2observation_pose(workspace_id)

The robot moves to a pose where it can observe the workspace given by workspace_id.

Parameters:

Name Type Description Default
workspace_id str

id of the workspace

required
Source code in robot_environment/robot/widowx_robot_controller.py
@log_start_end_cls()
def move2observation_pose(self, workspace_id: str) -> None:
    """
    The robot moves to a pose where it can observe the workspace given by workspace_id.

    Args:
        workspace_id: id of the workspace
    """
    try:
        observation_pose = self._robot.environment().get_observation_pose(workspace_id)

        if observation_pose is None:
            if self.verbose():
                print(f"No observation pose defined for workspace: {workspace_id}")
            return

        with self._lock:
            self._move_to_pose(observation_pose)

        if self.verbose():
            print(f"Moved to observation pose for workspace: {workspace_id}")

    except Exception as e:
        print(f"Error moving to observation pose: {e}")

reset_connection()

Reset the connection to the robot by safely disconnecting and reconnecting.

Source code in robot_environment/robot/widowx_robot_controller.py
def reset_connection(self) -> None:
    """
    Reset the connection to the robot by safely disconnecting and reconnecting.
    """
    if self.verbose():
        print("Resetting WidowX connection...")
    try:
        if self._robot_ctrl is not None:
            with self._lock:
                self._shutdown()
    except Exception as e:
        print(f"Error while closing connection: {e}")

    # Reinitialize the connection
    try:
        self._create_robot()
        if self.verbose():
            print("Connection successfully reset.")
    except Exception as e:
        print(f"Failed to reconnect to the robot: {e}")
        self._robot_ctrl = None

robot_pick_object(pick_pose)

Calls the pick command to pick an object at the given pose.

Parameters:

Name Type Description Default
pick_pose 'PoseObjectPNP'

Pose where to pick the object (z-offset already applied if needed)

required

Returns:

Type Description
bool

True, if pick was successful, else False

Source code in robot_environment/robot/widowx_robot_controller.py
@log_start_end_cls()
def robot_pick_object(self, pick_pose: "PoseObjectPNP") -> bool:
    """
    Calls the pick command to pick an object at the given pose.

    Args:
        pick_pose: Pose where to pick the object (z-offset already applied if needed)

    Returns:
        True, if pick was successful, else False
    """
    try:
        # Add small z-offset for approach (additional to any z-offset already in pick_pose)
        pick_pose_approach = pick_pose.copy_with_offsets(z_offset=0.05)
        pick_pose_grasp = pick_pose  # Use the pose as-is (z-offset already applied)

        with self._lock:
            # Open gripper
            self._robot_ctrl.gripper.release()

            # Move to approach pose (above object)
            self._move_to_pose(pick_pose_approach)

            # Move down to grasp pose
            self._move_to_pose(pick_pose_grasp)

            # Close gripper to grasp
            self._robot_ctrl.gripper.grasp()

            # Lift object
            lift_pose = pick_pose.copy_with_offsets(z_offset=0.05)
            self._move_to_pose(lift_pose)

        if self.verbose():
            print("Pick operation completed successfully")

        return True

    except Exception as e:
        print(f"Error during pick operation: {e}")
        return False

robot_place_object(place_pose)

Places an already picked object at the given place_pose.

Parameters:

Name Type Description Default
place_pose 'PoseObjectPNP'

Pose where to place the already picked object

required

Returns:

Type Description
bool

True, if place was successful, else False

Source code in robot_environment/robot/widowx_robot_controller.py
@log_start_end_cls()
def robot_place_object(self, place_pose: "PoseObjectPNP") -> bool:
    """
    Places an already picked object at the given place_pose.

    Args:
        place_pose: Pose where to place the already picked object

    Returns:
        True, if place was successful, else False
    """
    try:
        # Add z-offset for approach
        place_pose_approach = place_pose.copy_with_offsets(z_offset=0.05)
        place_pose_final = place_pose.copy_with_offsets(z_offset=0.005)

        with self._lock:
            # Move to approach pose
            self._move_to_pose(place_pose_approach)

            # Move down to place pose
            self._move_to_pose(place_pose_final)

            # Release gripper
            self._robot_ctrl.gripper.release()

            # Retract
            self._move_to_pose(place_pose_approach)

        if self.verbose():
            print("Place operation completed successfully")

        return True

    except Exception as e:
        print(f"Error during place operation: {e}")
        return False

robot_push_object(push_pose, direction, distance)

Push given object (its Pose) into the given direction by the given distance.

Parameters:

Name Type Description Default
push_pose 'PoseObjectPNP'

the Pose of the object that should be pushed.

required
direction str

"up", "down", "left", "right"

required
distance float

distance in millimeters

required

Returns:

Type Description
bool

True, if push was successful, else False

Source code in robot_environment/robot/widowx_robot_controller.py
@log_start_end_cls()
def robot_push_object(self, push_pose: "PoseObjectPNP", direction: str, distance: float) -> bool:
    """
    Push given object (its Pose) into the given direction by the given distance.

    Args:
        push_pose: the Pose of the object that should be pushed.
        direction: "up", "down", "left", "right"
        distance: distance in millimeters

    Returns:
        True, if push was successful, else False
    """
    try:
        with self._lock:
            # Close gripper first
            self._robot_ctrl.gripper.release()

            # Move to push starting position
            self._move_to_pose(push_pose)

            # Calculate push distance in meters
            push_dist_m = distance / 1000.0

            # Perform push based on direction
            if direction == "up":
                # Push along positive X axis
                self._robot_ctrl.arm.set_ee_cartesian_trajectory(x=push_dist_m)
            elif direction == "down":
                # Push along negative X axis
                self._robot_ctrl.arm.set_ee_cartesian_trajectory(x=-push_dist_m)
            elif direction == "left":
                # Push along positive Y axis
                self._robot_ctrl.arm.set_ee_cartesian_trajectory(y=push_dist_m)
            elif direction == "right":
                # Push along negative Y axis
                self._robot_ctrl.arm.set_ee_cartesian_trajectory(y=-push_dist_m)
            else:
                print(f"Unknown direction: {direction}")
                return False

        if self.verbose():
            print(f"Push operation completed: {direction}, {distance}mm")

        return True

    except Exception as e:
        print(f"Error during push operation: {e}")
        return False