Как написать облачную функцию для получения, анализа и публикации сообщений PubSub? - PullRequest
0 голосов
/ 04 января 2019

Это можно считать продолжением этой темы , но мне нужна дополнительная помощь в продвижении вещей.Надеюсь, кто-то может просмотреть мои попытки ниже и дать дальнейшие рекомендации.

Подводя итог, мне нужна облачная функция, которая

  1. запускается сообщением PubSub, опубликованным в теме A (это можно сделать в пользовательском интерфейсе).
  2. читает грязное уведомление об изменении объекта в "push" теме PubSub A .
  3. "разобрать" его
  4. опубликовать сообщение в теме PubSub B с исходным идентификатором сообщения в качестве данных и другими метаданными (например, именем файла, размером, временем)в качестве атрибутов.

.1:

Пример уведомления о смене беспорядочного объекта:

\ n "kind": "storage # object", \ n "id": "bucketcfpubsub / test.txt /1544681756538155 ", \ n" selfLink ":" https://www.googleapis.com/storage/v1/b/bucketcfpubsub/o/test.txt",\n "name": "test.txt", \ n "bucket": "bucketcfpubsub", \ n "generation": "1544681756538155", \ n "metageneration":" 1 ", \ n" contentType ":" text / plain ", \ n" timeCreated ":" 2018-12-13T06: 15: 56.537Z ", \ n" updated ":" 2018-12-13T06:15: 56.537Z ", \ n" storageClass ":" STANDARD ", \ n" timeStorageClassUpdated ":" 2018-12-13T06: 15: 56.537Z ", \ n" size ":" 1938 ", \ n" md5Hash ": "sDSXIvkR / PBg4mHyIUIvww ==", \ n "mediaLink": "https://www.googleapis.com/download/storage/v1/b/bucketcfpubsub/o/test.txt?generation=1544681756538155&alt=media",\n" crc32c ":" UDhyzw == ", \ n" etag ":" CKvqjvuTnN8CEAE = "\ n} \ n

Чтобы уточнить, это сообщение с пустым полем «данные», и вся информация выше представлена ​​в парах атрибутов (например, «имя атрибута»: «данные атрибута»)?Или это просто длинная строка, вставленная в поле «данные», без «атрибутов»?

.2:

В вышеупомянутой теме используется подписка по запросу.Это лучше, чем использовать «push» подписку?Пример ниже:

def create_push_subscription(project_id,
                             topic_name,
                             subscription_name,
                             endpoint):
    """Create a new push subscription on the given topic."""
    # [START pubsub_create_push_subscription]
    from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"
    # TODO subscription_name = "Your Pub/Sub subscription name"
    # TODO endpoint = "https://my-test-project.appspot.com/push"

    subscriber = pubsub_v1.SubscriberClient()
    topic_path = subscriber.topic_path(project_id, topic_name)
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    push_config = pubsub_v1.types.PushConfig(
        push_endpoint=endpoint)

    subscription = subscriber.create_subscription(
        subscription_path, topic_path, push_config)

    print('Push subscription created: {}'.format(subscription))
    print('Endpoint for subscription is: {}'.format(endpoint))
    # [END pubsub_create_push_subscription]

Или после этого мне нужен дополнительный код для получения сообщений?

Кроме того, разве это не создает нового подписчика каждый раз, когда функция облакавызвано публикацией сообщения pubsub?Должен ли я добавить код удаления подписки в конце CF или есть более эффективные способы сделать это?

.3:

Далее, чтобы проанализировать код, этот пример кода имеет несколько атрибутов следующим образом:

def summarize(message):
    # [START parse_message]
    data = message.data
    attributes = message.attributes

    event_type = attributes['eventType']
    bucket_id = attributes['bucketId']
    object_id = attributes['objectId']

Будет ли это работать с моим вышеупомянутым уведомлением в 1:?

.4:

Как мне отделить имя_файла?Шаги 1 и 2 используют тему A , тогда как этот шаг предназначен для публикации в теме B .Это так же просто, как переписать topic_name в приведенном ниже примере кода?

# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # Add two attributes, origin and username, to the message
    publisher.publish(
        topic_path, data, origin='python-sample', username='gcp')

print('Published messages with custom attributes.')

Исходный код, из которого я получил большую часть примера кода (кроме указанного выше потока): python-docs-samples.Будет ли адаптация и связывание приведенных выше примеров кода создавать полезный код?Или мне все еще не хватает таких вещей, как "import ****"?

1 Ответ

0 голосов
/ 23 апреля 2019

Не пытайтесь вручную создать подписчика, работающего в облачных функциях.Вместо этого следуйте документации здесь для настройки облачной функции, которая будет вызываться со всеми сообщениями, отправляемыми в заданную тему, путем передачи параметра командной строки --trigger-topic.

Для решения некоторых изВаши другие проблемы:

«Должен ли я добавить код удаления подписки в конце CF» - подписки - это долгоживущие ресурсы, соответствующие определенному резерву сообщений.Если подписка создается и удаляется в конце облачной функции, сообщения, отправленные, когда она не существует, не будут приниматься.

«Как отделить имя_платы» - в этом примере используется «имя_позиции»до последней части строки, отформатированной как projects/project_id/topics/topic_name, которая появится на этой странице в облачной консоли для вашей темы после ее создания.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...