mirror of
https://github.com/huggingface/diffusers.git
synced 2026-01-27 17:22:53 +03:00
Add Diffusion Policy for Reinforcement Learning (#9824)
* enable cpu ability * model creation + comprehensive testing * training + tests * all tests working * remove unneeded files + clarify docs * update train tests * update readme.md * remove data from gitignore * undo cpu enabled option * Update README.md * update readme * code quality fixes * diffusion policy example * update readme * add pretrained model weights + doc * add comment * add documentation * add docstrings * update comments * update readme * fix code quality * Update examples/reinforcement_learning/README.md Co-authored-by: Sayak Paul <spsayakpaul@gmail.com> * Update examples/reinforcement_learning/diffusion_policy.py Co-authored-by: Sayak Paul <spsayakpaul@gmail.com> * suggestions + safe globals for weights_only=True * suggestions + safe weights loading * fix code quality * reformat file --------- Co-authored-by: Sayak Paul <spsayakpaul@gmail.com>
This commit is contained in:
@@ -1,4 +1,13 @@
|
||||
# Overview
|
||||
|
||||
## Diffusion-based Policy Learning for RL
|
||||
|
||||
`diffusion_policy` implements [Diffusion Policy](https://diffusion-policy.cs.columbia.edu/), a diffusion model that predicts robot action sequences in reinforcement learning tasks.
|
||||
|
||||
This example implements a robot control model for pushing a T-shaped block into a target area. The model takes in current state observations as input, and outputs a trajectory of subsequent steps to follow.
|
||||
|
||||
To execute the script, run `diffusion_policy.py`
|
||||
|
||||
## Diffuser Locomotion
|
||||
|
||||
These examples show how to run [Diffuser](https://arxiv.org/abs/2205.09991) in Diffusers.
|
||||
There are two ways to use the script, `run_diffuser_locomotion.py`.
|
||||
|
||||
201
examples/reinforcement_learning/diffusion_policy.py
Normal file
201
examples/reinforcement_learning/diffusion_policy.py
Normal file
@@ -0,0 +1,201 @@
|
||||
import numpy as np
|
||||
import numpy.core.multiarray as multiarray
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from huggingface_hub import hf_hub_download
|
||||
from torch.serialization import add_safe_globals
|
||||
|
||||
from diffusers import DDPMScheduler, UNet1DModel
|
||||
|
||||
|
||||
add_safe_globals(
|
||||
[
|
||||
multiarray._reconstruct,
|
||||
np.ndarray,
|
||||
np.dtype,
|
||||
np.dtype(np.float32).type,
|
||||
np.dtype(np.float64).type,
|
||||
np.dtype(np.int32).type,
|
||||
np.dtype(np.int64).type,
|
||||
type(np.dtype(np.float32)),
|
||||
type(np.dtype(np.float64)),
|
||||
type(np.dtype(np.int32)),
|
||||
type(np.dtype(np.int64)),
|
||||
]
|
||||
)
|
||||
|
||||
"""
|
||||
An example of using HuggingFace's diffusers library for diffusion policy,
|
||||
generating smooth movement trajectories.
|
||||
|
||||
This implements a robot control model for pushing a T-shaped block into a target area.
|
||||
The model takes in the robot arm position, block position, and block angle,
|
||||
then outputs a sequence of 16 (x,y) positions for the robot arm to follow.
|
||||
"""
|
||||
|
||||
|
||||
class ObservationEncoder(nn.Module):
|
||||
"""
|
||||
Converts raw robot observations (positions/angles) into a more compact representation
|
||||
|
||||
state_dim (int): Dimension of the input state vector (default: 5)
|
||||
[robot_x, robot_y, block_x, block_y, block_angle]
|
||||
|
||||
- Input shape: (batch_size, state_dim)
|
||||
- Output shape: (batch_size, 256)
|
||||
"""
|
||||
|
||||
def __init__(self, state_dim):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(nn.Linear(state_dim, 512), nn.ReLU(), nn.Linear(512, 256))
|
||||
|
||||
def forward(self, x):
|
||||
return self.net(x)
|
||||
|
||||
|
||||
class ObservationProjection(nn.Module):
|
||||
"""
|
||||
Takes the encoded observation and transforms it into 32 values that represent the current robot/block situation.
|
||||
These values are used as additional contextual information during the diffusion model's trajectory generation.
|
||||
|
||||
- Input: 256-dim vector (padded to 512)
|
||||
Shape: (batch_size, 256)
|
||||
- Output: 32 contextual information values for the diffusion model
|
||||
Shape: (batch_size, 32)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.weight = nn.Parameter(torch.randn(32, 512))
|
||||
self.bias = nn.Parameter(torch.zeros(32))
|
||||
|
||||
def forward(self, x): # pad 256-dim input to 512-dim with zeros
|
||||
if x.size(-1) == 256:
|
||||
x = torch.cat([x, torch.zeros(*x.shape[:-1], 256, device=x.device)], dim=-1)
|
||||
return nn.functional.linear(x, self.weight, self.bias)
|
||||
|
||||
|
||||
class DiffusionPolicy:
|
||||
"""
|
||||
Implements diffusion policy for generating robot arm trajectories.
|
||||
Uses diffusion to generate sequences of positions for a robot arm, conditioned on
|
||||
the current state of the robot and the block it needs to push.
|
||||
|
||||
The model expects observations in pixel coordinates (0-512 range) and block angle in radians.
|
||||
It generates trajectories as sequences of (x,y) coordinates also in the 0-512 range.
|
||||
"""
|
||||
|
||||
def __init__(self, state_dim=5, device="cpu"):
|
||||
self.device = device
|
||||
|
||||
# define valid ranges for inputs/outputs
|
||||
self.stats = {
|
||||
"obs": {"min": torch.zeros(5), "max": torch.tensor([512, 512, 512, 512, 2 * np.pi])},
|
||||
"action": {"min": torch.zeros(2), "max": torch.full((2,), 512)},
|
||||
}
|
||||
|
||||
self.obs_encoder = ObservationEncoder(state_dim).to(device)
|
||||
self.obs_projection = ObservationProjection().to(device)
|
||||
|
||||
# UNet model that performs the denoising process
|
||||
# takes in concatenated action (2 channels) and context (32 channels) = 34 channels
|
||||
# outputs predicted action (2 channels for x,y coordinates)
|
||||
self.model = UNet1DModel(
|
||||
sample_size=16, # length of trajectory sequence
|
||||
in_channels=34,
|
||||
out_channels=2,
|
||||
layers_per_block=2, # number of layers per each UNet block
|
||||
block_out_channels=(128,), # number of output neurons per layer in each block
|
||||
down_block_types=("DownBlock1D",), # reduce the resolution of data
|
||||
up_block_types=("UpBlock1D",), # increase the resolution of data
|
||||
).to(device)
|
||||
|
||||
# noise scheduler that controls the denoising process
|
||||
self.noise_scheduler = DDPMScheduler(
|
||||
num_train_timesteps=100, # number of denoising steps
|
||||
beta_schedule="squaredcos_cap_v2", # type of noise schedule
|
||||
)
|
||||
|
||||
# load pre-trained weights from HuggingFace
|
||||
checkpoint = torch.load(
|
||||
hf_hub_download("dorsar/diffusion_policy", "push_tblock.pt"), weights_only=True, map_location=device
|
||||
)
|
||||
self.model.load_state_dict(checkpoint["model_state_dict"])
|
||||
self.obs_encoder.load_state_dict(checkpoint["encoder_state_dict"])
|
||||
self.obs_projection.load_state_dict(checkpoint["projection_state_dict"])
|
||||
|
||||
# scales data to [-1, 1] range for neural network processing
|
||||
def normalize_data(self, data, stats):
|
||||
return ((data - stats["min"]) / (stats["max"] - stats["min"])) * 2 - 1
|
||||
|
||||
# converts normalized data back to original range
|
||||
def unnormalize_data(self, ndata, stats):
|
||||
return ((ndata + 1) / 2) * (stats["max"] - stats["min"]) + stats["min"]
|
||||
|
||||
@torch.no_grad()
|
||||
def predict(self, observation):
|
||||
"""
|
||||
Generates a trajectory of robot arm positions given the current state.
|
||||
|
||||
Args:
|
||||
observation (torch.Tensor): Current state [robot_x, robot_y, block_x, block_y, block_angle]
|
||||
Shape: (batch_size, 5)
|
||||
|
||||
Returns:
|
||||
torch.Tensor: Sequence of (x,y) positions for the robot arm to follow
|
||||
Shape: (batch_size, 16, 2) where:
|
||||
- 16 is the number of steps in the trajectory
|
||||
- 2 is the (x,y) coordinates in pixel space (0-512)
|
||||
|
||||
The function first encodes the observation, then uses it to condition a diffusion
|
||||
process that gradually denoises random trajectories into smooth, purposeful movements.
|
||||
"""
|
||||
observation = observation.to(self.device)
|
||||
normalized_obs = self.normalize_data(observation, self.stats["obs"])
|
||||
|
||||
# encode the observation into context values for the diffusion model
|
||||
cond = self.obs_projection(self.obs_encoder(normalized_obs))
|
||||
# keeps first & second dimension sizes unchanged, and multiplies last dimension by 16
|
||||
cond = cond.view(normalized_obs.shape[0], -1, 1).expand(-1, -1, 16)
|
||||
|
||||
# initialize action with noise - random noise that will be refined into a trajectory
|
||||
action = torch.randn((observation.shape[0], 2, 16), device=self.device)
|
||||
|
||||
# denoise
|
||||
# at each step `t`, the current noisy trajectory (`action`) & conditioning info (context) are
|
||||
# fed into the model to predict a denoised trajectory, then uses self.noise_scheduler.step to
|
||||
# apply this prediction & slightly reduce the noise in `action` more
|
||||
|
||||
self.noise_scheduler.set_timesteps(100)
|
||||
for t in self.noise_scheduler.timesteps:
|
||||
model_output = self.model(torch.cat([action, cond], dim=1), t)
|
||||
action = self.noise_scheduler.step(model_output.sample, t, action).prev_sample
|
||||
|
||||
action = action.transpose(1, 2) # reshape to [batch, 16, 2]
|
||||
action = self.unnormalize_data(action, self.stats["action"]) # scale back to coordinates
|
||||
return action
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
policy = DiffusionPolicy()
|
||||
|
||||
# sample of a single observation
|
||||
# robot arm starts in center, block is slightly left and up, rotated 90 degrees
|
||||
obs = torch.tensor(
|
||||
[
|
||||
[
|
||||
256.0, # robot arm x position (middle of screen)
|
||||
256.0, # robot arm y position (middle of screen)
|
||||
200.0, # block x position
|
||||
300.0, # block y position
|
||||
np.pi / 2, # block angle (90 degrees)
|
||||
]
|
||||
]
|
||||
)
|
||||
|
||||
action = policy.predict(obs)
|
||||
|
||||
print("Action shape:", action.shape) # should be [1, 16, 2] - one trajectory of 16 x,y positions
|
||||
print("\nPredicted trajectory:")
|
||||
for i, (x, y) in enumerate(action[0]):
|
||||
print(f"Step {i:2d}: x={x:6.1f}, y={y:6.1f}")
|
||||
Reference in New Issue
Block a user