Мое глубокое изучение Q для гибкого кода птицы выглядит следующим образом:
import gym_ple
import gym
import os
import numpy as np
import cv2
from collections import deque
from datetime import datetime
import tensorflow as tf
from tensorflow.keras.layers import Dense, Conv2D, Flatten, InputLayer, MaxPooling2D, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard
gym_ple.main(200)
tf.compat.v1.enable_eager_execution()
env = gym.make('FlappyBird-v0') # ações: 1 nada; 0 pula
imagens = []
## Processamento da imagem
def image_processing(image):
if np.count_nonzero(image) != 0:
img = cv2.resize(image[:-100,50:,1],dsize=(30,51)) #pegamos somente a paleta verde de cores, recortamos a imagem e a redimensionamos
return img/255
else:
return np.zeros((51,30))
## Aglomerar frames
def frame_stack(stacked_frames, state, is_new, n_stack):
frame = image_processing(state)
if is_new:
stacked_frames = deque([np.zeros(frame.shape, dtype = int) for f in range(n_stack)], maxlen=n_stack)
for f in range(n_stack):
stacked_frames.append(frame)
stacked_state = np.stack(stacked_frames, axis=2)
else:
stacked_frames.append(frame)
stacked_state = np.stack(stacked_frames, axis=2)
return stacked_state, stacked_frames
class Memory:
def __init__(self,max_size):
self.buffer = deque(maxlen = max_size)
def add(self, experience):
self.buffer.append(experience)
def sample(self, batch_size):
buffer_size = len(self.buffer)
index = np.random.choice(range(buffer_size),batch_size,replace=False)
return [self.buffer[i] for i in index]
class DQN:
def __init__(self, lr, gamma, max_experiences, batch_size, memory,
input_dims=[51,30,4],actions=np.array([0,1])):
self.memory = memory
self.batch_size = batch_size
self.actions = actions
self.max_experiences = max_experiences
self.num_actions = len(actions)
self.input_dims = input_dims
self.gamma = gamma
self.lr = lr
self.model = tf.keras.Sequential([
InputLayer(input_shape=input_dims),
Conv2D(64, kernel_size=(7,7), activation='relu'),
MaxPooling2D(),
Dropout(0.2),
Conv2D(96, kernel_size=3, strides=2, activation='relu'),
MaxPooling2D(),
Dropout(0.2),
Conv2D(128, kernel_size=2, strides=1, activation='relu'),
Dropout(0.2),
Flatten(),
Dense(128, activation='linear'),
Dense(64, activation='linear'),
Dense(32, activation='linear'),
Dense(len(actions), activation='linear')])
self.model.compile(loss = 'mean_squared_error',
optimizer=Adam(learning_rate=self.lr),
metrics=['accuracy'])
def act(self, s, episode_actions, actions=np.array([0,1]), prob_cair=70,
exploration=0.5, decay=0.999):
if exploration < 0.01:
exploration = 0.01
if np.random.rand() < exploration:
probs = [actions[1]]*prob_cair + [actions[0]]*(100-prob_cair)
action = probs[np.random.randint(0,len(probs))]
else:
action = np.argmax(self.model.predict(s))
action = tf.math.reduce_sum(action * tf.one_hot(self.actions, len(self.actions)), axis=1)
action = np.array(action,dtype=int)[0]
episode_actions.append(action)
return action, episode_actions
def train(self, TargetNet, cb):
batch = self.memory.sample(self.batch_size)
states = np.array([each[0] for each in batch], ndmin=3)
next_states = np.array([each[3] for each in batch], ndmin=3)
Qs_list = self.model.predict(states)
Qs2_list = TargetNet.model.predict(next_states)
X = []
Y = []
for index, (s, action , reward, s2, done) in enumerate(batch):
if not done:
max_future_q = np.max(Qs2_list[index])
new_q = reward + self.gamma*max_future_q
else:
new_q = reward
Qs = Qs_list[index]
Qs[action] = new_q
X.append(s)
Y.append(Qs)
self.model.fit(x = np.array(X),
y = np.array(Y),
epochs = 1,
callbacks=cb,
verbose = 0)
def copy_weights(self, TrainNet):
TrainNet.model.set_weights(self.model.get_weights())
def main(training=True,
render=False,
lr=0.00025,
gamma=0.99,
batch_size=96,
max_size=100000,
n_stack=4,
actions=[0,1],
exploration = 1,
nepisodes=50,
npretrain=1000,
max_steps=50000,
load_path=''
):
states_size=[51,30,n_stack]
n_acoes = len(actions)
###SET UP DEEP Q NETWORK AND MEMORY
memory = Memory(100000)
TrainNet = DQN(lr, gamma, 100, batch_size, memory=memory, input_dims=states_size)
TargetNet = DQN(lr, gamma, 100, batch_size, memory=memory, input_dims=states_size)
### TREINO
if training:
#checkpoint_path+=datetime.now().strftime("%Y%m%d-%H%M%S")
### PRÉPOPULANDO A MEMÓRIA
s = env.reset()
s, stacked_frames = frame_stack(None, s, True, n_stack)
for i in range(npretrain):
action = actions[np.random.randint(0,n_acoes)]
s2, reward, done, _ = env.step(action)
if reward > 0:
reward*=10
else:
reward/5
s2, stacked_frames = frame_stack(stacked_frames, s2, False, n_stack)
if done:
s2=np.zeros(s2.shape)
memory.add((s,action,reward,s2,done))
s = env.reset()
s, stacked_frames = frame_stack(None, s, True, n_stack)
else:
memory.add((s, action, reward, s2, done))
s = s2
### INÍCIO DO TREINO
for episode in range(nepisodes):
if episode == 49 or episode == 50:
print(TrainNet.model.get_weights())
episode_actions = []
exploration *= np.power(0.01, 2/(nepisodes*1))
step=0
episode_rewards=[]
s=env.reset()
s,stacked_frames=frame_stack(None,s,True,n_stack)
while step<max_steps:
action, episode_actions = TrainNet.act(np.array(s,ndmin=4), episode_actions, exploration= exploration)
s2, reward, done, _ = env.step(action)
if reward>0:
reward += 5
if render:
env.render(mode='human')
episode_rewards.append(reward)
if done:
step=max_steps #sair do laço da linha 174
s2 = np.zeros(s2.shape)
s2, stacked_frames = frame_stack(None, s2, True, n_stack)
total_reward=sum(episode_rewards)
desc = np.count_nonzero(episode_actions)
sub = len(episode_actions) - desc
print(f'Episode::{episode},Rewards::{(total_reward):.2f},Probability of exploration::{(exploration):.4f}, Subiu {sub} vezes e desceu {desc} vezes')
memory.add((s,action,reward,s2,done))
TargetNet.copy_weights(TrainNet)
else:
s2, stacked_frames = frame_stack(stacked_frames, s2, False, n_stack)
memory.add((s,action,reward,s2,done))
s = s2
step+=1
### APRENDIZADO PROPRIAMENTE DITO
TrainNet.train(TargetNet, None)
if episode%10==0:
TrainNet.model.save('treino/modelo.h5')
if episode%100==0 and episode!=0:
os.mkdir(f'treino/{episode}/')
TrainNet.model.save(f'treino/{episode}/modelo.h5')
else:
TrainNet.model = tf.keras.models.load_model('treino/modelo.h5')
for episode in range(10):
s = env.reset()
s, stacked_frames = frame_stack(None, s, True, n_stack)
episode_rewards = []
episode_actions = []
igual = '='
print("****************************************************")
print("EPISODE ", episode)
done = False
while not done:
action = np.argmax(TrainNet.model.predict(np.array(s,ndmin=4)))
print(s, action)
episode_actions.append(action)
s2, reward, done, _ = env.step(action)
episode_rewards.append(reward)
env.render(mode='human')
descidas = np.count_nonzero(episode_actions)*100//len(episode_actions) ## Obtém cada ação do modelo e printa
subidas = (len(episode_actions) - descidas)*100//len(episode_actions) ## como um gráfico de barras
s2, stack_frames = frame_stack(stacked_frames, s2, False, n_stack)
s = s2
print(f'Score || {sum(episode_rewards)}')
print(f'Subidas || {igual*subidas}')
print(f'Descidas || {igual*descidas}')
if __name__ == "__main__":
main(False,False)
env.close()
однако, когда я устанавливаю main на main (False, False) после тренировки, агент только поднимается / опускается до каждое государство всегда умирает. Но в журналах тренировок я ясно вижу, что он часто проходит через одну или две трубы.
для этого кода я использовал среду спортзала open-ai для flappy bird. Для его запуска на машине должен быть установлен пакет "gym_ple". Эта обстановка в спортзале дает агенту как фрейм из игры. У агента есть два хода: 0 летать или 1 ничего не делать. Я все еще дорабатываю этот сценарий, поэтому он может выглядеть так грубо.
Награды за обучение
Награды за тестирование