はじめに

強化学習によって,OpenAI GymのAtariを攻略するシリーズの第二回です.過去のブログは以下の通りです.

今回は,PONG環境を,二重Deep Q Network (DDQN) で攻略します.今回のDDQNまで繋がる強化学習の理論については,こちらをご覧ください.

また,本ブログで仕様しているコード全文や実行環境はGitHubで公開しています.

アルゴリズム全体像

今回実装するDDQNのアルゴリズム全体像は,次の通りです.

  1. network,target network,replay bufferを初期化する.
  2. 各エピソードにおいて,以下を繰り返す.
    a. 初期状態を観測する.
    b. エピソードが終了するまで,以下を繰り返す.
    i. epsilon貪欲方策に従い,行動選択する.
    ii. 行動を環境に入力し,即時報酬と次状態を観測する.
    iii. i~iiの経験をreplay bufferに保存する.
    iv. replay bufferからミニバッチサイズ分だけ経験を乱択する.
    vi. 乱択された経験を元にnetworkを更新する.
    c. target networkをnetworkと同期する.
  3. argmax networkを最適方策として終了する.

優先度付き経験再生

優先度付き経験再生 (Prioritized Replay Buffer) を実装します.アルゴリズムの2.b.iv.で,優先度 (priority) により重み付けされた確率に従い,経験を乱択します.経験とは,「状態obsで行動actionを取ったら報酬rewardを得て,次状態next_obsが観測された」ことを示す,[obs, action, reward, next_obs, done]の五つ組です.これを一単位の経験として,networkの更新に使います.


class PrioritizedReplayBuffer:
    def __init__(self, buffer_size):
        self.buffer_size = buffer_size
        self.index = 0
        self.buffer = []
        self.priorities = np.zeros(buffer_size, dtype=np.float32)
        self.priorities[0] = 1.0

    def __len__(self):
        return len(self.buffer)

    def push(self, experience):
        if len(self.buffer) < self.buffer_size:
            self.buffer.append(experience)
        else:
            self.buffer[self.index] = experience
        self.priorities[self.index] = self.priorities.max()
        self.index = (self.index + 1) % self.buffer_size # 余り

    def sample(self, batch_size, alpha=0.6, beta=0.4):
        if len(self.buffer) < self.buffer_size:
            priorities = self.priorities[:self.index]
        else:
            priorities = self.priorities[:self.buffer_size]
        prob = (priorities ** alpha) / (priorities ** alpha).sum()
        indices = np.random.choice(len(prob), batch_size, p=prob)
        weights = (1 / (len(indices) * prob[indices])) ** beta
        obs, action, reward, next_obs, done = zip(*[self.buffer[i] for i in indices])
        obs = torch.stack(obs, 0).float()
        action = torch.as_tensor(action)
        reward = torch.as_tensor(reward).float()
        next_obs = torch.stack(next_obs, 0).float()
        done = torch.as_tensor(done, dtype=torch.uint8)
        weights = torch.as_tensor(weights).float()
        return (obs, action, reward, next_obs, done, indices,
                weights)

    def update_priorities(self, indices, priorities):
        self.priorities[indices] = priorities + 1e-04
  • init:経験を入れるバッファをbufferと言うリストで定義.prioritiesは,バッファ内の経験に対応する優先度.buffer_sizeはバッファに格納できる経験数の最大値 (int) で,メモリの制限を考慮し決定する.indexは,今回保存した経験のバッファ内のインデックス.
  • push:経験experienceを引数に取り,バッファに格納するメソッド.バッファに空きがある (=経験がbuffer_size分集まっていない) ならば末尾に経験を追加し,なければindexに格納する.新しい経験の優先度は最大値で初期化する.経験を追加したら,インデックスを更新する.
  • sample:バッファからbatch_sizeだけ経験をサンプルするメソッド.バッファに空きがあるならば,経験が集まっているインデックスまでを新たにprioritiesとする.prioritiesをもとに,各経験をサンプリングする確率probを計算する.np.random.choiceにより.確率probに従い経験を乱択する.乱択された経験はtensorとする.また,経験を一様でなく乱択しており,勾配にバイアスが生じるため,これを補正するためのweightsを計算する.
  • update_priorities:経験の優先度を更新するメソッド.sampleされた経験に対し損失を計算し,これを優先度 (損失が大きい=学習が進んでいない経験の採用率を高くする) とすることが多い.また経験が全く採用されないことを防ぐため微少量を足しておく.

