Python 3 неблокирующее синхронное поведение - PullRequest
0 голосов
/ 16 января 2019

Я делаю классическую игру Atari Snake на python3, используя Pygame . Я хочу порождать подпроцесс для прослушивания нажатий клавиш, чтобы каждый раз, когда игрок вводил клавишу (UP, DOWN, LEFT или RIGHT), подпроцесс отправлял ключ родительскому процессу. Но эта труба не должна блокировать, чтобы змея могла двигаться в том направлении, в котором она двигалась, до получения ключа.

Я нашел официальную документацию Python по мультипроцессам , но она не описывает желаемое поведение или, по крайней мере, не документирует, является ли пример использования блокирующим или нет. Может кто-нибудь привести пример того, как этого можно достичь?

1 Ответ

0 голосов
/ 16 января 2019

Вы сказали:

Я хочу создать интерфейс для ИИ, чтобы взять под контроль змею. Было бы несправедливо, если бы состояние игры просто передавалось ИИ на каждой итерации, так как это могло бы занять столько времени, сколько потребуется для вычисления следующего хода. Следовательно, почему он должен быть синхронным и неблокирующим.

Итак, чтобы получить то, что вы хотите, вам нужна абстракция. В приведенном ниже примере я создал класс Controller, который это делает. KeyboardController обрабатывает ввод с клавиатуры, в то время как AsyncController запускает поток и использует класс Queue для передачи состояния игры и решения «ИИ». Обратите внимание, что вы должны получать события pygame в основном потоке, поэтому я делаю это в основном цикле и просто передаю события в контроллер.

Ваш ИИ должен вызываться функцией worker. Как вы можете видеть, в настоящее время «ИИ» в рабочей функции действует только каждые 0,5 секунды, а частота кадров равна 120. Для игры не имеет значения, что ИИ принимает решение так долго.

Вот код:

import pygame
import time
import random
from queue import Queue, Empty
from threading import Thread

class Controller():
    def __init__(self, color, message, actor):
        self.color = color
        self.message = message
        if actor: self.attach(actor)

    def attach(self, actor):
        self.actor = actor
        self.actor.controller = self
        self.actor.image.fill(self.color)

class AsyncController(Controller):
    def __init__(self, actor=None):
        super().__init__(pygame.Color('orange'), "AI is in control.", actor)
        self.out_queue = Queue()
        self.in_queue  = Queue()
        t = Thread(target=self.worker)
        t.daemon = True
        t.start()

    def update(self, events, dt):
        for e in events:
            if e.type == pygame.KEYDOWN:
                if e.key == pygame.K_SPACE: self.actor.controller = KeyboardController(self.actor)

        self.out_queue.put_nowait((self.actor, events, dt))
        try: return self.in_queue.get_nowait()
        except Empty: pass

    def worker(self):
        while True:
            try:
                actor, events, dt = self.out_queue.get_nowait()
                if actor.rect.x < 100: self.in_queue.put_nowait(pygame.Vector2(1, 0))
                if actor.rect.x > 600: self.in_queue.put_nowait(pygame.Vector2(-1, 0))
                if actor.rect.y < 100: self.in_queue.put_nowait(pygame.Vector2(0, 1))
                if actor.rect.y > 400: self.in_queue.put_nowait(pygame.Vector2(0, -1))
                if random.randrange(1, 100) < 15:
                    self.in_queue.put_nowait(random.choice([
                        pygame.Vector2(1, 0),
                        pygame.Vector2(-1, 0),
                        pygame.Vector2(0, -1), 
                        pygame.Vector2(0, 1)]))

                time.sleep(0.5)
            except Empty:
                pass

class KeyboardController(Controller):
    def __init__(self, actor=None):
        super().__init__(pygame.Color('dodgerblue'), "You're in control.", actor)

    def update(self, events, dt):
        for e in events:
            if e.type == pygame.KEYDOWN:
                if e.key == pygame.K_SPACE: self.actor.controller = AsyncController(self.actor)
                if e.key == pygame.K_UP: return pygame.Vector2(0, -1)
                if e.key == pygame.K_DOWN: return pygame.Vector2(0, 1)
                if e.key == pygame.K_LEFT: return pygame.Vector2(-1, 0)
                if e.key == pygame.K_RIGHT: return pygame.Vector2(1, 0)

class Actor(pygame.sprite.Sprite):
    def __init__(self):
        super().__init__()
        self.image = pygame.Surface((32, 32))
        self.image.fill(pygame.Color('dodgerblue'))
        self.rect = self.image.get_rect(center=(100, 100))
        self.direction = pygame.Vector2(1, 0)
        self.pos = self.rect.center

    def update(self, events, dt):
        new_direction = self.controller.update(events, dt)
        if new_direction:
            self.direction = new_direction
        self.pos += (self.direction * dt * 0.2)
        self.rect.center = self.pos

def main():
    pygame.init()

    actor   = Actor()
    sprites = pygame.sprite.Group(actor)
    screen  = pygame.display.set_mode([800,600])
    clock   = pygame.time.Clock()
    font    = pygame.font.SysFont("consolas", 20, True)
    dt      = 0
    KeyboardController(actor)

    while True:
        events = pygame.event.get()
        for e in events:
            if e.type == pygame.QUIT:
                return

        sprites.update(events, dt)
        screen.fill(pygame.Color('grey12'))
        screen.blit(font.render(actor.controller.message + ' [SPACE] to change to keyboard control.', True, pygame.Color('white')), (10, 10))
        sprites.draw(screen)
        dt = clock.tick(120)
        pygame.display.update()

if __name__ == '__main__':
    main()

enter image description here

Обратите внимание, что эта реализация использует бесконечные очереди. Вы хотите добавить логику для очистки очередей, чтобы в ваших играх не использовалось много памяти.

...