Как подтвердить сообщение Google PubSub, используя AckID в Python - PullRequest
0 голосов
/ 10 февраля 2020

Я просматривал документы PubSub здесь

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO timeout = 5.0  # "How long the subscriber should listen for
# messages in seconds"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

streaming_pull_future = subscriber.subscribe(
    subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))

# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
    streaming_pull_future.result(timeout=timeout)
except:  # noqa
    streaming_pull_future.cancel()

В приведенном выше примере сообщение подтверждается сразу после его получения. Но я хочу подтвердить это только тогда, когда мои местные работники сельдерея завершат обработку сообщения sh, чтобы PubSub мог повторно доставить сообщение в случае сбоя. Поэтому я беру ack_id сообщения и передаю его работнику.

params["ack_id"] = message._ack_id
start_aggregation.delay(params)

Я просто не могу понять, как я могу использовать ack_id в работнике для подтверждения сообщения. Я знаю, что вы можете использовать конечную точку pubsub для подтверждения сообщения типа здесь . Но я не могу понять, как я могу использовать учетные данные учетной записи службы, чтобы сделать то же самое - они делают это, используя OAuth в том, что делают c. Любые указатели приветствуются. Спасибо.

1 Ответ

2 голосов
/ 10 февраля 2020

Получение сообщений от клиентской библиотеки с прямым вызовом API acknowledge может вызвать проблемы на клиенте. Клиент имеет ограничения управления потоком , которые определяют максимальное количество сообщений, которые могут быть ожидающими (доставленными, но не подтвержденными). Удаление сообщений из подсчета происходит, когда один из них вызывает message.ack() или message.nack(). Если бы вы вызывали API acknowledge напрямую, то это число не изменилось бы, в результате чего сообщения не будут передаваться после достижения предела.

Если вы пытаетесь использовать сельдерей для получения большего параллелизма в вашем обработки, вы, вероятно, можете сделать это напрямую без этого промежуточного шага. Один из вариантов - запускать экземпляры клиентских подписчиков с одинаковой подпиской в ​​разных процессах. Сообщения будут распространяться среди подписчиков. В качестве альтернативы, вы можете заменить планировщик на тот, который основан на процессах, а не на потоках, хотя это потребует дополнительных усилий.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...