Меня зовут Энди, и я новичок в stackoverflow, и это мой первый вопрос.
Я начал учиться python 40i sh дней go благодаря covid19 и перешел в машинное обучение / qlearning около 3 недель a go и застрял там с тех пор.
Цель: заставить компьютер играть в Rad Racer 2 (гоночная игра для NES), используя обучение с подкреплением.
Планы по выполнению этой работы:
После различных туториалов / сайтов я решил использовать двойную сеть для обучения / обучения. 2x 256 сверточная сеть с использованием keras, так как я просмотрел несколько обучающих видеороликов по keras basi c
3 действия (удерживайте ускорение (J), ускорение влево (JA), ускорение вправо (JD) Я использую Коды ключей прямого ввода, которые я нашел в Интернете для отправки входных данных в игру, поскольку отправка обычных ключей не работает.
Я знаю, что ppl использует ретро-тренажерный зал для таких игр, но я хотел увидеть внутреннюю работу вознаграждения / наблюдения и т. поэтому я использовал yolov5 для обнаружения линий / объектов. На основе результата yolov5 я вычисляю награду за шаг.
Мой ввод представляет собой серию изображений в градациях серого (4) для представления движения с использованием deque, затем сложенных с numpy. Как только я наберу достаточно опыта / воспроизведу память (1500), я начал тренировку в конце каждого эпизода, а не на каждом шаге. Я обнаружил, что после каждого шага тренировка задерживается.
Проблема:
Моя самая большая проблема в настоящее время заключается в том, что модель, похоже, не обучается должным образом. Кажется, я немного в порядке эпизод 20-30, потом становится все хуже и хуже. Дошло до того, что он выполняет только одно действие в течение нескольких часов.
Я пробовал поиграть со скоростью обучения (0,1 - 0,00001), различными входами (1 слой bgr, слой оттенков серого, 4 слоя ... c), разная скорость затухания эпсилона. Я прокомментировал большую часть наград, на данный момент только basi c reward.
большинство кодов, кроме yolo, пришлось удалить несколько строк из-за # ограничения символов
# parameters
training = True
learning_rate = 0.0001
DISCOUNT = 0.99
REPLAY_MEMORY_SIZE = 50_000 # How many last steps to keep for model training
MIN_REPLAY_MEMORY_SIZE = 1500 # Minimum number of steps in a memory to start training
MINIBATCH_SIZE = 1000 # How many steps (samples) to use for training
batch_size = 32
UPDATE_TARGET_EVERY = 0 # Terminal states (end of episodes)
MODEL_NAME = 'RC'
MIN_REWARD = 0 # For model save
save_every = 5 # save every x episodes
EPISODES = 2_000
# Exploration settings
if training is True:
epsilon = 1 # not a constant, going to be decayed
else:
epsilon = 0
MIN_EPSILON = 0.01
START_EPISODE_DECAY = 0
END_EPISODE_DECAY = 20
if epsilon > MIN_EPSILON:
EPS_DECAY = -(epsilon/((END_EPISODE_DECAY-START_EPISODE_DECAY)/epsilon))
else:
EPS_DECAY = 0
# Agent class
class DQNAgent:
def __init__(self):
# Main model
self.model = self.create_model()
# self.model = self.load_model()
# Target network
self.target_model = self.create_model()
self.target_model.set_weights(self.model.get_weights())
# An array with last n steps for training
self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)
# Used to count when to update target network with main network's weights
self.target_update_counter = 0
def create_model(self):
dropout = 0.1
model = Sequential()
model.add(Conv2D(256, (2, 2), input_shape=(int(height/resize_ratio), int(width/resize_ratio), img_channels)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(dropout))
model.add(Conv2D(256, (2, 2)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(dropout))
model.add(Flatten())
model.add(Dense(64))
model.add(Dense(env.ACTION_SPACE_SIZE, activation='linear')) # ACTION_SPACE_SIZE = how many choices (9)
model.compile(loss="mse", optimizer=Adam(lr=learning_rate), metrics=['accuracy'])
return model
# Trains main network at end of episode
def train(self, terminal_state):
# Start training only if certain number of samples is already saved
if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
return
minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)
current_states = np.array([transition[0] for transition in minibatch])
# from (MINIBATCH_SIZE, 1, h, w, 4) > (MINIBATCH_SIZE, h, w, 4)
current_states = current_states.reshape(current_states.shape[0], current_states.shape[2],
current_states.shape[3], current_states.shape[4])
current_qs_list = self.model.predict(current_states)
new_current_states = np.array([transition[3] for transition in minibatch])
new_current_states = new_current_states.reshape(new_current_states.shape[0], new_current_states.shape[2],
new_current_states.shape[3], new_current_states.shape[4])
# new_current_states = np.expand_dims(new_current_states, axis=-1)
future_qs_list = self.target_model.predict(new_current_states)
X = []
y = []
for index, (current_state_img, current_action, current_reward, new_current_img, current_done) in enumerate(minibatch):
if not current_done:
max_future_q = np.max(future_qs_list[index])
new_q = current_reward + (DISCOUNT * max_future_q)
else:
new_q = 0.0
current_qs = current_qs_list[index]
current_qs[current_action] = new_q
X.append(np.squeeze(current_state_img, axis=0))
y.append(current_qs)
X = np.array(X)
# X = np.expand_dims(X, axis=-1)
# X = X.reshape(X.shape[0], X.shape[2], X.shape[3], X.shape[4])
y = np.array(y)
self.model.fit(X, y, batch_size=batch_size, verbose=0, shuffle=False)
# self.model.train_on_batch(X, y)
if terminal_state:
self.target_update_counter += 1
# If counter reaches set value, update target network with weights of main network
if self.target_update_counter > UPDATE_TARGET_EVERY:
self.target_model.set_weights(self.model.get_weights())
self.target_update_counter = 0
print('target_model trained!')
# Queries main network for Q values given current observation space (environment state)
def get_qs(self, state):
result = agent.model.predict(state)
result = result[0]
return result
agent = DQNAgent()
current_img_stack = deque(maxlen=4)
# make the game active
game = gw.getWindowsWithTitle('Mesen')[0]
game.activate()
time.sleep(1)
release_all()
# Iterate over episodes
for episode in tqdm(range(1, EPISODES + 1), ascii=True, unit='episodes'):
episode_reward = 0
step = 1
if episode <= START_EPISODE_DECAY - 1:
start_epsilon = False
elif episode >= END_EPISODE_DECAY + 1:
start_epsilon = False
else:
start_epsilon = True
# Reset environment and get initial state
# blackscreens followed by the 1st screen starting out
current_state = env.reset()
blackscreen = np.zeros_like(current_state)
current_img_stack.append(blackscreen)
current_img_stack.append(blackscreen)
current_img_stack.append(blackscreen)
current_img_stack.append(current_state)
stacked_state = np.stack(current_img_stack, axis=2)
stacked_state = np.ascontiguousarray(stacked_state, dtype=np.float32) / 255
stacked_state = np.transpose(stacked_state, (1, 0, 2))
stacked_state = np.expand_dims(stacked_state, axis=0)
start_time = time.time()
# Reset flag and start iterating until episode ends
done = False
while not done:
if np.random.random() > epsilon:
action = np.argmax(agent.get_qs(stacked_state))
else:
action = np.random.randint(0, env.ACTION_SPACE_SIZE)
new_state, reward, done, prediction, preview = env.step(action)
if done is False:
next_img_stack = current_img_stack
next_img_stack.append(new_state)
next_stack = np.stack(next_img_stack, axis=2)
next_stack = np.ascontiguousarray(next_stack, dtype=np.float32) / 255
next_stack = np.transpose(next_stack, (1, 0, 2))
next_stack = np.expand_dims(next_stack, axis=0)
# current_state = new_state
current_img_stack = next_img_stack
stacked_state = next_stack
else:
next_img_stack = current_img_stack
next_img_stack.append(blackscreen)
next_stack = np.stack(next_img_stack, axis=2)
next_stack = np.ascontiguousarray(next_stack, dtype=np.float32) / 255
next_stack = np.transpose(next_stack, (1, 0, 2))
next_stack = np.expand_dims(next_stack, axis=0)
step += 1
episode_reward += reward
ep_rewards.append(episode_reward)
if SHOW_PREVIEW:
env.render(preview, prediction)
if training is True:
agent.update_replay_memory((stacked_state, action, reward, next_stack, done))
# print(episode_reward)
if done is True:
ep_reward_final.append(episode_reward)
print(' Epsilon(' + str(epsilon) + ') EPtimes(' + str(time.time() - start_time) + ') done('
+ str(done) + ') step(' + str(step) + ') EPreward(' + str(episode_reward) +
') best_reward_this_session(' + str(max(ep_reward_final)) + ') fps(' +
str(step/(time.time() - start_time)) + ')')
# plot(ep_reward_final)
if training is True:
agent.train(done)
# Decay epsilon
if show_info is False and epsilon <= MIN_EPSILON:
print(f"\nEPS_DECAY ended on episode {episode} - epsilon {epsilon}")
epsilon = MIN_EPSILON
show_info = True
elif start_epsilon is True:
epsilon += EPS_DECAY