はじめに

強化学習によって,OpenAI GymのAtariを攻略します.今回は,強化学習モデルを訓練,実行させるための環境を作り,PONGを描画します.OpenAI Gymの環境に関する詳細は,以下のブログで解説しています.

また,本ブログで仕様しているコード全文や実行環境はGitHubで公開しています.

Gymの基本処理

gymの基本的な動きは,


env = gym.make("環境名")
obs = env.reset()

によって環境を定義,初期状態を観測し,モデルの選択した行動を入力することで次状態,即時報酬,終了判定,デバッグなどの情報を受け取ります.


obs, reward, done, info = env.step(action)

obsは例えば(H,W,C)型の画像フレームです.

Wrapper

Wrapperは環境に対して行う前処理です.gym.Wrapperを継承したクラスを定義することで,env.reset()やenv.step()を実行する際の処理をカスタマイズします.OpenAI Baselinesリポジトリのatari_wrappers.pyにある主要なWrapperを確認します.細かい部分は削除・改変するなどしてシンプルな見た目にしています.

NoopResetEnv

初期状態を多様化させるため,学習時にランダムな回数だけ「何もしない (="Noop") 」という行動を取った後の状態を初期状態とするWrapperです.


class NoopResetEnv(gym.Wrapper):
    def __init__(self, env, noop_max=30):
        gym.Wrapper.__init__(self, env)
        self.noop_max = noop_max
        self.noop_action = 0
        assert env.unwrapped.get_action_meanings()[0] == "NOOP"

    def reset(self, **kwargs):
        self.env.reset(**kwargs)
        noops = self.env.unwrapped.np_random.randint(1, self.noop_max+1)
        obs = None
        for _ in range(noops):
            obs, _, done, _ = self.env.step(self.noop_action)
            if done:
                obs = self.env.reset(**kwargs)
        return obs

    def step(self, action):
        return self.env.step(action)
  • init:noop_maxは,Noopする回数の最大値です.env.unwrapped.get_action_meanings()は,環境envで取ることができる行動リストを取得します.例えばPongNoFrameskip-v4なら,['NOOP', 'FIRE', 'RIGHT', 'LEFT', 'RIGHTFIRE', 'LEFTFIRE']を返します.基本的にNoopは行動リストの0番目なので,self.noop_actionは0にしておき,行動リストの0番目が"NOOP"でない場合に例外を返します.
  • reset:noopsはNoopする回数 (int) です.env.unwrapped.np_random.randintは一様乱数を返します.noops回だけNoopした後の状態を返します.
  • step:NoopResetEnvは初期化に関するWrapperなので,stepメソッドはいじりません.

MaxAndSkipEnv

毎フレームごとに行動選択するのは現実的に早すぎるため,env.stepにおいてskip回同じ行動を自動で繰り返し,skip回ごとに行動選択することにします.


class MaxAndSkipEnv(gym.Wrapper):
    def __init__(self, env, skip=4):
        gym.Wrapper.__init__(self, env)
        self._obs_buffer = np.zeros((2,)+env.observation_space.shape,
                                    dtype=np.uint8)
        self._skip       = skip

    def reset(self, **kwargs):
        return self.env.reset(**kwargs)

    def step(self, action):
        total_reward = 0.0
        done = None
        for i in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            if i == self._skip - 2:
                self._obs_buffer[0] = obs
            if i == self._skip - 1:
                self._obs_buffer[1] = obs
            total_reward += reward
            if done:
                break
        max_frame = self._obs_buffer.max(axis=0)
        return max_frame, total_reward, done, info
  • init:_obs_bufferはskip回後に次状態として観測を返すために状態を保存しておくバッファです.
  • reset:MaxAnd SkipEnvは遷移に関するWrapperなので,resetメソッドはいじりません.
  • step:入力されたactionをskip回繰り返し,最後の2フレームと獲得した報酬和,終了判定と情報を求めます._obs_bufferに最後の2フレームを保存しておいて,値が大きい方を返すことにします.これはよく分かりませんが,Atariの描画仕様に対応するための処理なようです.

EpisodicLifeEnv

Atariには残機が複数あるゲームがあります.EpisodicLifeEnvは,残機が1減るごとにエピソードを終了し,次の機には次のエピソードを割り当てるWrapperです.


class EpisodicLifeEnv(gym.Wrapper):
    def __init__(self, env):
        gym.Wrapper.__init__(self, env)
        self.lives = 0
        self.was_real_done  = True

    def reset(self, **kwargs):
        if self.was_real_done:
            obs = self.env.reset(**kwargs)
        else:
            obs, _, _, _ = self.env.step(0)
        self.lives = self.env.unwrapped.ale.lives()
        return obs

    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        self.was_real_done = done
        lives = self.env.unwrapped.ale.lives()
        if lives < self.lives and lives > 0:
            done = True
        self.lives = lives
        return obs, reward, done, info
  • init:livesは残機,was_real_doneは現在ゲームが終了しているか否かを示します.
  • reset:was_real_doneならば,すなわちゲーム開始時点ならば環境を初期化し,前回1機失って別のエピソードを開始する時点ならばstepメソッドを呼び出します.env.unwrapped.ale.lives()は,現在の残機を返します.
  • step:env.step()が返すdoneをwas_real_doneとし,残機が減っている,かつ0ではない場合にdone=Trueとしてエピソードを終了します.

