Commit 90258d12 authored by hzq's avatar hzq

add continuous action space

parent 04fa328f
...@@ -22,7 +22,7 @@ MAPPO原版代码对于环境的封装过于复杂,本项目直接将环境封 ...@@ -22,7 +22,7 @@ MAPPO原版代码对于环境的封装过于复杂,本项目直接将环境封
## 用法 ## 用法
- 环境部分是一个空的的实现,文件`light_mappo/envs/env_wrappers.py`里面环境部分的实现:[Code](https://github.com/tinyzqh/light_mappo/blob/main/envs/env_wrappers.py) - 环境部分是一个空的的实现,文件`light_mappo/envs/env_core.py`里面环境部分的实现:[Code](https://github.com/tinyzqh/light_mappo/blob/main/envs/env_wrappers.py)
```python ```python
class Env(object): class Env(object):
...@@ -63,7 +63,7 @@ class Env(object): ...@@ -63,7 +63,7 @@ class Env(object):
``` ```
只需要编写这一部分的代码,就可以无缝衔接MAPPO。初始版本,后期这一部分会单独提出来 只需要编写这一部分的代码,就可以无缝衔接MAPPO。在env_core.py之后,单独提出来了两个文件env_discrete.py和env_continuous.py这两个文件用于封装处理动作空间和离散动作空间。在algorithms/utils/act.py中elif self.continuous_action:这个判断逻辑也是用来处理连续动作空间的。和runner/shared/env_runner.py部分的# TODO 这里改造成自己环境需要的形式即可都是用来处理连续动作空间的
## Related Efforts ## Related Efforts
......
...@@ -14,11 +14,13 @@ class ACTLayer(nn.Module): ...@@ -14,11 +14,13 @@ class ACTLayer(nn.Module):
super(ACTLayer, self).__init__() super(ACTLayer, self).__init__()
self.mixed_action = False self.mixed_action = False
self.multi_discrete = False self.multi_discrete = False
self.continuous_action = False
if action_space.__class__.__name__ == "Discrete": if action_space.__class__.__name__ == "Discrete":
action_dim = action_space.n action_dim = action_space.n
self.action_out = Categorical(inputs_dim, action_dim, use_orthogonal, gain) self.action_out = Categorical(inputs_dim, action_dim, use_orthogonal, gain)
elif action_space.__class__.__name__ == "Box": elif action_space.__class__.__name__ == "Box":
self.continuous_action = True
action_dim = action_space.shape[0] action_dim = action_space.shape[0]
self.action_out = DiagGaussian(inputs_dim, action_dim, use_orthogonal, gain) self.action_out = DiagGaussian(inputs_dim, action_dim, use_orthogonal, gain)
elif action_space.__class__.__name__ == "MultiBinary": elif action_space.__class__.__name__ == "MultiBinary":
...@@ -49,7 +51,7 @@ class ACTLayer(nn.Module): ...@@ -49,7 +51,7 @@ class ACTLayer(nn.Module):
:return actions: (torch.Tensor) actions to take. :return actions: (torch.Tensor) actions to take.
:return action_log_probs: (torch.Tensor) log probabilities of taken actions. :return action_log_probs: (torch.Tensor) log probabilities of taken actions.
""" """
if self.mixed_action : if self.mixed_action:
actions = [] actions = []
action_log_probs = [] action_log_probs = []
for action_out in self.action_outs: for action_out in self.action_outs:
...@@ -74,7 +76,16 @@ class ACTLayer(nn.Module): ...@@ -74,7 +76,16 @@ class ACTLayer(nn.Module):
actions = torch.cat(actions, -1) actions = torch.cat(actions, -1)
action_log_probs = torch.cat(action_log_probs, -1) action_log_probs = torch.cat(action_log_probs, -1)
elif self.continuous_action:
# actions = []
# action_log_probs = []
action_logit = self.action_out(x)
actions = action_logit.mode() if deterministic else action_logit.sample()
action_log_probs = action_logit.log_probs(actions)
# actions.append(action.float())
# action_log_probs.append(action_log_prob)
# actions = torch.cat(actions, -1)
# action_log_probs = torch.sum(torch.cat(action_log_probs, -1), -1, keepdim=True)
else: else:
action_logits = self.action_out(x, available_actions) action_logits = self.action_out(x, available_actions)
actions = action_logits.mode() if deterministic else action_logits.sample() actions = action_logits.mode() if deterministic else action_logits.sample()
...@@ -151,6 +162,27 @@ class ACTLayer(nn.Module): ...@@ -151,6 +162,27 @@ class ACTLayer(nn.Module):
action_log_probs = torch.cat(action_log_probs, -1) # ! could be wrong action_log_probs = torch.cat(action_log_probs, -1) # ! could be wrong
dist_entropy = torch.tensor(dist_entropy).mean() dist_entropy = torch.tensor(dist_entropy).mean()
elif self.continuous_action:
# a, b = action.split((2, 1), -1)
# b = b.long()
# action = [a, b]
action_log_probs = []
dist_entropy = []
# for action_out, act in zip(self.action_outs, action):
action_logit = self.action_out(x)
action_log_probs.append(action_logit.log_probs(action))
if active_masks is not None:
if len(action_logit.entropy().shape) == len(active_masks.shape):
dist_entropy.append((action_logit.entropy() * active_masks).sum() / active_masks.sum())
else:
dist_entropy.append(
(action_logit.entropy() * active_masks.squeeze(-1)).sum() / active_masks.sum())
else:
dist_entropy.append(action_logit.entropy().mean())
action_log_probs = torch.sum(torch.cat(action_log_probs, -1), -1, keepdim=True)
dist_entropy = dist_entropy[0] # / 2.0 + dist_entropy[1] / 0.98 # ! dosen't make sense
else: else:
action_logits = self.action_out(x, available_actions) action_logits = self.action_out(x, available_actions)
action_log_probs = action_logits.log_probs(action) action_log_probs = action_logits.log_probs(action)
......
...@@ -176,7 +176,6 @@ def get_config(): ...@@ -176,7 +176,6 @@ def get_config():
parser.add_argument("--num_env_steps", type=int, default=10e6, parser.add_argument("--num_env_steps", type=int, default=10e6,
help='Number of environment steps to train (default: 10e6)') help='Number of environment steps to train (default: 10e6)')
parser.add_argument("--user_name", type=str, default='marl',help="[for wandb usage], to specify user's name for simply collecting training data.") parser.add_argument("--user_name", type=str, default='marl',help="[for wandb usage], to specify user's name for simply collecting training data.")
parser.add_argument("--use_wandb", action='store_false', default=False, help="[for wandb usage], by default True, will log date to wandb server. or else will use tensorboard to log data.")
# env parameters # env parameters
parser.add_argument("--env_name", type=str, default='MyEnv', help="specify the name of environment") parser.add_argument("--env_name", type=str, default='MyEnv', help="specify the name of environment")
......
"""
# @Time : 2021/7/2 5:22 下午
# @Author : hezhiqiang01
# @Email : hezhiqiang01@baidu.com
# @File : env.py
"""
import numpy as np
class Env(object):
"""
# 环境中的智能体
"""
def __init__(self, i):
self.agent_num = 2 # 设置智能体(小飞机)的个数,这里设置为两个
self.obs_dim = 14 # 设置智能体的观测纬度
self.action_dim = 5 # 设置智能体的动作纬度,这里假定为一个五个纬度的
def reset(self):
"""
# self.agent_num设定为2个智能体时,返回值为一个list,每个list里面为一个shape = (self.obs_dim, )的观测数据
"""
sub_agent_obs = []
for i in range(self.agent_num):
sub_obs = np.random.random(size=(14, ))
sub_agent_obs.append(sub_obs)
return sub_agent_obs
def step(self, actions):
"""
# self.agent_num设定为2个智能体时,actions的输入为一个2纬的list,每个list里面为一个shape = (self.action_dim, )的动作数据
# 默认参数情况下,输入为一个list,里面含有两个元素,因为动作纬度为5,所里每个元素shape = (5, )
"""
sub_agent_obs = []
sub_agent_reward = []
sub_agent_done = []
sub_agent_info = []
for i in range(self.agent_num):
sub_agent_obs.append(np.random.random(size=(14,)))
sub_agent_reward.append([np.random.rand()])
sub_agent_done.append(False)
sub_agent_info.append({})
return [sub_agent_obs, sub_agent_reward, sub_agent_done, sub_agent_info]
\ No newline at end of file
"""
# @Time : 2021/7/2 5:22 下午
# @Author : hezhiqiang
# @Email : tinyzqh@163.com
# @File : env_discrete.py
"""
import gym
from gym import spaces
import numpy as np
from envs.env_core import EnvCore
class DiscreteActionEnv(object):
"""对于离散动作环境的封装"""
def __init__(self):
self.env = EnvCore()
self.num_agent = self.env.agent_num
self.signal_obs_dim = self.env.obs_dim
self.signal_action_dim = self.env.action_dim
# if true, action is a number 0...N, otherwise action is a one-hot N-dimensional vector
self.discrete_action_input = False
self.movable = True
# configure spaces
self.action_space = []
self.observation_space = []
self.share_observation_space = []
share_obs_dim = 0
for agent in range(self.num_agent):
total_action_space = []
# physical action space
u_action_space = spaces.Discrete(self.signal_action_dim) # 5个离散的动作
if self.movable:
total_action_space.append(u_action_space)
# total action space
if len(total_action_space) > 1:
# all action spaces are discrete, so simplify to MultiDiscrete action space
if all([isinstance(act_space, spaces.Discrete) for act_space in total_action_space]):
act_space = MultiDiscrete([[0, act_space.n - 1] for act_space in total_action_space])
else:
act_space = spaces.Tuple(total_action_space)
self.action_space.append(act_space)
else:
self.action_space.append(total_action_space[0])
# observation space
share_obs_dim += self.signal_obs_dim
self.observation_space.append(spaces.Box(low=-np.inf, high=+np.inf, shape=(self.signal_obs_dim,),
dtype=np.float32)) # [-inf,inf]
self.share_observation_space = [spaces.Box(low=-np.inf, high=+np.inf, shape=(share_obs_dim,),
dtype=np.float32) for _ in range(self.num_agent)]
def step(self, actions):
"""
输入actions纬度假设:
# actions shape = (5, 2, 5)
# 5个线程的环境,里面有2个智能体,每个智能体的动作是一个one_hot的5维编码
"""
results = self.env.step(actions)
obs, rews, dones, infos = results
return np.stack(obs), np.stack(rews), np.stack(dones), infos
def reset(self):
obs = self.env.reset()
return np.stack(obs)
def close(self):
pass
def render(self, mode="rgb_array"):
pass
def seed(self, seed):
pass
class MultiDiscrete(gym.Space):
"""
- The multi-discrete action space consists of a series of discrete action spaces with different parameters
- It can be adapted to both a Discrete action space or a continuous (Box) action space
- It is useful to represent game controllers or keyboards where each key can be represented as a discrete action space
- It is parametrized by passing an array of arrays containing [min, max] for each discrete action space
where the discrete action space can take any integers from `min` to `max` (both inclusive)
Note: A value of 0 always need to represent the NOOP action.
e.g. Nintendo Game Controller
- Can be conceptualized as 3 discrete action spaces:
1) Arrow Keys: Discrete 5 - NOOP[0], UP[1], RIGHT[2], DOWN[3], LEFT[4] - params: min: 0, max: 4
2) Button A: Discrete 2 - NOOP[0], Pressed[1] - params: min: 0, max: 1
3) Button B: Discrete 2 - NOOP[0], Pressed[1] - params: min: 0, max: 1
- Can be initialized as
MultiDiscrete([ [0,4], [0,1], [0,1] ])
"""
def __init__(self, array_of_param_array):
super().__init__()
self.low = np.array([x[0] for x in array_of_param_array])
self.high = np.array([x[1] for x in array_of_param_array])
self.num_discrete_space = self.low.shape[0]
self.n = np.sum(self.high) + 2
def sample(self):
""" Returns a array with one sample from each discrete action space """
# For each row: round(random .* (max - min) + min, 0)
random_array = np.random.rand(self.num_discrete_space)
return [int(x) for x in np.floor(np.multiply((self.high - self.low + 1.), random_array) + self.low)]
def contains(self, x):
return len(x) == self.num_discrete_space and (np.array(x) >= self.low).all() and (
np.array(x) <= self.high).all()
@property
def shape(self):
return self.num_discrete_space
def __repr__(self):
return "MultiDiscrete" + str(self.num_discrete_space)
def __eq__(self, other):
return np.array_equal(self.low, other.low) and np.array_equal(self.high, other.high)
if __name__ == "__main__":
DiscreteActionEnv().step(actions=None)
\ No newline at end of file
...@@ -7,214 +7,294 @@ Modified from OpenAI Baselines code to work with multi-agent envs ...@@ -7,214 +7,294 @@ Modified from OpenAI Baselines code to work with multi-agent envs
""" """
import numpy as np import numpy as np
import gym import torch
from gym import spaces from multiprocessing import Process, Pipe
from envs.env import Env from abc import ABC, abstractmethod
class MultiDiscrete(gym.Space):
"""
- The multi-discrete action space consists of a series of discrete action spaces with different parameters
- It can be adapted to both a Discrete action space or a continuous (Box) action space
- It is useful to represent game controllers or keyboards where each key can be represented as a discrete action space
- It is parametrized by passing an array of arrays containing [min, max] for each discrete action space
where the discrete action space can take any integers from `min` to `max` (both inclusive)
Note: A value of 0 always need to represent the NOOP action.
e.g. Nintendo Game Controller
- Can be conceptualized as 3 discrete action spaces:
1) Arrow Keys: Discrete 5 - NOOP[0], UP[1], RIGHT[2], DOWN[3], LEFT[4] - params: min: 0, max: 4
2) Button A: Discrete 2 - NOOP[0], Pressed[1] - params: min: 0, max: 1
3) Button B: Discrete 2 - NOOP[0], Pressed[1] - params: min: 0, max: 1
- Can be initialized as
MultiDiscrete([ [0,4], [0,1], [0,1] ])
"""
def __init__(self, array_of_param_array):
super().__init__()
self.low = np.array([x[0] for x in array_of_param_array])
self.high = np.array([x[1] for x in array_of_param_array])
self.num_discrete_space = self.low.shape[0]
self.n = np.sum(self.high) + 2
def sample(self):
""" Returns a array with one sample from each discrete action space """
# For each row: round(random .* (max - min) + min, 0)
random_array = np.random.rand(self.num_discrete_space)
return [int(x) for x in np.floor(np.multiply((self.high - self.low + 1.), random_array) + self.low)]
def contains(self, x):
return len(x) == self.num_discrete_space and (np.array(x) >= self.low).all() and (
np.array(x) <= self.high).all()
@property def tile_images(img_nhwc):
def shape(self):
return self.num_discrete_space
def __repr__(self):
return "MultiDiscrete" + str(self.num_discrete_space)
def __eq__(self, other):
return np.array_equal(self.low, other.low) and np.array_equal(self.high, other.high)
class SubprocVecEnv(object):
def __init__(self, all_args):
""" """
envs: list of gym environments to run in subprocesses Tile N images into one big PxQ image
(P,Q) are chosen to be as close as possible, and if N
is square, then P=Q.
input: img_nhwc, list or array of images, ndim=4 once turned into array
n = batch index, h = height, w = width, c = channel
returns:
bigim_HWc, ndarray with ndim=3
"""
img_nhwc = np.asarray(img_nhwc)
N, h, w, c = img_nhwc.shape
H = int(np.ceil(np.sqrt(N)))
W = int(np.ceil(float(N)/H))
img_nhwc = np.array(list(img_nhwc) + [img_nhwc[0]*0 for _ in range(N, H*W)])
img_HWhwc = img_nhwc.reshape(H, W, h, w, c)
img_HhWwc = img_HWhwc.transpose(0, 2, 1, 3, 4)
img_Hh_Ww_c = img_HhWwc.reshape(H*h, W*w, c)
return img_Hh_Ww_c
class CloudpickleWrapper(object):
"""
Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)
""" """
self.env_list = [Env(i) for i in range(all_args.n_rollout_threads)] def __init__(self, x):
self.num_envs = all_args.n_rollout_threads self.x = x
self.num_agent = self.env_list[0].agent_num def __getstate__(self):
self.signal_obs_dim = self.env_list[0].obs_dim import cloudpickle
self.signal_action_dim = self.env_list[0].action_dim return cloudpickle.dumps(self.x)
self.u_range = 1.0 # control range for continuous control def __setstate__(self, ob):
self.movable = True import pickle
self.x = pickle.loads(ob)
# environment parameters
# self.discrete_action_space = True def worker(remote, parent_remote, env_fn_wrapper):
self.discrete_action_space = True parent_remote.close()
env = env_fn_wrapper.x()
# if true, action is a number 0...N, otherwise action is a one-hot N-dimensional vector while True:
self.discrete_action_input = False cmd, data = remote.recv()
# if true, even the action is continuous, action will be performed discretely if cmd == 'step':
self.force_discrete_action = False ob, reward, done, info = env.step(data)
if 'bool' in done.__class__.__name__:
# configure spaces if done:
self.action_space = [] ob = env.reset()
self.observation_space = []
self.share_observation_space = []
share_obs_dim = 0
for agent in range(self.num_agent):
total_action_space = []
# physical action space
if self.discrete_action_space:
u_action_space = spaces.Discrete(self.signal_action_dim) # 5个离散的动作
else:
u_action_space = spaces.Box(low=-self.u_range, high=+self.u_range, shape=(2,), dtype=np.float32) # [-1,1]
if self.movable:
total_action_space.append(u_action_space)
# total action space
if len(total_action_space) > 1:
# all action spaces are discrete, so simplify to MultiDiscrete action space
if all([isinstance(act_space, spaces.Discrete) for act_space in total_action_space]):
act_space = MultiDiscrete([[0, act_space.n - 1] for act_space in total_action_space])
else: else:
act_space = spaces.Tuple(total_action_space) if np.all(done):
self.action_space.append(act_space) ob = env.reset()
remote.send((ob, reward, done, info))
elif cmd == 'reset':
ob = env.reset()
remote.send((ob))
elif cmd == 'render':
if data == "rgb_array":
fr = env.render(mode=data)
remote.send(fr)
elif data == "human":
env.render(mode=data)
elif cmd == 'reset_task':
ob = env.reset_task()
remote.send(ob)
elif cmd == 'close':
env.close()
remote.close()
break
elif cmd == 'get_spaces':
remote.send((env.observation_space, env.share_observation_space, env.action_space))
else: else:
self.action_space.append(total_action_space[0]) raise NotImplementedError
# observation space
share_obs_dim += self.signal_obs_dim
self.observation_space.append(spaces.Box(low=-np.inf, high=+np.inf, shape=(self.signal_obs_dim,),
dtype=np.float32)) # [-inf,inf]
self.share_observation_space = [spaces.Box(low=-np.inf, high=+np.inf, shape=(share_obs_dim,),
dtype=np.float32) for _ in range(self.num_agent)]
def step(self, actions): class ShareVecEnv(ABC):
""" """
输入actions纬度假设: An abstract asynchronous, vectorized environment.
# actions shape = (5, 2, 5) Used to batch data from multiple copies of an environment, so that
# 5个线程的环境,里面有2个智能体,每个智能体的动作是一个one_hot的5维编码 each observation becomes an batch of observations, and expected action is a batch of actions to
be applied per-environment.
""" """
closed = False
viewer = None
results = [env.step(action) for env, action in zip(self.env_list, actions)] metadata = {
obs, rews, dones, infos = zip(*results) 'render.modes': ['human', 'rgb_array']
return np.stack(obs), np.stack(rews), np.stack(dones), infos }
def reset(self): def __init__(self, num_envs, observation_space, share_observation_space, action_space):
obs = [env.reset() for env in self.env_list] self.num_envs = num_envs
return np.stack(obs) self.observation_space = observation_space
self.share_observation_space = share_observation_space
self.action_space = action_space
def close(self): @abstractmethod
def reset(self):
"""
Reset all the environments and return an array of
observations, or a dict of observation arrays.
If step_async is still doing work, that work will
be cancelled and step_wait() should not be called
until step_async() is invoked again.
"""
pass pass
def render(self, mode="rgb_array"): @abstractmethod
def step_async(self, actions):
"""
Tell all the environments to start taking a step
with the given actions.
Call step_wait() to get the results of the step.
You should not call this if a step_async run is
already pending.
"""
pass pass
@abstractmethod
# single env def step_wait(self):
class DummyVecEnv(object):
def __init__(self, all_args):
""" """
envs: list of gym environments to run in subprocesses Wait for the step taken with step_async().
Returns (obs, rews, dones, infos):
- obs: an array of observations, or a dict of
arrays of observations.
- rews: an array of rewards
- dones: an array of "episode done" booleans
- infos: a sequence of info objects
""" """
pass
self.env_list = [Env(i) for i in range(all_args.n_eval_rollout_threadss)] def close_extras(self):
self.num_envs = all_args.n_rollout_threads """
Clean up the extra resources, beyond what's in this base class.
self.num_agent = self.env_list[0].agent_num Only runs when not self.closed.
"""
self.u_range = 1.0 # control range for continuous control pass
self.movable = True
# environment parameters def close(self):
self.discrete_action_space = True if self.closed:
return
if self.viewer is not None:
self.viewer.close()
self.close_extras()
self.closed = True
# if true, action is a number 0...N, otherwise action is a one-hot N-dimensional vector def step(self, actions):
self.discrete_action_input = False """
# if true, even the action is continuous, action will be performed discretely Step the environments synchronously.
self.force_discrete_action = False This is available for backwards compatibility.
# in this env, force_discrete_action == False��because world do not have discrete_action """
self.step_async(actions)
return self.step_wait()
def render(self, mode='human'):
imgs = self.get_images()
bigimg = tile_images(imgs)
if mode == 'human':
self.get_viewer().imshow(bigimg)
return self.get_viewer().isopen
elif mode == 'rgb_array':
return bigimg
else:
raise NotImplementedError
# configure spaces def get_images(self):
self.action_space = [] """
self.observation_space = [] Return RGB images from each environment
self.share_observation_space = [] """
share_obs_dim = 0 raise NotImplementedError
for agent_num in range(self.num_agent):
total_action_space = []
# physical action space @property
if self.discrete_action_space: def unwrapped(self):
u_action_space = spaces.Discrete(5) # 5个离散的动作 if isinstance(self, VecEnvWrapper):
else: return self.venv.unwrapped
u_action_space = spaces.Box(low=-self.u_range, high=+self.u_range, shape=(2,), dtype=np.float32) # [-1,1]
if self.movable:
total_action_space.append(u_action_space)
# total action space
if len(total_action_space) > 1:
# all action spaces are discrete, so simplify to MultiDiscrete action space
if all([isinstance(act_space, spaces.Discrete) for act_space in total_action_space]):
act_space = MultiDiscrete([[0, act_space.n - 1] for act_space in total_action_space])
else:
act_space = spaces.Tuple(total_action_space)
self.action_space.append(act_space)
else: else:
self.action_space.append(total_action_space[0]) return self
# observation space
obs_dim = 14 # 单个智能体的观测维度
share_obs_dim += obs_dim
self.observation_space.append(spaces.Box(low=-np.inf, high=+np.inf, shape=(obs_dim,), dtype=np.float32)) # [-inf,inf]
self.share_observation_space = [spaces.Box(low=-np.inf, high=+np.inf, shape=(share_obs_dim,), def get_viewer(self):
dtype=np.float32) for _ in range(self.num_agent)] if self.viewer is None:
from gym.envs.classic_control import rendering
self.viewer = rendering.SimpleImageViewer()
return self.viewer
def step(self, actions): class SubprocVecEnv(ShareVecEnv):
def __init__(self, env_fns, spaces=None):
""" """
输入actions纬度假设: envs: list of gym environments to run in subprocesses
# actions shape = (5, 2, 5)
# 5个线程的环境,里面有2个智能体,每个智能体的动作是一个one_hot的5维编码
""" """
self.waiting = False
results = [env.step(action) for env, action in zip(self.env_list, actions)] self.closed = False
nenvs = len(env_fns)
self.remotes, self.work_remotes = zip(*[Pipe() for _ in range(nenvs)])
self.ps = [Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn)))
for (work_remote, remote, env_fn) in zip(self.work_remotes, self.remotes, env_fns)]
for p in self.ps:
p.daemon = True # if the main process crashes, we should not cause things to hang
p.start()
for remote in self.work_remotes:
remote.close()
self.remotes[0].send(('get_spaces', None))
observation_space, share_observation_space, action_space = self.remotes[0].recv()
ShareVecEnv.__init__(self, len(env_fns), observation_space,
share_observation_space, action_space)
def step_async(self, actions):
for remote, action in zip(self.remotes, actions):
remote.send(('step', action))
self.waiting = True
def step_wait(self):
results = [remote.recv() for remote in self.remotes]
self.waiting = False
obs, rews, dones, infos = zip(*results) obs, rews, dones, infos = zip(*results)
return np.stack(obs), np.stack(rews), np.stack(dones), infos return np.stack(obs), np.stack(rews), np.stack(dones), infos
def reset(self): def reset(self):
obs = [env.reset() for env in self.env_list] for remote in self.remotes:
remote.send(('reset', None))
obs = [remote.recv() for remote in self.remotes]
return np.stack(obs) return np.stack(obs)
def reset_task(self):
for remote in self.remotes:
remote.send(('reset_task', None))
return np.stack([remote.recv() for remote in self.remotes])
def close(self): def close(self):
pass if self.closed:
return
if self.waiting:
for remote in self.remotes:
remote.recv()
for remote in self.remotes:
remote.send(('close', None))
for p in self.ps:
p.join()
self.closed = True
def render(self, mode="rgb_array"): def render(self, mode="rgb_array"):
pass for remote in self.remotes:
remote.send(('render', mode))
if mode == "rgb_array":
frame = [remote.recv() for remote in self.remotes]
return np.stack(frame)
# single env
class DummyVecEnv(ShareVecEnv):
def __init__(self, env_fns):
self.envs = [fn() for fn in env_fns]
env = self.envs[0]
ShareVecEnv.__init__(self, len(
env_fns), env.observation_space, env.share_observation_space, env.action_space)
self.actions = None
def step_async(self, actions):
self.actions = actions
def step_wait(self):
results = [env.step(a) for (a, env) in zip(self.actions, self.envs)]
obs, rews, dones, infos = map(np.array, zip(*results))
for (i, done) in enumerate(dones):
if 'bool' in done.__class__.__name__:
if done:
obs[i] = self.envs[i].reset()
else:
if np.all(done):
obs[i] = self.envs[i].reset()
self.actions = None
return obs, rews, dones, infos
def reset(self):
obs = [env.reset() for env in self.envs]
return np.array(obs)
def close(self):
for env in self.envs:
env.close()
def render(self, mode="human"):
if mode == "rgb_array":
return np.array([env.render(mode=mode) for env in self.envs])
elif mode == "human":
for env in self.envs:
env.render(mode=mode)
else:
raise NotImplementedError
\ No newline at end of file
import wandb
import os import os
import numpy as np import numpy as np
import torch import torch
...@@ -37,7 +36,6 @@ class Runner(object): ...@@ -37,7 +36,6 @@ class Runner(object):
self.n_render_rollout_threads = self.all_args.n_render_rollout_threads self.n_render_rollout_threads = self.all_args.n_render_rollout_threads
self.use_linear_lr_decay = self.all_args.use_linear_lr_decay self.use_linear_lr_decay = self.all_args.use_linear_lr_decay
self.hidden_size = self.all_args.hidden_size self.hidden_size = self.all_args.hidden_size
self.use_wandb = self.all_args.use_wandb
self.use_render = self.all_args.use_render self.use_render = self.all_args.use_render
self.recurrent_N = self.all_args.recurrent_N self.recurrent_N = self.all_args.recurrent_N
...@@ -50,10 +48,6 @@ class Runner(object): ...@@ -50,10 +48,6 @@ class Runner(object):
# dir # dir
self.model_dir = self.all_args.model_dir self.model_dir = self.all_args.model_dir
if self.use_wandb:
self.save_dir = str(wandb.run.dir)
self.run_dir = str(wandb.run.dir)
else:
self.run_dir = config["run_dir"] self.run_dir = config["run_dir"]
self.log_dir = str(self.run_dir / 'logs') self.log_dir = str(self.run_dir / 'logs')
if not os.path.exists(self.log_dir): if not os.path.exists(self.log_dir):
...@@ -146,9 +140,6 @@ class Runner(object): ...@@ -146,9 +140,6 @@ class Runner(object):
:param total_num_steps: (int) total number of training env steps. :param total_num_steps: (int) total number of training env steps.
""" """
for k, v in train_infos.items(): for k, v in train_infos.items():
if self.use_wandb:
wandb.log({k: v}, step=total_num_steps)
else:
self.writter.add_scalars(k, {k: v}, total_num_steps) self.writter.add_scalars(k, {k: v}, total_num_steps)
def log_env(self, env_infos, total_num_steps): def log_env(self, env_infos, total_num_steps):
...@@ -159,7 +150,4 @@ class Runner(object): ...@@ -159,7 +150,4 @@ class Runner(object):
""" """
for k, v in env_infos.items(): for k, v in env_infos.items():
if len(v)>0: if len(v)>0:
if self.use_wandb:
wandb.log({k: np.mean(v)}, step=total_num_steps)
else:
self.writter.add_scalars(k, {k: np.mean(v)}, total_num_steps) self.writter.add_scalars(k, {k: np.mean(v)}, total_num_steps)
...@@ -16,7 +16,6 @@ import time ...@@ -16,7 +16,6 @@ import time
import numpy as np import numpy as np
import torch import torch
from runner.shared.base_runner import Runner from runner.shared.base_runner import Runner
import wandb
import imageio import imageio
...@@ -133,9 +132,12 @@ class EnvRunner(Runner): ...@@ -133,9 +132,12 @@ class EnvRunner(Runner):
else: else:
actions_env = np.concatenate((actions_env, uc_actions_env), axis=2) actions_env = np.concatenate((actions_env, uc_actions_env), axis=2)
elif self.envs.action_space[0].__class__.__name__ == 'Discrete': elif self.envs.action_space[0].__class__.__name__ == 'Discrete':
# actions --> actions_env : shape:[10, 1] --> [5, 2, 5]
actions_env = np.squeeze(np.eye(self.envs.action_space[0].n)[actions], 2) actions_env = np.squeeze(np.eye(self.envs.action_space[0].n)[actions], 2)
else: else:
raise NotImplementedError # TODO 这里改造成自己环境需要的形式即可
actions_env = actions
# raise NotImplementedError
return values, actions, action_log_probs, rnn_states, rnn_states_critic, actions_env return values, actions, action_log_probs, rnn_states, rnn_states_critic, actions_env
......
""" """
# @Time : 2021/6/30 10:07 下午 # @Time : 2021/6/30 10:07 下午
# @Author : hezhiqiang01 # @Author : hezhiqiang
# @Email : hezhiqiang01@baidu.com # @Email : tinyzqh@163.com
# @File : train.py # @File : train.py
""" """
# !/usr/bin/env python # !/usr/bin/env python
import sys import sys
import os import os
import wandb
import socket import socket
import setproctitle import setproctitle
import numpy as np import numpy as np
...@@ -21,11 +20,27 @@ from envs.env_wrappers import SubprocVecEnv, DummyVecEnv ...@@ -21,11 +20,27 @@ from envs.env_wrappers import SubprocVecEnv, DummyVecEnv
def make_train_env(all_args): def make_train_env(all_args):
return SubprocVecEnv(all_args) def get_env_fn(rank):
def init_env():
# from envs.env_continuous import ContinuousActionEnv
# env = ContinuousActionEnv()
from envs.env_discrete import DiscreteActionEnv
env = DiscreteActionEnv()
env.seed(all_args.seed + rank * 1000)
return env
return init_env
return DummyVecEnv([get_env_fn(i) for i in range(all_args.n_rollout_threads)])
def make_eval_env(all_args): def make_eval_env(all_args):
return DummyVecEnv(all_args) def get_env_fn(rank):
def init_env():
from envs.env_discrete import DiscreteActionEnv
env = DiscreteActionEnv()
env.seed(all_args.seed + rank * 1000)
return env
return init_env
return DummyVecEnv([get_env_fn(i) for i in range(all_args.n_rollout_threads)])
def parse_args(args, parser): def parse_args(args, parser):
...@@ -72,20 +87,6 @@ def main(args): ...@@ -72,20 +87,6 @@ def main(args):
if not run_dir.exists(): if not run_dir.exists():
os.makedirs(str(run_dir)) os.makedirs(str(run_dir))
# wandb
if all_args.use_wandb:
run = wandb.init(config=all_args,
project=all_args.env_name,
entity=all_args.user_name,
notes=socket.gethostname(),
name=str(all_args.algorithm_name) + "_" +
str(all_args.experiment_name) +
"_seed" + str(all_args.seed),
group=all_args.scenario_name,
dir=str(run_dir),
job_type="training",
reinit=True)
else:
if not run_dir.exists(): if not run_dir.exists():
curr_run = 'run1' curr_run = 'run1'
else: else:
...@@ -136,9 +137,6 @@ def main(args): ...@@ -136,9 +137,6 @@ def main(args):
if all_args.use_eval and eval_envs is not envs: if all_args.use_eval and eval_envs is not envs:
eval_envs.close() eval_envs.close()
if all_args.use_wandb:
run.finish()
else:
runner.writter.export_scalars_to_json(str(runner.log_dir + '/summary.json')) runner.writter.export_scalars_to_json(str(runner.log_dir + '/summary.json'))
runner.writter.close() runner.writter.close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment