Fisher's Blog

Sein heißt Werden
Leben heißt Lernen

0%

DQN 代码实现

有了 Deep Q-Network 的理论基础,根据文末的算法,就可以实现DQN。直接看所有代码

整个程序框架为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 经验记忆库
class Memory(object):
def __init__(self, batch_size, max_size):

def store_transition(self, s, a, r, s_, done):

def get_mini_batches(self):

class DQN(object):
def __init__(self, sess, s_dim, a_dim, batch_size, gamma, lr, epsilon, replace_target_iter):
# 利用 epsilon-greedy 来选取下一个行为
def choose_action(self, s):
# 生成模型
def _generate_model(self):
# 生成模型过程中建立两个神经网络
def _build_net(self, s, scope, trainable):
# 存储一个episode并且利用经验回放进行学习
def store_transition_and_learn(self, s, a, r, s_, done):

def _learn(self):

主函数执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
rl = DQN(
sess=sess,
s_dim=env.observation_space.shape[0], # 状态维度
a_dim=env.action_space.n, # 行为one hot形式维度
batch_size=128,
gamma=0.99,
lr=0.01, # learning rate
epsilon=0.1, # epsilon-greedy
replace_target_iter=300 # 经历C步后更新target参数
)
tf.global_variables_initializer().run()

for i_episode in range(1000):
s = env.reset()
# 一次episode的奖励总和
r_sum = 0
while True:
# 选行为
a = rl.choose_action(s)
# 根据行为获得下个状态的信息
s_, r, done, _ = env.step(a)

rl.store_transition_and_learn(s, a, r, s_, done)

r_sum += 1
if done:
print(i_episode, r_sum)
break

s = s_

Memory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Memory(object):
def __init__(self, batch_size, max_size):
self.batch_size = batch_size # mini batch大小
self.max_size = max_size
self._transition_store = collections.deque()

def store_transition(self, s, a, r, s_, done):
if len(self._transition_store) == self.max_size:
self._transition_store.popleft()

self._transition_store.append((s, a, r, s_, done))

def get_mini_batches(self):
n_sample = self.batch_size if len(self._transition_store) >= self.batch_size else len(self._transition_store)
t = random.sample(self._transition_store, k=n_sample)
t = list(zip(*t))

return tuple(np.array(e) for e in t)

DQN

\(Q\)\(\hat{Q}\)

1
2
self.q_eval_z = self._build_net(self.s, 'eval_net', True)
self.q_target_z = self._build_net(self.s_, 'target_net', False)

\(y_i =\begin{cases} r_j & \quad \text{if episode terminates at step } j+1 \\ r_j+\gamma\max_{a'}Q(\phi_{j+1},a';\theta^-) & \quad \text{otherwise}\end{cases}\)

1
q_target = self.r + self.gamma * tf.reduce_max(self.q_target_z, axis=1, keepdims=True) * (1 - self.done)

\(L(\theta) = (y_i-Q(\phi_j,a_j; \theta)^2\)

1
2
q_eval = tf.reduce_sum(self.a * self.q_eval_z, axis=1, keepdims=True)
self.loss = tf.reduce_mean(tf.squared_difference(q_target, q_eval))

经验回放

1
2
3
4
5
6
7
8
9
s, a, r, s_, done = self.memory.get_mini_batches()

loss, _ = self.sess.run([self.loss, self.optimizer], feed_dict={
self.s: s,
self.a: a,
self.r: r,
self.s_: s_,
self.done: done
})

更新过后的网络 \(Q\) 的参数复制给目标网络 \(\hat Q\)

1
2
3
param_target = tf.global_variables(scope='target_net')
param_eval = tf.global_variables(scope='eval_net')
self.target_replace_ops = [tf.assign(t, e) for t, e in zip(param_target, param_eval)]

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
initializer_helper = {
'kernel_initializer': tf.random_normal_initializer(0., 0.3),
'bias_initializer': tf.constant_initializer(0.1)
}


class DQN(object):
def __init__(self, sess, s_dim, a_dim, batch_size, gamma, lr, epsilon, replace_target_iter):
self.sess = sess
self.s_dim = s_dim # 状态维度
self.a_dim = a_dim # one hot行为维度
self.gamma = gamma
self.lr = lr # learning rate
self.epsilon = epsilon # epsilon-greedy
self.replace_target_iter = replace_target_iter # 经历C步后更新target参数

self.memory = Memory(batch_size, 10000)
self._learn_step_counter = 0
self._generate_model()

def choose_action(self, s):
if np.random.rand() < self.epsilon:
return np.random.randint(self.a_dim)
else:
q_eval_z = self.sess.run(self.q_eval_z, feed_dict={
self.s: s[np.newaxis, :]
})
return q_eval_z.squeeze().argmax()

def _generate_model(self):
self.s = tf.placeholder(tf.float32, shape=(None, self.s_dim), name='s')
self.a = tf.placeholder(tf.float32, shape=(None, self.a_dim), name='a')
self.r = tf.placeholder(tf.float32, shape=(None, 1), name='r')
self.s_ = tf.placeholder(tf.float32, shape=(None, self.s_dim), name='s_')
self.done = tf.placeholder(tf.float32, shape=(None, 1), name='done')

self.q_eval_z = self._build_net(self.s, 'eval_net', True)
self.q_target_z = self._build_net(self.s_, 'target_net', False)

# y = r + gamma * max(q^)
q_target = self.r + self.gamma * tf.reduce_max(self.q_target_z, axis=1, keepdims=True) * (1 - self.done)

q_eval = tf.reduce_sum(self.a * self.q_eval_z, axis=1, keepdims=True)
# a_mask = tf.cast(self.a, tf.bool)
# q_eval = tf.expand_dims(tf.boolean_mask(self.q_eval_z, a_mask), 1)

self.loss = tf.reduce_mean(tf.squared_difference(q_target, q_eval))
self.optimizer = tf.train.AdamOptimizer(self.lr).minimize(self.loss)

param_target = tf.global_variables(scope='target_net')
param_eval = tf.global_variables(scope='eval_net')

# 将eval网络参数复制给target网络
self.target_replace_ops = [tf.assign(t, e) for t, e in zip(param_target, param_eval)]

def _build_net(self, s, scope, trainable):
with tf.variable_scope(scope):
l = tf.layers.dense(s, 20, activation=tf.nn.relu, trainable=trainable, ** initializer_helper)
q_z = tf.layers.dense(l, self.a_dim, trainable=trainable, **initializer_helper)

return q_z

def store_transition_and_learn(self, s, a, r, s_, done):
if self._learn_step_counter % self.replace_target_iter == 0:
self.sess.run(self.target_replace_ops)

# 将行为转换为one hot形式
one_hot_action = np.zeros(self.a_dim)
one_hot_action[a] = 1

self.memory.store_transition(s, one_hot_action, [r], s_, [done])
self._learn()
self._learn_step_counter += 1

def _learn(self):
s, a, r, s_, done = self.memory.get_mini_batches()

loss, _ = self.sess.run([self.loss, self.optimizer], feed_dict={
self.s: s,
self.a: a,
self.r: r,
self.s_: s_,
self.done: done
})