как завершить сервер серверный поток от клиента grpc - PullRequest
0 голосов
/ 12 октября 2019

Я хочу реализовать метод, который может вызываться клиентом для завершения потоковой передачи сервера. Есть ли что-нибудь в GRPC Python или как я могу это реализовать. Проще говоря, я хочу, чтобы клиент сказал «больше не транслировать». Кроме того, я хочу, чтобы мой сервер не выполнял обратный вызов после завершения потоковой передачи.

мой протокол выглядит следующим образом:

syntax = "proto3";

service TestService {
  rpc GetOneToOne(TestRequest) returns (TestResponse) {}
  rpc GetOneToStream(TestRequest) returns (stream TestResponse) {}
  rpc GetStreamToOne(stream TestRequest) returns (TestResponse) {}
  rpc GetStreamToStream(stream TestRequest) returns (stream TestResponse) {}
}

message TestRequest {
  string message = 1;
}

message TestResponse {
  string message = 1;
}

client.py:

import grpc
import string
import random
from system_test.sw_test.vstars.lib.low_level._endpoint_lib.test import(
    test_pb2, test_pb2_grpc
)

 # open a gRPC channel
channel = grpc.insecure_channel('localhost:50052')

 # create a stub (client)
stub = test_pb2_grpc.TestServiceStub(channel)

# create a valid request message
number = test_pb2.TestRequest(message="pri")

# One to One
response1 = stub.GetOneToOne(number)

print(response1.message)

# stream to stream
iterator = iter([test_pb2.TestRequest(message=x) for x in ["gri","tree","dree","bri"]])
response4 = stub.GetStreamToStream(iterator)
for resp in response4:
    print("StreamToStream",resp)

server.py:

from system_test.sw_test.vstars.lib.low_level._endpoint_lib.test import(
    test_pb2, test_pb2_grpc
)
from system_test.sw_test.vstars.lib.low_level import endpoint_lib

import grpc
from concurrent import futures
import time
from queue import Empty
import queue


class _Servicer(test_pb2_grpc.TestServiceServicer):
    def GetOneToOne(self, request, context):
        return test_pb2.TestResponse(message='response: {}'.format(request.message))

    def GetStreamToStream(self, request_iterator, context):
        # yield from map(
        #     lambda d: test_pb2.TestResponse(message='response: {}'.format(d.message)),
        #     request_iterator
        # )
        def remove_feed():
            print("fsdfdsf",context)

        def stop_stream():
            remove_feed()

        # context.add_callback(stop_stream)
        i=0
        while True:
            try:
                print("in here",i)
                yield from map(
                    lambda d: test_pb2.TestResponse(message='response: {}'.format(d.message)),
                    request_iterator
                )
                i+=1
            except KeyboardInterrupt:
                print("Key")
                context.add_callback(stop_stream)

# create a gRPC server
rx_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
test_pb2_grpc.add_TestServiceServicer_to_server(_Servicer(), rx_server)

# listen on port 50051
print('Starting server. Listening on port 50051.')
rx_server.add_insecure_port('[::]:50052')
rx_server.start()

# since server.start() will not block,
# a sleep-loop is added to keep alive
try:
    while True:
        time.sleep(86400)
except KeyboardInterrupt:
    rx_server.stop(0)

...