Смешивание синхронного и A-sync кода в Python - PullRequest
0 голосов
/ 23 января 2019

Я пытаюсь преобразовать синхронный поток в коде Python, который основан на обратных вызовах, в A-синхронный поток с использованием asyncio. По сути, код тесно взаимодействует с сокетами TCP / UNIX. Он читает данные из сокетов, манипулирует ими для принятия решений и записывает данные обратно на другую сторону. Это происходит по нескольким сокетам одновременно, и данные совместно используются контекстами, чтобы иногда принимать решения.

EDIT :: Код в настоящее время в основном основан на регистрации обратного вызова в центральном объекте для определенного сокета и на том, что этот объект запускает обратный вызов, когда соответствующий сокет доступен для чтения (что-то вроде «вызова этой функции, когда этот сокет содержит данные»). быть прочитанным "). Как только вызывается обратный вызов, происходит куча вещей, и в конечном итоге регистрируется новый обратный вызов, когда появляются новые данные. Центральный объект выполняет выборку по всем зарегистрированным сокетам, чтобы выяснить, какие обратные вызовы следует вызывать.

Я пытаюсь сделать это без рефакторинга всего моего кода и сделать это как можно более понятным для программиста - поэтому я пытался думать об этом примерно так - весь код должен работать так же, как и сегодня - но всякий раз, когда текущий код выполняет socket.recv () для получения новых данных, процесс будет выполнять другие задачи. Когда чтение возвращается, оно должно вернуться к обработке данных из той же точки, используя новые полученные данные.

Для этого я написал новый класс под названием AsyncSocket, который взаимодействует с потоками ввода-вывода asyncIO, и поместил там почти исключительно исключительно операторы Async / await, полагая, что я реализую метод recv в своем классе, чтобы он выглядел как «обычный IO сокет» для остальной части моего кода. Пока - это мое понимание того, что должно разрешить программирование A-sync.

Теперь к проблеме:

Мой код ожидает подключения клиентов - когда это происходит, контекст каждого клиента может считывать и записывать из своего собственного подключения. Я упростил переход к следующему, чтобы прояснить проблему:

class AsyncSocket():
    def __init__(self,reader,writer):
        self.reader = reader
        self.writer = writer
    def recv(self,numBytes):
        print("called recv!")
        data = self.read_mitigator(numBytes)
        return data
    async def read_mitigator(self,numBytes):
        print("Awaiting of AsyncSocket.reader.read")
        data = await self.reader.read(numBytes)
        print("Done Awaiting of AsyncSocket.reader.read data is %s " % data)
        return data 

def mit2(aSock):
    return mit3(aSock)

def mit3(aSock):
    return aSock.recv(100)

async def echo_server(reader, writer):
    print ("New Connection!")
    aSock = AsyncSocket(reader,writer) # create a new A-sync socket class and pass it on the to regular code

    while True:
        data = await some_func(aSock) # this would eventually read from the socket
        print ("Data read is %s" % (data))
        if not data:
            break
        writer.write(data) # echo everything back

async def main(host, port):
    server = await asyncio.start_server(echo_server, host, port)
    await server.serve_forever()
asyncio.run(main('127.0.0.1', 5000))

mit2 () и mit3 () - синхронные функции, которые обрабатывают данные на обратном пути, прежде чем вернуться к циклу основного клиента - но здесь я просто использую их как пустые функции. Проблема начинается, когда я играю с реализацией some_func ().

Сквозная реализация (правка: вид работ) - но все еще есть проблемы:

def some_func(aSock):
    try:
        return (mit2(aSock)) # works
    except:
        print("Error!!!!")

Хотя реализация, которая читает данные и что-то с ними делает - например, добавляет суффикс перед возвратом, выдает ошибку:

def some_func(aSock):
    try:
        return (mit2(aSock) + "something") # doesn't work
    except:
        print("Error!!!!")

Ошибка (насколько я понимаю) означает, что он на самом деле не делает то, что должен:

New Connection!
called recv!
/Users/user/scripts/asyncServer.py:36: RuntimeWarning: coroutine 'AsyncSocket.read_mitigator' was never awaited
  return (mit2(aSock) + "something") # doesn't work
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Error!!!!
Data read is None

И эхо-сервер, очевидно, не работает. Очевидно, мой код больше похож на вариант №2 с гораздо большим количеством вещей в some_func (), mit2 () и mit3 () - но я не могу заставить это работать. Я довольно новичок в использовании asyncio / async / await - так что (довольно базовая концепция, я думаю) мне не хватает?

1 Ответ

0 голосов
/ 23 января 2019

Этот код не будет работать так, как предполагалось:

def recv(self,numBytes):
    print("called recv!")
    data = self.read_mitigator(numBytes)
    return data

async def read_mitigator(self,numBytes):
    ...

Вы не можете вызвать асинхронную функцию из функции синхронизации и получить результат, вы должны ожидать этого, что гарантирует, что вы вернетесь в цикл обработки событий, если данные еще не готовы. Это несоответствие между асинхронным и синхронизирующим кодом иногда называют проблемой функция цвета .

Поскольку ваш код уже использует неблокирующие сокеты и цикл обработки событий, хорошим подходом для его переноса в asyncio может быть сначала переключиться на цикл обработки событий asyncio. Вы можете использовать методы цикла событий, такие как sock_recv для запроса данных:

def start():
    loop = asyncio.get_event_loop()
    sock = make_socket()  # make sure it's non-blocking
    future_data = loop.sock_recv(sock, 1024)
    future_data.add_done_callback(continue_read)
    # return to the event loop - when some data is ready
    # continue_read will be invoked

def continue_read(future):
    data = future.result()
    print('got', data)
    # ... do something with data, e.g. process it
    # and call sock_sendall with the response

asyncio.get_event_loop().call_soon(start())
asyncio.get_event_loop().run_forever()

После того, как программа работает в этом режиме, вы можете перейти к сопрограммам , которые позволяют коду выглядеть как код синхронизации, но работают точно так же:

async def start():
    loop = asyncio.get_event_loop()
    sock = make_socket()  # make sure it's non-blocking
    data = await loop.sock_recv(sock, 1024)
    # data is available "immediately", meaning the coroutine gets
    # automatically suspended when awaiting data that is not yet
    # ready, and automatically re-scheduled when the data is ready
    print('got', data)

asyncio.run(start())

Следующим шагом может быть устранение make_socket и переключение на asyncio streams .

...