Как сделать асинхронные вызовы gRPC в Python? - PullRequest
2 голосов
/ 17 марта 2019

Я, должно быть, что-то делаю не так ... мой сервер gRPC реализован в node.js:

function handler(call, callback) {
   console.log('Received request at ' + Date.now());
   setTimeout(() => {
      callback({ message: 'Done and done' });
   }, 100);
}

Если я назову это 1000 в узле, я получу 1000 ответов примерно за 100 мс:

const resps = [];
for (let i = 0; i < 1000; i += 1) {
    client.doit({ data }, (err, resp) => {
      resps.push(resp);
      if (resps.length === 1000) {
        onDone();
      }
    });
}

Тем не менее, при вызове сервера из Python с использованием service.future я вижу, что сервер получает запрос только после того, как предыдущий вернул:

for _ in range(1000):
    message = Message(data=data)
    resp = client.doit.future(message)
    resp = resp.result()
    resps.append(resp)

Я получаю, что парадигма IO узла отличается (все асинхронно; цикл обработки событий и т. Д.), И приведенный выше пример Python блокирует out.result(), но мой вопрос: могу ли я изменить / оптимизировать клиент Python, чтобы он можно сделать несколько звонков на мой сервер, не дожидаясь возвращения первого?

1 Ответ

2 голосов
/ 22 марта 2019

Вы можете делать асинхронные унарные вызовы в Python следующим образом:

class RpcHandler:
    def rpc_async_req(self, stub):
        def process_response(future):
            duck.quack(future.result().quackMsg)

        duck = Duck()
        call_future = stub.Quack.future(pb2.QuackRequest(quackTimes=5)) #non-blocking call
        call_future.add_done_callback(process_response) #non-blocking call
        print('sent request, we could do other stuff or wait, lets wait this time. . .')
        time.sleep(12) #the main thread would drop out here with no results if I don't sleep
        print('exiting')

class Duck:
    def quack(self, msg):
        print(msg)


def main():
    channel = grpc.insecure_channel('localhost:12345')
    stub = pb2_grpc.DuckServiceStub(channel)
    rpc_handler = RpcHandler()
    rpc_handler.rpc_async_req(stub=stub)

if __name__ == '__main__':
    main()

proto

syntax = "proto3";

package asynch;

service DuckService {
    rpc Quack (QuackRequest) returns (QuackResponse);
}

message QuackRequest {
    int32 quackTimes = 1;
}

message QuackResponse {
    string quackMsg = 1;
}
...