File size: 1,880 Bytes
d61f7b9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
import os
from typing import Optional, Sequence, TypeVar
import numpy as np
import torch
from rl_algo_impls.dqn.q_net import QNetwork
from rl_algo_impls.shared.policy.policy import Policy
from rl_algo_impls.wrappers.vectorable_wrapper import (
VecEnv,
VecEnvObs,
single_action_space,
single_observation_space,
)
DQNPolicySelf = TypeVar("DQNPolicySelf", bound="DQNPolicy")
class DQNPolicy(Policy):
def __init__(
self,
env: VecEnv,
hidden_sizes: Sequence[int] = [],
cnn_flatten_dim: int = 512,
cnn_style: str = "nature",
cnn_layers_init_orthogonal: Optional[bool] = None,
impala_channels: Sequence[int] = (16, 32, 32),
**kwargs,
) -> None:
super().__init__(env, **kwargs)
self.q_net = QNetwork(
single_observation_space(env),
single_action_space(env),
hidden_sizes,
cnn_flatten_dim=cnn_flatten_dim,
cnn_style=cnn_style,
cnn_layers_init_orthogonal=cnn_layers_init_orthogonal,
impala_channels=impala_channels,
)
def act(
self,
obs: VecEnvObs,
eps: float = 0,
deterministic: bool = True,
action_masks: Optional[np.ndarray] = None,
) -> np.ndarray:
assert eps == 0 if deterministic else eps >= 0
assert (
action_masks is None
), f"action_masks not currently supported in {self.__class__.__name__}"
if not deterministic and np.random.random() < eps:
return np.array(
[
single_action_space(self.env).sample()
for _ in range(self.env.num_envs)
]
)
else:
o = self._as_tensor(obs)
with torch.no_grad():
return self.q_net(o).argmax(axis=1).cpu().numpy()
|