Source code for robotics_api.utils.kinova_utils

import argparse

from robotics_api.settings import *
from kortex_api.TCPTransport import TCPTransport
from kortex_api.UDPTransport import UDPTransport
from kortex_api.SessionManager import SessionManager
from kortex_api.autogen.messages import Session_pb2
from kortex_api.RouterClient import RouterClient, RouterClientSendOptions


[docs] def parseConnectionArguments(parser=argparse.ArgumentParser()): parser.add_argument("--ip", type=str, help="IP address of destination", default=KINOVA_01_IP) parser.add_argument("-u", "--username", type=str, help="username to login", default="admin") parser.add_argument("-p", "--password", type=str, help="password to login", default="admin") return parser.parse_args()
[docs] class DeviceConnection: TCP_PORT = 10000 UDP_PORT = 10001
[docs] @staticmethod def createTcpConnection(args): """ returns RouterClient required to create services and send requests to device or sub-devices, """ return DeviceConnection(args.ip, port=DeviceConnection.TCP_PORT, credentials=(args.username, args.password))
[docs] @staticmethod def createUdpConnection(args): """ returns RouterClient that allows to create services and send requests to a device or its sub-devices @ 1khz. """ return DeviceConnection(args.ip, port=DeviceConnection.UDP_PORT, credentials=(args.username, args.password))
[docs] def __init__(self, ipAddress, port=TCP_PORT, credentials=("", "")): self.ipAddress = ipAddress self.port = port self.credentials = credentials self.sessionManager = None # Setup API self.transport = TCPTransport() if port == DeviceConnection.TCP_PORT else UDPTransport() self.router = RouterClient(self.transport, RouterClient.basicErrorCallback)
# Called when entering 'with' statement def __enter__(self): self.transport.connect(self.ipAddress, self.port) if (self.credentials[0] != ""): session_info = Session_pb2.CreateSessionInfo() session_info.username = self.credentials[0] session_info.password = self.credentials[1] session_info.session_inactivity_timeout = 10000 # (milliseconds) session_info.connection_inactivity_timeout = 2000 # (milliseconds) self.sessionManager = SessionManager(self.router) print("Logging as", self.credentials[0], "on device", self.ipAddress) self.sessionManager.CreateSession(session_info) return self.router # Called when exiting 'with' statement def __exit__(self, exc_type, exc_value, traceback): if self.sessionManager != None: router_options = RouterClientSendOptions() router_options.timeout_ms = 1000 self.sessionManager.CloseSession(router_options) self.transport.disconnect()