Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[python/simulator] First-class multi-robot support. #754

Merged
merged 15 commits into from
Apr 12, 2024
6 changes: 3 additions & 3 deletions build_tools/stub_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _issubclass(cls, class_or_tuple, /):


def extract_boost_python_signature(args: str) -> str:
find_optional_args = re.search('\[(.*?)\]$', args)
find_optional_args = re.search(r'\[(.*?)\]$', args)
if find_optional_args:
optional_args = find_optional_args.group(1)
nominal_args = args.replace("[" + optional_args + "]", "")
Expand All @@ -53,7 +53,7 @@ def extract_boost_python_signature(args: str) -> str:

if nominal_args:
for k, arg in enumerate(nominal_args):
type_name = re.search('\((.*?)\)', arg).group(1)
type_name = re.search(r'\((.*?)\)', arg).group(1)
# `bp::object` can be basically anything, so switching to 'Any'.
if type_name == "object":
type_name = "typing.Any"
Expand All @@ -70,7 +70,7 @@ def extract_boost_python_signature(args: str) -> str:
if optional_args and True:
for k, arg in enumerate(optional_args):
main_arg, *optional_args = map(str.strip, arg.split('=', 1))
type_name = re.search('\((.*?)\)', main_arg).group(1)
type_name = re.search(r'\((.*?)\)', main_arg).group(1)
if type_name == "object":
type_name = "typing.Any"

Expand Down
28 changes: 19 additions & 9 deletions python/gym_jiminy/common/gym_jiminy/common/envs/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ def __init__(self,
environments with multiple inheritance, and to allow
automatic pipeline wrapper generation.
"""
# Make sure that the simulator is single-robot
if tuple(robot.name for robot in simulator.robots) != ("",):
raise ValueError(
"`BaseJiminyEnv` only supports single-robot simulators.")

# Handling of default rendering mode
viewer_backend = (simulator.viewer or Viewer).backend
if render_mode is None:
Expand Down Expand Up @@ -837,9 +842,11 @@ def reset(self, # type: ignore[override]
# Note that the viewer must be reset if available, otherwise it would
# keep using the old robot model for display, which must be avoided.
if self.simulator.is_viewer_available:
self.simulator.viewer._setup(self.robot)
if self.simulator.viewer.has_gui():
self.simulator.viewer.refresh()
viewer = self.simulator.viewer
assert viewer is not None # Assert(s) for type checker
viewer._setup(self.robot) # type: ignore[attr-defined]
if viewer.has_gui():
viewer.refresh()

return obs, deepcopy(self._info)

Expand Down Expand Up @@ -1130,8 +1137,8 @@ def play_interactive(self,

# Make sure viewer gui is open, so that the viewer will shared external
# forces with the robot automatically.
if not (self.simulator.is_viewer_available and
self.simulator.viewer.has_gui()):
viewer = self.simulator.viewer
if viewer is None or not viewer.has_gui():
self.simulator.render(update_ground_profile=False)

# Reset the environnement
Expand All @@ -1141,15 +1148,18 @@ def play_interactive(self,

# Refresh the ground profile
self.simulator.render(update_ground_profile=True)
viewer = self.simulator.viewer
assert viewer is not None # Assert(s) for type checker

# Enable travelling
if enable_travelling is None:
enable_travelling = \
self.simulator.viewer.backend.startswith('panda3d')
backend = viewer.backend
assert backend is not None # Assert(s) for type checker
enable_travelling = backend.startswith('panda3d')
enable_travelling = enable_travelling and self.robot.has_freeflyer
if enable_travelling:
tracked_frame = self.robot.pinocchio_model.frames[2].name
self.simulator.viewer.attach_camera(tracked_frame)
viewer.attach_camera(tracked_frame)

# Refresh the scene once again to update camera placement
self.render()
Expand All @@ -1176,7 +1186,7 @@ def _interact(key: Optional[str] = None) -> bool:

# Disable travelling if it enabled
if enable_travelling:
self.simulator.viewer.detach_camera()
viewer.detach_camera()

# Stop the simulation to unlock the robot.
# It will enable to display contact forces for replay.
Expand Down
21 changes: 20 additions & 1 deletion python/gym_jiminy/common/gym_jiminy/common/envs/locomotion.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Generic environment to learn locomotion skills for legged robots using
Jiminy simulator as physics engine.
"""
import os
import pathlib
from typing import Optional, Dict, Union, Any, Type, Sequence, Tuple

import numpy as np
Expand All @@ -13,6 +15,7 @@
ImuSensor as imu,
PeriodicGaussianProcess,
Robot)
from jiminy_py.robot import BaseJiminyRobot
from jiminy_py.simulator import Simulator

import pinocchio as pin
Expand Down Expand Up @@ -182,8 +185,24 @@ def __init__(self,
has_freeflyer=True),
**kwargs})
else:
# Instantiate a simulator and load the options
# Instantiate a simulator
simulator = Simulator(robot, viewer_kwargs=viewer_kwargs, **kwargs)

