Open AI Gym教程(6):自定义环境的基本步骤

虽然Open AI Gym自带了大量的环境可供测试,但我们终会碰到需要自定义环境的时候,本篇介绍构造自定义环境的基本步骤。

这里借用一个简单的例子,猜数字游戏,这个游戏和我们常见的猜数字类似,稍有不同。游戏随机给出一个浮点数(可以取值的范围事先不知道,但可以给出对于的值域空间Spaces),你可以最多猜200次,然后游戏根据你猜的数字,给出4中不同的测量值(Observation).

  • 0 — 初始,只在系统reset()后可以为0
  • 1 — 猜的数偏低
  • 2 — 猜的数和游戏给定的值相同,由于是浮点数,系统比较两个值偏差在1%内就认为相同。
  • 3 –猜的数偏大。

系统给出的奖励如下:

  • 1 如果猜中数字 ,允许1%偏差
  • 0 没有猜中, 也就是偏差高于1%

根据游戏规则,我们可以定义这个环境的Action 和 Observation 的值域空间,Observation比较简单,使用整数类型就可以

observation_space = spaces.Discrete(4)

而Action 可以用多种定义,最简单的是直接使用猜的数字作为值域空间,比如:

self.bounds = 10000

self.action_space = spaces.Box(low=np.array([-self.bounds]), high=np.array([self.bounds]),
                                       dtype=np.float32)
self.observation_space = spaces.Discrete(4)

