Асинхронный и ПАРАЛЛЕЛЬНЫЙ генератор - PullRequest
0 голосов
/ 17 мая 2018

У меня есть скрипт на python, который лениво собирает данные, создает обучающие образцы и передает их в мою модель ML для обучения. На данный момент я генерирую данные, используя стандартный генератор Python, который, насколько мне известно, является синхронным. Я ищу умный чистый способ сделать мой генератор действительно асинхронным, поэтому, когда я использую его в качестве итератора, обработка следующих выборок данных начнется сразу после того, как я извлечу последние выборки. Рассмотрим следующий пример:

def asyncgen():
    for i in range(5):
        print("I want this part to work asynchronously :(")
        i = 0;
        while(i<1e8):
            i+=1
        yield "Hi"

a = asyncgen()
for w in a:
    print(w)
    i = 0
    while (i < 1e8):
        i += 1

Как мне заставить мой генератор начать обрабатывать вещи (и асинхронно, в другом процессе) сразу после получения «Привет»? В настоящее время обработка начинается только после вызова цикла next ().

Я изучал Асинхронные генераторы PEP 525, но они, кажется, работают только одновременно, а не параллельно (черт возьми, ГИЛ!). Какой хороший, желательно родной способ сделать это в Python.

Ответы [ 2 ]

0 голосов
/ 17 мая 2018

Сокеты также являются отличным способом решения этой проблемы.
По сути, вместо того, чтобы иметь одну программу с потоками или несколькими процессами, создайте внешнюю, которая выполняет асинхронную, к которой вы подключаетесь извне.

Узнайте больше о сокетах на PMOTW

Вот довольно полный пример (python 2.7):

import argparse
import logging
import socket
import time
import random

description = """
This program sends data across a socket
"""
arg_parser = argparse.ArgumentParser(description=description)
arg_parser.add_argument('--AgentIP', '-i', action='store'    
    , default='localhost' , type=str
    , help="The IP address of this Server that client will connect to."
    )
arg_parser.add_argument('--AgentPort', '-p', action='store'     
    , default='13000' , type=int
    , help="The port number of the socket that client will connect to."
    )
arg_parser.add_argument('--log', action='store'     
    , default='INFO' , type=str
    , help="Log level"
    , choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
    )
args = arg_parser.parse_args()

# use Logger for great justice and heavenly output
logging.basicConfig(level=getattr(logging, args.log, 'CRITICAL'))
log = logging.getLogger(name='SERVER')

### vvv YOUR CODE vvv ###
def asyncgen(log):
    for i in range(5):
        log.debug("I want this part to work asynchronously :(") 
        time.sleep(random.random())
        yield "Hi"
### ^^^ YOUR CODE ^^^ ###

def make_a_connection(server, log):
    # Accept outside connections
    (client_socket, address) = server.accept()
    log.info("Got a connection : {}:{}".format(client_socket,address))

    for value in asyncgen(log):
        client_socket.send(value)
        log.info("SEND:{}".format(value))

    client_socket.close()


def main(args, log):
    server   = socket.socket( socket.AF_INET , socket.SOCK_STREAM )
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    hostname = socket.gethostbyname(args.AgentIP)
    port     = args.AgentPort
    address  = (hostname, port)

    server.bind(address)
    server.listen(1)
    log.info("Server started on {}:{}".format(hostname, port))

    try:
        while True:
          make_a_connection(server, log)

    except (KeyboardInterrupt, SystemExit):
       server.close()
       log.info("Server connections closed")


if __name__=='__main__':
    main(args, log)
0 голосов
/ 17 мая 2018

Единственный способ обойти GIL - это использовать многопроцессорную обработку .

from multiprocessing import Process

def asynch_part(i):
    print("I want this part to work asynchronously :(")
    k = 0;
    while(k<1e8):
        k+=1
    yield "Hi" # +" from " + str(i)

if __name__ == '__main__':
    p=[]
    for i in range(5): # I am keeping the processes listed and trackable,  
                       # perhaps you do not care. os.getpid() tracks them anyway
        p[i] = Process(target=asynch_part, args=(i))
        p[i].start()

    for i in range(5):
        p[i].join()

Таким образом, в приведенном выше коде ваш асинкген выполняется 5 раз независимо, как параллельные процессы. Затем они присоединяются до окончания программы. Ведение списка p просто иллюстративно.

...