双机器人协同透传

一、简介

本案例使用俩台机器人,其中从机器人跟随主机器人的运动,进行同步运动
需要的脚本文件作为附件置于文末,运行时需要放置在同目录下

二、操作流程

1、将代码与附件章节的代码放置到同一目录下,此时文件夹中应有四份文件
2、确保俩台机器人有充足的运动空间,程序启动时,从属机器人会先运动到主机器人的相同位姿
3、修改代码中两台机器人的IP地址,MASTER_IP是主动机器人的地址,FOLLOW_IP是从动机器人的地址
4、执行下方的python文件,此时从属机器人会监听主机器人的运动,并保持跟随。

三、常见问题

1、Q:为什么从属机器人运动更慢,无法跟上主机器人的运动速度。
A:优先确保俩台机器人的速度滑块设定的全局速度是一致的。如果俩台机器人全局速度一致仍然有滞后,可以再适度加大RTSI_FREQ的值。

四、代码和附件

代码流程图如下图所示:
"""
透传控制:纯 RTSI 直驱,follow 跑脚本监听寄存器 + EMA 外推
master -> PC(30004 read) -> PC(30004 write) -> follow(30001 script reads register)

依赖:rtsi.py, serialize.py(同目录下)
"""

import sys
import os
import time
import struct
import threading
import signal
import math
import socket

sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'ServoJ_RTSI联合', 'python'))
from rtsi import rtsi

# ──────────────────────────────────────────────
# 配置
# ──────────────────────────────────────────────
MASTER_IP = "192.168.254.138"
FOLLOW_IP = "172.16.15.45"

RTSI_FREQ = 250.0              # Hz
SERVOJ_TIME = 1.0 / RTSI_FREQ  # = 0.004s

SOFT_START_DURATION = 1.0      # 软启动时间(秒)
SOFT_START_STEPS = 50

_running = True
_latest_joints = None
_lock = threading.Lock()

def _signal_handler(sig, frame):
    global _running
    print("\n[INFO] 收到退出信号,正在停止...")
    _running = False

    # 发送 halt() 停止 Follow 脚本
    stop_follow_script()

# ──────────────────────────────────────────────
# 停止 Follow 脚本(30001 端口发送 halt)
# ──────────────────────────────────────────────
def stop_follow_script():
    """通过 30001 端口发送 halt() 停止 Follow 机器人脚本。"""
    script_port = 30001
    try:
        print(f"[INFO] 发送 halt() 到 Follow ({FOLLOW_IP}:{script_port})...")
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(5.0)
        sock.connect((FOLLOW_IP, script_port))
        sock.sendall(b"halt()\n")
        sock.close()
    except Exception as e:
        print(f"[WARN] 停止 Follow 脚本失败: {e}")

# ──────────────────────────────────────────────
# 线性插值 & 软启动
# ──────────────────────────────────────────────
def lerp(a, b, t):
    return a + (b - a) * t

def interpolate_joints(start, end, t):
    return [lerp(s, e, t) for s, e in zip(start, end)]

def joints_diff(a, b):
    return max(abs(ai - bi) for ai, bi in zip(a, b))

# ──────────────────────────────────────────────
# Master 读取线程(30004 订阅 actual_joint_positions)
# ──────────────────────────────────────────────
def master_reader_thread():
    """以 RTSI_FREQ Hz 持续读取 master 关节位置,写入 _latest_joints。"""
    global _latest_joints, _running

    print(f"[INFO] 连接 master RTSI ({MASTER_IP})...")
    rt_master = rtsi(MASTER_IP)
    try:
        rt_master.connect()
    except Exception as e:
        print(f"[ERROR] 无法连接 master RTSI: {e}")
        _running = False
        return

    rt_master.version_check()
    version = rt_master.controller_version()
    print(f"[INFO] Master 控制器版本: {version}")

    # 订阅输出:actual_joint_positions(6个关节角)
    out_vars = 'actual_joint_positions'
    out_data = rt_master.output_subscribe(out_vars, RTSI_FREQ)

    rt_master.start()
    print(f"[INFO] Master RTSI 启动,订阅 {out_vars} @ {RTSI_FREQ}Hz")

    interval = 1.0 / RTSI_FREQ
    try:
        while _running:
            t0 = time.perf_counter()

            # 接收 master 数据 (buffer=4 约 16ms 延迟)
            recv_out = rt_master.get_output_data()
            if recv_out and recv_out.actual_joint_positions:
                joints = list(recv_out.actual_joint_positions)
                with _lock:
                    _latest_joints = joints

            elapsed = time.perf_counter() - t0
            sleep_time = interval - elapsed
            if sleep_time > 0:
                time.sleep(sleep_time)
    finally:
        rt_master.disconnect()
        print("[INFO] Master RTSI 已断开")

