Как исключить дублирование сообщений в Pubsub без использования Dataflow и без использования ACK в Python? - PullRequest
0 голосов
/ 22 апреля 2019

У меня есть сценарий использования, в котором я хочу читать сообщения в Pubsub без подтверждения сообщений. Мне нужна помощь в том, как исключить возможность «дублирования сообщений», которые останутся в хранилище Pubsub, если я не подтвердю доставленное сообщение.

Решения, о которых я подумал:

  1. Сохраните извлеченные сообщения в Datastore и посмотрите, совпадают ли они.
  2. Сохраняйте извлеченные сообщения во время выполнения и проверяйте, является ли мое сообщение дубликатом O (n), сложность времени и пространственная сложность O (n).
  3. Сохраните извлеченные сообщения в файле и сравните новые входящие сообщения из сообщений в файле.
  4. Использование потока данных и исключение возможности (наименее ожидаемый)

Я вижу, что в Pubsub нет функции, подобной смещению, которая, по-моему, похожа на Kafka.

Какой наилучший подход вы бы предложили в этом вопросе / или любой другой альтернативный подход, который я могу использовать?

Я использую Python google-cloud-pubsub_v1 для создания клиента Python и получения сообщений из Pubsub.

Я делюсь кодом, который является логикой для извлечения данных

subscription_path = subscriber.subscription_path(
    project_id, subscription_name)
    NUM_MESSAGES = 3

    # The subscriber pulls a specific number of messages.
    response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

    for received_message in response.received_messages:
        print(received_message.message.data)

1 Ответ

2 голосов
/ 22 апреля 2019

Похоже, что Pub / Sub, вероятно, не подходящий инструмент для работы. Кажется, что вы пытаетесь использовать Pub / Sub как постоянное хранилище данных, что не является предполагаемым вариантом использования. Акинг является фундаментальной частью жизненного цикла сообщения Cloud Pub / Sub. Сообщения Pub / Sub удаляются, если они не распакованы по истечении указанного срока хранения сообщений, который не может превышать 7 дней.

Вместо этого я бы предложил использовать базу данных SQL, например Cloud Spanner . Затем вы можете сгенерировать uuid для каждого сообщения, использовать его в качестве первичного ключа для дедупликации и обновлять базу данных транзакционно, чтобы гарантировать отсутствие дубликатов.

Возможно, я смогу дать вам лучший ответ, если вы предоставите больше информации о том, что вы планируете делать с дедуплицированными сообщениями.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...