Я хочу реализовать метод, который может вызываться клиентом для завершения потоковой передачи сервера. Есть ли что-нибудь в GRPC Python или как я могу это реализовать. Проще говоря, я хочу, чтобы клиент сказал «больше не транслировать». Кроме того, я хочу, чтобы мой сервер не выполнял обратный вызов после завершения потоковой передачи.
мой протокол выглядит следующим образом:
syntax = "proto3";
service TestService {
rpc GetOneToOne(TestRequest) returns (TestResponse) {}
rpc GetOneToStream(TestRequest) returns (stream TestResponse) {}
rpc GetStreamToOne(stream TestRequest) returns (TestResponse) {}
rpc GetStreamToStream(stream TestRequest) returns (stream TestResponse) {}
}
message TestRequest {
string message = 1;
}
message TestResponse {
string message = 1;
}
client.py:
import grpc
import string
import random
from system_test.sw_test.vstars.lib.low_level._endpoint_lib.test import(
test_pb2, test_pb2_grpc
)
# open a gRPC channel
channel = grpc.insecure_channel('localhost:50052')
# create a stub (client)
stub = test_pb2_grpc.TestServiceStub(channel)
# create a valid request message
number = test_pb2.TestRequest(message="pri")
# One to One
response1 = stub.GetOneToOne(number)
print(response1.message)
# stream to stream
iterator = iter([test_pb2.TestRequest(message=x) for x in ["gri","tree","dree","bri"]])
response4 = stub.GetStreamToStream(iterator)
for resp in response4:
print("StreamToStream",resp)
server.py:
from system_test.sw_test.vstars.lib.low_level._endpoint_lib.test import(
test_pb2, test_pb2_grpc
)
from system_test.sw_test.vstars.lib.low_level import endpoint_lib
import grpc
from concurrent import futures
import time
from queue import Empty
import queue
class _Servicer(test_pb2_grpc.TestServiceServicer):
def GetOneToOne(self, request, context):
return test_pb2.TestResponse(message='response: {}'.format(request.message))
def GetStreamToStream(self, request_iterator, context):
# yield from map(
# lambda d: test_pb2.TestResponse(message='response: {}'.format(d.message)),
# request_iterator
# )
def remove_feed():
print("fsdfdsf",context)
def stop_stream():
remove_feed()
# context.add_callback(stop_stream)
i=0
while True:
try:
print("in here",i)
yield from map(
lambda d: test_pb2.TestResponse(message='response: {}'.format(d.message)),
request_iterator
)
i+=1
except KeyboardInterrupt:
print("Key")
context.add_callback(stop_stream)
# create a gRPC server
rx_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
test_pb2_grpc.add_TestServiceServicer_to_server(_Servicer(), rx_server)
# listen on port 50051
print('Starting server. Listening on port 50051.')
rx_server.add_insecure_port('[::]:50052')
rx_server.start()
# since server.start() will not block,
# a sleep-loop is added to keep alive
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
rx_server.stop(0)