此外,还可以使用Tuple类型, (Discrete(1),(Box(low=0, high=np.array([self.bounds]),
dtype=np.float32))
第一个增大或是减小,第二个为增大或是减小的数值。本例采用第一种方法。

根据上述的分析, 我们就可以来设计自定义Gym环境。一个Gym环境的项目结构如下:

.
├── LICENSE
├── README.md
├── guessing_number
│   ├── __init__.py
│   └── envs
│       ├── __init__.py
│       └── guessing_number_env.py
├── setup.py
└── test.py

其中setup.py 一般为:

from setuptools import find_packages, setup

setup(
    name="guessing_number",
    version="0.0.1",
    install_requires=["gym>=0.2.3", "numpy"],
    packages=find_packages(),
)

 

定义项目的名称,版本,以及所依赖的其它软件包。

而 guessing_number/__init__.py 一般如下:

from gym.envs.registration import register

register(id="GuessingNumber-v0", entry_point="guessing_number.envs:GuessingNumberEnv")

 

其中环境个名称的格式为GuessingNumber-v0,一般在环境名称后使用v0,v1来代表不同的版本。
entry_point给出环境人主类的人口点。

参考Open AI Gym教程(3): 环境 Env 我们可以定义GuessingNumberEnv如下:

import numpy as np

import gym
from gym import spaces
from gym.utils import seeding


class GuessingNumberEnv(gym.Env):
    """Number guessing game

    The object of the game is to guess within 1% of the randomly chosen number
    within 200 time steps

    After each step the agent is provided with one of four possible observations
    which indicate where the guess is in relation to the randomly chosen number

    0 - No guess yet submitted (only after reset)
    1 - Guess is lower than the target
    2 - Guess is equal to the target
    3 - Guess is higher than the target

    The rewards are:
    0 if the agent's guess is outside of 1% of the target
    1 if the agent's guess is inside 1% of the target

    The episode terminates after the agent guesses within 1% of the target or
    200 steps have been taken

    The agent will need to use a memory of previously submitted actions and observations
    in order to efficiently explore the available actions

    The purpose is to have agents optimise their exploration parameters (e.g. how far to
    explore from previous actions) based on previous experience. Because the goal changes
    each episode a state-value or action-value function isn't able to provide any additional
    benefit apart from being able to tell whether to increase or decrease the next guess.

    The perfect agent would likely learn the bounds of the action space (without referring
    to them explicitly) and then follow binary tree style exploration towards to goal number
    """
    def __init__(self):
        self.range = 1000  # Randomly selected number is within +/- this value
        self.bounds = 10000

        self.action_space = spaces.Box(low=np.array([-self.bounds]), high=np.array([self.bounds]),
                                       dtype=np.float32)
        self.observation_space = spaces.Discrete(4)

        self.number = 0
        self.guess_count = 0
        self.guess_max = 200
        self.observation = 0

        self.seed()
        self.reset()

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def step(self, action):
        assert self.action_space.contains(action)

        if action < self.number:
            self.observation = 1

        elif action == self.number:
            self.observation = 2

        elif action > self.number:
            self.observation = 3

        reward = 0
        done = False

        if (self.number - self.range * 0.01) < action < (self.number + self.range * 0.01):
            reward = 1
            done = True

        self.guess_count += 1
        if self.guess_count >= self.guess_max:
            done = True

        return self.observation, reward, done, {"number": self.number, "guesses": self.guess_count}

    def reset(self):
        self.number = self.np_random.uniform(-self.range, self.range)
        self.guess_count = 0
        self.observation = 0
        return self.observation

项目完成后可以使用

pip install -e .

来安装这个环境。

最有我们可以设计两个不同的Agent来玩这个猜数字游戏,一个是随机猜:

class RandomAgent(object):
    """The world's simplest agent!"""

    def __init__(self, action_space):
        self.action_space = action_space

    def act(self, observation, reward, done):
        return self.action_space.sample()

 

第二个是改进后的随机猜,根据step返回的observation (1-偏小,3-偏大)适当调整所猜的数字:

class BetterRandomAgent(object):
    """The world's 2nd simplest agent!"""

    def __init__(self, action_space):
        self.action_space = action_space

    def act(self, observation, last_action):
        new_action = last_action
        if observation == 1:
            new_action = last_action + abs(last_action / 2)

        elif observation == 3:
            new_action = last_action - abs(last_action / 2)
        if abs(last_action - new_action) < 1e-1:
            new_action = self.action_space.sample()
        return new_action

 

有了这两个Agent,我们就可以测试这个环境.分别运行100次,看看每个Agent猜中的次数,和平均猜的次数:

import gym

import guessing_number

class RandomAgent(object):
    """The world's simplest agent!"""

    def __init__(self, action_space):
        self.action_space = action_space

    def act(self, observation, reward, done):
        return self.action_space.sample()


class BetterRandomAgent(object):
    """The world's 2nd simplest agent!"""

    def __init__(self, action_space):
        self.action_space = action_space

    def act(self, observation, last_action):
        new_action = last_action
        if observation == 1:
            new_action = last_action + abs(last_action / 2)

        elif observation == 3:
            new_action = last_action - abs(last_action / 2)
        if abs(last_action - new_action) < 1e-1:
            new_action = self.action_space.sample()
        return new_action


if __name__ == '__main__':

    env = gym.make('GuessingNumber-v0')
    env.seed(0)
    agent = BetterRandomAgent(env.action_space)

    episode_count = 100
    reward = 0
    done = False

    total_reward = 0
    total_guesses = 0
    for i in range(episode_count):
        last_action = env.action_space.sample()
        ob = env.reset()
        while True:
            action = agent.act(ob, last_action)
            ob, reward, done, info = env.step(action)
            last_action = action

            # print(f'count={info["guesses"]},number={info["number"]},guess={action},ob={ob},reward={reward}')
            if done:
                total_reward += reward
                total_guesses += int(info["guesses"])
                break

    print(f'Total better random reward {total_reward}, average guess {round(total_guesses / 100, 1)}')

    env.seed(0)
    agent = RandomAgent(env.action_space)
    reward = 0
    done = False

    total_reward = 0
    total_guesses = 0

    for i in range(episode_count):
        ob = env.reset()
        while True:
            action = agent.act(ob, reward, done)
            ob, reward, done, info = env.step(action)

            if done:
                total_reward += reward
                total_guesses += int(info["guesses"])
                break

    # Close the env and write monitor result info to disk
    env.close()
    print(f'Total random reward {total_reward}, average guess {round(total_guesses / 100, 1)}')

 

几个可能的运行结果:

Total better random reward 100, average guess 35.9
Total random reward 15, average guess 180.6
-----
Total better random reward 100, average guess 39.2
Total random reward 20, average guess 175.9
--
Total better random reward 100, average guess 38.2
Total random reward 24, average guess 177.2
--
Total better random reward 100, average guess 38.6
Total random reward 18, average guess 180.4

 

可以看到改进后的Agent,几乎每次都猜中,平均30多次就猜中,而完全随机的Agent,猜中20次左右,每次需要180次左右。

如果我们使用二分法来设计Agent,由于事先不知道所猜数的范围,所以可以先设计算法,得到所猜数的范围,比如从100开始,每次翻倍,直到observation从偏小变成偏大。
我们的例子的取值范围为(-10000,10000),因此只要8次就可以得到范围为[-25600,25600]。然后使用二分法,最多也是8次就可以猜中数字。

本篇源码 https://github.com/guidebee/guessing_number

Open AI Gym教程(5): 各种环境Wrappers

gym.wrappers自带个多种封装后的Wrapper,对Action,Reward,Obersvation 做了处理(其中Observation Wrapper可以看成类似于图像处理时的各种滤镜)

 

其中的Monitor 可以用来观测环境的参数并保持视频到本地。

outdir = '/tmp/random-agent-results'
env = wrappers.Monitor(env, directory=outdir, force=True)

TimeLimit 可以控制最大的episode的步数。
FrameStack 可以多个Observation 组合成一个多帧的Observation,比如你的AI算法需要连续的几个帧的数据做为输入。
RecordEpisodeStatistics 可以用来记录episodes的一些统计信息
PixelObservationWrapper 可以为原先的Observation增加一些图像数据。

此外gym.vector 目录定义了矢量化环境,一般Gym环境是单个环境,如果我们需要同时使用多个环境来设计我们的AI算法,我们使用使用矢量化环境。这时envs, actions, rewards,和 observations都会是个一维数组。

Open AI Gym教程(4): Wrapper封装后的Env

core.py定义一个Wrapper类,它是环境Env的一个子类,便于模块化变换,比如对环境的reward进行换算,Observation的彩色图像进行灰度处理,提供计算效率。

class Wrapper(Env):
    r"""Wraps the environment to allow a modular transformation.

    This class is the base class for all wrappers. The subclass could override
    some methods to change the behavior of the original environment without touching the
    original code.

    .. note::

        Don't forget to call ``super().__init__(env)`` if the subclass overrides :meth:`__init__`.

    """
    def __init__(self, env):
        self.env = env
        self.action_space = self.env.action_space
        self.observation_space = self.env.observation_space
        self.reward_range = self.env.reward_range
        self.metadata = self.env.metadata

    def __getattr__(self, name):
        if name.startswith('_'):
            raise AttributeError("attempted to get missing private attribute '{}'".format(name))
        return getattr(self.env, name)

    @property
    def spec(self):
        return self.env.spec

    @classmethod
    def class_name(cls):
        return cls.__name__

    def step(self, action):
        return self.env.step(action)

    def reset(self, **kwargs):
        return self.env.reset(**kwargs)

    def render(self, mode='human', **kwargs):
        return self.env.render(mode, **kwargs)

    def close(self):
        return self.env.close()

    def seed(self, seed=None):
        return self.env.seed(seed)

    def compute_reward(self, achieved_goal, desired_goal, info):
        return self.env.compute_reward(achieved_goal, desired_goal, info)

    def __str__(self):
        return '<{}{}>'.format(type(self).__name__, self.env)

    def __repr__(self):
        return str(self)

    @property
    def unwrapped(self):
        return self.env.unwrapped

 

在Core.py定义了三个子类ObservationWrapper,RewardWrapper和ActionWrapper。

class ObservationWrapper(Wrapper):
    def reset(self, **kwargs):
        observation = self.env.reset(**kwargs)
        return self.observation(observation)

    def step(self, action):
        observation, reward, done, info = self.env.step(action)
        return self.observation(observation), reward, done, info

    def observation(self, observation):
        raise NotImplementedError


class RewardWrapper(Wrapper):
    def reset(self, **kwargs):
        return self.env.reset(**kwargs)

    def step(self, action):
        observation, reward, done, info = self.env.step(action)
        return observation, self.reward(reward), done, info

    def reward(self, reward):
        raise NotImplementedError


class ActionWrapper(Wrapper):
    def reset(self, **kwargs):
        return self.env.reset(**kwargs)

    def step(self, action):
        return self.env.step(self.action(action))

    def action(self, action):
        raise NotImplementedError

    def reverse_action(self, action):
        raise NotImplementedError

 

从代码中可以看到,我们可以使用action,reward,observation 方法对之前的action,reward,observation进行转换后再使用到step方法中。
这几个类都是抽象类,具体的实现定义在gym.wrappers。我们后面具体介绍。

Open AI Gym教程(2): 值域空间Spaces

Gym除去各种环境定义外核心的代码其实并不大,下面是Gym源码的目录结构:

其中envs 占据了较大篇幅。单其核心部分是spaces, 和core.py 。这些定义了Env几个主要的类定,Env(环境),Spaces(值域),由space定义了Action和Observation的空间。这些构成了Action-Environment Loop的基本部件。

Spaces 中定义几种不同类型的值域空间类型,Space为基类,定义Space类型的接口:

class Space(object):
    """Defines the observation and action spaces, so you can write generic
    code that applies to any Env. For example, you can choose a random
    action.
    """
    def __init__(self, shape=None, dtype=None):
        import numpy as np  # takes about 300-400ms to import, so we load lazily
        self.shape = None if shape is None else tuple(shape)
        self.dtype = None if dtype is None else np.dtype(dtype)
        self.np_random = None
        self.seed()

    def sample(self):
        """Randomly sample an element of this space. Can be 
        uniform or non-uniform sampling based on boundedness of space."""
        raise NotImplementedError

    def seed(self, seed=None):
        """Seed the PRNG of this space. """
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def contains(self, x):
        """
        Return boolean specifying if x is a valid
        member of this space
        """
        raise NotImplementedError

    def __contains__(self, x):
        return self.contains(x)

    def to_jsonable(self, sample_n):
        """Convert a batch of samples from this space to a JSONable data type."""
        # By default, assume identity is JSONable
        return sample_n

    def from_jsonable(self, sample_n):
        """Convert a JSONable data type to a batch of samples from this space."""
        # By default, assume identity is JSONable
        return sample_n

其中最主要的两种为 Discrete 和 Box

Discrete

Discrete为离散的整数 [0,1,2…n]

In [1]: from gym.spaces import *                                                                                               

In [2]: space = Discrete(5)                                                                                                    

In [3]: space.sample()                                                                                                         
Out[3]: 2

In [4]: space.sample()                                                                                                         
Out[4]: 1

In [5]: space.sample()                                                                                                         
Out[5]: 4

Box

Box 为连续的多维有理数空间R^n。 它表示n个闭区间的卡氏积。每个区间的取值可以是

[a, b], (-oo, b], [a, oo), or (-oo, oo)

Box 常见两种用法,一是每个维度的取值范围相同:

In [1]: from gym.spaces import *                                                                                               

In [2]: import numpy as np                                                                                                     

In [3]: space = Box(low=-1.0, high=2.0, shape=(3, 4), dtype=np.float32)                                                        

In [4]: space.sample()                                                                                                         
Out[4]: 
array([[-0.49652585,  0.9263435 ,  0.38507813,  0.783846  ],
       [ 0.85791075,  1.8828201 , -0.9763712 , -0.7506176 ],
       [ 0.5605676 ,  0.58183783, -0.43566808,  0.50398904]],
      dtype=float32)

In [5]: space.sample()                                                                                                         
Out[5]: 
array([[ 1.0351197 , -0.26707068,  1.2349498 , -0.03579823],
       [ 0.35440695,  1.6972734 , -0.94597757,  0.43317792],
       [ 1.7264552 ,  0.7422606 , -0.641941  ,  1.9083056 ]],
      dtype=float32)

另外一种是每个维度的取值范围不同:

In [7]: space = Box(low=np.array([-1.0, -2.0]), high=np.array([2.0, 4.0]), dtype=np.float32)                                   


In [8]: space.sample()                                                                                                         
Out[8]: array([-0.01725261, -1.8928218 ], dtype=float32)

In [9]: space.sample()                                                                                                         
Out[9]: array([-0.37487462, -0.4833201 ], dtype=float32)

In [10]: space.sample()                                                                                                        
Out[10]: array([-0.2957047,  1.2446854], dtype=float32)

MultiDiscrete

MultiDiscrete 是多维离散整数,这对于游戏类型的操作尤为有用,比如我们可以定义任天堂游戏操纵杆为一个多维离散整数集:

大多数环境我们用0代表不做任何操作NOOP.

1) Arrow Keys: Discrete 5  - NOOP[0], UP[1], RIGHT[2], DOWN[3], LEFT[4]  - params: min: 0, max: 4
2) Button A:   Discrete 2  - NOOP[0], Pressed[1] - params: min: 0, max: 1
3) Button B:   Discrete 2  - NOOP[0], Pressed[1] - params: min: 0, max: 1

