Как «создать» / «назначить» обработчик ведения журнала для Google Cloud Pubsub? - PullRequest
0 голосов
/ 21 января 2019

Разработка из предыдущей темы обнаружила, что предположения при задании вопроса были не по теме (подпроцесс фактически не вызывал проблем), поэтому я делаю более сфокусированный пост.

Мое сообщение об ошибке:

Не удалось найти обработчики для регистратора "google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager"

Мое намерение:

Передать атрибуты сообщения Google PubSub в качестве переменных Python для повторного использования в более позднем коде.

Мой код:

import time
import logging

from google.cloud import pubsub_v1

project_id = "redacted"
subscription_name = "redacted"

def receive_messages_with_custom_attributes(project_id, subscription_name):
    """Receives messages from a pull subscription."""
    # [START pubsub_subscriber_sync_pull_custom_attributes]

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message.data))
        if message.attributes:
            #print('Attributes:')
            for key in message.attributes:
                value = message.attributes.get(key);
                #commented out to not print to terminal
                #which should not be necessary
                #print('{}: {}'.format(key, value))
        message.ack()

        print("this is before variables")
        dirpath = "~/subfolder1/"
        print(dirpath)
        namepath = message.data["name"]
        print(namepath)
        fullpath = dirpath + namepath
        print(fullpath)
        print("this is after variables")


    subscriber.subscribe(subscription_path, callback=callback)
    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)
    # [END pubsub_subscriber_sync_pull_custom_attributes]

receive_messages_with_custom_attributes(project_id, subscription_name)

Мой полный вывод на консоль после запуска приведенного выше кода:

Listening for messages on projects/[redacted]
Received message: {
  "kind": "storage#object",
  "id": "[redacted]/0.testing/1548033442364022",
  "selfLink": "https://www.googleapis.com/storage/v1/b/[redacted]/o/BSD%2F0.testing",
  "name": "BSD/0.testing",
  "bucket": "[redacted]",
  "generation": "1548033442364022",
  "metageneration": "1",
  "contentType": "application/octet-stream",
  "timeCreated": "2019-01-21T01:17:22.363Z",
  "updated": "2019-01-21T01:17:22.363Z",
  "storageClass": "MULTI_REGIONAL",
  "timeStorageClassUpdated": "2019-01-21T01:17:22.363Z",
  "size": "0",
  "md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
  "mediaLink": "https://www.googleapis.com/download/storage/v1/b/[redacted]/o/BSD%2F0.testing?generation=1548033442364022&alt=media",
  "crc32c": "AAAAAA==",
  "etag": "CPb0uvvZ/d8CEAE="
}

this is before variables
/home/[redacted]
No handlers could be found for logger "google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager"

Как видите, первая строка и строка-определена-как переменная были напечатаны,но код прерывается при попытке определить переменные из только что сгенерированного словаря, и дальнейшие print() s не выполняются.

Потенциально связанный поток , который пользователь публиковал с заданиями cron,и нашел исправление из envpath crontab, но мой СиУчеба получает и не использует какие-либо задания cron, но может намекнуть на другой слой позади / внутри python?

Может кто-нибудь помочь мне с добавлением обработчика, чтобы этот код работал как задумано?

Ответы [ 2 ]

0 голосов
/ 22 января 2019

Как объяснили Науэль и Трипли, проблема в том, что сообщения представляют собой БАЙТЫ, а не строки.Тем не менее, их код не работал точно, и все равно выбрасывал ошибки, и я понятия не имею, почему.Перекрестные ссылки с примером кода Google для веб-сайта pubsub appengine и еще несколько часов проб и ошибок показали, что следующий код работает. Может быть не элегантным и / или иметь плохие практики, в этом случае, пожалуйста, отредактируйте его и сделайте его более надежным.

#Continues from after message.ack(), above code remains unchanged
#except needing to <import json>

    #this makes a message.data a true python dict with strings.
    payload = json.loads(message.data.decode('utf-8')) 

    #this finds the value of the dict with key "name"
    namepath = payload["name"]

    #this is just a static string to pre-pend to the file path
    dirpath = "/home/[redacted]/"

    #combine them into a single functioning path
    fullpath = dirpath + namepath

    #currently type 'unicode', so convert them to type 'str'
    fullpath = fullpath.encode("utf-8")

И в конце у нас будет полный путь, который имеет чисто тип'str' будет использоваться более поздними функциями / командами.

0 голосов
/ 21 января 2019

Во-первых, если я правильно понимаю, что вы показываете в своих выходных данных, вы используете уведомление Pub / Sub для отправки сообщения всякий раз, когда вы вносите изменения в объект облачного хранилища. Эта информация может быть полезной.

Теперь message.data["name"] не будет работать, потому что message.data является BYTES-объектом . Таким образом, не может быть проиндексирован как диктат.

Чтобы рассматривать его как диктовку, сначала нужно декодировать его как base64 (import base64) . После этого остается строка, которая выглядит как формат JSON. Затем вы используете json.load() (не забудьте import json) , чтобы преобразовать эту строку в диктовку. Теперь вы можете проиндексировать сообщение.

Код для этого будет:

print("This is before variables")
dirpath = "/subfolder1/"
print(dirpath)

#Transform the bytes object into a string by decoding it
namepath = base64.b64decode(message.data).decode('utf-8')

#Transform the json formated string into a dict
namepath = json.loads(namepath)

print(namepath["name"])
fullpath = dirpath + namepath["name"]
print(fullpath)
print("this is after variables")

Теперь, если вы хотите прочитать только атрибуты, они правильно определены вверху, например:

    if message.attributes:
        print('Attributes:')
        for key in message.attributes:
            value = message.attributes.get(key)
            print('{}: {}'.format(key, value))

Итак, вы можете использовать:

    print("this is before variables")
    dirpath = "~/subfolder1/"
    print(dirpath)
    namepath = message.attributes["objectId"]
    print(namepath)
    fullpath = dirpath + namepath
    print(fullpath)
    print("this is after variables")

Имейте в виду, что для этого конкретного случая "objectId" - это имя файла, поскольку это атрибут, который использует уведомление от Pub / Sub для Cloud Storage. Если вы притворяетесь, что отправляете пользовательские сообщения, измените "objectId" на желаемое имя атрибута.

...