Google Cloud Pubsub Python синхронная тяга - PullRequest
1 голос
/ 10 октября 2019

У меня одна тема и одна подписка с несколькими подписчиками. Сценарий моего приложения заключается в том, что я хочу обрабатывать сообщения от разных подписчиков с определенным количеством сообщений, которые должны обрабатываться одновременно. Вначале подразумевается, что 8 сообщений обрабатываются, затем, если выполняется одна обработка сообщения, после подтверждения обработанного сообщения следующее сообщение должно быть извлечено из темы, при этом не должно быть никаких повторяющихся сообщений, которые могут быть найдены ни у одного подписчика, и каждый раз, когда 8 сообщений должны обрабатываться в фоновом режиме.

Для этого я использовал метод синхронного извлечения с max_messages = 8, но следующее извлечение выполняется после завершения всех сообщений. Итак, мы создали собственный планировщик, в котором одновременно должен выполняться 8 процессов в фоновом режиме и извлекать по 1 сообщению за раз, но все же после того, как все 8 обработанных сообщений завершены, следующее сообщение доставляется.

Вот мой код:

    #!/usr/bin/env python3

    import logging
    import multiprocessing
    import time
    import sys
    import random
    from google.cloud import pubsub_v1

    project_id = 'xyz'
    subscription_name = 'abc'

    NUM_MESSAGES = 4
    ACK_DEADLINE = 50
    SLEEP_TIME = 20

    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    def worker(msg):
        logger.info("Received message:{}".format(msg.message.data))
        random_sleep = random.randint(200,800)
        logger.info("Received message:{} for {} sec".format(msg.message.data, random_sleep))
        time.sleep(random_sleep)

    def message_puller():
        subscriber = pubsub_v1.SubscriberClient()
        subscription_path = subscriber.subscription_path(project_id, subscription_name)
        while(True):
            try:
                response = subscriber.pull(subscription_path, max_messages=1)
                message = response.received_messages[0]
                msg = message
                ack_id = message.ack_id
                process = multiprocessing.Process(target=worker, args=(message,))
                process.start()
                while process.is_alive():
                    # `ack_deadline_seconds` must be between 10 to 600.
                    subscriber.modify_ack_deadline(subscription_path,[ack_id],ack_deadline_seconds=ACK_DEADLINE)
                    time.sleep(SLEEP_TIME)
                # Final ack.
                subscriber.acknowledge(subscription_path, [ack_id])
                logger.info("Acknowledging message: {}".format(msg.message.data))
    except Exception as e:
        print (e)
        continue

    def synchronous_pull():
        p = []
        for i in range(0,NUM_MESSAGES):
            p.append(multiprocessing.Process(target=message_puller))

        for i in range(0,NUM_MESSAGES):
            p[i].start()

        for i in range(0,NUM_MESSAGES):
            p[i].join()

    if __name__ == '__main__':
        synchronous_pull()

Также на некоторое время subscriber.pull не тянет никакие сообщения, даже когда цикл while всегда имеет значение True. Это дает мне ошибку, так как индекс списка (0) выходит за пределы допустимого. Вывод, что subscriber.pull не извлекает сообщения, даже сообщения находятся в теме, но через некоторое время начинает вытягивать. Почему это так?

Я пробовал с асинхронным вытягиванием и управлением потоком, но повторное сообщение найдено на нескольких подписчиках. Если какой-либо другой метод решит мою проблему, то дайте мне знать. Заранее спасибо.

1 Ответ

1 голос
/ 10 октября 2019

Google Cloud PubSub обеспечивает Как минимум один раз ( документы ). Это означает, что сообщения могут быть доставлены более одного раза. Чтобы решить эту проблему, вам нужно сделать вашу программу / систему idempotent

У вас есть несколько подписчиков, каждый из которых тянет по 8 сообщений.
Чтобы избежать обработки одного и того же сообщения несколькими подписчиками, acknowledge сообщение, как только любой подписчик извлекает это сообщение и переходит к дальнейшей обработке, а не подтверждает его в конце, после полной обработки сообщения.

Кроме того, вместо непрерывного запуска основного сценария используйте sleep в течение некоторого постоянного времени, когда в очереди нет сообщений.

У меня был похожий код, где я использовал синхронное извлечение, за исключением того, что я не использовал параллельную обработку.

Вот код:

PubSubHandler - класс для обработки операций, связанных с Pubsub

from google.cloud import pubsub_v1
from google.api_core.exceptions import DeadlineExceeded


class PubSubHandler:

    def __init__(self, subscriber_config):

        self.project_name = subscriber_config['PROJECT_NAME']
        self.subscriber_name = subscriber_config['SUBSCRIBER_NAME']

        self.subscriber = pubsub_v1.SubscriberClient()
        self.subscriber_path = self.subscriber.subscription_path(self.project_name,self.subscriber_name)


    def pull_messages(self,number_of_messages):

        try:
            response = self.subscriber.pull(self.subscriber_path, max_messages = number_of_messages)
            received_messages = response.received_messages
        except DeadlineExceeded as e:
            received_messages = []
            print('No messages caused error')
        return received_messages


    def ack_messages(self,message_ids):

        if len(message_ids) > 0:
            self.subscriber.acknowledge(self.subscriber_path, message_ids)
            return True

Utils - класс для утилитарных методов

import json

class Utils:


    def __init__(self):
        pass


    def decoded_data_to_json(self,decoded_data):
        try:
            decoded_data = decoded_data.replace("'", '"')
            json_data = json.loads(decoded_data)
            return json_data
        except Exception as e:
            raise Exception('error while parsing json')


    def raw_data_to_utf(self,raw_data):
        try:
            decoded_data = raw_data.decode('utf8')
            return decoded_data
        except Exception as e:
            raise Exception('error converting to UTF')

Orcestrator - Основной скрипт


import time
import json
import logging

from utils import Utils
from db_connection import DbHandler
from pub_sub_handler import PubSubHandler

class Orcestrator:

    def __init__(self):

        self.MAX_NUM_MESSAGES = 2
        self.SLEEP_TIME = 10
        self.util_methods = Utils()
        self.pub_sub_handler = PubSubHandler(subscriber_config)


    def main_handler(self):
        to_ack_ids = []
        pulled_messages = self.pub_sub_handler.pull_messages(self.MAX_NUM_MESSAGES)

        if len(pulled_messages) < 1:
            self.SLEEP_TIME = 1
            print('no messages in queue')
            return

        logging.info('messages in queue')
        self.SLEEP_TIME = 10

        for message in pulled_messages:
            raw_data = message.message.data
            try: 
                decoded_data = self.util_methods.raw_data_to_utf(raw_data)  
                json_data = self.util_methods.decoded_data_to_json(decoded_data)
                print(json_data)

            except Exception as e:
                logging.error(e)
            to_ack_ids.append(message.ack_id)

        if self.pub_sub_handler.ack_messages(to_ack_ids):
            print('acknowledged msg_ids')


if __name__ == "__main__":

    orecestrator = Orcestrator()
    print('Receiving data..')
    while True:
        orecestrator.main_handler()
        time.sleep(orecestrator.SLEEP_TIME)

...