Как я могу гарантировать, что каждый поток gRPC обновляется один раз и избегает условий гонки? - PullRequest
0 голосов
/ 06 марта 2019

Что я пытаюсь сделать: когда я обновляю состояние объекта, все клиенты gRPC должны получать обновление через поток gRPC. Важно, чтобы каждый клиент получал каждое обновление и получал его ровно один раз.

То, что я ожидаю, произойдет: когда я выполняю event.set (), а затем event.clear () сразу после этого, все клиенты запускаются один раз, возвращая новый статус.

Что на самом деле происходит: на клиентах отсутствуют обновления. Например, моя сервисная функция отправляет 10 обновлений версии. На стороне клиента пропущены эти обновления, я посмотрю, где у него обновление 1 2, затем пропустит 3 или какое-то другое обновление, затем начнет получать их снова.

Версия сервера 1, это не работает, потому что клиентам не хватает некоторых обновлений:

class StatusStreamer(pb2_grpc.StatusServiceServicer):
    def __init__(self, status, event):
        self.continue_running = True
        self.status = status
        self.event = event


    def StatusSubscribe(self, request, context):
        while self.continue_running:
            self.event.wait()
            yield self.status


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    status = status_builder()
    event = threading.Event()
    status_streamer = StatusStreamer(status, event)
    pb2_grpc.add_StatusServiceServicer_to_server(status_streamer, server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print('server started')
    try:
        while True:
            _ = input('enter a key to update')
            for _ in range(10):
                #make an update and send it out to all clients
                status.version = str(int(status.version) + 1)
                print('update:',status.version)
                event.set()
                event.clear()
    except KeyboardInterrupt:
        print('\nstopping...')
        event.set()
        status_streamer.continue_running = False
        server.stop(0)

Сервер версии 2, этот работает, но я думаю, что есть условие гонки: Во второй версии вместо использования threading.Event я использую логическое значение new_update, которое используется всеми потоками. Внутри функции serve я устанавливаю значение true, а затем все потоки устанавливают значение False.

class StatusStreamer(pb2_grpc.StatusServiceServicer):
    def __init__(self, status):
        self.continue_running = True
        self.new_update = False
        self.status = status


    def StatusSubscribe(self, request, context):
        while self.continue_running:
            if self.new_update:
                yield self.status
                self.new_update = False #race condition I believe, that maybe doesn't occur because of the GIL.  




def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    status = status_builder()
    status_streamer = StatusStreamer(status)
    pb2_grpc.add_StatusServiceServicer_to_server(status_streamer, server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print('server started')
    try:
        while True:
            _ = input('enter a key to update')
            for _ in range(10):
                #make an update and send it out to all clients
                status.version = str(int(status.version) + 1)
                print('update:', status.version)
                status_streamer.new_update = True #Also a race condition I believe.
    except KeyboardInterrupt:
        print('\nstopping...')
        status_streamer.continue_running = False
        server.stop(0)

Я полагаю, что вторая версия работает только потому, что она использует глобальную блокировку интерпретатора CPython, гарантирующую, что ни один поток не будет мутировать в то же время new_update. Мне не нравится это решение, какие у меня варианты? Кроме того, я знаю, что могу создать очередь или список и сохранить все изменения, а затем отслеживать, где находится каждый подключенный клиент, Я не хочу выделять память для этого.

1 Ответ

1 голос
/ 06 марта 2019

Для версии сервера 1 причина отсутствия обновления заключается в том, что, как только основной поток удерживает GIL, он может выполнить несколько event.set(), прежде чем передать GIL другим потокам.Поэтому другой поток может не блокироваться event.wait(), что приводит к отсутствию обновлений.Потенциальным исправлением будет сохранение счетчика соединений и блокирование обновления версии до тех пор, пока сервер не отправит обновление для всех соединений.

Для версии сервера 2 используйте threading.Lock или threading.RLock.состояние гонки.Кроме того, эта версия будет потреблять много циклов ЦП при проверке флагов, что может ухудшить вашу бизнес-логику в других потоках.И также возможно, что основной поток слишком долго удерживает GIL, чтобы сервер еще не отправлял сообщения всем соединениям.

К сожалению, у меня нет идеального решения для удовлетворения ваших требований.Команда gRPC имеет реализацию обслуживающего устройства с аналогичной функциональностью на https://github.com/grpc/grpc/blob/v1.18.x/src/python/grpcio_health_checking/grpc_health/v1/health.py.

. В реализации обслуживающее устройство будет сохранять ссылку на возвращенные итераторы ответа.Когда статус обновляется, сервисер явно добавляет сообщение к соответствующим итераторам ответа.Следовательно, обновление статуса не будет пропущено.

Надеюсь, что это может ответить на ваш вопрос.

...