0%

DQN代码实现

DQN强化学习方法系列主要是由两篇文章提出,分别是 Playing Atari with Deep Reinforcement LearningHuman-level control through deep reinforcement learning。这两篇文章讲述的具体方法在之前的博客 DQN相关论文笔记中有过介绍,在这篇文章中分析DQN强化学习方法的代码实现细节。

1. 算法为代码

DQN_algorithm

  1. 构建模型。确定在线 Q 网络(也称为估计 Q 网络)的层数和隐藏层神经元个数,并随机初始化权重。目标 Q 网络的架构与在线 Q 网络一致,权重初始化为在线 Q 网络的权重。

  2. 初始化经验回放池 $\mathcal{D}$。

  3. 进行训练。外循环为 M 个 episode 的训练。每个 episode 是智能体从行动开始到任务结束(或)任务超时的过程。

  4. 内循环为 T 个时间步长:

    • 利用 $\epsilon$ -贪婪算法选择随机动作 $a_t$。

    • 在仿真器中执行动作 $a_t$ 获得奖励 $r_t$ 和新的状态 $s_{t+1}$。

    • 将一个 transition $(s_t, a_t, r_t, s_{t+1})$ 存入经验回放池。

    • 对经验回报池进行采样,随机抽取 N 个 transition 构成一个 Mini-batch。

    • 通过梯度下降最小化均方损失函数 $(y_j - Q(s_j, a_j; \theta))^2$ 更新参数 $\theta$。

    • 每隔 $C$ 个时间步长将在线 Q 网络的权重 $\theta$ 赋值给目标 Q 网络的权重 $\theta^-$

2. 代码实现(基于pytorch)

下面用代码实现简单的 DQN 示例, 环境为 gym 中的小游戏 Cartpole-v0(让小车上的木棍保持竖立)。

2.1 构建网络

构建 Q 值网络,采用两层全连接网络。输入是 4 维状态向量,输出是 2 个动作对应的 Q 值。策略采用 $\epsilon-$ 贪婪策略,有 $\epsilon$ 的概率选择随机动作,$1-\epsilon$的概率选择贪婪动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Qnet(nn.Module):
def __init__(self):
super(Qnet, self).__init__()
self.fc1 = nn.Linear(4, 256)
self.fc2 = nn.Linear(256, 2)

def forward(self, x):
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x

def sample_action(self, obs, epsilon):
out = self.forward(obs)
coin = random.random()
if coin < epsilon:
return random.randint(0, 1)
else:
return out.argmax().item()

2.2 构建经验回放池

经验回放池实际上是一个队列,当经验回放池满时,会抛弃旧的经验值,加入新采样的经验值。采样时,从经验回放池中随机抽取batch_size个经验值作为一个transition返回给训练机进行学习,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ReplayBuffer():
def __init__(self):
self.buffer = collections.deque(maxlen=buffer_limit)

def put(self, transition):
self.buffer.append(transition)

def sample(self, n):
mini_batch = random.sample(self.buffer, n)
s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []

for transition in mini_batch:
s, a, r, s_prime, done_mask = transition
s_lst.append(s)
a_lst.append([a])
r_lst.append([r])
s_prime_lst.append(s_prime)
done_mask_lst.append([done_mask])

return torch.tensor(s_lst, dtype=torch.float), \
torch.tensor(a_lst), \
torch.tensor(r_lst, dtype=torch.float), \
torch.tensor(s_prime_lst, dtype=torch.float), \
torch.tensor(done_mask_lst)

def size(self):
return len(self.buffer)

2.3 构建训练模型

估计 Q 网络的参数采用随机初始化,目标 Q 网络的参数复制估计 Q 网络的参数。$\epsilon$会随着 episode 的增长而降低,也就是后面的策略越来越靠近贪婪策略。外层循环是 $N$ 个 episode,内层循环是 $T$ 个时间步长。每个时间步长进行动作采样,仿真执行,存储经验这三个步骤。

注意伪代码中每个时间步长都更新一次,这里是每个 episode 更新一次,视具体情况而定,如果每个 episode 包含的时间步长很多,则选择每个时间步长更新一次,也可以是每隔几个时间步长更新一次。

另外还需要注意的是前期不会直接进行更新,而是等到经验回放池中的样本数量超过某个阈值才会开始训练。