# Load engine and robot options
if config_path is None:
if isinstance(robot, BaseJiminyRobot):
urdf_path = (
robot._urdf_path_orig) # type: ignore[attr-defined]
else:
urdf_path = robot.urdf_path
if not urdf_path:
raise ValueError(
"'config_path' must be provided if the robot is not "
"associated with any URDF.")
config_path = str(pathlib.Path(
urdf_path).with_suffix('')) + '_options.toml'
if not os.path.exists(config_path):
config_path = ""
simulator.import_options(config_path)

# Initialize base class
Expand Down
141 changes: 125 additions & 16 deletions python/jiminy_py/src/jiminy_py/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
reconstructing the robot to reading telemetry variables.
"""
import os
import re
import tempfile
from bisect import bisect_right
from collections import OrderedDict
Expand Down Expand Up @@ -116,8 +117,11 @@ def build_robot_from_log(
log_data: Dict[str, Any],
mesh_path_dir: Optional[str] = None,
mesh_package_dirs: Sequence[str] = (),
*, robot_name: str = ""
20chupin marked this conversation as resolved.
Show resolved Hide resolved
) -> jiminy.Robot:
"""Build robot from log.
"""Create and initialize a robot from a single or
multi robot simulation. If the robot to be built is from a multi-robot
simulation, its name needs to be provided.

.. note::
model options and `robot.pinocchio_model` will be the same as during
Expand All @@ -142,37 +146,46 @@ def build_robot_from_log(
directories to the ones provided by log file. It
may be necessary to specify it to read log
generated on a different environment.
:param robot_name: Name of the robot to build from log.

:returns: Reconstructed robot, and parsed log data as returned by
`jiminy_py.log.read_log` method.
"""
# Prepare robot namespace
robot_namespace = ".".join(filter(None, (ENGINE_NAMESPACE, robot_name)))

# Instantiate empty robot
robot = jiminy.Robot()
robot = jiminy.Robot(robot_name)

# Extract log constants
log_constants = log_data["constants"]

# Extract common info
pinocchio_model = log_constants[
".".join((ENGINE_NAMESPACE, "pinocchio_model"))]
try:
pinocchio_model = log_constants[
".".join((robot_namespace, "pinocchio_model"))]
except KeyError as e:
if robot_name == "":
raise ValueError(
20chupin marked this conversation as resolved.
Show resolved Hide resolved
"No robot with empty name has been found. Specify a name or "
"call `build_robots_from_log`.") from e

try:
# Extract geometry models
collision_model = log_constants[
".".join((ENGINE_NAMESPACE, "collision_model"))]
".".join((robot_namespace, "collision_model"))]
visual_model = log_constants[
".".join((ENGINE_NAMESPACE, "visual_model"))]
".".join((robot_namespace, "visual_model"))]

# Initialize the model
robot.initialize(pinocchio_model, collision_model, visual_model)
except KeyError as e:
# Extract initialization arguments
urdf_data = log_constants[
".".join((ENGINE_NAMESPACE, "urdf_file"))]
".".join((robot_namespace, "urdf_file"))]
has_freeflyer = int(log_constants[
".".join((ENGINE_NAMESPACE, "has_freeflyer"))])
".".join((robot_namespace, "has_freeflyer"))])
mesh_package_dirs = [*mesh_package_dirs, *log_constants.get(
".".join((ENGINE_NAMESPACE, "mesh_package_dirs")), ())]
".".join((robot_namespace, "mesh_package_dirs")), ())]

