查看原文
其他

如何用令人拍案称奇的强化学习玩小游戏?

井森堡 CSDN 2019-03-31


作者 | 井森堡

责编 | 郭   芮


光阴似箭日月如梭,转眼已经9012年啦,现在哪怕随便在大街上拉个路人,都听过AlphaGo, AlphaGo Zero。


2016年3月,谷歌公布的AlphaGo打败了人类顶尖围棋棋手李世乭,吃瓜群众们纷纷瑟瑟发抖,生怕人工智能要打败人类,占领世界。之后谷歌又推出了AlphaGo Zero,之前的AlphaGo跟它一比简直是个弟弟。


这两个软体的核心算法都是强化学习,这篇文章想要帮助大家粗浅地理一理,一窥强化学习的皮毛。本文会介绍强化学习的基本概念以及最基本入门的算法policy gradient,同时用这个算法去训练一个能玩小游戏的模型。我们会通过细致的讲解让大家了解强化学习,同时附上可读性极高的代码。


强化学习基本知识



强化学习是一种算法,让机器从刚开始时什么都不会,通过不断学习到最后达到某一目标。学习过程中,机器一直不断地试错与学习,环境一直会根据机器的做法给予惩罚或是激励,机器根据环境给它的反馈不断摸索出正确的实现目标的方法。


举个例子,就好比老师教小朋友算术,小朋友刚开始什么都不会,老师不断让小朋友做算术题,小朋友答的好,老师就给他糖,小朋友答得差,老师就严厉地斥责他。但在这个过程中,老师一直没有直接告诉小朋友怎样去答题,小朋友只能根据老师的反应来学习算术,久而久之,小朋友就记住了如何做才能得到糖同时规避斥责,最后达到学会算术的目的。


强化学习的几大因素:agent, environment, state, action, reward。在上面的例子中,小朋友就是agent, 老师就是environment, 小朋友做题的情况就是state, 小朋友采取的做题的行为就是action, 老师给糖还是斥责就是reward。


再回到我们今天要让电脑学会玩的小游戏上,我们今天要玩一个打乒乓球的游戏,游戏环境可以在下面这个网站找到:https://gym.openai.com/envs/#classic_control。下图是一张游戏画面。



游戏里的agent是右边的木板,agent可以采取的action是向上、向下以及不动。最终目标是想让右边的木板在与左边的木板进行乒乓球比赛时能够获胜,observation指的是游戏画面。evironment是游戏环境,每次agent要根据state做出action,然后envrionment会根据当前state和action给出reward和经过action后的state。


更加详细的信息可以查看上文中提到的游戏环境网站。



代码实现



对state进行前处理,state是一张RGB图片,前处理会将图片转化为灰度图片,能加快收敛速度。同时前处理会裁剪图片中无关部分,让训练过程更加稳定。


def prepro(o,image_size=[105,80]):
    y = 0.2126 * o[:, :, 0] + 0.7152 * o[:, :, 1] + 0.0722 * o[:, :, 2]   #gray scale 
    resized = skimage.transform.resize(y, image_size)[17:-8,:]            #delete score board
    return np.expand_dims(resized.astype(np.float32),axis=2)              #shape (height, wodth) -> (1, height, wodth)


搭建agent模型,这里的agent用两层全连接的神经网络来实现,当然也可以用卷积神经网络,但是训练结束时全连接的神经网络和卷积神经网络模型的性能差不多。出于搭建方便与加速收敛的目的,我们采用了全连接神经网络。


