обработка асинхронного потокового запроса в grpc python - PullRequest
2 голосов
/ 06 марта 2019

Я пытаюсь понять, как обработать API-интерфейс grpc с помощью двунаправленной потоковой передачи (используя Python API).

Скажите, что у меня есть следующее простое определение сервера:

syntax = "proto3";
package simple;

service TestService {
  rpc Translate(stream Msg) returns (stream Msg){}
}

message Msg
{
 string msg = 1;
}

Скажите, чтосообщения, которые будут отправлены от клиента, поступают асинхронно (вследствие выбора пользователем некоторых элементов пользовательского интерфейса).

Сгенерированная заглушка Python для клиента будет содержать метод Translate, который будет принимать функцию генератора ивернет итератор.

Что мне не ясно, так это как написать функцию генератора, которая будет возвращать сообщения по мере их создания пользователем.Спать в цепочке в ожидании сообщений - не лучшее решение.

1 Ответ

3 голосов
/ 07 марта 2019

Сейчас это немного неуклюже, но вы можете выполнить свой вариант использования следующим образом:

#!/usr/bin/env python

from __future__ import print_function

import time
import random
import collections
import threading

from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
import grpc

from translate_pb2 import Msg
from translate_pb2_grpc import TestServiceStub
from translate_pb2_grpc import TestServiceServicer
from translate_pb2_grpc import add_TestServiceServicer_to_server


def translate_next(msg):
    return ''.join(reversed(msg))


class Translator(TestServiceServicer):
  def Translate(self, request_iterator, context):
    for req in request_iterator:
      print("Translating message: {}".format(req.msg))
      yield Msg(msg=translate_next(req.msg))

class TranslatorClient(object):
  def __init__(self):
    self._stop_event = threading.Event()
    self._request_condition = threading.Condition()
    self._response_condition = threading.Condition()
    self._requests = collections.deque()
    self._last_request = None
    self._expected_responses = collections.deque()
    self._responses = {}

  def _next(self):
    with self._request_condition:
      while not self._requests and not self._stop_event.is_set():
        self._request_condition.wait()
      if len(self._requests) > 0:
        return self._requests.popleft()
      else:
        raise StopIteration()

  def next(self):
    return self._next()

  def __next__(self):
    return self._next()

  def add_response(self, response):
    with self._response_condition:
      request = self._expected_responses.popleft()
      self._responses[request] = response
      self._response_condition.notify_all()

  def add_request(self, request):
    with self._request_condition:
      self._requests.append(request)
      with self._response_condition:
        self._expected_responses.append(request.msg)
      self._request_condition.notify()

  def close(self):
    self._stop_event.set()
    with self._request_condition:
      self._request_condition.notify()

  def translate(self, to_translate):
    self.add_request(to_translate)
    with self._response_condition:
      while True:
        self._response_condition.wait()
        if to_translate.msg in self._responses:
          return self._responses[to_translate.msg]


def _run_client(address, translator_client):
  with grpc.insecure_channel('localhost:50054') as channel:
    stub = TestServiceStub(channel)
    responses = stub.Translate(translator_client)
    for resp in responses:
      translator_client.add_response(resp)

def main():
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  add_TestServiceServicer_to_server(Translator(), server)
  server.add_insecure_port('[::]:50054')
  server.start()
  translator_client = TranslatorClient()
  client_thread = threading.Thread(
      target=_run_client, args=('localhost:50054', translator_client))
  client_thread.start()

  def _translate(to_translate):
    return translator_client.translate(Msg(msg=to_translate)).msg

  translator_pool = futures.ThreadPoolExecutor(max_workers=4)
  to_translate = ("hello", "goodbye", "I", "don't", "know", "why",)
  translations = translator_pool.map(_translate, to_translate)
  print("Translations: {}".format(zip(to_translate, translations)))

  translator_client.close()
  client_thread.join()
  server.stop(None)


if __name__ == "__main__":
  main()

Основная идея состоит в том, чтобы объект с именем TranslatorClient работал в отдельном потоке, сопоставляя запросыи ответы.Ожидается, что ответы будут возвращаться в том порядке, в котором запросы были отправлены.Он также реализует интерфейс итератора, так что вы можете передать его непосредственно вызову метода Translate в вашей заглушке.

Мы запускаем поток, выполняющий _run_client, который извлекает ответы из TranslatorClient ивозвращает их на другом конце с помощью add_response.

Функция main, которую я здесь включил, на самом деле просто трубочист, так как у меня нет деталей вашего кода пользовательского интерфейса.Я запускаю _translate в ThreadPoolExecutor, чтобы продемонстрировать, что, хотя translator_client.translate является синхронным, он дает, что позволяет вам иметь несколько запросов в полете одновременно.

Мы понимаем, что этомного кода, чтобы написать для такого простого варианта использования.В конечном итоге ответом будет asyncio поддержка.У нас есть планы на это в недалеком будущем.Но на данный момент такое решение должно помочь вам работать с Python 2 или Python 3.

...