# ──────────────────────────────────────────────
# Follow 写入线程(30004 写 input_double_register)
# ──────────────────────────────────────────────
def follow_writer_thread():
    """以 RTSI_FREQ Hz 将 master 关节位置写入 follow 的寄存器。"""
    global _running

    print(f"[INFO] 连接 follow RTSI ({FOLLOW_IP})...")
    rt_follow = rtsi(FOLLOW_IP)
    try:
        rt_follow.connect()
    except Exception as e:
        print(f"[ERROR] 无法连接 follow RTSI: {e}")
        _running = False
        return

    rt_follow.version_check()
    version = rt_follow.controller_version()
    print(f"[INFO] Follow 控制器版本: {version}")

    # 订阅 follow 的 output_integer_register0(确认 follow 脚本已启动)
    out_follow_data = rt_follow.output_subscribe('output_int_register0', 10)

    # 订阅输入:6个关节角 + 1个 flag,共 7 个 input_double_register
    in_vars = 'input_double_register0,input_double_register1,input_double_register2,input_double_register3,input_double_register4,input_double_register5,input_double_register6'
    in_data = rt_follow.input_subscribe(in_vars)

    rt_follow.start()
    print("[INFO] Follow RTSI 启动")

    # 先等待 follow 脚本启动(output_int_register0 == 1)
    print("[INFO] 等待 follow 脚本启动...")
    deadline = time.time() + 15
    reg_ready = False
    while time.time() < deadline and _running:
        recv = rt_follow.get_output_data_buffered(512)
        if recv and recv.output_int_register0 == 1:
            reg_ready = True
            print("[INFO] Follow 脚本已就绪")
            break
        time.sleep(0.05)

    if not reg_ready:
        print("[WARN] Follow 脚本未就绪,继续尝试写入(可能已等待超时)")

    # 等待 master 读到第一帧
    with _lock:
        master_target = _latest_joints

    if master_target is None:
        print("[ERROR] 无法获取 master 目标位置")
        _running = False
        return

    print(f"[INFO] Master 初始关节: {[round(v, 4) for v in master_target]}")

    # 读取 follow 当前实际关节位置(软启动用)
    rt_master_snapshot = rtsi(FOLLOW_IP)
    rt_master_snapshot.connect()
    rt_master_snapshot.version_check()
    out_follow_snapshot = rt_master_snapshot.output_subscribe('actual_joint_positions', 50)
    rt_master_snapshot.start()
    time.sleep(0.2)
    recv_snap = rt_master_snapshot.get_output_data_buffered(512)
    follow_current = list(recv_snap.actual_joint_positions) if recv_snap else [0.0] * 6
    rt_master_snapshot.disconnect()

    print(f"[INFO] Follow 当前关节: {[round(v, 4) for v in follow_current]}")
    diff = joints_diff(follow_current, master_target)
    print(f"[INFO] 初始位置差: {diff:.4f} rad")

    # ─── 软启动阶段 ───
    print(f"[INFO] 开始软启动 ({SOFT_START_DURATION}s, {SOFT_START_STEPS}步)...")
    soft_interval = SOFT_START_DURATION / SOFT_START_STEPS
    for i in range(SOFT_START_STEPS + 1):
        if not _running:
            break
        t = i / SOFT_START_STEPS
        joints = interpolate_joints(follow_current, master_target, t)

        in_data.input_double_register0 = joints[0]
        in_data.input_double_register1 = joints[1]
        in_data.input_double_register2 = joints[2]
        in_data.input_double_register3 = joints[3]
        in_data.input_double_register4 = joints[4]
        in_data.input_double_register5 = joints[5]
        in_data.input_double_register6 = 1  # flag=1 表示有新数据
        rt_follow.set_input(in_data)

        time.sleep(soft_interval)

    if not _running:
        rt_follow.disconnect()
        return

    print("[INFO] 软启动完成,进入正常透传模式")

    # ─── 正常透传(250Hz)───
    interval = 1.0 / RTSI_FREQ
    last_sent = time.perf_counter()

    try:
        while _running:
            t0 = time.perf_counter()

            with _lock:
                joints = _latest_joints

            if joints is not None:
                in_data.input_double_register0 = joints[0]
                in_data.input_double_register1 = joints[1]
                in_data.input_double_register2 = joints[2]
                in_data.input_double_register3 = joints[3]
                in_data.input_double_register4 = joints[4]
                in_data.input_double_register5 = joints[5]
                in_data.input_double_register6 = 1   # flag=1 → 有新数据
                rt_follow.set_input(in_data)
                last_sent = t0

            elapsed = time.perf_counter() - t0
            sleep_time = interval - elapsed
            if sleep_time > 0:
                time.sleep(sleep_time)
    finally:
        # 发送停止信号(flag=0,follow 脚本会停在最后位置)
        in_data.input_double_register6 = 0
        rt_follow.set_input(in_data)
        rt_follow.disconnect()
        print("[INFO] Follow RTSI 已断开")

