Как увидеть, когда Google Pub / Sub завершится - PullRequest
0 голосов
/ 14 ноября 2018

От клиента у меня есть следующий код:

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
future = publisher.publish(topic_path, data=json.dumps(dict(op='create_master', review_id=1273612)))

Есть ли способ проверить, когда элемент закончил обрабатываться?Если так, как это будет сделано?Теперь у меня нет возможности узнать, «работает» ли кто-то или нет.

Ответы [ 2 ]

0 голосов
/ 14 ноября 2018

Один из способов настроить это - сохранить базу данных на основе message_id. Например, вот пример кода сервера:

def callback(message):

    # Message has been received by the Server/Subscriber
    cursor.execute('INSERT IGNORE INTO pubsub (id, message, received) VALUES (%s, %s, NOW())', (message.message_id, message.data))
    connection.commit()

    # Message is processed by the Server/Subscriber
    data_obj = loads(message.data)
    _process(data_obj)

    # Message has finished being processed by the Server/Subscriber
    cursor.execute('UPDATE pubsub SET completed=NOW() WHERE id=%s', (message.message_id,))
    connection.commit()
    message.ack()

Клиент имеет доступ к id через future.result(), поэтому может легко запросить его, чтобы увидеть статус. Это может быть особенно полезно, если вы просматриваете статусы в отдельном процессе (например, если запущено 100 длительных процессов и мы хотим отслеживать, какие из них были завершены).

0 голосов
/ 14 ноября 2018

Чтобы узнать, что сообщение было успешно опубликовано, вам нужно посмотреть на результат в будущем.Предпочтительный способ сделать это асинхронно:

def callback(future):
  try:
    print(future.result()) # future.result() is the message ID for the published message.
  except Exception as e:
    print("Error publishing: " + str(e))

future = publisher.publish(topic_path, data=json.dumps(dict(op='create_master', review_id=1273612)))
future.add_done_callback(callback)

Вы также можете сделать это синхронно, если хотите.Вызов result() в будущем будет блокироваться до тех пор, пока не станет доступен результат публикации:

future = publisher.publish(topic_path, data=json.dumps(dict(op='create_master', review_id=1273612)))
try:
  print(future.result()) # future.result() is the message ID for the published message.
except Exception as e:
  print("Error publishing: " + str(e))

Нет встроенного способа узнать, когда подписчики закончили обработку сообщения.Требование, чтобы издатели знали, когда подписчики обрабатывали сообщения, - это антишаблон;издатели и подписчики предназначены для разделения сущностей, которые непосредственно не знают друг о друге.При этом, если вам нужна такая информация, лучший способ сделать это - создать вторую тему, в которой ваши первоначальные подписчики публикуют сообщения после завершения обработки, на которые могут подписаться ваши оригинальные издатели, чтобы знать, когда обработказавершено.

...