Я пытаюсь прочитать сообщения из потока publi c projects / pubsub-publi c -data / themes / taxirides-realtime , и кажется, что я не обрабатываю данные быстро достаточно или есть проблема с подтверждением. «Количество непрочитанных сообщений» постоянно увеличивается, что бы я ни делал (даже если я очищаю сообщения перед запуском моего кода). Я попытался запустить один и тот же код из моего дома Windows 10 P C, из виртуальной машины Ubuntu на основе GCP и из консольного терминала GCP с тем же результатом.
Дополнительная информация: в одном из моих GCP проекты Я создал подписку «taxi-ride-client» для publi c projects / pubsub-publi c -data / themes / taxirides-realtime PubSub topi c и мое приложение читало это. Сообщения приходят в мою программу, но они обрабатываются медленно или неправильно.
Я что-то делаю не так или Python слишком медленно для этого? Вот мой код:
import os
from google.cloud import pubsub_v1
def callback(message):
''' Processing PubSub messages '''
message.ack()
if __name__ == '__main__':
project_name = '<projectname>'
credfile = '<credfilename>.json'
subscription_name = 'taxi-ride-client'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credfile
subscriber = pubsub_v1.SubscriberClient()
subscription = subscriber.subscription_path(project_name, subscription_name)
subscr_future = subscriber.subscribe(subscription, callback=callback)
print('Listening for messages via: {}'.format(subscription))
try:
subscr_future.result(timeout=600) # running for 10 minutes
except Exception as ex:
subscr_future.cancel()
print('\nNormal program termination.\n')
Поток, производящий около 8-10 миллионов записей на час, из которых менее 0,5% соответствует условию IF в моем обратном вызове. Во всяком случае, я также попробовал полностью пустой обратный вызов, который содержал только строку подтверждения.
Я также запустил эту маленькую программу в 5 отдельных экземплярах, чтобы прочитать из той же подписки, но даже в этом случае я не мог сделать разницу , Это говорит о том, что у меня есть проблема с подтверждением.
Что я делаю не так?
Кстати, я реализовал решение, используя G C DataFlow с первым шагом в качестве чтения из TopS PubSub c и это нормально работает под Python. Это другая библиотека и другая архитектура. Но он легко обрабатывает 9 000 000 сообщений в час.
Тем не менее мне любопытно, как это можно сделать с помощью python и чистого PubSub (без Beam).
(ОБНОВЛЕНИЕ)
Воспроизведение
- Проект GCP создан с именем:
<your-test-project>
- Файл учетной записи службы создан с ролью Проект / Владелец и файлом учетных данных, загруженным в JSON формате
- Подписка, созданная в командной оболочке:
gcloud pubsub subscriptions create projects/<your-test-project>/subscriptions/taxi-ride-client --topic=projects/pubsub-public-data/topics/taxirides-realtime --ack-deadline=60 --message-retention-duration=6h
- Python 3.7 виртуальная среда с google-cloud-pubsub (версия 1.1.0)
- Запуск кода после замены
<projectname>
и <credfilename>
. Исходный код здесь
Габор