Как вернуть подтверждение из фоновой функции в pubsub, используя python - PullRequest
1 голос
/ 30 апреля 2019

Я настраиваю новый проект GCP для чтения и анализа файла CSV, как только он загружен в корзину.Для этого я создал триггер, который публикует в паб / саб.Pub / Sub сам отправляет сообщения в фоновую функцию.

Кажется, все работает нормально, например, как только загруженный файл запускается, триггер начинает отправлять сообщение в Pubsub, а затем в функцию.Я также вижу сообщение, поступающее в функцию.

Проблема, однако, заключается в отправке Ack обратно в паб / саб.Где-то я читал, что отправка обратно любого статуса 2xx должна выполнять работу (чтобы удалить сообщение из очереди), но это не так.В результате pubsub «думает», что сообщение не было доставлено, и отправляет сообщение снова и снова.

def parse_data(data, context):


    if 'data' in data:
        args = base64.b64decode(data['data']).decode('utf-8')
        pubsub_message = args.replace('\n', ' ')
        properties = json.loads(pubsub_message)
        myBucket = validate_message(properties, 'bucket')   
        myFileName = validate_message(properties, "name")
        fileLocation = 'gs://'+myBucket+'/'+myFileName
        readAndEnhanceData(fileLocation)
        return 'OK', 200
    else:
        return 'Something went wrong, no data received'   

и вот файл журнала, который показывает, что функция вызывается постоянно.

D  CSV_Parser_Raw_Data 518626734652287 Function execution took 72855 ms,
 finished with status: 'ok' CSV_Parser_Raw_Data 518626734652287

D  CSV_Parser_Raw_Data 518626708442766 Function execution took 131886 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518626708442766 

D  CSV_Parser_Raw_Data 518624470100006 Function execution took 65412 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518624470100006 

D  CSV_Parser_Raw_Data 518626734629237 Function execution took 68004 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518626734629237

D  CSV_Parser_Raw_Data 518623777839079 Function execution took 131255 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518623777839079 

D  CSV_Parser_Raw_Data 518623548622842 Function execution took 131186 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518623548622842 

D  CSV_Parser_Raw_Data 518623769252453 Function execution took 133981 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518623769252453 

Так что я был бы рад узнать, чего мне здесь не хватает!Т.е. как я могу разорвать этот цикл?

* ОБНОВЛЕНИЕ в выпуске * Благодаря @kamal, который заставил меня открыть глаза, поручил мне воссоздать ведра / темы и т. Д., Пока я былна задаче, пересмотрел все и понял, я использовал временный файл в подпапке, но в том же ведре, как загружать файлы!Это была проблема.Событие Finalize предназначено для ЛЮБОГО объекта, созданного ВСЕГДА в корзине.Итак, Камаль был прав: несколько загрузок происходили!

Если вы работаете с таким же образом, создайте папку tmp и не добавляйте в нее ЛЮБОЙ триггер.


Ответы [ 2 ]

1 голос
/ 30 апреля 2019

Как правило, Google Cloud Pub / Sub гарантирует хотя бы раз доставку сообщений.Это означает, что всегда можно получить дубликаты, хотя они должны быть относительно редкими.В вашем случае, это не то, что одно и то же сообщение обрабатывается снова и снова, это разные сообщения.Цифры, такие как 518626734652287, являются идентификаторами сообщений.Поскольку каждый раз они разные, это означает, что было опубликовано несколько сообщений.Вероятно, происходит одно из двух:

  1. Файлы загружаются несколько раз.
  2. Триггер GCS устанавливается несколько раз.Вы можете проверить это, запустив gsutil notification list gs://<bucket name>.

Если проблема последняя, ​​вы увидите несколько записей, например:

projects/_/buckets/my-bucket/notificationConfigs/1
    Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic

projects/_/buckets/my-bucket/notificationConfigs/2
    Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic

projects/_/buckets/my-bucket/notificationConfigs/3
    Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic

Вы можете удалить дополнительные уведомления с помощьювыдача удаления с именем конфигурации, например, gsutil notification delete projects/_/buckets/my-bucket/notificationConfigs/2.

Стоит также отметить, что с Cloud Functions и Pub / Sub можно настроить два типа подписок: настроенные пользователем инастроенные самими Cloud Functions.По умолчанию крайний срок подтверждения для первого составляет 10 секунд.Это означает, что если сообщение не будет подтверждено в течение 10 секунд, оно будет доставлено.Для последнего значение по умолчанию составляет 600 секунд.Если обработка сообщений занимает больше времени, чем этот период времени, вероятно, произойдет повторная доставка.

Вы можете попытаться уменьшить время, необходимое для обработки сообщения, или увеличить срок подтверждения.Вы можете увеличить срок подтверждения, используя инструмент gcloud:

gcloud pubsub subscriptions update <subscription name> --ack-deadline=180

Это увеличит срок до 3 минут.Вы также можете сделать это на странице Cloud Console Pub / Sub , нажав на подписку, нажав «Изменить», а затем изменив «Срок подтверждения» на большее значение.

Облачные функции не требуют возврата HTTP-статуса.Это необходимо, только если вы используете push-подписку напрямую.

0 голосов
/ 30 апреля 2019

Вы не можете просто вернуть 200 из своей функции.Вам действительно нужно «подтвердить» сообщение pubsub.Вы не показали код, который фактически получает сообщение от pubsub, но я предполагаю, что где-то в этом коде у вас есть что-то вроде:

queue = Queue.Queue()
message = queue.get()
parse_data(message.data, context)

Вот где вам нужно подтвердить сообщение:

queue = Queue.Queue()
message = queue.get()
if parse_data(message.data, context):
    message.ack()
...