代码拉取完成,页面将自动刷新
import numpy as np
import os
import collections
import matplotlib.pyplot as plt
from dm_control import mujoco
from dm_control.rl import control
from dm_control.suite import base
from constants import DT, XML_DIR, START_ARM_POSE,B_2_N,DOF_ARM,DOF_CAP
from constants import PUPPET_GRIPPER_POSITION_UNNORMALIZE_FN,PUPPET_GRIPPER_POSITION_UNNORMALIZE_FN1
from constants import MASTER_GRIPPER_POSITION_NORMALIZE_FN
from constants import PUPPET_GRIPPER_POSITION_NORMALIZE_FN
from constants import PUPPET_GRIPPER_VELOCITY_NORMALIZE_FN
import math
import IPython
e = IPython.embed
import math
from ctypes import *
BOX_POSE = [None] # to be changed from outside
def quaternion_to_euler(q):
"""
Convert a quaternion into euler angles (yaw, pitch, roll)
q: A list of four elements representing a quaternion [q_w, q_x, q_y, q_z]
"""
# Extract the values from the quaternion
w, x, y, z = q
# Yaw (Z-axis rotation)
siny_cosp = 2 * (w * z + x * y)
cosy_cosp = 1 - 2 * (y * y + z * z)
yaw = math.atan2(siny_cosp, cosy_cosp)
# Pitch (Y-axis rotation)
sinp = 2 * (w * y - z * x)
if abs(sinp) >= 1:
pitch = math.copysign(math.pi / 2, sinp) # Use 90 degrees if out of range
else:
pitch = math.asin(sinp)
# Roll (X-axis rotation)
sinr_cosp = 2 * (w * x + y * z)
cosr_cosp = 1 - 2 * (x * x + y * y)
roll = math.atan2(sinr_cosp, cosr_cosp)
return pitch, roll, yaw
def make_sim_env(task_name):
"""
Environment for simulated robot bi-manual manipulation, with joint position control
Action space: [left_arm_qpos (6), # absolute joint position
left_gripper_positions (1), # normalized gripper position (0: close, 1: open)
right_arm_qpos (6), # absolute joint position
right_gripper_positions (1),] # normalized gripper position (0: close, 1: open)
Observation space: {"qpos": Concat[ left_arm_qpos (6), # absolute joint position
left_gripper_position (1), # normalized gripper position (0: close, 1: open)
right_arm_qpos (6), # absolute joint position
right_gripper_qpos (1)] # normalized gripper position (0: close, 1: open)
"qvel": Concat[ left_arm_qvel (6), # absolute joint velocity (rad)
left_gripper_velocity (1), # normalized gripper velocity (pos: opening, neg: closing)
right_arm_qvel (6), # absolute joint velocity (rad)
right_gripper_qvel (1)] # normalized gripper velocity (pos: opening, neg: closing)
"images": {"main": (480x640x3)} # h, w, c, dtype='uint8'
"""
if 'sim_transfer_cube' in task_name:
xml_path = os.path.join(XML_DIR, f'bimanual_viperx_transfer_cube.xml')
physics = mujoco.Physics.from_xml_path(xml_path)
task = TransferCubeTask(random=False)
env = control.Environment(physics, task, time_limit=20, control_timestep=DT,
n_sub_steps=None, flat_observation=False)
elif 'sim_insertion' in task_name:
xml_path = os.path.join(XML_DIR, f'bimanual_viperx_insertion.xml')
physics = mujoco.Physics.from_xml_path(xml_path)
task = InsertionTask(random=False)
env = control.Environment(physics, task, time_limit=20, control_timestep=DT,
n_sub_steps=None, flat_observation=False)
elif 'human_test' == task_name:#---------------------------------------------new doghome
xml_path = os.path.join(XML_DIR, f'human_test.xml')
physics = mujoco.Physics.from_xml_path(xml_path)
task = TransferCubeTask_Human(random=False)
env = control.Environment(physics, task, time_limit=20, control_timestep=DT,
n_sub_steps=None, flat_observation=False)
elif 'human_test1' == task_name:#---------------------------------------------new doghome
xml_path = os.path.join(XML_DIR, f'human_test1.xml')
physics = mujoco.Physics.from_xml_path(xml_path)
task = TransferCubeTask_Human(random=False)
env = control.Environment(physics, task, time_limit=20, control_timestep=DT,
n_sub_steps=None, flat_observation=False)
else:
raise NotImplementedError
return env
class BimanualViperXTask(base.Task):#基础类 被所有任务继承
def __init__(self, random=None):
super().__init__(random=random)
def before_step(self, action, physics):#不同于ee这里直接送入了角度7+7+2
left_arm_action = action[:7]#末端6个位姿
right_arm_action = action[8:8+7]
normalized_left_gripper_action = action[7]#末端行程
normalized_right_gripper_action = action[7+8]
left_gripper_action = PUPPET_GRIPPER_POSITION_UNNORMALIZE_FN1(normalized_left_gripper_action)
right_gripper_action = PUPPET_GRIPPER_POSITION_UNNORMALIZE_FN1(normalized_right_gripper_action)
full_left_gripper_action = [left_gripper_action, -left_gripper_action]
full_right_gripper_action = [right_gripper_action, -right_gripper_action]
#算出来14个关节角,Left: 0:7 +抓1+抓2; right:[9:15]+抓1+抓2 18个自由度
new_qpos = np.concatenate([left_arm_action[:DOF_ARM],[0,0],right_arm_action[:DOF_ARM],[0,0]]) #18
qpos_raw = physics.data.qpos.copy()[:(DOF_ARM+2)*2]
tar_q_flt=np.zeros((DOF_ARM+2)*2)
flt=1
for i in range (0,len(qpos_raw)):
tar_q_flt[i]=new_qpos[i]*flt+(1-flt)*qpos_raw[i]
tar_q_flt[DOF_ARM]=left_gripper_action
tar_q_flt[DOF_ARM+1]=-left_gripper_action#PUPPET_GRIPPER_POSITION_OPEN
tar_q_flt[(DOF_ARM+1)*2]=right_gripper_action
tar_q_flt[(DOF_ARM+1)*2+1]=-right_gripper_action
#env_action = np.concatenate([left_arm_action, full_left_gripper_action, right_arm_action, full_right_gripper_action])
#env_action = np.copyto(physics.data.ctrl, np.concatenate([tar_q_flt,0.0*np.zeros(18)]))
np.copyto(physics.data.ctrl, np.concatenate([tar_q_flt,np.zeros((DOF_ARM+2)*2)]))#输出位置控制器 由于没有DM DJ这里还是采用mojoco执行器或使用扭矩控制代替
#print(physics.data.ctrl)
#super().before_step(env_action, physics)#使用关节电机驱动
return
def initialize_episode(self, physics):
"""Sets the state of the environment at the start of each episode."""
super().initialize_episode(physics)
@staticmethod
def get_qpos(physics):
qpos_raw = physics.data.qpos.copy()
left_qpos_raw = qpos_raw[:(DOF_ARM+2)]
right_qpos_raw = qpos_raw[(DOF_ARM+2):(DOF_ARM+2)*2]
left_arm_qpos = left_qpos_raw[:DOF_ARM]
right_arm_qpos = right_qpos_raw[:DOF_ARM]
left_gripper_qpos = [PUPPET_GRIPPER_POSITION_NORMALIZE_FN(left_qpos_raw[DOF_ARM])]
right_gripper_qpos = [PUPPET_GRIPPER_POSITION_NORMALIZE_FN(right_qpos_raw[DOF_ARM])]#从手夹爪比例需要修改
return np.concatenate([left_arm_qpos, left_gripper_qpos, right_arm_qpos, right_gripper_qpos])
@staticmethod
def get_qvel(physics):
qvel_raw = physics.data.qvel.copy()
left_qvel_raw = qvel_raw[:(DOF_ARM+2)]
right_qvel_raw = qvel_raw[(DOF_ARM+2):(DOF_ARM+2)*2]
left_arm_qvel = left_qvel_raw[:DOF_ARM]
right_arm_qvel = right_qvel_raw[:DOF_ARM]
left_gripper_qvel = [PUPPET_GRIPPER_VELOCITY_NORMALIZE_FN(left_qvel_raw[DOF_ARM])]
right_gripper_qvel = [PUPPET_GRIPPER_VELOCITY_NORMALIZE_FN(right_qvel_raw[DOF_ARM])]
return np.concatenate([left_arm_qvel, left_gripper_qvel, right_arm_qvel, right_gripper_qvel])
@staticmethod
def get_env_state(physics):
raise NotImplementedError
def get_observation(self, physics):
obs = collections.OrderedDict()
obs['qpos'] = self.get_qpos(physics)
obs['qvel'] = self.get_qvel(physics)
obs['env_state'] = self.get_env_state(physics)
obs['images'] = dict()
obs['images']['top'] = physics.render(height=480, width=640, camera_id='top')#图像尺寸
obs['images']['angle'] = physics.render(height=480, width=640, camera_id='angle')
obs['images']['vis'] = physics.render(height=480, width=640, camera_id='front_close')
obs['images']['left_wrist'] = physics.render(height=480, width=640, camera_id='left_wrist')
obs['images']['right_wrist'] = physics.render(height=480, width=640, camera_id='right_wrist')
return obs
def get_reward(self, physics):
# return whether left gripper is holding the box
raise NotImplementedError
class TransferCubeTask_Human(BimanualViperXTask):#--------------------------doghome
def __init__(self, random=None):
super().__init__(random=random)
self.max_reward = 5#-------------------
def initialize_episode(self, physics):
"""Sets the state of the environment at the start of each episode."""
# TODO Notice: this function does not randomize the env configuration. Instead, set BOX_POSE from outside
# reset qpos, control and box position
with physics.reset_context():
box_start_idx = physics.model.name2id('red_box_joint', 'joint')
physics.named.data.qpos[:(DOF_ARM+2)*2] = START_ARM_POSE
#np.copyto(physics.data.ctrl, START_ARM_POSE)
np.copyto(physics.data.ctrl, np.concatenate([START_ARM_POSE,np.zeros((DOF_ARM+2)*2)])) #amao
assert BOX_POSE[0] is not None
#np.copyto(physics.named.data.qpos[box_start_idx : box_start_idx + 7], BOX_POSE[0])
temp=BOX_POSE[0][:7*2]
print(len(BOX_POSE[0]),len(temp))
print(f"{BOX_POSE=}")
np.copyto(physics.named.data.qpos[(DOF_ARM+2)*2:(DOF_ARM+2)*2+7*2], temp)
#physics.named.data.qpos[(DOF_ARM+2)*2:(DOF_ARM+2)*2+7*2] = BOX_POSE[0] # two objects 方块的位姿态 将上一次的方块位置进行复位
super().initialize_episode(physics)
@staticmethod
def get_env_state(physics):#---doghome
env_state = physics.data.qpos.copy()[(DOF_ARM+2)*2:]
return env_state
def get_reward(self, physics):
# return whether left gripper is holding the box
all_contact_pairs = []
for i_contact in range(physics.data.ncon):
id_geom_1 = physics.data.contact[i_contact].geom1
id_geom_2 = physics.data.contact[i_contact].geom2
name_geom_1 = physics.model.id2name(id_geom_1, 'geom')
name_geom_2 = physics.model.id2name(id_geom_2, 'geom')
contact_pair = (name_geom_1, name_geom_2)
all_contact_pairs.append(contact_pair)
touch_left_gripper = ("red_box", "vx300s_left/10_left_gripper_finger") in all_contact_pairs
touch_right_gripper = ("red_box", "vx300s_right/10_right_gripper_finger") in all_contact_pairs
touch_table = ("red_box", "table") in all_contact_pairs
cube_red_place_good = ("red_box", "table") in all_contact_pairs
cube_blue_place_good = ("blue_box", "table") in all_contact_pairs
#print("Reward check",touch_left_gripper,touch_right_gripper,touch_table,cube_red_place_good)
reward = 0
if touch_right_gripper:
reward = 1
if touch_right_gripper and not touch_table: # lifted
reward = 2
if touch_left_gripper: # attempted transfer
reward = 3
if touch_left_gripper and not touch_table: # successful transfer
reward = 4
if cube_red_place_good and cube_blue_place_good: # successful transfer
reward = 5
return reward
class TransferCubeTask(BimanualViperXTask):
def __init__(self, random=None):
super().__init__(random=random)
self.max_reward = 4
def initialize_episode(self, physics):
"""Sets the state of the environment at the start of each episode."""
# TODO Notice: this function does not randomize the env configuration. Instead, set BOX_POSE from outside
# reset qpos, control and box position
with physics.reset_context():
physics.named.data.qpos[:16] = START_ARM_POSE
np.copyto(physics.data.ctrl, START_ARM_POSE)
assert BOX_POSE[0] is not None
physics.named.data.qpos[-7:] = BOX_POSE[0]
# print(f"{BOX_POSE=}")
super().initialize_episode(physics)
@staticmethod
def get_env_state(physics):
env_state = physics.data.qpos.copy()[16:]
return env_state
def get_reward(self, physics):
# return whether left gripper is holding the box
all_contact_pairs = []
for i_contact in range(physics.data.ncon):
id_geom_1 = physics.data.contact[i_contact].geom1
id_geom_2 = physics.data.contact[i_contact].geom2
name_geom_1 = physics.model.id2name(id_geom_1, 'geom')
name_geom_2 = physics.model.id2name(id_geom_2, 'geom')
contact_pair = (name_geom_1, name_geom_2)
all_contact_pairs.append(contact_pair)
touch_left_gripper = ("red_box", "vx300s_left/10_left_gripper_finger") in all_contact_pairs
touch_right_gripper = ("red_box", "vx300s_right/10_right_gripper_finger") in all_contact_pairs
touch_table = ("red_box", "table") in all_contact_pairs
reward = 0
if touch_right_gripper:
reward = 1
if touch_right_gripper and not touch_table: # lifted
reward = 2
if touch_left_gripper: # attempted transfer
reward = 3
if touch_left_gripper and not touch_table: # successful transfer
reward = 4
return reward
class InsertionTask(BimanualViperXTask):
def __init__(self, random=None):
super().__init__(random=random)
self.max_reward = 4
def initialize_episode(self, physics):
"""Sets the state of the environment at the start of each episode."""
# TODO Notice: this function does not randomize the env configuration. Instead, set BOX_POSE from outside
# reset qpos, control and box position
with physics.reset_context():
physics.named.data.qpos[:16] = START_ARM_POSE
np.copyto(physics.data.ctrl, START_ARM_POSE)
assert BOX_POSE[0] is not None
physics.named.data.qpos[-7*2:] = BOX_POSE[0] # two objects
# print(f"{BOX_POSE=}")
super().initialize_episode(physics)
@staticmethod
def get_env_state(physics):
env_state = physics.data.qpos.copy()[16:]
return env_state
def get_reward(self, physics):
# return whether peg touches the pin
all_contact_pairs = []
for i_contact in range(physics.data.ncon):
id_geom_1 = physics.data.contact[i_contact].geom1
id_geom_2 = physics.data.contact[i_contact].geom2
name_geom_1 = physics.model.id2name(id_geom_1, 'geom')
name_geom_2 = physics.model.id2name(id_geom_2, 'geom')
contact_pair = (name_geom_1, name_geom_2)
all_contact_pairs.append(contact_pair)
touch_right_gripper = ("red_peg", "vx300s_right/10_right_gripper_finger") in all_contact_pairs
touch_left_gripper = ("socket-1", "vx300s_left/10_left_gripper_finger") in all_contact_pairs or \
("socket-2", "vx300s_left/10_left_gripper_finger") in all_contact_pairs or \
("socket-3", "vx300s_left/10_left_gripper_finger") in all_contact_pairs or \
("socket-4", "vx300s_left/10_left_gripper_finger") in all_contact_pairs
peg_touch_table = ("red_peg", "table") in all_contact_pairs
socket_touch_table = ("socket-1", "table") in all_contact_pairs or \
("socket-2", "table") in all_contact_pairs or \
("socket-3", "table") in all_contact_pairs or \
("socket-4", "table") in all_contact_pairs
peg_touch_socket = ("red_peg", "socket-1") in all_contact_pairs or \
("red_peg", "socket-2") in all_contact_pairs or \
("red_peg", "socket-3") in all_contact_pairs or \
("red_peg", "socket-4") in all_contact_pairs
pin_touched = ("red_peg", "pin") in all_contact_pairs
reward = 0
if touch_left_gripper and touch_right_gripper: # touch both
reward = 1
if touch_left_gripper and touch_right_gripper and (not peg_touch_table) and (not socket_touch_table): # grasp both
reward = 2
if peg_touch_socket and (not peg_touch_table) and (not socket_touch_table): # peg and socket touching
reward = 3
if pin_touched: # successful insertion
reward = 4
return reward
#-------------------------------------------------------------------------------------
def get_action(master_bot_left, master_bot_right):#实物样机
action = np.zeros(14)
# arm action
action[:6] = master_bot_left.dxl.joint_states.position[:6]
action[7:7+6] = master_bot_right.dxl.joint_states.position[:6]
# gripper action
left_gripper_pos = master_bot_left.dxl.joint_states.position[7]
right_gripper_pos = master_bot_right.dxl.joint_states.position[7]
normalized_left_pos = MASTER_GRIPPER_POSITION_NORMALIZE_FN(left_gripper_pos)
normalized_right_pos = MASTER_GRIPPER_POSITION_NORMALIZE_FN(right_gripper_pos)
action[6] = normalized_left_pos
action[7+6] = normalized_right_pos
return action
def test_sim_teleop():#真机器人
""" Testing teleoperation in sim with ALOHA. Requires hardware and ALOHA repo to work. """
from interbotix_xs_modules.arm import InterbotixManipulatorXS
BOX_POSE[0] = [0.2, 0.5, 0.05, 1, 0, 0, 0]
# source of data
master_bot_left = InterbotixManipulatorXS(robot_model="wx250s", group_name="arm", gripper_name="gripper",
robot_name=f'master_left', init_node=True)
master_bot_right = InterbotixManipulatorXS(robot_model="wx250s", group_name="arm", gripper_name="gripper",
robot_name=f'master_right', init_node=False)
# setup the environment
env = make_sim_env('sim_transfer_cube')
ts = env.reset()
episode = [ts]
# setup plotting
ax = plt.subplot()
plt_img = ax.imshow(ts.observation['images']['angle'])
plt.ion()
for t in range(1000):
action = get_action(master_bot_left, master_bot_right)
ts = env.step(action)
episode.append(ts)
plt_img.set_data(ts.observation['images']['angle'])
plt.pause(0.02)
if __name__ == '__main__':
test_sim_teleop()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。