self.model = nn.Sequential(
                    nn.Linear(80*80*1, 256),
                    nn.ReLU(),
                    nn.Linear(256, 1),
                    nn.Sigmoid()


初始化环境:


def init_game_setting(self):
        self.last_frame = None
        pass


policy gradient的算法实作过程如下:



代码实现如下,看到这么一长串代码130多行的代码不必惊慌,我们在整理GitHub代码的时候,每一个关键变数都给了必要的注释与说明,花上一点时间仔细研读下,相信你们一定能读懂的。在参数更新的时候采取了variance reduction这一窍门,加速了收敛速度。


for episode in range(self.hyper_param['episode']):

                o = self.env.reset()
                self.last_frame = prepro(o) 
                # initial the first frame, otherwise self.last_frame is None
                action = self.env.action_space.sample()
                # random sample an action for initialization
                o, _, _, _ = self.env.step(action)
                episode_r = []

                while True:

                    # make_action return the action to do and its probability.
                    # done means the game is over or not, if it's over, then we update
                    # the parameters of model.                  
                    # When reward != 0, we calculate the advantage function using 
                    # discount factor and baseline. And put it to "cumulated_r" according
                    # to each action.

                    o = prepro(o)
                    residual_state = o - self.last_frame   
                    self.last_frame = o
                    action, p = self.make_action(residual_state, test = False)
                    o, reward, done, _ = self.env.step(action)


                    p_list.append(p)                #left for BCELoss
                    action_list.append(action-2)    #left for BCELoss

                    if reward != 0:
                        # When someone gets point, reset discount parameter,
                        # and record the experience
                        # cumulated_r means experience until someone getting point
                        # also calculate mean and var to do reward normalization

                        # this flag is used to compare with normal policy grad
                        if not self.hyper_param['base']:
                            T = len(cumulated_r)
                            for i in range(T):
                                cumulated_r[i] += (gamma**(T-i))*reward
                            cumulated_r.append(reward)

                            reward_mean += sum(cumulated_r)
                            n_reward += len(cumulated_r)
                            for i in cumulated_r:
                                reward_var += i**2

                            episode_r.append(cumulated_r[-1])
                            total_reward.extend(cumulated_r)
                            cumulated_r = []
                        else:
                            T = len(cumulated_r)
                            cumulated_r = [reward]*(T+1)
                            episode_r.append(cumulated_r[-1])
                            total_reward.extend(cumulated_r)
                            cumulated_r = []
                    else:
                        cumulated_r.append(reward)


                    if done and n_b != batch:
                        n_b +=1
                        self.init_game_setting()
                        if len(lastest_r) < 30:
                            lastest_r.append(np.sum(episode_r))
                        elif len(lastest_r) == 30:
                            lastest_r.pop(0)
                            lastest_r.append(np.sum(episode_r))
                            self.training_curve.append(np.mean(lastest_r))
                            #print("Episode : %d Mean : %4f" % (episode, np.mean(lastest_r)), end = ' ')
                        break
                    elif done and n_b == batch:
                        self.init_game_setting()

                        if len(lastest_r) < 30:
                            lastest_r.append(np.sum(episode_r))
                        elif len(lastest_r) == 30:
                            lastest_r.pop(0)
                            lastest_r.append(np.sum(episode_r))
                            self.training_curve.append(np.mean(lastest_r))
                            print("Episode : %d Mean : %4f" % (episode, np.mean(lastest_r)), end = ' ')


                        #update per episode
                        self.optimizer.zero_grad()
                        loss = 0

                        if not self.hyper_param['base']:
                            reward_mean = reward_mean/n_reward
                            reward_stddev = (reward_var/n_reward - (reward_mean)**2)**0.5
                            total_reward = torch.Tensor(total_reward).to(device)
                            total_reward = (total_reward - reward_mean) / reward_stddev
                        else:
                            total_reward = torch.Tensor(total_reward).to(device)

                        action_list = torch.Tensor(action_list).to(device)
                        p_list = torch.stack(p_list).view(-1)

                        loss = F.binary_cross_entropy(input = p_list,
                                                      target = action_list,
                                                      weight = total_reward,
                                                      reduction = 'sum')
                        loss /= n_b
                        loss.backward()
                        self.optimizer.step()


                        #reset all record
                        n_b = 1
                        n_reward = 0

                        reward_mean = 0
                        reward_var = 0
                        reward_stddev = 0

                        total_reward = []
                        cumulated_r = []
                        p_list = []
                        action_list = []

                        print("Loss : %4f" % (loss.item()), end = '\r')
                        break




                if episode % 500 == 0 and episode != 0:
                    #Testing per 100 episode
                    print("Testing")
                    test_env = Environment('Pong-v0', self.argument, test=True)
                    result = draw_curve(agent = self, env=test_env)

                    if result >= best_result:
                        best_result = result
                    torch.save(self.model, "./log/"+self.hyper_param['model_name']+".pkl"+str(episode))
                    torch.save(self.optimizer.state_dict(),
                               "./log/"+self.hyper_param['model_name']+".optim")
                    np.save("training_curve_"+str(episode)+".npy", np.array(self.training_curve))                   



实验结果



为了让大家直观地看到强化学习到底有多厉害,接下来我们会通过一个动图展示模型的训练结果,视频中右边的木板是我们训练出来的agent。



下图是训练的曲线,reward越高代表模型训练得越好。可以看到没有加variance reduction这一窍门时,模型很难训练起来,而加了variance reduction这一窍门后模型很快就能达到很好的效能了。




参考资料



https://www.bilibili.com/video/av10590361/


本文的代码是上述教学视频的一次作业,对机器学习感兴趣的小伙伴可以看看。讲者是台湾大学网红教授李宏毅大大,个人感觉十分浅显易懂。


最后,万水千山总是情,GitHub给颗星星行不行:https://github.com/sky1456723/Secret-MLDS-2018/blob/master/hw4/agent_dir/agent_pg.py。


作者:井森堡,运营有个人公众号井森堡,欢迎志同道合的小伙伴关注,本公众号会不定期更新机器学习技术文并附上质量佳且可读性高的代码。

声明:本文为作者投稿,未经允许请勿转载。



 热 文 推 荐 

☞ 华为三星折叠手机可看不可摸;小米架构再调整;杨幂 AI 换脸视频制作者回应 | 极客头条

☞ 颠覆游戏开发,引领行业革命,从 Unite 2019 看 Unity 技术亮点

☞ 十年程序员的告诫:千万不要重写代码!

☞ 为什么程序员下班后只关显示器从不关电脑?

☞ 云评测 | OpenStack智能运维解决方案 @文末有福利!

☞ 指纹锁就安全了?防火防盗还得防AI

☞ 当恐怖组织哈马斯玩起了加密货币, 世界又少了多少伟大的灵魂……

☞ 月入5万,程序员夫人们过上"贵妇"生活了吗?

print_r('点个好看吧!');
var_dump('点个好看吧!');
NSLog(@"点个好看吧!");
System.out.println("点个好看吧!");
console.log("点个好看吧!");
print("点个好看吧!");
printf("点个好看吧!\n");
cout << "点个好看吧!" << endl;
Console.WriteLine("点个好看吧!");
fmt.Println("点个好看吧!");
Response.Write("点个好看吧!");
alert("点个好看吧!")
echo "点个好看吧!"

点击阅读原文,输入关键词,即可搜索您想要的 CSDN 文章。

喜欢就点击“好看”吧!

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存