File size: 4,754 Bytes
460072a 5c87b65 460072a a861765 5c87b65 460072a 5c87b65 460072a 5c87b65 460072a 5c87b65 460072a 5c87b65 460072a 5c87b65 460072a 5c87b65 460072a 5c87b65 460072a 5c87b65 460072a 5c87b65 460072a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
from typing import Optional, Sequence, Tuple
import numpy as np
import torch
import torch.nn as nn
from rl_algo_impls.shared.actor import Actor, PiForward, actor_head
from rl_algo_impls.shared.encoder import Encoder
from rl_algo_impls.shared.policy.actor_critic import OnPolicy, Step, clamp_actions
from rl_algo_impls.shared.policy.actor_critic_network import default_hidden_sizes
from rl_algo_impls.shared.policy.critic import CriticHead
from rl_algo_impls.shared.policy.policy import ACTIVATION
from rl_algo_impls.wrappers.vectorable_wrapper import (
VecEnv,
VecEnvObs,
single_action_space,
single_observation_space,
)
PI_FILE_NAME = "pi.pt"
V_FILE_NAME = "v.pt"
class VPGActor(Actor):
def __init__(self, feature_extractor: Encoder, head: Actor) -> None:
super().__init__()
self.feature_extractor = feature_extractor
self.head = head
def forward(self, obs: torch.Tensor, a: Optional[torch.Tensor] = None) -> PiForward:
fe = self.feature_extractor(obs)
return self.head(fe, a)
def sample_weights(self, batch_size: int = 1) -> None:
self.head.sample_weights(batch_size=batch_size)
@property
def action_shape(self) -> Tuple[int, ...]:
return self.head.action_shape
class VPGActorCritic(OnPolicy):
def __init__(
self,
env: VecEnv,
hidden_sizes: Optional[Sequence[int]] = None,
init_layers_orthogonal: bool = True,
activation_fn: str = "tanh",
log_std_init: float = -0.5,
use_sde: bool = False,
full_std: bool = True,
squash_output: bool = False,
**kwargs,
) -> None:
super().__init__(env, **kwargs)
activation = ACTIVATION[activation_fn]
obs_space = single_observation_space(env)
self.action_space = single_action_space(env)
self.use_sde = use_sde
self.squash_output = squash_output
hidden_sizes = (
hidden_sizes
if hidden_sizes is not None
else default_hidden_sizes(obs_space)
)
pi_feature_extractor = Encoder(
obs_space, activation, init_layers_orthogonal=init_layers_orthogonal
)
pi_head = actor_head(
self.action_space,
pi_feature_extractor.out_dim,
tuple(hidden_sizes),
init_layers_orthogonal,
activation,
log_std_init=log_std_init,
use_sde=use_sde,
full_std=full_std,
squash_output=squash_output,
)
self.pi = VPGActor(pi_feature_extractor, pi_head)
v_feature_extractor = Encoder(
obs_space, activation, init_layers_orthogonal=init_layers_orthogonal
)
v_head = CriticHead(
v_feature_extractor.out_dim,
tuple(hidden_sizes),
activation=activation,
init_layers_orthogonal=init_layers_orthogonal,
)
self.v = nn.Sequential(v_feature_extractor, v_head)
def value(self, obs: VecEnvObs) -> np.ndarray:
o = self._as_tensor(obs)
with torch.no_grad():
v = self.v(o)
return v.cpu().numpy()
def step(self, obs: VecEnvObs, action_masks: Optional[np.ndarray] = None) -> Step:
assert (
action_masks is None
), f"action_masks not currently supported in {self.__class__.__name__}"
o = self._as_tensor(obs)
with torch.no_grad():
pi, _, _ = self.pi(o)
a = pi.sample()
logp_a = pi.log_prob(a)
v = self.v(o)
a_np = a.cpu().numpy()
clamped_a_np = clamp_actions(a_np, self.action_space, self.squash_output)
return Step(a_np, v.cpu().numpy(), logp_a.cpu().numpy(), clamped_a_np)
def act(
self,
obs: np.ndarray,
deterministic: bool = True,
action_masks: Optional[np.ndarray] = None,
) -> np.ndarray:
assert (
action_masks is None
), f"action_masks not currently supported in {self.__class__.__name__}"
if not deterministic:
return self.step(obs).clamped_a
else:
o = self._as_tensor(obs)
with torch.no_grad():
pi, _, _ = self.pi(o)
a = pi.mode
return clamp_actions(a.cpu().numpy(), self.action_space, self.squash_output)
def load(self, path: str) -> None:
super().load(path)
self.reset_noise()
def reset_noise(self, batch_size: Optional[int] = None) -> None:
self.pi.sample_weights(
batch_size=batch_size if batch_size else self.env.num_envs
)
@property
def action_shape(self) -> Tuple[int, ...]:
return self.pi.action_shape
|