self.buffer[i]で取り出されるのは[obs, action, reward, next_obs, done]なる,五つのオブジェクトからなるリストで,sampleメソッドの8行目ではbatch_size分だけ取り出された経験をobs, action, reward, next_obs, doneに分離しています.この時,例えば,obsは要素数batch_sizeの状態が格納されたタプルになっています.これを実現するためにzipを使います.これは,例えば次のように動作します.


l1 = [[1,2,3],[4,5,6],[7,8,9]]
indices = [0,2]
z1, z2, z3 = zip(*[l1[i] for i in indices])
print(z1)
print(z2)
print(z3)

###出力###
(1, 7)
(2, 8)
(3, 9)

衝突ネットワーク

使用するモデルは,衝突ネットワーク (Dueling DQN) です.これは,行動価値関数が価値関数とアドバンテージ関数という,状態のみに依存する要素と,それ以外の要素に分解できる性質を明示的に表現したものです.


class CNNQNetwork(nn.Module):
    def __init__(self,
                state_shape, # (k, c, h, w)
                n_action):
        super(CNNQNetwork, self).__init__()
        k, c, h, w = state_shape
        self.n_action = n_action
        self.conv_layers = nn.Sequential(
            nn.Conv3d(in_channels=c, out_channels=32,
                    kernel_size=4, stride=2, padding=1, bias=False),
            # (b, c, k, h, w) -> (b, 32, k/2, h/2, w/2)
            nn.ReLU(),
            nn.Conv3d(in_channels=32, out_channels=64,
                    kernel_size=4, stride=2, padding=1, bias=False),
            # (b, 32, k/2, h/2, w/2) -> (b, 64, k/4, h/4, w/4)
            nn.ReLU()
        )
        self.fc_in = k*h*w
        self.fc_state = nn.Sequential(
            nn.Linear(self.fc_in, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )
        self.fc_advantage = nn.Sequential(
            nn.Linear(self.fc_in, 128),
            nn.ReLU(),
            nn.Linear(128, n_action)
        )

    def forward(self, obs): # (b,k,c,h,w)
        obs = obs.permute((0,2,1,3,4)) # (b,c,k,h,w)
        feature = self.conv_layers(obs)
        feature = feature.contiguous().view(-1, self.fc_in)
        state_values = self.fc_state(feature)
        advantage = self.fc_advantage(feature).view(-1,
                                                self.n_action)
        action_values = state_values + advantage - \
                        torch.mean(advantage, dim=1, keepdim=True)
        return action_values # (b, n_action)

    def act(self, obs, epsilon=0):
        if random.random() < epsilon:
            action = random.randrange(self.n_action)
        else:
            with torch.no_grad():
                obs = obs.unsqueeze(0)
                action = torch.argmax(self.forward(obs)).item()
        return action
  • init:conv_layersはサイズ(k,c,h,w) (k=4,c=1,h=84,w=84) の状態から特徴量featureを抽出する畳み込み層,fc_stateは状態関数を表す全結合層,fc_advantageはアドバンテージ関数を表す全結合層.
  • forward:n_action個それぞれの行動に対応する行動価値を出力するメソッド.obsはバッチ数bも含めサイズ(b,k,c,h,w)だが,nn.Conv3dは(b,c,k,h,w)型である必要があるため,permuteで軸入れ替えを行う.また,実際はstate_values + advantageが行動価値関数だが,安定化のためアドバンテージ関数の平均を引く.
  • act:行動価値関数に従い,イプシロン貪欲に行動を選択するメソッド.obsは (k,c,h,w)型なので,unsqueezeによって軸を追加し,(1,k,c,h,w)とする.forwardメソッドで各行動の価値が得られたら,行動価値の最大値を与える行動をaction (int) として選択する.

訓練

「アルゴリズム全体像」で示した手順に従って,学習を行います.


def train_dqn(env, net, target_net,
            replay_buffer, gamma, batch_size, n_episodes, initial_buffer_size,
            epsilon_, beta_, target_update_interval,
            device, best_save_path, last_save_path, load_path=None):
    print(f"device: {device}")
    if load_path is not None:
        checkpoint = torch.load(load_path,
                                map_location=torch.device("cpu"))
        net.load_state_dict(checkpoint["net_state_dict"])
        target_net.load_state_dict(checkpoint["net_state_dict"])
        current_episode = checkpoint["current_episode"]
        n_episodes += current_episode
        rewards = checkpoint["rewards"]
        episode_losses = checkpoint["episode_losses"]
        best_reward = checkpoint["best_reward"]
    else:
        current_episode = 0
        rewards = []
        episode_losses = []
        best_reward = -1e+10
    #---
    # 1. initialize net, criterion, optimizer
    #---
    criterion = nn.SmoothL1Loss(reduction='none')
    optimizer = optim.AdamW(net.parameters(), lr=1e-06)
    net.to(device)
    target_net.to(device)
    #---
    # 2. episode
    #---
    for episode in range(current_episode, n_episodes):
        #---
        # a. observe initial state
        #---
        obs = env.reset()
        done = False
        episode_reward = 0
        step = 0
        if len(replay_buffer) < initial_buffer_size:
            epsilon = 1
        else:
            epsilon = max(0.01, epsilon_ ** (episode+1))
        beta = max(0.4, 1 - beta_ ** (episode+1))
        episode_loss = 0
        #---
        # b. loop until the episode ends
        #---
        while not done:
            obs = obs.float().to(device)
            #---
            # i. choose action following epsilon greedy policy
            #---
            action = net.act(obs, epsilon)
            #---
            # ii. act and observe next_obs, reward, done
            #---
            next_obs, reward, done, _ = env.step(action)
            obs = obs.cpu()
            next_obs = next_obs.cpu()
            #---
            # iii. push experience to the replay buffer
            #---
            replay_buffer.push([obs, action, reward, next_obs, done])
            obs = next_obs
            if len(replay_buffer) > initial_buffer_size:
                #---
                # iv. sample experience from the replay buffer
                #---
                s_obs, s_action, s_reward, s_next_obs, s_done, s_indices, s_weights = \
                replay_buffer.sample(batch_size, beta=beta)
                s_obs = s_obs.to(device)
                s_action = s_action.to(device)
                s_reward = s_reward.to(device)
                s_next_obs = s_next_obs.to(device)
                s_done = s_done.to(device)
                s_weights = s_weights.to(device)
                #---
                # vi. update network
                #---
                q_values = net(s_obs).gather(1, s_action.unsqueeze(1)).squeeze(1)
                with torch.no_grad():
                    t_action = torch.argmax(net(s_next_obs), dim=1)
                    q_values_next = \
                    target_net(s_next_obs).gather(1, t_action.unsqueeze(1)).squeeze(1)
                target_q_values = s_reward + gamma * q_values_next * (1-s_done)
                optimizer.zero_grad()
                loss = (s_weights * criterion(q_values, target_q_values)).sum()
                loss.backward()
                optimizer.step()
                episode_loss += loss.item()
                priorities = (target_q_values - q_values).abs().detach().cpu().numpy()
                replay_buffer.update_priorities(s_indices, priorities)
            step += 1
            episode_reward += reward
        #---
        # c. update target network
        #---
        if (episode + 1) % target_update_interval == 0:
            target_net.load_state_dict(net.state_dict())
        episode_loss /= step
        rewards.append(episode_reward)
        episode_losses.append(episode_loss)
        print(f"episode:{episode+1}/{n_episodes} reward:{int(episode_reward)}" +
            f" step:{step} loss:{episode_loss:.4f}" +
            f" epsilon:{epsilon:.4f} beta:{beta:.4f} last action:{action}")
        if best_reward <= episode_reward:
            best_reward = episode_reward
            save_path = best_save_path
        else:
            save_path = last_save_path
        torch.save(
            {
                "net_state_dict": net.state_dict(),
                "current_episode": episode,
                "rewards": rewards,
                "episode_losses": episode_losses,
                "best_reward": best_reward
            },
            str(save_path)
        )
        print(f"net saved to >> {str(save_path)}")
        print()
    return

上のコードですが,最初はiv.とvi.をupdate()関数として別に定義し,


net, replay_buffer, loss = update(net, target_net, replay_buffer, ...)

とすると,netが更新されず,学習が進みませんでした.どうやらPyTorchでupdate()を作ってその中でoptimizer.step()して上のように書いてもnetは更新されないようです.update()関数を書きたい場合は,返り値を書かずに


update(net, target_net, replay_buffer, ...)

としなければなりません.

学習結果

train_dqnを用いてDDQNを学習します.


python trainers/train_dqn.py \
--best_save_name ddqn_best.tar --last_save_name ddqn_last.tar 

300episode学習した時の損失関数と報酬の総和の推移は次のようになりました.

また,前回作ったmake_pong_videoを使って,学習ずみモデルによるPONGゲームのプレイ動画を作成すると,次のようになりました.


python utils/pong_env.py \
--video_name pong_ddqn.mp4 --model_name CNNQNetwork --load_name ddqn_best.tar

まとめ

深層強化学習モデルDDQNを使って,PONGゲームでCPUに勝つことができました.