可以使用MultiDiscrete([ 5, 2, 2 ])来表示:

In [11]: space = MultiDiscrete([ 5, 2, 2 ])                                                                                    

In [12]: space.sample()                                                                                                        
Out[12]: array([2, 1, 0])

In [13]: space.sample()                                                                                                        
Out[13]: array([2, 1, 1])

In [14]: space.sample()                                                                                                        
Out[14]: array([3, 0, 0])

Spaces其它的几种类型基本上也是有Discrete和Box组合而成,比如MultiBinary和MultiDiscrete非常类似,只是每个取值只能是二员0或者1.dict和tuple分别对应于字典和元组类型:

In [16]: space = Dict({"position": Discrete(2), "velocity": Discrete(3)})                                                      

In [17]: space.sample()                                                                                                        
Out[17]: OrderedDict([('position', 1), ('velocity', 2)])

In [18]: space.sample()                                                                                                        
Out[18]: OrderedDict([('position', 0), ('velocity', 0)])

In [19]: space = Tuple((Discrete(2), Discrete(3)))                                                                             

In [20]: space.sample()                                                                                                        
Out[20]: (0, 1)

In [21]: space.sample()                                                                                                        
Out[21]: (1, 0)

In [22]: space.sample()                                                                                                        
Out[22]: (1, 2)

In [23]: space = MultiBinary(5)                                                                                                

In [24]: space.sample()                                                                                                        
Out[24]: array([0, 1, 1, 1, 1], dtype=int8)

In [25]: space.sample()                                                                                                        
Out[25]: array([1, 0, 1, 0, 1], dtype=int8)

In [26]: space.sample()                                                                                                        
Out[26]: array([1, 0, 1, 0, 1], dtype=int8)