Что я пытаюсь сделать: когда я обновляю состояние объекта, все клиенты gRPC должны получать обновление через поток gRPC. Важно, чтобы каждый клиент получал каждое обновление и получал его ровно один раз.
То, что я ожидаю, произойдет: когда я выполняю event.set (), а затем event.clear () сразу после этого, все клиенты запускаются один раз, возвращая новый статус.
Что на самом деле происходит: на клиентах отсутствуют обновления. Например, моя сервисная функция отправляет 10 обновлений версии. На стороне клиента пропущены эти обновления, я посмотрю, где у него обновление 1 2, затем пропустит 3 или какое-то другое обновление, затем начнет получать их снова.
Версия сервера 1, это не работает, потому что клиентам не хватает некоторых обновлений:
class StatusStreamer(pb2_grpc.StatusServiceServicer):
def __init__(self, status, event):
self.continue_running = True
self.status = status
self.event = event
def StatusSubscribe(self, request, context):
while self.continue_running:
self.event.wait()
yield self.status
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
status = status_builder()
event = threading.Event()
status_streamer = StatusStreamer(status, event)
pb2_grpc.add_StatusServiceServicer_to_server(status_streamer, server)
server.add_insecure_port('[::]:50051')
server.start()
print('server started')
try:
while True:
_ = input('enter a key to update')
for _ in range(10):
#make an update and send it out to all clients
status.version = str(int(status.version) + 1)
print('update:',status.version)
event.set()
event.clear()
except KeyboardInterrupt:
print('\nstopping...')
event.set()
status_streamer.continue_running = False
server.stop(0)
Сервер версии 2, этот работает, но я думаю, что есть условие гонки:
Во второй версии вместо использования threading.Event я использую логическое значение new_update, которое используется всеми потоками. Внутри функции serve я устанавливаю значение true, а затем все потоки устанавливают значение False.
class StatusStreamer(pb2_grpc.StatusServiceServicer):
def __init__(self, status):
self.continue_running = True
self.new_update = False
self.status = status
def StatusSubscribe(self, request, context):
while self.continue_running:
if self.new_update:
yield self.status
self.new_update = False #race condition I believe, that maybe doesn't occur because of the GIL.
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
status = status_builder()
status_streamer = StatusStreamer(status)
pb2_grpc.add_StatusServiceServicer_to_server(status_streamer, server)
server.add_insecure_port('[::]:50051')
server.start()
print('server started')
try:
while True:
_ = input('enter a key to update')
for _ in range(10):
#make an update and send it out to all clients
status.version = str(int(status.version) + 1)
print('update:', status.version)
status_streamer.new_update = True #Also a race condition I believe.
except KeyboardInterrupt:
print('\nstopping...')
status_streamer.continue_running = False
server.stop(0)
Я полагаю, что вторая версия работает только потому, что она использует глобальную блокировку интерпретатора CPython, гарантирующую, что ни один поток не будет мутировать в то же время new_update. Мне не нравится это решение, какие у меня варианты? Кроме того, я знаю, что могу создать очередь или список и сохранить все изменения, а затем отслеживать, где находится каждый подключенный клиент, Я не хочу выделять память для этого.