FireResetEnv

Atariの中には最初に"FIRE"という行動を取らないと始まらないゲームがあるので,環境リセット時に自動でFireするWrapperです.


class FireResetEnv(gym.Wrapper):
    def __init__(self, env):
        gym.Wrapper.__init__(self, env)
        self.fire_action = 1
        assert env.unwrapped.get_action_meanings()[1] == "FIRE"

    def reset(self, **kwargs):
        self.env.reset(**kwargs)
        obs, _, done, _ = self.env.step(self.fire_action)
        if done:
            self.env.reset(**kwargs)
        return obs

    def step(self, action):
        return self.env.step(action)
  • init:基本的にFireは行動リストの1番目なので,self.fire_actionは1にしておき,行動リストの1番目が"FIRE"でない場合に例外を返します.
  • reset:環境を初期化したのち,最初にFireします.
  • step:FireResetEnvは初期化に関するWrapperなので,stepメソッドはいじりません.

WarpFrame

WarpFrameは,gym.ObservationWrapperを継承したWrapperです.これは,env.reset()や,env.step()が実行された時に返される状態観測であるobsの再定義を行います.WarpFrameは,画像のリサイズとグレースケール化を行います.


class WarpFrame(gym.ObservationWrapper):
    def __init__(self, env, width=224, height=224,
                grayscale=True):
        super().__init__(env)
        self._width = width
        self._height = height
        self._grayscale = grayscale
        if self._grayscale:
            num_colors = 1
        else:
            num_colors = 3
        new_space = gym.spaces.Box(
            low=0,
            high=255,
            shape=(self._height, self._width, num_colors),
            dtype=np.uint8,
        )
        self.observation_space = new_space

    def observation(self, obs):
        if self._grayscale:
            obs = cv2.cvtColor(obs, cv2.COLOR_BGR2GRAY)
        obs = cv2.resize(
            obs, (self._width, self._height),
            interpolation=cv2.INTER_AREA
        )
        if self._grayscale:
            obs = np.expand_dims(obs, -1)
        return obs

