Source code for robotics_api.utils.kinova_move

import sys
import json
import time
import warnings
import threading
import numpy as np
from argparse import Namespace
from kortex_api.autogen.messages import Base_pb2
from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient

from robotics_api.actions.db_manipulations import VialStatus
from robotics_api.utils import kinova_utils as utilities
from robotics_api.utils.kinova_gripper import GripperMove
from robotics_api.settings import *

# Maximum allowed waiting time during actions (in seconds)
TIMEOUT_DURATION = 20
VERBOSE = 1


# Create closure to set an event after an END or an ABORT
[docs] def check_for_end_or_abort(e): """ Return a closure checking for END or ABORT notifications Arguments: e: event to signal when the action is completed (will be set when an END or ABORT occurs) """ def check(notification, e=e): if VERBOSE > 3: print("EVENT : " + Base_pb2.ActionEvent.Name(notification.action_event)) if notification.action_event == Base_pb2.ACTION_END or notification.action_event == Base_pb2.ACTION_ABORT: e.set() return check
[docs] def snapshot_move_angular(base: BaseClient, joint_angle_values): if VERBOSE > 3: print("Starting angular action movement ...") action = Base_pb2.Action() action.name = "Example angular action movement" action.application_data = "" # might be better to use this instead of len(joint_angle_values) for following conditional actuator_count = base.GetActuatorCount() # Place arm according to joint angle values in JSON for joint_id in range(len(joint_angle_values)): joint_angle = action.reach_joint_angles.joint_angles.joint_angles.add() joint_angle.joint_identifier = joint_id joint_angle.value = joint_angle_values[joint_id]["value"] e = threading.Event() notification_handle = base.OnNotificationActionTopic( check_for_end_or_abort(e), Base_pb2.NotificationOptions() ) if VERBOSE > 3: print("Executing action") base.ExecuteAction(action) if VERBOSE > 3: print("Waiting for movement to finish ...") finished = e.wait(TIMEOUT_DURATION) base.Unsubscribe(notification_handle) if finished: if VERBOSE > 3: print("Angular movement completed") else: if VERBOSE > 3: print("Timeout on action notification wait") return finished
[docs] def snapshot_move_cartesian(base: BaseClient, coordinate_values: dict): if VERBOSE > 3: print("Starting Cartesian action movement ...") action = Base_pb2.Action() action.name = "Example Cartesian action movement" action.application_data = "" cartesian_pose = action.reach_pose.target_pose cartesian_pose.x = coordinate_values["x"] # (meters) cartesian_pose.y = coordinate_values["y"] # (meters) cartesian_pose.z = coordinate_values["z"] # (meters) cartesian_pose.theta_x = coordinate_values["thetaX"] # (degrees) cartesian_pose.theta_y = coordinate_values["thetaY"] # (degrees) cartesian_pose.theta_z = coordinate_values["thetaZ"] # (degrees) e = threading.Event() notification_handle = base.OnNotificationActionTopic( check_for_end_or_abort(e), Base_pb2.NotificationOptions() ) if VERBOSE > 3: print("Executing action") base.ExecuteAction(action) if VERBOSE > 3: print("Waiting for movement to finish ...") finished = e.wait(TIMEOUT_DURATION) base.Unsubscribe(notification_handle) if finished: if VERBOSE > 1: print("Cartesian movement completed") else: if VERBOSE > 1: print("Timeout on action notification wait") return finished
[docs] def move_gripper(target_position=None, max_gripper_attempts=5): """ Move gripper :param target_position: target position for the gripper: open, closed, or percentage closed (e.g., 90) :return: bool, True if action a success """ # Create connection to the device and get the router connector = Namespace(ip=KINOVA_01_IP, username="admin", password="admin") def _try_gripper(conn=connector, target=target_position): finished = True with utilities.DeviceConnection.createTcpConnection(conn) as router: # Create connection to the device and get the router with utilities.DeviceConnection.createUdpConnection(conn) as router_real_time: action = GripperMove(router, router_real_time, 2) if target == 'open': if VERBOSE > 2: print("Moving gripper open...") finished &= action.gripper_move(OPEN_GRIP_TARGET) action.cleanup() elif target == 'closed': if VERBOSE > 2: print("Moving gripper closed...") finished &= action.gripper_move(90) action.cleanup() elif target: if VERBOSE > 2: print("Moving gripper to {}...".format(target)) finished &= action.gripper_move(target) action.cleanup() return finished gripper_tries = 0 while True: try: finished = _try_gripper() break except Exception as e: if gripper_tries > max_gripper_attempts: raise e print(f"WARNING. Gripper movement {gripper_tries} ended in error: ", e) gripper_tries += 1 print("Gripper movement successfully executed!" if finished else "Error! Gripper was not successfully moved.") return finished
[docs] def move_hand(linear_x: float = 0, linear_y: float = 0, linear_z: float = 0, angular_x: float = 0, angular_y: float = 0, angular_z: float = 0): """ Function to move the robotic hand by linear and angular x, y, and z Args: linear_x (float): increment of linear x movement linear_y (float): increment of linear y movement linear_z (float): increment of linear z movement angular_x (float): increment of angular x movement angular_y (float): increment of angular y movement angular_z (float): increment of angular z movement Returns: boolean indicating success of action """ # Create connection to the device and get the router connector = Namespace(ip=KINOVA_01_IP, username="admin", password="admin") with utilities.DeviceConnection.createTcpConnection(connector) as router: # Create required services base = BaseClient(router) command = Base_pb2.TwistCommand() command.reference_frame = Base_pb2.CARTESIAN_REFERENCE_FRAME_TOOL command.duration = 0 twist = command.twist twist.linear_x = linear_x twist.linear_y = linear_y twist.linear_z = linear_z twist.angular_x = angular_x twist.angular_y = angular_y twist.angular_z = angular_z try: e = threading.Event() notification_handle = base.OnNotificationActionTopic( check_for_end_or_abort(e), Base_pb2.NotificationOptions() ) if VERBOSE > 2: print("Executing twist action") base.SendTwistCommand(command) if VERBOSE > 2: print("Waiting for twist to finish ...") finished = e.wait(TIMEOUT_DURATION) base.Unsubscribe(notification_handle) except Exception: finished = False if VERBOSE > 2: print("Stopping the robot...") base.Stop() time.sleep(1) return finished
[docs] def get_zone(angle, zone_dividers=ZONE_DIVIDERS): """ Moves a given joint a specified range (in degrees). Args: angle (float): The angle in degrees. zone_dividers (list): A list of zone divider angles in ascending order. Returns: int: The zone index (starting from 1) that the angle belongs to. """ # Make sure zone_dividers starts with 0 zone_dividers = list(set([0] + zone_dividers)) zone_dividers.sort() print("ZONES: ", angle, zone_dividers) # Check if angle is above the last divider if angle > zone_dividers[-1]: return 1 # Belongs to Zone 1 # Find the zone for idx in range(len(zone_dividers)): if zone_dividers[idx] <= angle < (zone_dividers[(idx + 1) % len(zone_dividers)]): return idx + 1 # Zone index starts from 1 raise ValueError(f"Zone not found for angle {angle} and dividers {zone_dividers}.")
[docs] def get_current_zone(zone_dividers=ZONE_DIVIDERS): """ Moves a given joint a specified range (in degrees). Args: zone_dividers (list): A list of zone divider angles in ascending order. Returns: int: The zone index (starting from 1) that the angle belongs to. """ connector = Namespace(ip=KINOVA_01_IP, username="admin", password="admin") with utilities.DeviceConnection.createTcpConnection(connector) as router: # Create required services base = BaseClient(router) current_joint_angles = base.GetMeasuredJointAngles() joint_1_angle = current_joint_angles.joint_angles[0].value return get_zone(joint_1_angle, zone_dividers=zone_dividers)
[docs] def snapshot_zone(snapshot_file, zone_dividers=ZONE_DIVIDERS): """ Moves a given joint a specified range (in degrees). Args: snapshot_file (str): A list of zone divider angles in ascending order. zone_dividers (list): A list of zone divider angles in ascending order. Returns: int: The zone index (starting from 1) that the angle belongs to. """ with open(snapshot_file, 'r') as fn: master_data = json.load(fn) if "jointAnglesGroup" in master_data.keys(): joint_angle_values = master_data["jointAnglesGroup"]["jointAngles"][0]["reachJointAngles"]["jointAngles"][ "jointAngles"] theta = joint_angle_values[0]["value"] elif "poses" in master_data.keys(): coordinate_values = master_data["poses"]["pose"][0]["reachPose"]["targetPose"] x = coordinate_values["x"] y = coordinate_values["y"] theta = -np.degrees(np.arctan2(y, x)) % 360 return get_zone(theta, zone_dividers=zone_dividers)
[docs] def snapshot_move(snapshot_file: str = None, target_position: str or int = None, raise_error: bool = True, angle_error: float = 0.2, position_error: float = 0.1): """ :param snapshot_file: str, path to snapshot file (JSON) :param target_position: target position for the gripper: open, closed, or percentage closed (e.g., 90) :param raise_error: :param angle_error: :param position_error: :return: bool, True if movement was a success Args: angle_error: position_error: """ if not RUN_ROBOT: warnings.warn("Robot NOT run because RUN_ROBOT is set to False.") return True finished = False # Create connection to the device and get the router connector = Namespace(ip=KINOVA_01_IP, username="admin", password="admin") if snapshot_file: try: with utilities.DeviceConnection.createTcpConnection(connector) as router: # Create required services base = BaseClient(router) with open(snapshot_file, 'r') as snapshot_file: # converts JSON to nested dictionary snapshot_file.dict = json.load(snapshot_file) if "jointAnglesGroup" in snapshot_file.dict: # grabs the list of joint angle values joint_angle_values = \ snapshot_file.dict["jointAnglesGroup"]["jointAngles"][0]["reachJointAngles"]["jointAngles"][ "jointAngles"] finished = snapshot_move_angular(base, joint_angle_values) if finished: current_joint_angles_raw = base.GetMeasuredJointAngles() current_joint_angles = {i.joint_identifier: i.value % 360 for i in current_joint_angles_raw.joint_angles} joint_angle_values_dict = {i["jointIdentifier"]: i["value"] for i in joint_angle_values} angle_diffs = [abs(current_joint_angles.get(k, 0) - joint_angle_values_dict.get(k, 0)) for k in joint_angle_values_dict] if not all([d < angle_error for d in angle_diffs]): error_diffs = [d for d in angle_diffs if d > angle_error] if not all([abs(d - 360) < angle_error for d in error_diffs]): finished = False if raise_error: raise SystemError("Error: Robot did not reach the desired joint angles: ", angle_diffs) elif "poses" in snapshot_file.dict: # grabs the dictionary of coordinate values coordinate_values = snapshot_file.dict["poses"]["pose"][0]["reachPose"]["targetPose"] finished = snapshot_move_cartesian(base, coordinate_values) if finished: current_pose_raw = base.GetMeasuredCartesianPose() current_pose = {"x": current_pose_raw.x, "y": current_pose_raw.y, "z": current_pose_raw.z, "thetaX": current_pose_raw.theta_x, "thetaY": current_pose_raw.theta_y, "thetaZ": current_pose_raw.theta_z} pose_diffs = [abs(current_pose.get(k, 0) - coordinate_values.get(k, 0)) for k in coordinate_values] if not all([d < position_error for d in pose_diffs]): if raise_error: raise SystemError(f"Error: Robot did not reach the desired Cartesian pose " f"({[coordinate_values.get(k, 0) for k in coordinate_values]}):" f" {pose_diffs}") finished = False else: if raise_error: raise SystemError("Snapshot file type not suitable for robot movement") except Exception as e: raise Exception(e) if target_position: finished = move_gripper(target_position) print("Snapshot successfully executed!" if finished else "Error! Snapshot was not successfully executed.") return finished
[docs] def perturb_angular(reverse=False, wait_time=None, **joint_deltas): """ Moves a given joint a specified range (in degrees). Args: reverse : Reverse all joint deltas if True wait_time : Wait time after move joint_deltas : Key word arguments Returns: bool: True if the movement completed successfully, False otherwise. """ connector = Namespace(ip=KINOVA_01_IP, username="admin", password="admin") with utilities.DeviceConnection.createTcpConnection(connector) as router: # Create required services base = BaseClient(router) current_joint_angles = base.GetMeasuredJointAngles() joint_angles = [] for joint in current_joint_angles.joint_angles: i = joint.joint_identifier delta_value = joint_deltas.get(f"j{i + 1}", 0) * (-1 if reverse else 1) final_angle = (joint.value + delta_value) % 360 if delta_value: print(f"Moving joint {i + 1} {delta_value} degrees to {final_angle}") joint_angles.append({"joint_identifier": i, "value": final_angle}) if wait_time: time.sleep(wait_time) return snapshot_move_angular(base, joint_angles)
[docs] def perturbed_snapshot(snapshot_file, perturb_amount: float = PERTURB_AMOUNT, axis="z"): """ Creates a perturbed snapshot by modifying the specified axis position in the given snapshot file. Args: snapshot_file (str): Path to the snapshot JSON file. perturb_amount (float): Amount to perturb the position along the specified axis (default is PERTURB_AMOUNT). axis (str): Axis to apply the perturbation to, defaults to "z". Returns: str: Path to the new perturbed snapshot file. """ with open(snapshot_file, 'r') as fn: master_data = json.load(fn) # Generate temporary perturbed file new_height = master_data["poses"]["pose"][0]["reachPose"]["targetPose"][axis] + perturb_amount master_data["poses"]["pose"][0]["reachPose"]["targetPose"][axis] = new_height with open(os.path.join(SNAPSHOT_DIR, "_temp_perturbed.json"), "w+") as fn: json.dump(master_data, fn, indent=2) return os.path.join(SNAPSHOT_DIR, "_temp_perturbed.json")
[docs] def get_place_vial(station, action_type="get", go=True, leave=True, release_vial=True, raise_error=True, pre_position_only=False, move_to_zone=True): """ Executes an action to get or place a vial using snapshot movements. Args: station (StationStatus): action_type (str): Action type, either 'get' (to retrieve the vial) or 'place' (to place the vial). go (bool): Whether to move to the snapshot file location (default is True). leave (bool): Whether to leave the snapshot file location after the action (default is True). release_vial (bool): Whether to release the vial after placing (default is True). raise_error (bool): Whether to raise an error if movement fails (default is True). pre_position_only (bool): Only move to "above" position if True (default is True). move_to_zone (bool): Returns: bool: True if the action was successful, False otherwise. Raises: Exception: If the movement fails and raise_error is True. """ # Check if robot operation is enabled if not RUN_ROBOT: warnings.warn("Robot NOT run because RUN_ROBOT is set to False.") return True raise_amount = station.raise_amount if isinstance(station, VialStatus): snapshot_file = station.home_snapshot pre_position_file = None else: snapshot_file = station.location_snapshot pre_position_file = station.pre_location_snapshot snapshot_file_above = perturbed_snapshot(snapshot_file, perturb_amount=raise_amount) success = True if go: # Start open if getting a vial success &= snapshot_move(target_position='open') if action_type == "get" else True # If move_to_zone, go to the correct zone target_zone = snapshot_zone(snapshot_file) current_zone = get_current_zone() if current_zone != target_zone: print(f"--------- Moving from zone {current_zone} to zone {target_zone} ---------") success &= snapshot_move(os.path.join(SNAPSHOT_DIR, f"zone_{current_zone:02d}.json")) success &= snapshot_move(os.path.join(SNAPSHOT_DIR, f"zone_{target_zone:02d}.json")) # If pre-position, go there if pre_position_file: success &= snapshot_move(pre_position_file) if (not success) and raise_error: raise Exception(f"Failed to move robot arm pre-position snapshot {pre_position_file} before target.") # Go to above target position before target success &= snapshot_move(snapshot_file_above) print("ABOVE: ", snapshot_file_above) if (not success) and raise_error: raise Exception(f"Failed to move robot arm to {raise_amount} above target before snapshot {snapshot_file}.") # Go to target position if not pre_position_only: target = VIAL_GRIP_TARGET if action_type == "get" else 'open' if release_vial else VIAL_GRIP_TARGET success &= snapshot_move(snapshot_file, target_position=target) if (not success) and raise_error: raise Exception(f"Failed to move robot arm to snapshot {snapshot_file}.") if leave: # Go to above target position after target success &= snapshot_move(snapshot_file_above) if (not success) and raise_error: raise Exception(f"Failed to move robot arm to {raise_amount} above target after snapshot {snapshot_file}.") # If pre-position, go there if pre_position_file: success &= snapshot_move(pre_position_file) if (not success) and raise_error: raise Exception(f"Failed to move robot arm pre-position snapshot {pre_position_file} after target.") return success
[docs] def screw_lid(screw=True, starting_position="vial-screw_test.json", linear_z=0.00003, angular_z=120): """ Screws or unscrews the vial lid by controlling the robot arm's movement. Args: screw (bool): True to screw the lid on, False to unscrew it (default is True). starting_position (str): Path to the starting snapshot position file. linear_z (float): Amount of linear movement along the z-axis per iteration (default is 0.00003). angular_z (float): Amount of rotational movement along the z-axis in degrees (default is 120). Returns: bool: True if the action was successful, False otherwise. """ snapshot_start = os.path.join(SNAPSHOT_DIR, starting_position) snapshot_top = perturbed_snapshot(snapshot_start, perturb_amount=0.02) if screw: success = snapshot_move(snapshot_top) snapshot_start = perturbed_snapshot(snapshot_start, perturb_amount=0.0005) else: success = snapshot_move(target_position='open') success &= snapshot_move(snapshot_start) success &= snapshot_move(target_position=VIAL_GRIP_TARGET + 4) rotation_increment = (angular_z / 20) if screw else (-angular_z / 20) raise_height = linear_z if screw else -linear_z for _ in range(5): success &= move_hand(linear_z=raise_height, angular_z=rotation_increment) if screw: success = snapshot_move(target_position='open') else: print("Done unscrewing. ") success &= snapshot_move(snapshot_top) return success
if __name__ == "__main__": print(snapshot_zone(SNAPSHOT_DIR / "cvUM_potentiostat_A_01.json")) print(snapshot_zone(SNAPSHOT_DIR / "ca_potentiostat_C_01.json")) print(snapshot_zone(SNAPSHOT_DIR / "VialHome_A_01.json")) print(snapshot_zone(SNAPSHOT_DIR / "solvent_01.json")) print(snapshot_zone(SNAPSHOT_DIR / "balance_01.json"))