一、简介
本案例基于用户坐标系下笛卡尔坐标位姿,计算基坐标系的关节姿态,并调用servoj移动到目标点
需要的脚本文件作为附件置于文末,运行时需要放置在同目录下
需要的脚本文件作为附件置于文末,运行时需要放置在同目录下
二、操作流程
1、将代码与附件章节的代码放置到同一目录下,此时文件夹中应有四份文件
2、修改代码中DEFAULT_ROBOT_IP 为机器人真实IP,USER_X,USER_Y,USER_Z,USER_RX,USER_RY,USER_RZ设置为坐标系偏移值,单位是mm和deg
3、示例里设置了5个点位,写在targets里,请将点位修改为需要的点位,并且如果存在自己的线程,可以删除main函数里的循环代码,只在需要运动的时候调用servo.set_target即可
4、执行代码文件,机器人基于用户坐标系运动
三、常见问题
1、Q:为什么机器人没有运动,然后打印里有“逆解失败”的字样。
A:这基本都是因为逆解的目标是奇异点,导致机器人无法到达,自然逆解失败,机器人没有运动。这一点可以通过示教器的日志看到,会打印“目标位姿为奇异位姿,机器人不可达”。解决方法首先要确认传递的点位是否正确,单位是否一致。
2、Q:为什么寄存器的值一直在变?
A:因为实现逻辑会通过端口修改机器人浮点数寄存器0-5的值,分别对应机器人六个关节的位姿,所以如果有修改寄存器的需求,请跳过前五个浮点数寄存器。
四、代码和附件
"""
用户坐标系下的 servoj 运动控制(持续线程模式)
架构:
PC 端:计算 IK → 写 input_double_register0~5(关节角)+ register6(flag=1)
机器人端:跑脚本读寄存器 → servoj 执行
依赖:rtsi.py, serialize.py(同目录下)
用法:
from user_coord_servo import UserCoordServo
servo = UserCoordServo(robot_ip="192.168.254.138")
servo.start()
servo.set_target(ux=100, uy=200, uz=50, urx=0, ury=0, urz=0)
servo.stop()
"""
import sys
import os
import time
import socket
import threading
import math
import atexit
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'ServoJ_RTSI联合', 'python'))
from rtsi import rtsi
# ──────────────────────────────────────────────
# 配置
# ──────────────────────────────────────────────
DEFAULT_ROBOT_IP = "192.168.254.138"
RTSI_FREQ = 250.0
SERVOJ_TIME = 1.0 / RTSI_FREQ
# 用户坐标系参数
USER_X = -300.0 # mm
USER_Y = 600.0 # mm
USER_Z = -300.0 # mm
USER_RX = 0.0 # deg
USER_RY = -45.0 # deg
USER_RZ = 0.0 # deg
# 默认目标点(用户坐标系下,单位mm/rad)
DEFAULT_TARGET = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
# ──────────────────────────────────────────────
# 40011 明文请求
# ──────────────────────────────────────────────
_req_lock = threading.Lock()
_req_id = 1
def send_40011(ip, cmd, timeout=5.0):
"""向 40011 端口发送请求,返回 (result_list, error_str)"""
global _req_id
with _req_lock:
cur_id = _req_id
_req_id += 1
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect((ip, 40011))
sock.sendall(f"req {cur_id}{cmd}\n".encode())
resp = b""
while True:
chunk = sock.recv(4096)
if not chunk:
break
resp += chunk
if b"\n" in resp:
break
finally:
sock.close()
resp_str = resp.decode("utf-8", errors="replace").strip()
if "failure" in resp_str.lower():
return None, resp_str
parts = resp_str.split(":", 1)
if len(parts) < 2:
return None, f"unexpected response: {resp_str}"
val_str = parts[1].strip()
try:
val = eval(val_str)
if isinstance(val, list):
if len(val) == 1 and isinstance(val[0], list):
return [float(x) for x in val[0]], None
return [float(x) for x in val], None
return float(val), None
except Exception:
return None, f"parse error: {val_str}"
# ──────────────────────────────────────────────
# 位姿变换工具
# ──────────────────────────────────────────────
def pose_inv(ip, p):
p_native = [float(x) for x in p]
return send_40011(ip, f"pose_inv({p_native})")
def pose_trans(ip, p1, p2):
p1_native = [float(x) for x in p1]
p2_native = [float(x) for x in p2]
return send_40011(ip, f"pose_trans({p1_native},{p2_native})")
def deg_to_rad(deg):
return deg * math.pi / 180.0
def build_user_frame():
"""构建用户坐标系位姿(m, rad)"""
return [
USER_X / 1000.0,
USER_Y / 1000.0,
USER_Z / 1000.0,
deg_to_rad(USER_RX),
deg_to_rad(USER_RY),
deg_to_rad(USER_RZ),
]
def user_to_base(ip, user_frame, target):
"""
用户坐标系位姿 → 基坐标系位姿
target: [ux, uy, uz, urx, ury, urz] (mm, rad)
返回: [x, y, z, Rx, Ry, Rz] (m, rad)
"""
ux, uy, uz, urx, ury, urz = target
user_target = [ux / 1000.0, uy / 1000.0, uz / 1000.0, urx, ury, urz]
base_pose, err = pose_trans(ip, user_frame, user_target)
if err:
return None, f"pose_trans failed: {err}"
return base_pose, None
def pose_to_joints(ip, base_pose, ref_joints):
"""基坐标系位姿 → 关节角度"""
pose_native = [float(x) for x in base_pose]
ref_native = [float(x) for x in ref_joints]
return send_40011(ip, f"get_inverse_kin({pose_native}, {ref_native})")
# ──────────────────────────────────────────────
# 主类
# ──────────────────────────────────────────────
class UserCoordServo:
"""
用户坐标系 servoj 控制类
架构:
1. PC 通过 30001 发送 follow 脚本(读寄存器 → servoj)
2. PC 通过 RTSI 30004 写 input_double_register0~6
- register0~5: 目标关节角 (rad)
- register6: flag (1=有新数据, 0=停止)
3. 机器人端脚本持续读寄存器执行 servoj
用法:
servo = UserCoordServo(robot_ip="192.168.254.138")
servo.start()
servo.set_target(ux=100, uy=200, uz=50)
servo.stop()
"""
def __init__(
self,
robot_ip=DEFAULT_ROBOT_IP,
user_frame=None,
freq_hz=250,
):
self.robot_ip = robot_ip
self.user_frame = user_frame if user_frame is not None else build_user_frame()
self.freq_hz = freq_hz
self._running = False
self._target = list(DEFAULT_TARGET) # [ux, uy, uz, urx, ury, urz]
self._target_lock = threading.Lock()
self._target_joints = None # 当前生效的目标关节角
self._target_joints_lock = threading.Lock()
self._latest_joints = None # 机器人当前实际关节角
self._latest_joints_lock = threading.Lock()
self._rtsi_reader_thread = None
self._rtsi_reader_running = False
self._rtsi_writer_thread = None
self._rtsi_writer_running = False
# RTSI 写入连接(writer 线程专用)
self._rt_writer = None
self._in_data = None
self._ik_dirty = True
atexit.register(self.stop)
# ──────────────────────────────────────────
# 公共接口
# ──────────────────────────────────────────
def set_target(self, ux=None, uy=None, uz=None, urx=None, ury=None, urz=None):
"""
设置目标点(用户坐标系下,位置mm / 姿态rad)
线程安全,可随时调用。只传需要修改的分量。
"""
with self._target_lock:
t = self._target
if ux is not None: t[0] = float(ux)
if uy is not None: t[1] = float(uy)
if uz is not None: t[2] = float(uz)
if urx is not None: t[3] = float(urx)
if ury is not None: t[4] = float(ury)
if urz is not None: t[5] = float(urz)
self._ik_dirty = True
def get_target(self):
"""获取当前目标点(用户坐标系下,位置mm / 姿态rad)"""
with self._target_lock:
return self._target[:]
def start(self):
"""启动控制"""
if self._running:
print("[WARN] 已在运行")
return
print(f"[INFO] 启动 UserCoordServo,robot={self.robot_ip}, freq={self.freq_hz}Hz")
print(f"[INFO] 用户坐标系: x={USER_X}, y={USER_Y}, z={USER_Z}, rx={USER_RX}°, ry={USER_RY}°, rz={USER_RZ}°")
# Step 1: 通过 30001 发送 follow 脚本
self._send_follow_script()
time.sleep(0.5)
# Step 2: 启动 RTSI reader 线程
self._rtsi_reader_running = True
self._rtsi_reader_thread = threading.Thread(target=self._rtsi_reader_loop, daemon=True, name="rtsi-reader")
self._rtsi_reader_thread.start()
# 等待第一帧
deadline = time.time() + 10
while self._latest_joints is None and self._rtsi_reader_running:
if time.time() > deadline:
raise RuntimeError("等待机器人数据超时")
time.sleep(0.05)
print(f"[INFO] 机器人已就绪,当前关节: {[round(v, 4) for v in self._latest_joints]}")
# Step 3: 建立 RTSI writer 连接
self._setup_rtsi_writer()
# Step 4: 等待 follow 脚本就绪
self._wait_follow_script_ready()
# Step 5: 计算初始 IK 并直接写入
if self._compute_and_set_target_joints():
# 初始位置:直接写一次寄存器,让机器人从当前位置出发
with self._target_joints_lock:
joints = self._target_joints
if joints is not None and self._in_data is not None:
self._in_data.input_double_register0 = joints[0]
self._in_data.input_double_register1 = joints[1]
self._in_data.input_double_register2 = joints[2]
self._in_data.input_double_register3 = joints[3]
self._in_data.input_double_register4 = joints[4]
self._in_data.input_double_register5 = joints[5]
self._in_data.input_double_register6 = 1
self._rt_writer.set_input(self._in_data)
print(f"[INFO] 初始目标关节: {[round(v, 4) for v in joints]}")
# Step 6: 启动 RTSI writer 线程
self._rtsi_writer_running = True
self._rtsi_writer_thread = threading.Thread(target=self._rtsi_writer_loop, daemon=True, name="rtsi-writer")
self._rtsi_writer_thread.start()
self._running = True
print("[INFO] 控制已启动")
def stop(self):
"""停止控制"""
self._running = False
self._rtsi_reader_running = False
self._rtsi_writer_running = False
# flag=0 告诉机器人脚本停住
if self._in_data is not None and self._rt_writer is not None:
try:
self._in_data.input_double_register6 = 0
self._rt_writer.set_input(self._in_data)
except Exception:
pass
# 停止 follow 脚本
self._halt_follow_script()
if self._rtsi_reader_thread and self._rtsi_reader_thread.is_alive():
self._rtsi_reader_thread.join(timeout=3)
if self._rtsi_writer_thread and self._rtsi_writer_thread.is_alive():
self._rtsi_writer_thread.join(timeout=5)
# 断开 writer
if self._rt_writer:
try:
self._rt_writer.disconnect()
except Exception:
pass
self._rt_writer = None
print("[INFO] UserCoordServo 已停止")
# ──────────────────────────────────────────
# 内部方法
# ──────────────────────────────────────────
def _send_follow_script(self):
"""通过 30001 发送 follow 脚本到机器人"""
port_dir = os.path.dirname(__file__)
script_path = os.path.abspath(os.path.join(port_dir, 'follow_ema.script'))
try:
with open(script_path, 'r', encoding='utf-8') as f:
script_content = f.read()
except Exception as e:
raise RuntimeError(f"读取 follow 脚本失败: {e}")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10.0)
sock.connect((self.robot_ip, 30001))
sock.sendall(script_content.encode('utf-8'))
sock.close()
print(f"[INFO] Follow 脚本已发送 ({len(script_content)} 字节)")
except Exception as e:
raise RuntimeError(f"发送 follow 脚本失败: {e}")
def _halt_follow_script(self):
"""发送 halt() 停止机器人脚本"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5.0)
sock.connect((self.robot_ip, 30001))
sock.sendall(b"halt()\n")
sock.close()
print("[INFO] 已发送 halt()")
except Exception as e:
print(f"[WARN] 发送 halt() 失败: {e}")
def _setup_rtsi_writer(self):
"""建立 RTSI 写入连接"""
rt = rtsi(self.robot_ip)
try:
rt.connect()
except Exception as e:
raise RuntimeError(f"RTSI writer 连接失败: {e}")
rt.version_check()
version = rt.controller_version()
print(f"[INFO] RTSI writer 控制器版本: {version}")
rt.output_subscribe('output_int_register0', 10)
in_vars = (
'input_double_register0,'
'input_double_register1,'
'input_double_register2,'
'input_double_register3,'
'input_double_register4,'
'input_double_register5,'
'input_double_register6'
)
self._in_data = rt.input_subscribe(in_vars)
rt.start()
self._rt_writer = rt
print("[INFO] RTSI writer 已启动")
def _wait_follow_script_ready(self):
"""等待 follow 脚本就绪(output_int_register0 == 1)"""
print("[INFO] 等待 follow 脚本就绪...")
deadline = time.time() + 15
while time.time() < deadline:
try:
recv = self._rt_writer.get_output_data()
if recv and recv.output_int_register0 == 1:
print("[INFO] Follow 脚本已就绪")
return
except Exception:
pass
time.sleep(0.05)
def _rtsi_reader_loop(self):
"""RTSI 读取线程:持续获取机器人当前关节位置"""
rt = rtsi(self.robot_ip)
try:
rt.connect()
except Exception as e:
print(f"[ERROR] RTSI reader 连接失败: {e}")
self._rtsi_reader_running = False
return
rt.version_check()
rt.output_subscribe("actual_joint_positions", RTSI_FREQ)
rt.start()
interval = 1.0 / RTSI_FREQ
try:
while self._rtsi_reader_running:
t0 = time.perf_counter()
recv = rt.get_output_data()
if recv and recv.actual_joint_positions:
with self._latest_joints_lock:
self._latest_joints = list(recv.actual_joint_positions)
elapsed = time.perf_counter() - t0
sleep_time = interval - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
finally:
rt.disconnect()
print("[INFO] RTSI reader 已断开")
def _rtsi_writer_loop(self):
"""RTSI 写入线程:持续将目标关节角写入寄存器"""
interval = 1.0 / self.freq_hz
try:
while self._rtsi_writer_running:
t0 = time.perf_counter()
# ── IK 更新 ──
if self._ik_dirty:
self._compute_and_set_target_joints()
self._ik_dirty = False
# ── 写寄存器 ──
with self._target_joints_lock:
joints = self._target_joints
if joints is not None and self._in_data is not None:
self._in_data.input_double_register0 = joints[0]
self._in_data.input_double_register1 = joints[1]
self._in_data.input_double_register2 = joints[2]
self._in_data.input_double_register3 = joints[3]
self._in_data.input_double_register4 = joints[4]
self._in_data.input_double_register5 = joints[5]
self._in_data.input_double_register6 = 1 # flag=1
self._rt_writer.set_input(self._in_data)
# ── 定时 ──
elapsed = time.perf_counter() - t0
sleep_time = interval - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
finally:
# 退出时 flag=0
if self._in_data is not None and self._rt_writer is not None:
try:
self._in_data.input_double_register6 = 0
self._rt_writer.set_input(self._in_data)
except Exception:
pass
def _compute_and_set_target_joints(self):
"""
计算目标关节角度(用户坐标 → 基坐标 → IK)
优先用上一次成功的 target_joints 做 IK 参考,
失败则回退到当前实际关节做参考。
IK 失败时保留旧目标,不覆盖。
"""
with self._target_lock:
target = self._target[:]
# 优先用上一次成功的 target_joints 做 IK 参考(比 actual 更稳定)
with self._target_joints_lock:
ref_joints = self._target_joints
if ref_joints is None:
with self._latest_joints_lock:
ref_joints = self._latest_joints[:]
base_pose, err = user_to_base(self.robot_ip, self.user_frame, target)
if err:
print(f"[ERROR] 坐标变换失败: {err}")
return False
ik_joints, err = pose_to_joints(self.robot_ip, base_pose, ref_joints)
if err:
# IK 失败:尝试用 actual joints 再算一次
with self._latest_joints_lock:
ref_joints_actual = self._latest_joints[:]
if ref_joints_actual != ref_joints:
print(f"[WARN] IK 失败(用 target_joints 参考),尝试用 actual joints 重算...")
ik_joints, err = pose_to_joints(self.robot_ip, base_pose, ref_joints_actual)
if err:
print(f"[ERROR] 逆解失败: {err},保留旧目标")
return False
else:
print(f"[ERROR] 逆解失败: {err},保留旧目标")
return False
with self._target_joints_lock:
self._target_joints = ik_joints
return True
# ──────────────────────────────────────────────
# 便捷函数
# ──────────────────────────────────────────────
_instance = None
def start(robot_ip=DEFAULT_ROBOT_IP):
"""快捷启动:启动全局单例"""
global _instance
if _instance is None:
_instance = UserCoordServo(robot_ip=robot_ip)
_instance.start()
return _instance
def set_target(ux=None, uy=None, uz=None, urx=None, ury=None, urz=None):
"""快捷接口:设置目标点(用户坐标系,位置mm / 姿态rad)"""
if _instance is None:
raise RuntimeError("请先调用 user_coord_servo.start()")
_instance.set_target(ux=ux, uy=uy, uz=uz, urx=urx, ury=ury, urz=urz)
def stop():
"""快捷停止"""
global _instance
if _instance:
_instance.stop()
_instance = None
# ──────────────────────────────────────────────
# 演示
# ──────────────────────────────────────────────
if __name__ == "__main__":
servo = UserCoordServo(robot_ip="192.168.254.138")
servo.start()
targets = [ #点位需要密集,间距要小,单位是mm 和 rad
(0, 0, 0, 0, 0, 0),
(90, 80, 166, 0.138, 0.154, -0.138),
(0, 100, 0, 0, 0, 0),
(-50, 50, 100, 0, 0, 0),
(0, 0, 0, 0, 0, 0),
]
for i, target in enumerate(targets):
ux, uy, uz, urx, ury, urz = target
print(f"\n[演示] 设置目标点 {i+1}: ({ux}, {uy}, {uz}, {urx}, {ury}, {urz}) mm/rad")
servo.set_target(ux=ux, uy=uy, uz=uz, urx=urx, ury=ury, urz=urz)
time.sleep(1)
servo.stop()14.6 KB
1.3 KB
5.3 KB