最后就是每隔 $C$ 个 episode 将估计 Q 网络的参数复制给在线 Q 网络。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def train():
env = gym.make('CartPole-v1')
q = Qnet()
q_target = Qnet()
q_target.load_state_dict(q.state_dict())
memory = ReplayBuffer()
optimizer = optim.Adam(q.parameters(), lr=learning_rate)
print_interval = 20
score = 0.0

for n_epi in range(N_episode):
# epsilon = max(min_epsilon, max_epsilon - 0.01 * (n_epi / 200))
epsilon = max(min_epsilon, max_epsilon - (max_epsilon - min_epsilon) * (n_epi / N_episode))
s = env.reset()

for t in range(T):
a = q.sample_action(torch.from_numpy(s).float(), epsilon)
s_prime, r, done, info = env.step(a)
done_mask = 0.0 if done else 1.0
memory.put((s,a,r,s_prime,done_mask))
s = s_prime

score += r
if done:
break

if memory.size() > 2000:
update(q, q_target, memory, optimizer)

if n_epi%copy_time == 0:
q_target.load_state_dict(q.state_dict())

if n_epi%print_interval==0 and n_epi!=0:
print("# of episode :{}, avg score : {:.1f}, buffer size : {}, epsilon : {:.1f}%".format(
n_epi, score/print_interval, memory.size(), epsilon*100))
score = 0.0

env.close()
torch.save(
{
'q_state_dict': q.state_dict(),
'q_target_state_dict': q_target.state_dict(),
'optim_state_dict': optimizer.state_dict()
}, SAVE_PATH)

2.4 梯度下降更新参数

采用adam优化器,对估计网络的参数 $\theta^Q$ 和 $\theta^\mu$ 进行mini-batch梯度下降优化。
通过最小化均方损失函数,求 $\theta^Q$ 的梯度,利用adam优化器更新参数 $\theta^Q$。均方损失函数如下:

其中 $\theta_i^-$ 是经过 $C$ 个步长从估计 Q 网络中复制而来,在上面的构建训练模型中有体现。

更新过程是从经验回放池中抽取 batch_size 个训练样本,利用梯度下降法进行反向传播更新参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
def update(q, q_target, memory, optimizer):
for i in range(update_epoch):
s, a, r, s_prime, done_mask = memory.sample(batch_size)

q_out = q(s)
q_a = q_out.gather(1,a)
max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1)
target = r + gamma * max_q_prime * done_mask
loss = F.smooth_l1_loss(q_a, target)

optimizer.zero_grad()
loss.backward()
optimizer.step()

2.6 运行DDPG

训练好模型之后,将网络的参数保存,然后在评测的时候重新载入模型参数。采用的策略也算是 $\epsilon-$ 贪婪策略,不过 $\epsilon$ 值采用最小值。代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def evaluate():
q = Qnet()
checkpoint = torch.load(SAVE_PATH)
q.load_state_dict(checkpoint['q_state_dict'])
q.eval()
env = gym.make('CartPole-v1')
while(1):
s = env.reset()
done = False
while not done:
a = q.sample_action(torch.from_numpy(s).float(), min_epsilon)
s_prime, r, done, info = env.step(a)
env.render()
s = s_prime

env.close()

2.7 超参数的设置

超参数的设置很重要,但没有特定的方法指导,所以全凭经验。简单总结一些经验:

  1. 确定学习步长一般先确定它的数量级,比如学习步长的数量级为 $10^{-4}$,然后再对系数进行微调。在不影响后期收敛速度的情况下,学习步长选择低一点的。

  2. batch_size 的大小对学习的稳定性有影响。如果 batch_size 太小,模型可能一开始取得较优值,后面训练的性能却开始下降,这是因为模型过拟合。所以 batch_size 不能太小。但是也不能太大,否则收敛速度过慢,训练过程太久。

  3. copy_time 太大模型会陷入过拟合,太小则模型收敛速度过慢。

1
2
3
4
5
6
7
8
9
10
11
12
learning_rate = 0.0002 #0.0001
N_episode = 10000
max_epsilon = 0.08
min_epsilon = 0.001 #0.01
T = 600
train_threshold = 2000
update_epoch = 10
copy_time = 50
SAVE_PATH = 'model/dqn.pt'
batch_size = 64 #32
buffer_limit = 50000
gamma = 0.99