SubprocVecEnv(VecEnv):
VecEnv that runs multiple environments in parallel in subproceses and communicates with them via pipes.
Recommended to use when num_envs > 1 and step() can be a bottleneck.
def
__init__
(self, env_fns, spaces=None, context=
'
spawn
'
, in_series=1
):
Arguments:
env_fns: iterable of callables - functions that create environments to run in subprocesses. Need to be cloud-pickleable
in_series: number of environments to run in series in a single process
(e.g. when len(env_fns) == 12 and in_series == 3, it will run 4 processes, each running 3 envs in series)
self.waiting
=
False
self.closed
=
False
self.in_series
=
in_series
nenvs
=
len(env_fns)
assert
nenvs % in_series == 0,
"
Number of envs must be divisible by number of envs to run in series
"
self.nremotes
= nenvs //
in_series
env_fns
=
np.array_split(env_fns, self.nremotes)
ctx
=
mp.get_context(context)
self.remotes, self.work_remotes
= zip(*[ctx.Pipe()
for
_
in
range(self.nremotes)])
self.ps
= [ctx.Process(target=worker, args=
(work_remote, remote, CloudpickleWrapper(env_fn)))
for
(work_remote, remote, env_fn)
in
zip(self.work_remotes, self.remotes, env_fns)]
for
p
in
self.ps:
p.daemon
= True
#
if the main process crashes, we should not cause things to hang
with clear_mpi_env_vars():
p.start()
for
remote
in
self.work_remotes:
remote.close()
self.remotes[0].send((
'
get_spaces_spec
'
, None))
observation_space, action_space, self.spec
=
self.remotes[0].recv().x
self.viewer
=
None
VecEnv.
__init__
(self, nenvs, observation_space, action_space)
def
step_async(self, actions):
self._assert_not_closed()
actions
=
np.array_split(actions, self.nremotes)
for
remote, action
in
zip(self.remotes, actions):
remote.send((
'
step
'
, action))
self.waiting
=
True
def
step_wait(self):
self._assert_not_closed()
results
= [remote.recv()
for
remote
in
self.remotes]
results
=
_flatten_list(results)
self.waiting
=
False
obs, rews, dones, infos
= zip(*
results)
return
_flatten_obs(obs), np.stack(rews), np.stack(dones), infos
def
reset(self):
self._assert_not_closed()
for
remote
in
self.remotes:
remote.send((
'
reset
'
, None))
obs
= [remote.recv()
for
remote
in
self.remotes]
obs
=
_flatten_list(obs)
return
_flatten_obs(obs)
def
close_extras(self):
self.closed
=
True
if
self.waiting:
for
remote
in
self.remotes:
remote.recv()
for
remote
in
self.remotes:
remote.send((
'
close
'
, None))
for
p
in
self.ps:
p.join()
def
get_images(self):
self._assert_not_closed()
for
pipe
in
self.remotes:
pipe.send((
'
render
'
, None))
imgs
= [pipe.recv()
for
pipe
in
self.remotes]
imgs
=
_flatten_list(imgs)
return
imgs
def
_assert_not_closed(self):
assert
not
self.closed,
"
Trying to operate on a SubprocVecEnv after calling close()
"
def
__del__
(self):
if
not
self.closed:
self.close()
def
_flatten_obs(obs):
assert
isinstance(obs, (list, tuple))
assert
len(obs) >
0
if
isinstance(obs[0], dict):
keys
=
obs[0].keys()
return
{k: np.stack([o[k]
for
o
in
obs])
for
k
in
keys}
else
:
return
np.stack(obs)
def
_flatten_list(l):
assert
isinstance(l, (list, tuple))
assert
len(l) >
0
assert
all([len(l_) > 0
for
l_
in
l])
return
[l__
for
l_
in
l
for
l__
in
l_]
该模块实现多进程下的多环境交互,这里假设每个进程可以与多个环境进行串行的交互,每个进程可以交互的环境个数为in_series,默认该值为1。
每个子进程的交互的环境个数为nremotes。
self.nremotes = nenvs // in_series
将生成环境env的函数进行nremotes个数的划分:
env_fns = np.array_split(env_fns, self.nremotes)
在这个模块中多进程使用multiprocessing来实现的。
进程自己的信息传递使用Pipe管道来实现:
self.remotes, self.work_remotes = zip(*[ctx.Pipe() for _ in range(self.nremotes)])
从管道的生成可以看出一个有nremotes个子进程,也为其分别生成了nremotes个数个Pipe。
每个pipe都有两个端口,这里使用remotes和work_remotes来表示。
这里说明下pipe的两个端口都是可以进行读信息和存信息的操作的,不过如果有多个进程同时操作同一个端口会导致消息的操作出错,这样的话如果一个管道设计固定为两个进程进行消息传递可以自行指定两个端口分别给两个进程进行操作。
一个pipe的两个端口就像一个真实管道的两个端口一样。
生成nremotes个子进程,分别将nremotes个管道的两端和生成环境的函数可序列化包装后传递给子进程:
self.ps = [ctx.Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn)))
for (work_remote, remote, env_fn) in zip(self.work_remotes, self.remotes, env_fns)]
在这个类的设计中为1个主进程分别和nremotes个子进程进行交互。
在父进程中只对self.remotes进行操作,对其进行消息的读取和存储。
在子进程中只对self.work_remotes进行操作,对其进行消息的读取和存储。
在父进程中关闭对self.work_remotes的操作:
for remote in self.work_remotes:
remote.close()
在子进程中关闭对self.remotes的操作:
parent_remote.close()
在父进程中生成多个子进程,并设置子进程的模式为daemon,同时在生成子进程时要清空mpi的环境变量否则mpi的引入会导致子进程的挂起:
for p in self.ps:
p.daemon = True # if the main process crashes, we should not cause things to hang
with clear_mpi_env_vars():
p.start()
def step_async(self, actions):
def step_async(self, actions):
对动作进行划分使用pipe传递给对应的子进程并从管道中取出子进程返回的消息。
在对收到的observation后需要做处理,如果observation为dict类型则需要将多个环境的observation的对应的key保持不变将value进行堆叠,reward,done信息也是使用np.array进行堆叠,info则用list做集合。
如果各个环境的observation为np.array类型则对他们进行np.stack堆叠。
在图像绘制上继承父类vec_env.py中的特性:
def render(self, mode='human'):
imgs = self.get_images()
bigimg = tile_images(imgs)
if mode == 'human':
self.get_viewer().imshow(bigimg)
return self.get_viewer().isopen
elif mode == 'rgb_array':
return bigimg
else:
raise NotImplementedError
def get_viewer(self):
if self.viewer is None:
from gym.envs.classic_control import rendering
self.viewer = rendering.SimpleImageViewer()
return self.viewer
subproc_vec_env.py中的:
def get_images(self):
self._assert_not_closed()
for pipe in self.remotes:
pipe.send(('render', None))
imgs = [pipe.recv() for pipe in self.remotes]
imgs = _flatten_list(imgs)
return imgs
可以看到render函数也是将收集到的各个子进程的图片拼接后进行绘制。