# ──────────────────────────────────────────────
# 发送脚本到 follow(30001 端口)
# ──────────────────────────────────────────────
def send_script_to_follow(script_path):
    """通过 30001 端口发送脚本到 follow 机器人。"""

    script_port = 30001  # Elite 机器人的脚本端口
    
    # 读取脚本文件
    try:
        with open(script_path, 'r', encoding='utf-8') as f:
            script_content = f.read()
    except Exception as e:
        print(f"[ERROR] 读取脚本失败: {e}")
        return False

    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(10.0)
        sock.connect((FOLLOW_IP, script_port))
        print(f"[INFO] 已连接 follow 脚本端口")

        # 发送脚本内容
        sock.sendall(script_content.encode('utf-8'))
        print(f"[INFO] 脚本已发送 ({len(script_content)} 字节)")

        # 等待确认
        response = sock.recv(1024)
        print(f"[INFO] Follow 响应: {response.decode('utf-8', errors='ignore').strip()}")

        sock.close()
        return True

    except Exception as e:
        print(f"[ERROR] 发送脚本失败: {e}")
        return False

# ──────────────────────────────────────────────
# 主函数
# ──────────────────────────────────────────────
def main():
    signal.signal(signal.SIGINT, _signal_handler)
    signal.signal(signal.SIGTERM, _signal_handler)

    port_dir = os.path.dirname(__file__)
    script_path = os.path.abspath(os.path.join(port_dir, 'follow_ema.script'))

    if not send_script_to_follow(script_path):
        print("[ERROR] 脚本发送失败,退出")
        sys.exit(1)

    # 等待脚本启动
    time.sleep(0.5)

    t_master = threading.Thread(target=master_reader_thread, daemon=True, name="master-reader")
    t_master.start()

    print("[INFO] 等待 master 第一帧数据...")
    deadline = time.time() + 10
    while _latest_joints is None and _running:
        if time.time() > deadline:
            print("[ERROR] 等待 master 数据超时")
            sys.exit(1)
        time.sleep(0.05)

    print(f"[INFO] Master 初始关节: {[round(v, 4) for v in _latest_joints]}")

    t_follow = threading.Thread(target=follow_writer_thread, daemon=True, name="follow-writer")
    t_follow.start()

    try:
        while _running and t_master.is_alive() and t_follow.is_alive():
            time.sleep(0.5)
    except KeyboardInterrupt:
        _signal_handler(None, None)

    stop_follow_script()

    t_master.join(timeout=3)
    t_follow.join(timeout=5)
    print("[INFO] 透传已停止,程序退出")

if __name__ == "__main__":
    main()
14.6 KB