# Make sure urdf data is available
if len(urdf_data) <= 1:
Expand Down Expand Up @@ -201,7 +214,9 @@ def build_robot_from_log(

# Load the options
all_options = log_constants[".".join((ENGINE_NAMESPACE, "options"))]
robot.set_options(all_options["robot"])
# Load the options
robot.set_options(
all_options[".".join(filter(None, (robot_name, "robot")))])

# Update model in-place.
# Note that `__setstate__` re-allocates memory instead of just calling
Expand All @@ -218,8 +233,53 @@ def build_robot_from_log(
return robot


def build_robots_from_log(
log_data: Dict[str, Any],
mesh_path_dir: Optional[str] = None,
mesh_package_dirs: Sequence[str] = (),
) -> Sequence[jiminy.Robot]:
"""Build all the robots in the log of the simulation.

.. note::
20chupin marked this conversation as resolved.
Show resolved Hide resolved
This function calls `build_robot_from_log` to build each robot.
Refer to `build_robot_from_log` for more information.

:param log_data: Logged data (constants and variables) as a dictionary.
:param mesh_path_dir: Overwrite the common root of all absolute mesh paths.
It which may be necessary to read log generated on a
different environment.
:param mesh_package_dirs: Prepend custom mesh package search path
directories to the ones provided by log file. It
may be necessary to specify it to read log
generated on a different environment.

:returns: Sequence of reconstructed robots.
"""
# Extract log constants
log_constants = log_data["constants"]

robots_names = []
for key in log_constants.keys():
if key == "HighLevelController.has_freeflyer":
robots_names.append("")
else:
m = re.findall(r"HighLevelController.(\w+).has_freeflyer", key)
if len(m) == 1:
robots_names.append(m[0])

robots = []
for robot_name in robots_names:
robot = build_robot_from_log(
log_data, mesh_path_dir, mesh_package_dirs, robot_name=robot_name)

robots.append(robot)

return robots


def extract_trajectory_from_log(log_data: Dict[str, Any],
robot: Optional[jiminy.Model] = None
robot: Optional[jiminy.Model] = None,
*, robot_name: Optional[str] = None
) -> TrajectoryDataType:
"""Extract the minimal required information from raw log data in order to
replay the simulation in a viewer.
Expand All @@ -234,27 +294,41 @@ def extract_trajectory_from_log(log_data: Dict[str, Any],
:param robot: Jiminy robot associated with the logged trajectory.
Optional: None by default. If None, then it will be
reconstructed from 'log_data' using `build_robot_from_log`.
:param robot_name: Name of the robot to be constructed in the log. If it
is not known, call `build_robot_from_log`.

:returns: Trajectory dictionary. The actual trajectory corresponds to the
field "evolution_robot" and it is a list of State object. The
other fields are additional information.
"""
# Prepare robot namespace
if robot is not None:
if robot_name is None:
robot_name = robot.name
elif robot_name != robot.name:
raise ValueError(
"The name specified in `robot_name` and the name of the robot "
"are differents.")
elif robot_name is None:
robot_name = ""

robot_namespace = ".".join(filter(None, (ENGINE_NAMESPACE, robot_name)))
# Handling of default argument(s)
if robot is None:
robot = build_robot_from_log(log_data)
robot = build_robot_from_log(log_data, robot_name=robot_name)

# Define some proxies for convenience
log_vars = log_data["variables"]

# Extract the joint positions, velocities and external forces over time
positions = np.stack([
log_vars.get(".".join((ENGINE_NAMESPACE, field)), [])
log_vars.get(".".join((robot_namespace, field)), [])
for field in robot.log_position_fieldnames], axis=-1)
velocities = np.stack([
log_vars.get(".".join((ENGINE_NAMESPACE, field)), [])
log_vars.get(".".join((robot_namespace, field)), [])
for field in robot.log_velocity_fieldnames], axis=-1)
forces = np.stack([
log_vars.get(".".join((ENGINE_NAMESPACE, field)), [])
log_vars.get(".".join((robot_namespace, field)), [])
for field in robot.log_f_external_fieldnames], axis=-1)

# Determine which optional data are available
Expand All @@ -281,6 +355,41 @@ def extract_trajectory_from_log(log_data: Dict[str, Any],
"use_theoretical_model": False}


def extract_trajectories_from_log(
log_data: Dict[str, Any],
robots: Optional[Sequence[jiminy.Model]] = None
) -> Dict[str, TrajectoryDataType]:
"""Extract the minimal required information from raw log data in order to
replay the simulation in a viewer.

.. note::
20chupin marked this conversation as resolved.
Show resolved Hide resolved
This function calls `extract_trajectory_from_log` to extract each
robot's trajectory. Refer to `extract_trajectory_from_log` for more
information.

:param log_data: Logged data (constants and variables) as a dictionary.
:param robots: Sequend of Jiminy robots associated with the logged
trajectory.
Optional: None by default. If None, then it will be
reconstructed from 'log_data' using `build_robots_from_log`.

:returns: Dictonary mapping each robot name to its corresponding
trajectory.
"""
trajectories = {}

# Handling of default argument(s)
if robots is None:
robots = build_robots_from_log(log_data)

for robot in robots:
trajectory = extract_trajectory_from_log(
log_data, robot, robot_name=robot.name)

20chupin marked this conversation as resolved.
Show resolved Hide resolved
trajectories[robot.name] = trajectory
return trajectories


def update_sensor_measurements_from_log(
log_data: Dict[str, Any],
robot: jiminy.Model
Expand Down
7 changes: 5 additions & 2 deletions python/jiminy_py/src/jiminy_py/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,12 +865,15 @@ class BaseJiminyRobot(jiminy.Robot):
Hardware description files within the same directory and having the
name than the URDF file will be detected automatically without
requiring to manually specify its path.

:param name: Name of the robot to be created.
Optional: Empty string by default.
"""
def __init__(self) -> None:
def __init__(self, name: Optional[str] = "") -> None:
self.extra_info: Dict[str, Any] = {}
self.hardware_path: Optional[str] = None
self._urdf_path_orig: Optional[str] = None
super().__init__()
super().__init__(name)

def initialize(self,
urdf_path: str,
Expand Down
Loading
Loading