envの行動空間はenv.action_spaceで,状態空間はenv.observation_spaceで定義されています.これらの状態行動空間は,以下のいずれかの型を取ります.

  • Boxクラス:範囲[low,high]の連続値の要素を持つ多次元配列です.例えば,210×160のRGB画像を状態空間とする場合は,Box(low=0, high=255, shape=(210,160,3)です.
  • Discreteクラス:範囲[0,n-1]の整数です.例えば,CartPoleのように「0:左移動,1:右移動」と2つの行動を表現する場合はDiscrete(2).
  • MultiBinaryクラス:0または1の要素を持つ配列.「上,下,左,右」の4つのボタンを同時に押すようなシチュエーションでは,行動空間は長さ4の配列であり,例えば「上,右」ボタンを押す,という行動は[1,0,0,1]と表現され,この場合は行動空間はMultiBinary(4)と定義されます.
  • MultiDiscreteクラス:範囲[0,n-1]の整数の要素を持つ配列です.複数の種類の行動を必要とする場合に用いる.例えば,カーソルの移動を「0:左移動,1:右移動」,書き込む文字を「0:A, 1:B, 2:C, 3:D, 4:E, 5:なし」とするようなシチュエーションでは,行動空間は長さ2の配列であり,「右に移動しながらDを書き込む」行動は[1,3]と表現されます.この場合は行動空間はMultiDiscrete([2,6])と定義されます.

WarpFrameは画像に対するWrapperですから,envの状態空間はBoxクラスを想定しています.

  • init:新しい状態空間new_spaceをBoxクラスで定義します.
  • observation:env.reset()や,env.step()が実行された時,元の状態として観測された画像obsをnew_spaceに合うようリサイズとグレースケール化します.

ClipRewardEnv

ClipRewardEnvは,gym.RewardWrapperを継承したWrapperです.env.step()が実行された時に返される報酬であるrewardの再定義を行います.ClipRewardEnvは,複数のゲームで報酬のスケールが異なるため,これを{-1,0,1}にクリップします.


class ClipRewardEnv(gym.RewardWrapper):
    def __init__(self, env):
        gym.RewardWrapper.__init__(self, env)

    def reward(self, reward):
        return np.sign(reward)
  • reward:np.signは,引数の符号に応じて{-1,0,1}を返します.これを利用して,報酬の値を{-1,0,1}にクリップします.

FrameStack

状態として,現在のフレームだけを使うのは,そのフレームに至るまでの過程を踏まえた行動選択ができないため適切ではありません.そこで,直近k枚のフレームをスタックしたものを状態として再定義します.また,reset,stepの際にNumPy配列をPyTorchのテンソルに変換します.


class FrameStack(gym.Wrapper):
    def __init__(self, env, k):
        gym.Wrapper.__init__(self, env)
        self.k = k
        self.frames = deque([], maxlen=k)
        h, w, c = env.observation_space.shape
        self.observation_space = gym.spaces.Box(low=0, high=255,
                                            shape=(k, c, h, w),
                                            dtype=env.observation_space.dtype)

    def reset(self):
        obs = self.env.reset()
        for _ in range(self.k):
            self.frames.append(obs)
        obs = self._get_obs()
        obs = torch.from_numpy(obs._force().transpose(0,3,1,2))
        return obs

    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        self.frames.append(obs)
        obs = self._get_obs()
        obs = torch.from_numpy(obs._force().transpose(0,3,1,2))
        return obs, reward, done, info

    def _get_obs(self):
        assert len(self.frames) == self.k
        return LazyFrames(list(self.frames))

class LazyFrames:
    def __init__(self, frames):
        self._frames = frames
        self._out = None

    def _force(self):
        if self._out is None:
            self._out = np.stack(self._frames)
            self._frames = None
        return self._out # (k, h, w, c)

    def __array__(self, dtype=None):
        out = self._force()
        if dtype is not None:
            out = out.astype(dtype)
        return out

    def __len__(self):
        return len(self._force())

    def __getitem__(self, i):
        return self._force()[i]

    def count(self):
        frames = self._force()
        return frames.shape[frames.ndim - 1]

    def frame(self, i):
        return self._force()[..., i]
  • init:状態空間を,フレームk枚分に再定義します.フレームを集めたリストをframesとします.framesはdequeで定義します.これは,appendメソッドを呼ばれた時,リストの要素数がkならば先頭の要素を自動で削除し,要素数をkに保ったまま末尾に要素を追加します.
  • reset:環境を初期化して観測したフレームを,k枚framesに格納し,LazyFrameオブジェクトとし,torch.tensorに変換します.LazyFrameは,FrameStackを効率よく行うために定義した,フレームk枚を表すクラスです.
  • step:環境遷移した際に観測した次状態をframesにappendし,LazyFrameオブジェクトとし,torch.tensorに変換します.

環境の描画

以上で定義したWrapperを組み込んだ環境を作成し,ランダムに行動した結果を動画にしてみます.


def make_pong_env(noop_max=30, skip=4, width=224, height=224):
    env = gym.make('PongNoFrameskip-v4')
    env = NoopResetEnv(env, noop_max=noop_max)
    env = MaxAndSkipEnv(env, skip=skip)
    env = EpisodicLifeEnv(env)
    env = FireResetEnv(env)
    env = WarpFrame(env, width=width, height=height, grayscale=False)
    env = ClipRewardEnv(env)
    env = FrameStack(env, skip)
    return env

def make_pong_video(env, video_path, width=224, height=224, reward_font_size=20, model=None):
    obs = env.reset()
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    video = cv2.VideoWriter(str(video_path), fourcc, 15.0, (width, height))
    total_reward = 0
    done = False
    while not done:
        frame = obs[-1].detach().cpu().numpy()
        frame = cv2.putText(frame, f"Reward:{int(total_reward)}", (0, height-reward_font_size),
                           cv2.FONT_HERSHEY_PLAIN, 1.5, (255,255,0), 1, cv2.LINE_AA)
        video.write(frame)
        if model is not None:
            obs = obs.float().to(device)
            action = model.act(obs)
        else:
            action = env.action_space.sample()
        obs, reward, done, _ = env.step(action)
        total_reward += reward
    video.release()
    return

これを実行すると,video_pathに指定した名前の動画が作成されます.今回は次のような動画ができました.


python utils/pong_env.py --video_name pong_random_action.mp4

Warning

現在の実行環境では,


UserWarning: WARN: Custom namespace ALE is being overridden by namespace ALE. If you are developing a plugin you shouldn't specify a namespace in register calls. The namespace is specified through the entry point package metadata.

DeprecationWarning: WARN: Function hash_seed(seed, max_bytes) is marked as deprecated and will be removed in the future. 

UserWarning: WARN: Env check failed with the following message: The environment cannot be reset with options, even though options or kwargs appear in the signature. This should never happen, please report this issue. The error was: reset() got an unexpected keyword argument 'options' You can set disable_env_checker=True to disable this check.

DeprecationWarning: WARN: Function rng.randint(low, [high, size, dtype]) is marked as deprecated and will be removed in the future. Please use rng.integers(low, [high, size, dtype]) instead.

という四つのWarningが出ますが,対処法はわかっていません.バージョンを変更すると別のWarningかErrorに直面しました.

まとめ

今回はモデルが動く環境を作ることができました.今後はDQNをはじめとした深層強化学習モデルを使ってAtariを攻略します.